Sfoglia il codice sorgente

OTPGenerator: parallelize OTP generation using BThreadWork

ambrop7 15 anni fa
parent
commit
c97534f8e9

+ 3 - 2
client/DatagramPeerIO.c

@@ -147,7 +147,8 @@ int DatagramPeerIO_Init (
     PacketPassInterface *recv_userif,
     int otp_warning_count,
     DatagramPeerIO_handler_otp_warning handler_otp_warning,
-    void *user
+    void *user,
+    BThreadWorkDispatcher *twd
 )
 {
     ASSERT(payload_mtu >= 0)
@@ -219,7 +220,7 @@ int DatagramPeerIO_Init (
     FragmentProtoDisassembler_Init(&o->send_disassembler, o->reactor, o->payload_mtu, o->spproto_payload_mtu, -1, latency);
     
     // init encoder
-    if (!SPProtoEncoder_Init(&o->send_encoder, FragmentProtoDisassembler_GetOutput(&o->send_disassembler), o->sp_params, otp_warning_count, handler_otp_warning, user, BReactor_PendingGroup(o->reactor))) {
+    if (!SPProtoEncoder_Init(&o->send_encoder, FragmentProtoDisassembler_GetOutput(&o->send_disassembler), o->sp_params, otp_warning_count, handler_otp_warning, user, BReactor_PendingGroup(o->reactor), twd)) {
         BLog(BLOG_ERROR, "SPProtoEncoder_Init failed");
         goto fail3;
     }

+ 3 - 1
client/DatagramPeerIO.h

@@ -135,6 +135,7 @@ typedef struct {
  *                          In this case, must be >0 and <=sp_params.otp_num.
  * @param handler_otp_warning OTP warning handler
  * @param user value to pass to handler
+ * @param twd thread work dispatcher
  * @return 1 on success, 0 on failure
  */
 int DatagramPeerIO_Init (
@@ -148,7 +149,8 @@ int DatagramPeerIO_Init (
     PacketPassInterface *recv_userif,
     int otp_warning_count,
     DatagramPeerIO_handler_otp_warning handler_otp_warning,
-    void *user
+    void *user,
+    BThreadWorkDispatcher *twd
 ) WARN_UNUSED;
 
 /**

+ 14 - 1
client/client.c

@@ -148,6 +148,9 @@ char server_name[256];
 // reactor
 BReactor ss;
 
+// thread work dispatcher
+BThreadWorkDispatcher twd;
+
 // client certificate if using SSL
 CERTCertificate *client_cert;
 
@@ -426,6 +429,12 @@ int main (int argc, char *argv[])
         goto fail2;
     }
     
+    // init thread work dispatcher
+    if (!BThreadWorkDispatcher_Init(&twd, &ss, -1)) {
+        BLog(BLOG_ERROR, "BThreadWorkDispatcher_Init failed");
+        goto fail2a;
+    }
+    
     if (options.ssl) {
         // init NSPR
         PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
@@ -602,6 +611,8 @@ fail3:
         ASSERT_FORCE(PR_Cleanup() == PR_SUCCESS)
         PL_ArenaFinish();
     }
+    BThreadWorkDispatcher_Free(&twd);
+fail2a:
     BSignal_Finish();
 fail2:
     BReactor_Free(&ss);
@@ -1523,7 +1534,9 @@ int peer_init_link (struct peer_data *peer)
         if (!DatagramPeerIO_Init(
             &peer->pio.udp.pio, &ss, data_mtu, CLIENT_UDP_MTU, sp_params,
             options.fragmentation_latency, PEER_UDP_ASSEMBLER_NUM_FRAMES, &peer->recv_ppi,
-            options.otp_num_warn, (DatagramPeerIO_handler_otp_warning)peer_udp_pio_handler_seed_warning, peer
+            options.otp_num_warn,
+            (DatagramPeerIO_handler_otp_warning)peer_udp_pio_handler_seed_warning,
+            peer, &twd
         )) {
             peer_log(peer, BLOG_ERROR, "DatagramPeerIO_Init failed");
             goto fail1;

+ 15 - 6
flow/SPProtoEncoder.c

@@ -166,7 +166,19 @@ static void handler_job_hander (SPProtoEncoder *o)
     return;
 }
 
-int SPProtoEncoder_Init (SPProtoEncoder *o, PacketRecvInterface *input, struct spproto_security_params sp_params, int otp_warning_count, SPProtoEncoder_handler handler, void *user, BPendingGroup *pg)
+static void otpgenerator_handler (SPProtoEncoder *o)
+{
+    ASSERT(SPPROTO_HAVE_OTP(o->sp_params))
+    DebugObject_Access(&o->d_obj);
+    
+    // remember seed ID
+    o->otpgen_seed_id = o->otpgen_pending_seed_id;
+    
+    // possibly continue I/O
+    maybe_encode(o);
+}
+
+int SPProtoEncoder_Init (SPProtoEncoder *o, PacketRecvInterface *input, struct spproto_security_params sp_params, int otp_warning_count, SPProtoEncoder_handler handler, void *user, BPendingGroup *pg, BThreadWorkDispatcher *twd)
 {
     spproto_assert_security_params(sp_params);
     ASSERT(spproto_carrier_mtu_for_payload_mtu(sp_params, PacketRecvInterface_GetMTU(input)) >= 0)
@@ -196,7 +208,7 @@ int SPProtoEncoder_Init (SPProtoEncoder *o, PacketRecvInterface *input, struct s
     
     // init otp generator
     if (SPPROTO_HAVE_OTP(o->sp_params)) {
-        if (!OTPGenerator_Init(&o->otpgen, o->sp_params.otp_num, o->sp_params.otp_mode)) {
+        if (!OTPGenerator_Init(&o->otpgen, o->sp_params.otp_num, o->sp_params.otp_mode, twd, (OTPGenerator_handler)otpgenerator_handler, o)) {
             goto fail0;
         }
     }
@@ -324,10 +336,7 @@ void SPProtoEncoder_SetOTPSeed (SPProtoEncoder *o, uint16_t seed_id, uint8_t *ke
     OTPGenerator_SetSeed(&o->otpgen, key, iv);
     
     // remember seed ID
-    o->otpgen_seed_id = seed_id;
-    
-    // possibly continue I/O
-    maybe_encode(o);
+    o->otpgen_pending_seed_id = seed_id;
 }
 
 void SPProtoEncoder_RemoveOTPSeed (SPProtoEncoder *o)

+ 4 - 1
flow/SPProtoEncoder.h

@@ -35,6 +35,7 @@
 #include <security/BEncryption.h>
 #include <security/OTPGenerator.h>
 #include <flow/PacketRecvInterface.h>
+#include <threadwork/BThreadWork.h>
 
 /**
  * Event context handler called when the remaining number of
@@ -61,6 +62,7 @@ typedef struct {
     int enc_key_size;
     OTPGenerator otpgen;
     uint16_t otpgen_seed_id;
+    uint16_t otpgen_pending_seed_id;
     int have_encryption_key;
     BEncryption encryptor;
     int input_mtu;
@@ -87,9 +89,10 @@ typedef struct {
  * @param handler OTP warning handler
  * @param user value to pass to handler
  * @param pg pending group
+ * @param twd thread work dispatcher
  * @return 1 on success, 0 on failure
  */
-int SPProtoEncoder_Init (SPProtoEncoder *o, PacketRecvInterface *input, struct spproto_security_params sp_params, int otp_warning_count, SPProtoEncoder_handler handler, void *user, BPendingGroup *pg) WARN_UNUSED;
+int SPProtoEncoder_Init (SPProtoEncoder *o, PacketRecvInterface *input, struct spproto_security_params sp_params, int otp_warning_count, SPProtoEncoder_handler handler, void *user, BPendingGroup *pg, BThreadWorkDispatcher *twd) WARN_UNUSED;
 
 /**
  * Frees the object.

+ 1 - 1
security/CMakeLists.txt

@@ -6,4 +6,4 @@ add_library(security
     OTPChecker.c
     OTPGenerator.c
 )
-target_link_libraries(security system ${LIBCRYPTO_LIBRARIES})
+target_link_libraries(security system threadwork ${LIBCRYPTO_LIBRARIES})

+ 80 - 9
security/OTPGenerator.c

@@ -20,61 +20,132 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
+#include <string.h>
+
 #include <security/OTPGenerator.h>
 
-int OTPGenerator_Init (OTPGenerator *g, int num_otps, int cipher)
+static void work_func (OTPGenerator *g)
+{
+    g->otps[!g->cur_calc] = OTPCalculator_Generate(&g->calc[!g->cur_calc], g->tw_key, g->tw_iv, 1);
+}
+
+static void work_done_handler (OTPGenerator *g)
+{
+    DebugObject_Access(&g->d_obj);
+    
+    // free work
+    BThreadWork_Free(&g->tw);
+    g->tw_have = 0;
+    
+    // use new OTPs
+    g->cur_calc = !g->cur_calc;
+    g->position = 0;
+    
+    // call handler
+    g->handler(g->user);
+    return;
+}
+
+int OTPGenerator_Init (OTPGenerator *g, int num_otps, int cipher, BThreadWorkDispatcher *twd, OTPGenerator_handler handler, void *user)
 {
     ASSERT(num_otps >= 0)
     ASSERT(BEncryption_cipher_valid(cipher))
     
     // init arguments
     g->num_otps = num_otps;
+    g->cipher = cipher;
+    g->twd = twd;
+    g->handler = handler;
+    g->user = user;
     
     // init position
     g->position = g->num_otps;
     
     // init calculator
-    if (!OTPCalculator_Init(&g->calc, g->num_otps, cipher)) {
+    if (!OTPCalculator_Init(&g->calc[0], g->num_otps, g->cipher)) {
         goto fail0;
     }
     
-    // init debug object
+    // init calculator
+    if (!OTPCalculator_Init(&g->calc[1], g->num_otps, g->cipher)) {
+        goto fail1;
+    }
+    
+    // set current calculator
+    g->cur_calc = 0;
+    
+    // have no work
+    g->tw_have = 0;
+    
     DebugObject_Init(&g->d_obj);
-
     return 1;
     
+fail1:
+    OTPCalculator_Free(&g->calc[0]);
 fail0:
     return 0;
 }
 
 void OTPGenerator_Free (OTPGenerator *g)
 {
-    // free debug object
     DebugObject_Free(&g->d_obj);
     
+    // free work
+    if (g->tw_have) {
+        BThreadWork_Free(&g->tw);
+    }
+    
+    // free calculator
+    OTPCalculator_Free(&g->calc[1]);
+    
     // free calculator
-    OTPCalculator_Free(&g->calc);
+    OTPCalculator_Free(&g->calc[0]);
 }
 
 void OTPGenerator_SetSeed (OTPGenerator *g, uint8_t *key, uint8_t *iv)
 {
-    g->otps = OTPCalculator_Generate(&g->calc, key, iv, 1);
-    g->position = 0;
+    DebugObject_Access(&g->d_obj);
+    
+    // free existing work
+    if (g->tw_have) {
+        BThreadWork_Free(&g->tw);
+    }
+    
+    // copy key and IV
+    memcpy(g->tw_key, key, BEncryption_cipher_key_size(g->cipher));
+    memcpy(g->tw_iv, iv, BEncryption_cipher_block_size(g->cipher));
+    
+    // start work
+    BThreadWork_Init(&g->tw, g->twd, (BThreadWork_handler_done)work_done_handler, g, (BThreadWork_work_func)work_func, g);
+    
+    // set have work
+    g->tw_have = 1;
 }
 
 int OTPGenerator_GetPosition (OTPGenerator *g)
 {
+    DebugObject_Access(&g->d_obj);
+    
     return g->position;
 }
 
 void OTPGenerator_Reset (OTPGenerator *g)
 {
+    DebugObject_Access(&g->d_obj);
+    
+    // free existing work
+    if (g->tw_have) {
+        BThreadWork_Free(&g->tw);
+        g->tw_have = 0;
+    }
+    
     g->position = g->num_otps;
 }
 
 otp_t OTPGenerator_GetOTP (OTPGenerator *g)
 {
     ASSERT(g->position < g->num_otps)
+    DebugObject_Access(&g->d_obj);
     
-    return g->otps[g->position++];
+    return g->otps[g->cur_calc][g->position++];
 }

+ 32 - 6
security/OTPGenerator.h

@@ -30,16 +30,34 @@
 #include <misc/debug.h>
 #include <security/OTPCalculator.h>
 #include <system/DebugObject.h>
+#include <threadwork/BThreadWork.h>
+
+/**
+ * Handler called when OTP generation for a seed is finished.
+ * The OTP position is reset to zero before the handler is called.
+ * 
+ * @param user as in {@link OTPGenerator_Init}
+ */
+typedef void (*OTPGenerator_handler) (void *user);
 
 /**
  * Object which generates OTPs for use in sending packets.
  */
 typedef struct {
-    DebugObject d_obj;
     int num_otps;
+    int cipher;
+    BThreadWorkDispatcher *twd;
+    OTPGenerator_handler handler;
+    void *user;
     int position;
-    OTPCalculator calc;
-    otp_t *otps;
+    int cur_calc;
+    OTPCalculator calc[2];
+    otp_t *otps[2];
+    int tw_have;
+    BThreadWork tw;
+    uint8_t tw_key[BENCRYPTION_MAX_KEY_SIZE];
+    uint8_t tw_iv[BENCRYPTION_MAX_BLOCK_SIZE];
+    DebugObject d_obj;
 } OTPGenerator;
 
 /**
@@ -50,9 +68,13 @@ typedef struct {
  * @param num_otps number of OTPs to generate from a seed. Must be >=0.
  * @param cipher encryption cipher for calculating the OTPs. Must be valid
  *               according to {@link BEncryption_cipher_valid}.
+ * @param twd thread work dispatcher
+ * @param handler handler to call when generation of new OTPs is complete,
+ *                after {@link OTPGenerator_SetSeed} was called.
+ * @param user argument to handler
  * @return 1 on success, 0 on failure
  */
-int OTPGenerator_Init (OTPGenerator *g, int num_otps, int cipher) WARN_UNUSED;
+int OTPGenerator_Init (OTPGenerator *g, int num_otps, int cipher, BThreadWorkDispatcher *twd, OTPGenerator_handler handler, void *user) WARN_UNUSED;
 
 /**
  * Frees the generator.
@@ -62,8 +84,12 @@ int OTPGenerator_Init (OTPGenerator *g, int num_otps, int cipher) WARN_UNUSED;
 void OTPGenerator_Free (OTPGenerator *g);
 
 /**
- * Assigns a seed to use for generating OTPs.
- * Sets the number of used OTPs to 0.
+ * Starts generating OTPs for a seed.
+ * When generation is complete and the new OTPs may be used, the {@link OTPGenerator_handler}
+ * handler will be called.
+ * If OTPs are still being generated for a previous seed, it will be forgotten.
+ * This call by itself does not affect the OTP position; rather the position is set to zero
+ * before the handler is called.
  *
  * @param g the object
  * @param key encryption key