Преглед на файлове

DataProto: don't fail at sending the first relayed frame. We need a job for this.

ambrop7 преди 15 години
родител
ревизия
60ec61b3c8
променени са 3 файла, в които са добавени 84 реда и са изтрити 60 реда
  1. 64 45
      client/DataProto.c
  2. 19 2
      client/DataProto.h
  3. 1 13
      client/client.c

+ 64 - 45
client/DataProto.c

@@ -36,20 +36,8 @@
 
 #define DATAPROTO_TIMEOUT 30000
 
-struct dp_relay_flow {
-    DataProtoRelaySource *rs;
-    DataProtoDest *dp;
-    BufferWriter ainput;
-    PacketBuffer buffer;
-    PacketPassInactivityMonitor monitor;
-    PacketPassFairQueueFlow qflow;
-    LinkedList2Node source_list_node;
-    BAVLNode source_tree_node;
-    LinkedList2Node dp_list_node;
-};
-
 static int peerid_comparator (void *user, peerid_t *val1, peerid_t *val2);
-static struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDest *dp, int num_packets);
+static struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDest *dp, int num_packets, uint8_t *first_frame, int first_frame_len);
 static void dealloc_relay_flow (struct dp_relay_flow *flow);
 static void release_relay_flow (struct dp_relay_flow *flow);
 static void flow_monitor_handler (struct dp_relay_flow *flow);
@@ -59,6 +47,8 @@ static void receive_timer_handler (DataProtoDest *o);
 static void notifier_handler (DataProtoDest *o, uint8_t *data, int data_len);
 static int pointer_comparator (void *user, void **val1, void **val2);
 static void keepalive_job_handler (DataProtoDest *o);
+static void relay_job_handler (struct dp_relay_flow *flow);
+static void submit_relay_frame (struct dp_relay_flow *flow, uint8_t *frame, int frame_len);
 
 int peerid_comparator (void *user, peerid_t *val1, peerid_t *val2)
 {
@@ -71,8 +61,10 @@ int peerid_comparator (void *user, peerid_t *val1, peerid_t *val2)
     return 0;
 }
 
-struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDest *dp, int num_packets)
+struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDest *dp, int num_packets, uint8_t *first_frame, int first_frame_len)
 {
+    ASSERT(first_frame_len >= 0)
+    ASSERT(first_frame_len <= dp->frame_mtu)
     ASSERT(!BAVL_LookupExact(&rs->relay_flows_tree, &dp))
     ASSERT(num_packets > 0)
     ASSERT(!dp->d_freeing)
@@ -84,9 +76,15 @@ struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDes
         goto fail0;
     }
     
-    // set source and dp
+    // init arguments
     flow->rs = rs;
     flow->dp = dp;
+    flow->first_frame = first_frame;
+    flow->first_frame_len = first_frame_len;
+    
+    // init first frame job
+    BPending_Init(&flow->first_frame_job, BReactor_PendingGroup(dp->reactor), (BPending_handler)relay_job_handler, flow);
+    BPending_Set(&flow->first_frame_job);
     
     // init queue flow
     PacketPassFairQueueFlow_Init(&flow->qflow, &dp->queue);
@@ -120,6 +118,7 @@ fail1:
     BufferWriter_Free(&flow->ainput);
     PacketPassInactivityMonitor_Free(&flow->monitor);
     PacketPassFairQueueFlow_Free(&flow->qflow);
+    BPending_Free(&flow->first_frame_job);
     free(flow);
 fail0:
     return NULL;
@@ -127,11 +126,7 @@ fail0:
 
 void dealloc_relay_flow (struct dp_relay_flow *flow)
 {
-    #ifndef NDEBUG
-    if (!flow->dp->d_freeing) {
-        ASSERT(!PacketPassFairQueueFlow_IsBusy(&flow->qflow))
-    }
-    #endif
+    PacketPassFairQueueFlow_AssertFree(&flow->qflow);
     
     // remove from dp list
     LinkedList2_Remove(&flow->dp->relay_flows_list, &flow->dp_list_node);
@@ -154,6 +149,9 @@ void dealloc_relay_flow (struct dp_relay_flow *flow)
     // free queue flow
     PacketPassFairQueueFlow_Free(&flow->qflow);
     
+    // free first frame job
+    BPending_Free(&flow->first_frame_job);
+    
     // free flow structure
     free(flow);
 }
@@ -243,9 +241,50 @@ int pointer_comparator (void *user, void **val1, void **val2)
 
 void keepalive_job_handler (DataProtoDest *o)
 {
+    ASSERT(!o->d_freeing)
     DebugObject_Access(&o->d_obj);
     
-    PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
+    send_keepalive(o);
+}
+
+void relay_job_handler (struct dp_relay_flow *flow)
+{
+    ASSERT(flow->first_frame_len >= 0)
+    
+    int frame_len = flow->first_frame_len;
+    
+    // set no first frame
+    flow->first_frame_len = -1;
+    
+    // submit first frame
+    submit_relay_frame(flow, flow->first_frame, frame_len);
+}
+
+void submit_relay_frame (struct dp_relay_flow *flow, uint8_t *frame, int frame_len)
+{
+    ASSERT(flow->first_frame_len == -1)
+    
+    // get a buffer
+    uint8_t *out;
+    // safe because of PacketBufferAsyncInput
+    if (!BufferWriter_StartPacket(&flow->ainput, &out)) {
+        BLog(BLOG_NOTICE, "out of buffer for relayed frame from peer %d to %d", (int)flow->rs->source_id, (int)flow->dp->dest_id);
+        return;
+    }
+    
+    // write header
+    struct dataproto_header *header = (struct dataproto_header *)out;
+    // don't set flags, it will be set in notifier_handler
+    header->from_id = htol16(flow->rs->source_id);
+    header->num_peer_ids = htol16(1);
+    struct dataproto_peer_id *id = (struct dataproto_peer_id *)(out + sizeof(struct dataproto_header));
+    id->id = htol16(flow->dp->dest_id);
+    
+    // write data
+    memcpy(out + sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id), frame, frame_len);
+    
+    // submit it
+    BufferWriter_EndPacket(&flow->ainput, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + frame_len);
 }
 
 int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoDest_handler handler, void *user)
@@ -399,34 +438,14 @@ void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs,
     BAVLNode *node = BAVL_LookupExact(&rs->relay_flows_tree, &o);
     if (!node) {
         // create new flow
-        if (!(flow = create_relay_flow(rs, o, buffer_num_packets))) {
-            return;
-        }
-    } else {
-        flow = UPPER_OBJECT(node, struct dp_relay_flow, source_tree_node);
-    }
-    
-    // get a buffer
-    uint8_t *out;
-    // safe because of PacketBufferAsyncInput
-    if (!BufferWriter_StartPacket(&flow->ainput, &out)) {
-        BLog(BLOG_NOTICE, "out of buffer for relayed frame from peer %d to %d", (int)rs->source_id, (int)o->dest_id);
+        create_relay_flow(rs, o, buffer_num_packets, data, data_len);
         return;
     }
     
-    // write header
-    struct dataproto_header *header = (struct dataproto_header *)out;
-    // don't set flags, it will be set in notifier_handler
-    header->from_id = htol16(rs->source_id);
-    header->num_peer_ids = htol16(1);
-    struct dataproto_peer_id *id = (struct dataproto_peer_id *)(out + sizeof(struct dataproto_header));
-    id->id = htol16(o->dest_id);
-    
-    // write data
-    memcpy(out + sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id), data, data_len);
+    flow = UPPER_OBJECT(node, struct dp_relay_flow, source_tree_node);
     
-    // submit it
-    BufferWriter_EndPacket(&flow->ainput, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + data_len);
+    // submit frame
+    submit_relay_frame(flow, data, data_len);
 }
 
 void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)

+ 19 - 2
client/DataProto.h

@@ -50,6 +50,8 @@
 
 typedef void (*DataProtoDest_handler) (void *user, int up);
 
+struct dp_relay_flow;
+
 /**
  * Frame destination.
  * Represents a peer as a destination for sending frames to.
@@ -113,6 +115,21 @@ typedef struct {
     DebugObject d_obj;
 } DataProtoRelaySource;
 
+struct dp_relay_flow {
+    DataProtoRelaySource *rs;
+    DataProtoDest *dp;
+    BufferWriter ainput;
+    PacketBuffer buffer;
+    PacketPassInactivityMonitor monitor;
+    PacketPassFairQueueFlow qflow;
+    LinkedList2Node source_list_node;
+    BAVLNode source_tree_node;
+    LinkedList2Node dp_list_node;
+    BPending first_frame_job;
+    uint8_t *first_frame;
+    int first_frame_len;
+};
+
 /**
  * Initializes the object.
  * 
@@ -151,11 +168,11 @@ void DataProtoDest_PrepareFree (DataProtoDest *o);
 /**
  * Submits a relayed frame.
  * Must not be in freeing state.
- * Must not be called from output Send calls.
+ * Must not be called before it evaluates.
  * 
  * @param o the object
  * @param rs relay source object representing the peer this frame came from
- * @param data frame data
+ * @param data frame data. The frame must remain accessible until this evaluates.
  * @param data_len frame length. Must be >=0.
  *                 Must be <= (output MTU) - (sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id)).
  * @param buffer_num_packets number of packets the relay buffer should hold, in case it doesn't exist.

+ 1 - 13
client/client.c

@@ -303,9 +303,6 @@ static int peer_generate_and_send_seed (struct peer_data *peer);
 
 static int peer_send_confirmseed (struct peer_data *peer, uint16_t seed_id);
 
-// submits a relayed frame for sending to the peer
-static void peer_submit_relayed_frame (struct peer_data *peer, struct peer_data *source_peer, uint8_t *frame, int frame_len);
-
 // handler for peer DataProto up state changes
 static void peer_dataproto_handler (struct peer_data *peer, int up);
 
@@ -2218,7 +2215,7 @@ out:
     
     // relay frame
     if (relay_dest) {
-        peer_submit_relayed_frame(relay_dest, src_peer, data, data_len);
+        DataProtoDest_SubmitRelayFrame(&relay_dest->send_dp, &src_peer->relay_source, data, data_len, options.send_buffer_relay_size);
     }
     
     // submit to device
@@ -2574,15 +2571,6 @@ int peer_send_confirmseed (struct peer_data *peer, uint16_t seed_id)
     return 0;
 }
 
-void peer_submit_relayed_frame (struct peer_data *peer, struct peer_data *source_peer, uint8_t *frame, int frame_len)
-{
-    ASSERT(peer->have_link)
-    ASSERT(frame_len >= 0)
-    ASSERT(frame_len <= device.mtu)
-    
-    DataProtoDest_SubmitRelayFrame(&peer->send_dp, &source_peer->relay_source, frame, frame_len, options.send_buffer_relay_size);
-}
-
 void peer_dataproto_handler (struct peer_data *peer, int up)
 {
     ASSERT(peer->have_link)