فهرست منبع

minor changes

ambrop7 15 سال پیش
والد
کامیت
084597a46d
2فایلهای تغییر یافته به همراه29 افزوده شده و 59 حذف شده
  1. 23 52
      flow/FragmentProtoAssembler.c
  2. 6 7
      flow/FragmentProtoAssembler.h

+ 23 - 52
flow/FragmentProtoAssembler.c

@@ -32,8 +32,6 @@
 
 #include <generated/blog_channel_FragmentProtoAssembler.h>
 
-#define FPA_MAX_TIME UINT32_MAX
-
 static int frame_id_comparator (void *unused, fragmentproto_frameid *v1, fragmentproto_frameid *v2)
 {
     if (*v1 < *v2) {
@@ -153,9 +151,8 @@ static void reduce_times (FragmentProtoAssembler *o)
     o->time -= min_time;
 }
 
-static void process_chunk (FragmentProtoAssembler *o, fragmentproto_frameid frame_id, int chunk_start, int chunk_len, int is_last, uint8_t *payload)
+static int process_chunk (FragmentProtoAssembler *o, fragmentproto_frameid frame_id, int chunk_start, int chunk_len, int is_last, uint8_t *payload)
 {
-    ASSERT(!o->output_ready)
     ASSERT(chunk_start >= 0)
     ASSERT(chunk_len >= 0)
     ASSERT(is_last == 0 || is_last == 1)
@@ -165,13 +162,13 @@ static void process_chunk (FragmentProtoAssembler *o, fragmentproto_frameid fram
     // check start
     if (chunk_start > o->output_mtu) {
         BLog(BLOG_INFO, "chunk starts outside");
-        return;
+        return 0;
     }
     
     // check frame size bound
     if (chunk_len > o->output_mtu - chunk_start) {
         BLog(BLOG_INFO, "chunk ends outside");
-        return;
+        return 0;
     }
     
     // calculate end
@@ -267,7 +264,7 @@ static void process_chunk (FragmentProtoAssembler *o, fragmentproto_frameid fram
         }
         
         // wait for more chunks
-        return;
+        return 0;
     }
     
     ASSERT(frame->sum == frame->length)
@@ -277,20 +274,19 @@ static void process_chunk (FragmentProtoAssembler *o, fragmentproto_frameid fram
     // free frame entry
     free_frame(o, frame);
     
-    // remember frame
-    o->output_ready = 1;
-    o->output_packet_data = frame->buffer;
-    o->output_packet_len = frame->length;
-    return;
+    // send frame
+    PacketPassInterface_Sender_Send(o->output, frame->buffer, frame->length);
+    
+    return 1;
     
 fail_frame:
     free_frame(o, frame);
+    return 0;
 }
 
 static void process_input (FragmentProtoAssembler *o)
 {
     ASSERT(o->in_len >= 0)
-    ASSERT(!o->output_ready)
     
     // read chunks
     while (o->in_pos < o->in_len) {
@@ -319,18 +315,15 @@ static void process_input (FragmentProtoAssembler *o)
         }
         
         // process chunk
-        process_chunk(o, frame_id, chunk_start, chunk_len, is_last, o->in + o->in_pos);
+        int res = process_chunk(o, frame_id, chunk_start, chunk_len, is_last, o->in + o->in_pos);
         o->in_pos += chunk_len;
         
-        // if output is blocking, stop processing input
-        if (o->output_ready) {
+        if (res) {
+            // sending complete frame, stop processing input
             return;
         }
     }
     
-    // all input processed
-    o->in_len = -1;
-    
     // increment packet time
     if (o->time == FPA_MAX_TIME) {
         reduce_times(o);
@@ -345,31 +338,18 @@ static void process_input (FragmentProtoAssembler *o)
     } else {
         o->time++;
     }
-}
-
-static void do_io (FragmentProtoAssembler *o)
-{
-    ASSERT(o->in_len >= 0)
-    ASSERT(!o->output_ready)
-    
-    // process input
-    process_input(o);
     
-    ASSERT((o->in_len >= 0) == o->output_ready)
+    // set no input packet
+    o->in_len = -1;
     
-    if (o->output_ready) {
-        PacketPassInterface_Sender_Send(o->output, o->output_packet_data, o->output_packet_len);
-    } else {
-        PacketPassInterface_Done(&o->input);
-    }
+    // finish input
+    PacketPassInterface_Done(&o->input);
 }
 
 static void input_handler_send (FragmentProtoAssembler *o, uint8_t *data, int data_len)
 {
     ASSERT(data_len >= 0)
-    ASSERT(data_len <= o->input_mtu)
     ASSERT(o->in_len == -1)
-    ASSERT(!o->output_ready)
     DebugObject_Access(&o->d_obj);
     
     // save input packet
@@ -377,19 +357,15 @@ static void input_handler_send (FragmentProtoAssembler *o, uint8_t *data, int da
     o->in = data;
     o->in_pos = 0;
     
-    do_io(o);
+    process_input(o);
 }
 
 static void output_handler_done (FragmentProtoAssembler *o)
 {
     ASSERT(o->in_len >= 0)
-    ASSERT(o->output_ready)
     DebugObject_Access(&o->d_obj);
     
-    // output no longer blocking
-    o->output_ready = 0;
-    
-    do_io(o);
+    process_input(o);
 }
 
 int FragmentProtoAssembler_Init (FragmentProtoAssembler *o, int input_mtu, PacketPassInterface *output, int num_frames, int num_chunks, BPendingGroup *pg)
@@ -400,13 +376,11 @@ int FragmentProtoAssembler_Init (FragmentProtoAssembler *o, int input_mtu, Packe
     ASSERT(num_chunks > 0)
     
     // init arguments
-    o->input_mtu = input_mtu;
     o->output = output;
-    o->num_frames = num_frames;
     o->num_chunks = num_chunks;
     
     // init input
-    PacketPassInterface_Init(&o->input, o->input_mtu, (PacketPassInterface_handler_send)input_handler_send, o, pg);
+    PacketPassInterface_Init(&o->input, input_mtu, (PacketPassInterface_handler_send)input_handler_send, o, pg);
     
     // init output
     PacketPassInterface_Sender_Init(o->output, (PacketPassInterface_handler_done)output_handler_done, o);
@@ -418,20 +392,20 @@ int FragmentProtoAssembler_Init (FragmentProtoAssembler *o, int input_mtu, Packe
     o->time = 0;
     
     // set time tolerance to num_frames
-    o->time_tolerance = o->num_frames;
+    o->time_tolerance = num_frames;
     
     // allocate frames
-    if (!(o->frames_entries = BAllocArray(o->num_frames, sizeof(struct FragmentProtoAssembler_frame)))) {
+    if (!(o->frames_entries = BAllocArray(num_frames, sizeof(struct FragmentProtoAssembler_frame)))) {
         goto fail1;
     }
     
     // allocate chunks
-    if (!(o->frames_chunks = BAllocArray2(o->num_frames, o->num_chunks, sizeof(struct FragmentProtoAssembler_chunk)))) {
+    if (!(o->frames_chunks = BAllocArray2(num_frames, o->num_chunks, sizeof(struct FragmentProtoAssembler_chunk)))) {
         goto fail2;
     }
     
     // allocate buffers
-    if (!(o->frames_buffer = BAllocArray(o->num_frames, o->output_mtu))) {
+    if (!(o->frames_buffer = BAllocArray(num_frames, o->output_mtu))) {
         goto fail3;
     }
     
@@ -456,9 +430,6 @@ int FragmentProtoAssembler_Init (FragmentProtoAssembler *o, int input_mtu, Packe
     // have no input packet
     o->in_len = -1;
     
-    // output not blocking
-    o->output_ready = 0;
-    
     DebugObject_Init(&o->d_obj);
     
     return 1;

+ 6 - 7
flow/FragmentProtoAssembler.h

@@ -36,6 +36,8 @@
 #include <structure/BAVL.h>
 #include <flow/PacketPassInterface.h>
 
+#define FPA_MAX_TIME UINT32_MAX
+
 struct FragmentProtoAssembler_chunk {
     int start;
     int len;
@@ -62,12 +64,9 @@ struct FragmentProtoAssembler_frame {
  * Output is with {@link PacketPassInterface}.
  */
 typedef struct {
-    DebugObject d_obj;
     PacketPassInterface input;
-    int input_mtu;
     PacketPassInterface *output;
     int output_mtu;
-    int num_frames;
     int num_chunks;
     uint32_t time;
     int time_tolerance;
@@ -80,9 +79,7 @@ typedef struct {
     int in_len;
     uint8_t *in;
     int in_pos;
-    int output_ready;
-    uint8_t *output_packet_data;
-    int output_packet_len;
+    DebugObject d_obj;
 } FragmentProtoAssembler;
 
 /**
@@ -92,7 +89,9 @@ typedef struct {
  * @param o the object
  * @param input_mtu maximum input packet size. Must be >=0.
  * @param output output interface
- * @param num_frames number of frames we can hold. Must be >0 and < UINT32_MAX.
+ * @param num_frames number of frames we can hold. Must be >0 and < FPA_MAX_TIME.
+ *  To make the assembler tolerate out-of-order input of degree D, set to D+2.
+ *  Here, D is the minimum size of a hypothetical buffer needed to order the input.
  * @param num_chunks maximum number of chunks a frame can come in. Must be >0.
  * @param pg pending group
  * @return 1 on success, 0 on failure