Răsfoiți Sursa

PacketCopier: revert using jobs to call Done, it makes no sense

ambrop7 15 ani în urmă
părinte
comite
9e183b6265
4 a modificat fișierele cu 21 adăugiri și 49 ștergeri
  1. 1 1
      client/StreamPeerIO.c
  2. 18 40
      flow/PacketCopier.c
  3. 1 7
      flow/PacketCopier.h
  4. 1 1
      ipc/BIPC.c

+ 1 - 1
client/StreamPeerIO.c

@@ -374,7 +374,7 @@ int init_persistent_io (StreamPeerIO *pio, PacketPassInterface *user_recv_if)
     FlowErrorDomain_Init(&pio->ioerrdomain, (FlowErrorDomain_handler)error_handler, pio);
     
     // init sending objects
-    PacketCopier_Init(&pio->output_user_copier, pio->payload_mtu, BReactor_PendingGroup(pio->reactor));
+    PacketCopier_Init(&pio->output_user_copier, pio->payload_mtu);
     PacketProtoEncoder_Init(&pio->output_user_ppe, PacketCopier_GetOutput(&pio->output_user_copier));
     PacketPassConnector_Init(&pio->output_connector, PACKETPROTO_ENCLEN(pio->payload_mtu), BReactor_PendingGroup(pio->reactor));
     if (!SinglePacketBuffer_Init(&pio->output_user_spb, PacketProtoEncoder_GetOutput(&pio->output_user_ppe), PacketPassConnector_GetInput(&pio->output_connector), BReactor_PendingGroup(pio->reactor))) {

+ 18 - 40
flow/PacketCopier.c

@@ -31,18 +31,21 @@ static int input_handler_send (PacketCopier *o, uint8_t *data, int data_len)
     ASSERT(o->in_len == -1)
     ASSERT(data_len >= 0)
     
-    if (!o->out_have || o->out_got_len >= 0) {
+    if (!o->out_have) {
         o->in_len = data_len;
         o->in = data;
-        o->in_got = 0;
         return 0;
     }
     
     memcpy(o->out, data, data_len);
     
-    o->out_got_len = data_len;
+    o->out_have = 0;
     
-    BPending_Set(&o->continue_job_output);
+    DEAD_ENTER(o->dead)
+    PacketRecvInterface_Done(&o->output, data_len);
+    if (DEAD_LEAVE(o->dead)) {
+        return -1;
+    }
     
     return 1;
 }
@@ -59,46 +62,29 @@ static int output_handler_recv (PacketCopier *o, uint8_t *data, int *data_len)
 {
     ASSERT(!o->out_have)
     
-    if (o->in_len < 0 || o->in_got) {
+    if (o->in_len < 0) {
         o->out_have = 1;
         o->out = data;
-        o->out_got_len = -1;
         return 0;
     }
     
-    memcpy(data, o->in, o->in_len);
-    
-    o->in_got =  1;
+    int len = o->in_len;
     
-    BPending_Set(&o->continue_job_input);
-    
-    *data_len = o->in_len;
-    return 1;
-}
-
-static void input_job_handler (PacketCopier *o)
-{
-    ASSERT(o->in_len >= 0)
-    ASSERT(o->in_got)
+    memcpy(data, o->in, len);
     
     o->in_len = -1;
     
+    DEAD_ENTER(o->dead)
     PacketPassInterface_Done(&o->input);
-    return;
-}
-
-static void output_job_handler (PacketCopier *o)
-{
-    ASSERT(o->out_have)
-    ASSERT(o->out_got_len >= 0)
-    
-    o->out_have = 0;
+    if (DEAD_LEAVE(o->dead)) {
+        return -1;
+    }
     
-    PacketRecvInterface_Done(&o->output, o->out_got_len);
-    return;
+    *data_len = len;
+    return 1;
 }
 
-void PacketCopier_Init (PacketCopier *o, int mtu, BPendingGroup *pg)
+void PacketCopier_Init (PacketCopier *o, int mtu)
 {
     ASSERT(mtu >= 0)
     
@@ -118,10 +104,6 @@ void PacketCopier_Init (PacketCopier *o, int mtu, BPendingGroup *pg)
     // set no output packet
     o->out_have = 0;
     
-    // init continue jobs
-    BPending_Init(&o->continue_job_input, pg, (BPending_handler)input_job_handler, o);
-    BPending_Init(&o->continue_job_output, pg, (BPending_handler)output_job_handler, o);
-    
     // init debug object
     DebugObject_Init(&o->d_obj);
 }
@@ -130,11 +112,7 @@ void PacketCopier_Free (PacketCopier *o)
 {
     // free debug object
     DebugObject_Free(&o->d_obj);
-    
-    // free continue jobs
-    BPending_Free(&o->continue_job_output);
-    BPending_Free(&o->continue_job_input);
-    
+
     // free output
     PacketRecvInterface_Free(&o->output);
     

+ 1 - 7
flow/PacketCopier.h

@@ -30,7 +30,6 @@
 #include <stdint.h>
 
 #include <misc/dead.h>
-#include <system/BPending.h>
 #include <flow/PacketPassInterface.h>
 #include <flow/PacketRecvInterface.h>
 
@@ -46,12 +45,8 @@ typedef struct {
     PacketRecvInterface output;
     int in_len;
     uint8_t *in;
-    int in_got;
     int out_have;
     uint8_t *out;
-    int out_got_len;
-    BPending continue_job_input;
-    BPending continue_job_output;
 } PacketCopier;
 
 /**
@@ -59,9 +54,8 @@ typedef struct {
  * 
  * @param o the object
  * @param mtu maximum packet size. Must be >=0.
- * @param pg pending group
  */
-void PacketCopier_Init (PacketCopier *o, int mtu, BPendingGroup *pg);
+void PacketCopier_Init (PacketCopier *o, int mtu);
 
 /**
  * Frees the object.

+ 1 - 1
ipc/BIPC.c

@@ -50,7 +50,7 @@ static int init_io (BIPC *o, int send_mtu, PacketPassInterface *recv_if, BReacto
     // init sending
     StreamSocketSink_Init(&o->send_sink, FlowErrorReporter_Create(&o->domain, COMPONENT_SINK), &o->sock);
     PacketStreamSender_Init(&o->send_pss, StreamSocketSink_GetInput(&o->send_sink), PACKETPROTO_ENCLEN(send_mtu));
-    PacketCopier_Init(&o->send_copier, send_mtu, BReactor_PendingGroup(reactor));
+    PacketCopier_Init(&o->send_copier, send_mtu);
     PacketProtoEncoder_Init(&o->send_encoder, PacketCopier_GetOutput(&o->send_copier));
     if (!SinglePacketBuffer_Init(&o->send_buf, PacketProtoEncoder_GetOutput(&o->send_encoder), PacketStreamSender_GetInput(&o->send_pss), BReactor_PendingGroup(reactor))) {
         goto fail1;