Просмотр исходного кода

DPRelay: don't require every peer to have its own buffer for the purpose of relaying to other peers; use just one global buffer

ambrop7 15 лет назад
Родитель
Сommit
f13a0e9a0a
3 измененных файлов с 70 добавлено и 33 удалено
  1. 44 24
      client/DPRelay.c
  2. 13 4
      client/DPRelay.h
  3. 13 5
      client/client.c

+ 44 - 24
client/DPRelay.c

@@ -48,7 +48,7 @@ static struct DPRelay_flow * create_flow (DPRelaySource *src, DPRelaySink *sink,
     flow->sink = sink;
     
     // init DataProtoLocalSource
-    if (!DataProtoLocalSource_Init(&flow->dpls, &src->device, src->source_id, sink->dest_id, num_packets, inactivity_time, (DataProtoLocalSource_handler_inactivity)flow_inactivity_handler, flow)) {
+    if (!DataProtoLocalSource_Init(&flow->dpls, &src->router->device, src->source_id, sink->dest_id, num_packets, inactivity_time, (DataProtoLocalSource_handler_inactivity)flow_inactivity_handler, flow)) {
         BLog(BLOG_ERROR, "relay flow %d->%d: DataProtoLocalSource_Init failed", (int)src->source_id, (int)sink->dest_id);
         goto fail1;
     }
@@ -81,9 +81,9 @@ static void free_flow (struct DPRelay_flow *flow)
         DataProtoLocalSource_Detach(&flow->dpls);
     }
     
-    // remove from source's current flow
-    if (flow->src->current_flow == flow) {
-        flow->src->current_flow = NULL;
+    // remove posible router reference
+    if (flow->src->router->current_flow == flow) {
+        flow->src->router->current_flow = NULL;
     }
     
     // remove from sink list
@@ -121,7 +121,7 @@ static struct DPRelay_flow * source_find_flow (DPRelaySource *o, DPRelaySink *si
     return NULL;
 }
 
-static void source_device_handler (DPRelaySource *o, const uint8_t *frame, int frame_len)
+static void source_device_handler (DPRelayRouter *o, const uint8_t *frame, int frame_len)
 {
     DebugObject_Access(&o->d_obj);
     
@@ -136,13 +136,12 @@ static void source_device_handler (DPRelaySource *o, const uint8_t *frame, int f
     o->current_flow = NULL;
 }
 
-int DPRelaySource_Init (DPRelaySource *o, peerid_t source_id, int frame_mtu, BReactor *reactor)
+int DPRelayRouter_Init (DPRelayRouter *o, int frame_mtu, BReactor *reactor)
 {
     ASSERT(frame_mtu >= 0)
     ASSERT(frame_mtu <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
     
     // init arguments
-    o->source_id = source_id;
     o->frame_mtu = frame_mtu;
     
     // init BufferWriter
@@ -153,13 +152,11 @@ int DPRelaySource_Init (DPRelaySource *o, peerid_t source_id, int frame_mtu, BRe
         goto fail1;
     }
     
-    // init flows list
-    LinkedList1_Init(&o->flows_list);
-    
     // have no current flow
     o->current_flow = NULL;
     
     DebugObject_Init(&o->d_obj);
+    DebugCounter_Init(&o->d_ctr);
     
     return 1;
     
@@ -168,19 +165,12 @@ fail1:
     return 0;
 }
 
-void DPRelaySource_Free (DPRelaySource *o)
+void DPRelayRouter_Free (DPRelayRouter *o)
 {
+    ASSERT(!o->current_flow) // have no sources
+    DebugCounter_Free(&o->d_ctr);
     DebugObject_Free(&o->d_obj);
     
-    // free flows, detaching them if needed
-    LinkedList1Node *node;
-    while (node = LinkedList1_GetFirst(&o->flows_list)) {
-        struct DPRelay_flow *flow = UPPER_OBJECT(node, struct DPRelay_flow, src_list_node);
-        free_flow(flow);
-    }
-    
-    ASSERT(!o->current_flow)
-    
     // free DataProtoDevice
     DataProtoDevice_Free(&o->device);
     
@@ -188,19 +178,21 @@ void DPRelaySource_Free (DPRelaySource *o)
     BufferWriter_Free(&o->writer);
 }
 
-void DPRelaySource_SubmitFrame (DPRelaySource *o, DPRelaySink *sink, uint8_t *data, int data_len, int num_packets, int inactivity_time)
+void DPRelayRouter_SubmitFrame (DPRelayRouter *o, DPRelaySource *src, DPRelaySink *sink, uint8_t *data, int data_len, int num_packets, int inactivity_time)
 {
     ASSERT(data_len >= 0)
     ASSERT(data_len <= o->frame_mtu)
     ASSERT(num_packets > 0)
     ASSERT(!o->current_flow)
+    ASSERT(src->router == o)
     DebugObject_Access(&o->d_obj);
+    DebugObject_Access(&src->d_obj);
     DebugObject_Access(&sink->d_obj);
     
     // get memory location
     uint8_t *out;
     if (!BufferWriter_StartPacket(&o->writer, &out)) {
-        BLog(BLOG_ERROR, "BufferWriter_StartPacket failed for frame %d->%d !?", (int)o->source_id, (int)sink->dest_id);
+        BLog(BLOG_ERROR, "BufferWriter_StartPacket failed for frame %d->%d !?", (int)src->source_id, (int)sink->dest_id);
         return;
     }
     
@@ -212,9 +204,9 @@ void DPRelaySource_SubmitFrame (DPRelaySource *o, DPRelaySink *sink, uint8_t *da
     
     // get a flow
     // this comes _after_ writing the packet, in case flow initialization schedules jobs
-    struct DPRelay_flow *flow = source_find_flow(o, sink);
+    struct DPRelay_flow *flow = source_find_flow(src, sink);
     if (!flow) {
-        if (!(flow = create_flow(o, sink, num_packets, inactivity_time))) {
+        if (!(flow = create_flow(src, sink, num_packets, inactivity_time))) {
             return;
         }
     }
@@ -223,6 +215,34 @@ void DPRelaySource_SubmitFrame (DPRelaySource *o, DPRelaySink *sink, uint8_t *da
     o->current_flow = flow;
 }
 
+void DPRelaySource_Init (DPRelaySource *o, DPRelayRouter *router, peerid_t source_id, BReactor *reactor)
+{
+    DebugObject_Access(&router->d_obj);
+    
+    // init arguments
+    o->router = router;
+    o->source_id = source_id;
+    
+    // init flows list
+    LinkedList1_Init(&o->flows_list);
+    
+    DebugObject_Init(&o->d_obj);
+    DebugCounter_Increment(&o->router->d_ctr);
+}
+
+void DPRelaySource_Free (DPRelaySource *o)
+{
+    DebugCounter_Decrement(&o->router->d_ctr);
+    DebugObject_Free(&o->d_obj);
+    
+    // free flows, detaching them if needed
+    LinkedList1Node *node;
+    while (node = LinkedList1_GetFirst(&o->flows_list)) {
+        struct DPRelay_flow *flow = UPPER_OBJECT(node, struct DPRelay_flow, src_list_node);
+        free_flow(flow);
+    }
+}
+
 void DPRelaySource_PrepareFreeDestinations (DPRelaySource *o)
 {
     DebugObject_Access(&o->d_obj);

+ 13 - 4
client/DPRelay.h

@@ -37,13 +37,19 @@
 struct DPRelay_flow;
 
 typedef struct {
-    peerid_t source_id;
     int frame_mtu;
     BufferWriter writer;
     DataProtoDevice device;
-    LinkedList1 flows_list;
     struct DPRelay_flow *current_flow;
     DebugObject d_obj;
+    DebugCounter d_ctr;
+} DPRelayRouter;
+
+typedef struct {
+    DPRelayRouter *router;
+    peerid_t source_id;
+    LinkedList1 flows_list;
+    DebugObject d_obj;
 } DPRelaySource;
 
 typedef struct {
@@ -61,9 +67,12 @@ struct DPRelay_flow {
     LinkedList1Node sink_list_node;
 };
 
-int DPRelaySource_Init (DPRelaySource *o, peerid_t source_id, int frame_mtu, BReactor *reactor) WARN_UNUSED;
+int DPRelayRouter_Init (DPRelayRouter *o, int frame_mtu, BReactor *reactor) WARN_UNUSED;
+void DPRelayRouter_Free (DPRelayRouter *o);
+void DPRelayRouter_SubmitFrame (DPRelayRouter *o, DPRelaySource *src, DPRelaySink *sink, uint8_t *data, int data_len, int num_packets, int inactivity_time);
+
+void DPRelaySource_Init (DPRelaySource *o, DPRelayRouter *router, peerid_t source_id, BReactor *reactor);
 void DPRelaySource_Free (DPRelaySource *o);
-void DPRelaySource_SubmitFrame (DPRelaySource *o, DPRelaySink *sink, uint8_t *data, int data_len, int num_packets, int inactivity_time);
 void DPRelaySource_PrepareFreeDestinations (DPRelaySource *o);
 
 void DPRelaySink_Init (DPRelaySink *o, peerid_t dest_id, BReactor *reactor);

+ 13 - 5
client/client.c

@@ -172,6 +172,9 @@ LinkedList2 relays;
 // peers than need a relay
 LinkedList2 waiting_relay_peers;
 
+// object for queuing relay packets to relay flows
+DPRelayRouter relay_router;
+
 // server connection
 ServerConnection server;
 
@@ -518,6 +521,12 @@ int main (int argc, char *argv[])
     // init need relay list
     LinkedList2_Init(&waiting_relay_peers);
     
+    // init DPRelayRouter
+    if (!DPRelayRouter_Init(&relay_router, device.mtu, &ss)) {
+        BLog(BLOG_ERROR, "DPRelayRouter_Init failed");
+        goto fail8a;
+    }
+    
     // start connecting to server
     if (!ServerConnection_Init(
         &server, &ss, server_addr, SC_KEEPALIVE_INTERVAL, SERVER_BUFFER_MIN_PACKETS, options.ssl, client_cert, client_key, server_name, NULL,
@@ -563,6 +572,8 @@ int main (int argc, char *argv[])
     
     ServerConnection_Free(&server);
 fail9:
+    DPRelayRouter_Free(&relay_router);
+fail8a:
     FrameDecider_Free(&frame_decider);
 fail8:
     PacketPassFairQueue_Free(&device.output_queue);
@@ -1231,9 +1242,7 @@ int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     PacketPassInterface_Sender_Init(peer->local_recv_if, (PacketPassInterface_handler_done)local_recv_qflow_output_handler_done, peer);
     
     // init relay source
-    if (!DPRelaySource_Init(&peer->relay_source, peer->id, device.mtu, &ss)) {
-        goto fail2;
-    }
+    DPRelaySource_Init(&peer->relay_source, &relay_router, peer->id, &ss);
     
     // init relay sink
     DPRelaySink_Init(&peer->relay_sink, peer->id, &ss);
@@ -1285,7 +1294,6 @@ int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
 fail5:
     DPRelaySink_Free(&peer->relay_sink);
     DPRelaySource_Free(&peer->relay_source);
-fail2:
     PacketPassFairQueueFlow_Free(&peer->local_recv_qflow);
     DataProtoLocalSource_Free(&peer->local_dpflow);
 fail1:
@@ -2154,7 +2162,7 @@ out:
     
     // relay frame
     if (relay_dest) {
-        DPRelaySource_SubmitFrame(&src_peer->relay_source, &relay_dest->relay_sink, data, data_len, options.send_buffer_relay_size, PEER_RELAY_FLOW_INACTIVITY_TIME);
+        DPRelayRouter_SubmitFrame(&relay_router, &src_peer->relay_source, &relay_dest->relay_sink, data, data_len, options.send_buffer_relay_size, PEER_RELAY_FLOW_INACTIVITY_TIME);
     }
     
     // inform DataProto of received packet