Ver Fonte

client: use RouteBuffer's to route frames from the device to peers. Avoids a memcpy of the frame.

ambrop7 há 15 anos atrás
pai
commit
ae621a700a
4 ficheiros alterados com 174 adições e 161 exclusões
  1. 98 74
      client/DataProto.c
  2. 54 38
      client/DataProto.h
  3. 21 47
      client/client.c
  4. 1 2
      client/client.h

+ 98 - 74
client/DataProto.c

@@ -67,7 +67,7 @@ struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDes
     ASSERT(first_frame_len <= dp->frame_mtu)
     ASSERT(!BAVL_LookupExact(&rs->relay_flows_tree, &dp))
     ASSERT(num_packets > 0)
-    ASSERT(!dp->d_freeing)
+    ASSERT(!dp->freeing)
     
     // allocate flow structure
     struct dp_relay_flow *flow = malloc(sizeof(struct dp_relay_flow));
@@ -158,7 +158,7 @@ void dealloc_relay_flow (struct dp_relay_flow *flow)
 
 void release_relay_flow (struct dp_relay_flow *flow)
 {
-    ASSERT(!flow->dp->d_freeing)
+    ASSERT(!flow->dp->freeing)
     
     // release it if it's busy
     if (PacketPassFairQueueFlow_IsBusy(&flow->qflow)) {
@@ -171,7 +171,7 @@ void release_relay_flow (struct dp_relay_flow *flow)
 
 void flow_monitor_handler (struct dp_relay_flow *flow)
 {
-    ASSERT(!flow->dp->d_freeing)
+    ASSERT(!flow->dp->freeing)
     
     BLog(BLOG_NOTICE, "relay flow from peer %d to %d timed out", (int)flow->rs->source_id, (int)flow->dp->dest_id);
     
@@ -180,7 +180,7 @@ void flow_monitor_handler (struct dp_relay_flow *flow)
 
 void monitor_handler (DataProtoDest *o)
 {
-    ASSERT(!o->d_freeing)
+    ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
     send_keepalive(o);
@@ -188,7 +188,7 @@ void monitor_handler (DataProtoDest *o)
 
 void send_keepalive (DataProtoDest *o)
 {
-    ASSERT(!o->d_freeing)
+    ASSERT(!o->freeing)
     
     BLog(BLOG_DEBUG, "sending keepalive to peer %d", (int)o->dest_id);
     
@@ -241,7 +241,7 @@ int pointer_comparator (void *user, void **val1, void **val2)
 
 void keepalive_job_handler (DataProtoDest *o)
 {
-    ASSERT(!o->d_freeing)
+    ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
     send_keepalive(o);
@@ -287,6 +287,21 @@ void submit_relay_frame (struct dp_relay_flow *flow, uint8_t *frame, int frame_l
     BufferWriter_EndPacket(&flow->ainput, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + frame_len);
 }
 
+static void device_router_handler (DataProtoDevice *o, uint8_t *buf, int recv_len)
+{
+    ASSERT(recv_len >= 0)
+    ASSERT(recv_len <= o->frame_mtu)
+    DebugObject_Access(&o->d_obj);
+    
+    // remember packet
+    o->current_buf = buf;
+    o->current_recv_len = recv_len;
+    
+    // call handler
+    o->handler(o->user, buf + DATAPROTO_MAX_OVERHEAD, recv_len);
+    return;
+}
+
 int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoDest_handler handler, void *user)
 {
     ASSERT(PacketPassInterface_HasCancel(output))
@@ -342,12 +357,14 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, P
     // init relay flows list
     LinkedList2_Init(&o->relay_flows_list);
     
+    // set not freeing
+    o->freeing = 0;
+    
     DebugCounter_Init(&o->flows_counter);
     DebugObject_Init(&o->d_obj);
     
     #ifndef NDEBUG
     o->d_output = output;
-    o->d_freeing = 0;
     #endif
     
     return 1;
@@ -413,9 +430,8 @@ void DataProtoDest_PrepareFree (DataProtoDest *o)
     // allow freeing queue flows
     PacketPassFairQueue_PrepareFree(&o->queue);
     
-    #ifndef NDEBUG
-    o->d_freeing = 1;
-    #endif
+    // set freeing
+    o->freeing = 1;
 }
 
 void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs, uint8_t *data, int data_len, int buffer_num_packets)
@@ -423,7 +439,7 @@ void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs,
     ASSERT(data_len >= 0)
     ASSERT(data_len <= o->frame_mtu)
     ASSERT(buffer_num_packets > 0)
-    ASSERT(!o->d_freeing)
+    ASSERT(!o->freeing)
     DebugObject_Access(&rs->d_obj);
     DebugObject_Access(&o->d_obj);
     
@@ -445,7 +461,7 @@ void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs,
 void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)
 {
     ASSERT(peer_receiving == 0 || peer_receiving == 1)
-    ASSERT(!o->d_freeing)
+    ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
     int prev_up = o->up;
@@ -470,29 +486,56 @@ void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)
     }
 }
 
-int DataProtoLocalSource_Init (DataProtoLocalSource *o, int frame_mtu, peerid_t source_id, peerid_t dest_id, int num_packets, BReactor *reactor)
+int DataProtoDevice_Init (DataProtoDevice *o, PacketRecvInterface *input, DataProtoDevice_handler handler, void *user, BReactor *reactor)
+{
+    ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
+    
+    // init arguments
+    o->handler = handler;
+    o->user = user;
+    o->reactor = reactor;
+    
+    // remember frame MTU
+    o->frame_mtu = PacketRecvInterface_GetMTU(input);
+    
+    // init router
+    if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)device_router_handler, o, BReactor_PendingGroup(reactor))) {
+        goto fail1;
+    }
+    
+    DebugObject_Init(&o->d_obj);
+    DebugCounter_Init(&o->d_ctr);
+    
+    return 1;
+    
+fail1:
+    return 0;
+}
+
+void DataProtoDevice_Free (DataProtoDevice *o)
+{
+    DebugCounter_Free(&o->d_ctr);
+    DebugObject_Free(&o->d_obj);
+    
+    // free router
+    PacketRouter_Free(&o->router);
+}
+
+int DataProtoLocalSource_Init (DataProtoLocalSource *o, DataProtoDevice *device, peerid_t source_id, peerid_t dest_id, int num_packets)
 {
-    ASSERT(frame_mtu >= 0)
-    ASSERT(frame_mtu <= INT_MAX - (sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id)))
     ASSERT(num_packets > 0)
     
     // init arguments
-    o->frame_mtu = frame_mtu;
+    o->device = device;
     o->source_id = source_id;
     o->dest_id = dest_id;
     
-    // calculate packet MTU
-    int packet_mtu = o->frame_mtu + sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id);
-    
     // init connector
-    PacketPassConnector_Init(&o->connector, packet_mtu, BReactor_PendingGroup(reactor));
-    
-    // init async input
-    BufferWriter_Init(&o->ainput, packet_mtu, BReactor_PendingGroup(reactor));
+    PacketPassConnector_Init(&o->connector, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, BReactor_PendingGroup(device->reactor));
     
-    // init buffer
-    if (!PacketBuffer_Init(&o->buffer, BufferWriter_GetOutput(&o->ainput), PacketPassConnector_GetInput(&o->connector), num_packets, BReactor_PendingGroup(reactor))) {
-        BLog(BLOG_ERROR, "PacketBuffer_Init failed");
+    // init route buffer
+    if (!RouteBuffer_Init(&o->rbuf, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, PacketPassConnector_GetInput(&o->connector), num_packets)) {
+        BLog(BLOG_ERROR, "RouteBuffer_Init failed");
         goto fail1;
     }
     
@@ -500,11 +543,11 @@ int DataProtoLocalSource_Init (DataProtoLocalSource *o, int frame_mtu, peerid_t
     o->dp = NULL;
     
     DebugObject_Init(&o->d_obj);
+    DebugCounter_Increment(&device->d_ctr);
     
     return 1;
     
 fail1:
-    BufferWriter_Free(&o->ainput);
     PacketPassConnector_Free(&o->connector);
 fail0:
     return 0;
@@ -513,57 +556,54 @@ fail0:
 void DataProtoLocalSource_Free (DataProtoLocalSource *o)
 {
     ASSERT(!o->dp)
+    DebugCounter_Decrement(&o->device->d_ctr);
     DebugObject_Free(&o->d_obj);
     
-    // free buffer
-    PacketBuffer_Free(&o->buffer);
-    
-    // free async input
-    BufferWriter_Free(&o->ainput);
+    // free route buffer
+    RouteBuffer_Free(&o->rbuf);
     
     // free connector
     PacketPassConnector_Free(&o->connector);
 }
 
-void DataProtoLocalSource_SubmitFrame (DataProtoLocalSource *o, uint8_t *data, int data_len)
+void DataProtoLocalSource_Route (DataProtoLocalSource *o, int more)
 {
-    ASSERT(data_len >= 0)
-    ASSERT(data_len <= o->frame_mtu)
+    ASSERT(more == 0 || more == 1)
+    PacketRouter_AssertRoute(&o->device->router);
     if (o->dp) {
-        ASSERT(!o->d_dp_released)
-        ASSERT(!o->dp->d_freeing)
+        ASSERT(!o->dp->freeing)
     }
     DebugObject_Access(&o->d_obj);
     
-    // get a buffer
-    uint8_t *out;
-    // safe because of PacketBufferAsyncInput
-    if (!BufferWriter_StartPacket(&o->ainput, &out)) {
-        BLog(BLOG_NOTICE, "out of buffer for frame from peer %d to %d", (int)o->source_id, (int)o->dest_id);
-        return;
-    }
-    
     // write header
-    struct dataproto_header *header = (struct dataproto_header *)out;
+    struct dataproto_header *header = (struct dataproto_header *)o->device->current_buf;
     // don't set flags, it will be set in notifier_handler
     header->from_id = htol16(o->source_id);
     header->num_peer_ids = htol16(1);
-    struct dataproto_peer_id *id = (struct dataproto_peer_id *)(out + sizeof(struct dataproto_header));
+    struct dataproto_peer_id *id = (struct dataproto_peer_id *)(header + 1);
     id->id = htol16(o->dest_id);
     
-    // write data
-    memcpy(out + sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id), data, data_len);
+    // route
+    uint8_t *next_buf;
+    if (!PacketRouter_Route(
+        &o->device->router, DATAPROTO_MAX_OVERHEAD + o->device->current_recv_len, &o->rbuf,
+        (more ? &next_buf : NULL), DATAPROTO_MAX_OVERHEAD, (more ? o->device->current_recv_len : 0)
+    )) {
+        BLog(BLOG_NOTICE, "out of buffer for frame from peer %d to %d", (int)o->source_id, (int)o->dest_id);
+        return;
+    }
     
-    // submit it
-    BufferWriter_EndPacket(&o->ainput, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + data_len);
+    if (more) {
+        o->device->current_buf = next_buf;
+    }
 }
 
 void DataProtoLocalSource_Attach (DataProtoLocalSource *o, DataProtoDest *dp)
 {
     ASSERT(dp)
     ASSERT(!o->dp)
-    ASSERT(o->frame_mtu <= dp->frame_mtu)
-    ASSERT(!dp->d_freeing)
+    ASSERT(o->device->frame_mtu <= dp->frame_mtu)
+    ASSERT(!dp->freeing)
     DebugObject_Access(&o->d_obj);
     DebugObject_Access(&dp->d_obj);
     
@@ -578,38 +618,22 @@ void DataProtoLocalSource_Attach (DataProtoLocalSource *o, DataProtoDest *dp)
     
     // increment flows counter
     DebugCounter_Increment(&dp->flows_counter);
-    
-    #ifndef NDEBUG
-    o->d_dp_released = 0;
-    #endif
-}
-
-void DataProtoLocalSource_Release (DataProtoLocalSource *o)
-{
-    ASSERT(o->dp)
-    ASSERT(!o->d_dp_released)
-    ASSERT(!o->dp->d_freeing)
-    DebugObject_Access(&o->d_obj);
-    
-    if (PacketPassFairQueueFlow_IsBusy(&o->dp_qflow)) {
-        PacketPassFairQueueFlow_Release(&o->dp_qflow);
-    }
-    
-    #ifndef NDEBUG
-    o->d_dp_released = 1;
-    #endif
 }
 
 void DataProtoLocalSource_Detach (DataProtoLocalSource *o)
 {
     #ifndef NDEBUG
     ASSERT(o->dp)
-    ASSERT(o->d_dp_released || o->dp->d_freeing)
     #endif
     DebugObject_Access(&o->d_obj);
     
     DataProtoDest *dp = o->dp;
     
+    // release flow if needed
+    if (!o->dp->freeing && PacketPassFairQueueFlow_IsBusy(&o->dp_qflow)) {
+        PacketPassFairQueueFlow_Release(&o->dp_qflow);
+    }
+    
     // decrement flows counter
     DebugCounter_Decrement(&dp->flows_counter);
     

+ 54 - 38
client/DataProto.h

@@ -46,8 +46,10 @@
 #include <flow/BufferWriter.h>
 #include <flow/PacketBuffer.h>
 #include <flow/PacketPassConnector.h>
+#include <flow/PacketRouter.h>
 
 typedef void (*DataProtoDest_handler) (void *user, int up);
+typedef void (*DataProtoDevice_handler) (void *user, const uint8_t *frame, int frame_len);
 
 struct dp_relay_flow;
 
@@ -73,31 +75,43 @@ typedef struct {
     void *user;
     LinkedList2 relay_flows_list;
     BPending keepalive_job;
+    int freeing;
     DebugCounter flows_counter;
     DebugObject d_obj;
     #ifndef NDEBUG
     PacketPassInterface *d_output;
-    int d_freeing;
     #endif
 } DataProtoDest;
 
+/**
+ * Object that receives frames from a device and routes
+ * them to buffers in {@link DataProtoLocalSource} objects.
+ */
+typedef struct {
+    DataProtoDevice_handler handler;
+    void *user;
+    BReactor *reactor;
+    int frame_mtu;
+    PacketRouter router;
+    uint8_t *current_buf;
+    int current_recv_len;
+    DebugObject d_obj;
+    DebugCounter d_ctr;
+} DataProtoDevice;
+
 /**
  * Local frame source.
  * Buffers frames received from the TAP device, addressed to a particular peer.
  */
 typedef struct {
-    int frame_mtu;
+    DataProtoDevice *device;
     peerid_t source_id;
     peerid_t dest_id;
-    BufferWriter ainput;
-    PacketBuffer buffer;
+    RouteBuffer rbuf;
     PacketPassConnector connector;
     DataProtoDest *dp;
     PacketPassFairQueueFlow dp_qflow;
     DebugObject d_obj;
-    #ifndef NDEBUG
-    int d_dp_released;
-    #endif
 } DataProtoLocalSource;
 
 /**
@@ -189,22 +203,40 @@ void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs,
  */
 void DataProtoDest_Received (DataProtoDest *o, int peer_receiving);
 
+/**
+ * Initiazes the object.
+ * 
+ * @param o the object
+ * @param input device input. Its input MTU must be <= INT_MAX - DATAPROTO_MAX_OVERHEAD.
+ * @param handler handler called when a packet arrives to allow the user to route it to
+ *                appropriate {@link DataProtoLocalSource} objects.
+ * @param user value passed to handler
+ * @param reactor reactor we live in
+ * @return 1 on success, 0 on failure
+ */
+int DataProtoDevice_Init (DataProtoDevice *o, PacketRecvInterface *input, DataProtoDevice_handler handler, void *user, BReactor *reactor) WARN_UNUSED;
+
+/**
+ * Frees the object.
+ * There must be no {@link DataProtoLocalSource} objects referring to this device.
+ * 
+ * @param o the object
+ */
+void DataProtoDevice_Free (DataProtoDevice *o);
+
 /**
  * Initializes the object.
  * The object is initialized in not attached state.
  * 
  * @param o the object
- * @param frame_mtu maximum frame size. Must be >=0.
- *                  Must be <= INT_MAX - (sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id)).
- * @param source_id ID of the peer from which the frames submitted to this object originate from,
- *                  i.e. our ID
- * @param dest_id ID of the peer to which the frames are to be delivered to
+ * @param device device to receive frames from
+ * @param source_id source peer ID to encode in the headers (i.e. our ID)
+ * @param dest_id destination peer ID to encode in the headers (i.e. ID if the peer this
+ *                object belongs to)
  * @param num_packets number of packets the buffer should hold. Must be >0.
- * @param reactor reactor we live in. Must be the same as with all destinations this
- *                source will be attached to.
  * @return 1 on success, 0 on failure
  */
-int DataProtoLocalSource_Init (DataProtoLocalSource *o, int frame_mtu, peerid_t source_id, peerid_t dest_id, int num_packets, BReactor *reactor) WARN_UNUSED;
+int DataProtoLocalSource_Init (DataProtoLocalSource *o, DataProtoDevice *device, peerid_t source_id, peerid_t dest_id, int num_packets) WARN_UNUSED;
 
 /**
  * Frees the object.
@@ -215,23 +247,20 @@ int DataProtoLocalSource_Init (DataProtoLocalSource *o, int frame_mtu, peerid_t
 void DataProtoLocalSource_Free (DataProtoLocalSource *o);
 
 /**
- * Submits a frame.
- * If the object is in attached state:
- * - The object must be in not released state.
- * - The destination must not be in freeing state.
- * - Must not be called from destination's output Send calls.
- * - May invoke the destination's output I/O.
+ * Routes a frame from the device to this object.
+ * Must be called from within the job context of the {@link DataProtoDevice_handler} handler.
+ * Must not be called after this has been called with more=0 for the current frame.
  * 
  * @param o the object
- * @param data frame data
- * @param data_len frame length. Must be >=0 and <=frame_mtu.
+ * @param more whether the current frame may have to be routed to more
+ *             objects. If 0, must not be called again until the handler is
+ *             called for the next frame. Must be 0 or 1.
  */
-void DataProtoLocalSource_SubmitFrame (DataProtoLocalSource *o, uint8_t *data, int data_len);
+void DataProtoLocalSource_Route (DataProtoLocalSource *o, int more);
 
 /**
  * Attaches the object to a destination.
  * The object must be in not attached state.
- * The object enters attached and not released state.
  * 
  * @param o the object
  * @param dp destination to attach to. This object's frame_mtu must be <= destination's
@@ -239,22 +268,9 @@ void DataProtoLocalSource_SubmitFrame (DataProtoLocalSource *o, uint8_t *data, i
  */
 void DataProtoLocalSource_Attach (DataProtoLocalSource *o, DataProtoDest *dp);
 
-/**
- * Releases the object to allow detaching it from the destination.
- * The object must be in attached and not released state.
- * The destination must not be in freeing state.
- * The object enters attached and released state.
- * Must not be called from destination's output Send calls.
- * May invoke the destination's output Cancel call.
- * 
- * @param o the object
- */
-void DataProtoLocalSource_Release (DataProtoLocalSource *o);
-
 /**
  * Detaches the object from a destination.
  * The object must be in attached state.
- * Either the object must be in released state, or the destination must be in freeing state.
  * Unless the destination is in freeing state, must not be called from destination's
  * output Send calls.
  * 

+ 21 - 47
client/client.c

@@ -240,11 +240,7 @@ static void peer_dealloc_relay_provider (struct peer_data *peer);
 static void peer_install_relaying (struct peer_data *peer, struct peer_data *relay);
 
 // uninstall relaying for a peer
-static void peer_uninstall_relaying (struct peer_data *peer);
-
-// deallocates relaying for a peer. Used when the relay is beeing freed,
-// and when uninstalling relaying after having released the connection.
-static void peer_dealloc_relaying (struct peer_data *peer);
+static void peer_free_relaying (struct peer_data *peer);
 
 // handle a peer that needs a relay
 static void peer_need_relay (struct peer_data *peer);
@@ -312,8 +308,8 @@ static struct peer_data * find_peer_by_id (peerid_t id);
 // device error handler
 static void device_error_handler (void *unused);
 
-// PacketPassInterfacre handler for packets from the device
-static void device_input_handler_send (void *unused, uint8_t *data, int data_len);
+// DataProtoDevice handler for packets from the device
+static void device_input_dpd_handler (void *unused, const uint8_t *frame, int frame_len);
 
 // assign relays to clients waiting for them
 static void assign_relays (void);
@@ -490,8 +486,8 @@ int main (int argc, char *argv[])
     BLog(BLOG_INFO, "device MTU is %d", device.mtu);
     
     // init device input
-    PacketPassInterface_Init(&device.input_interface, device.mtu, device_input_handler_send, NULL, BReactor_PendingGroup(&ss));
-    if (!SinglePacketBuffer_Init(&device.input_buffer, BTap_GetOutput(&device.btap), &device.input_interface, BReactor_PendingGroup(&ss))) {
+    if (!DataProtoDevice_Init(&device.input_dpd, BTap_GetOutput(&device.btap), device_input_dpd_handler, NULL, &ss)) {
+        BLog(BLOG_ERROR, "DataProtoDevice_Init failed");
         goto fail5a;
     }
     
@@ -563,11 +559,9 @@ int main (int argc, char *argv[])
     ServerConnection_Free(&server);
 fail10:
     FrameDecider_Free(&frame_decider);
-fail7:
     PacketPassFairQueue_Free(&device.output_queue);
-    SinglePacketBuffer_Free(&device.input_buffer);
+    DataProtoDevice_Free(&device.input_dpd);
 fail5a:
-    PacketPassInterface_Free(&device.input_interface);
     BTap_Free(&device.btap);
 fail5:
     if (options.transport_mode == TRANSPORT_MODE_TCP) {
@@ -1221,7 +1215,7 @@ int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     }
     
     // init local flow
-    if (!DataProtoLocalSource_Init(&peer->local_dpflow, device.mtu, my_id, peer->id, options.send_buffer_size, &ss)) {
+    if (!DataProtoLocalSource_Init(&peer->local_dpflow, &device.input_dpd, my_id, peer->id, options.send_buffer_size)) {
         peer_log(peer, BLOG_ERROR, "DataProtoLocalSource_Init failed");
         goto fail1;
     }
@@ -1294,7 +1288,7 @@ void peer_remove (struct peer_data *peer)
     
     // uninstall relaying
     if (peer->have_relaying) {
-        peer_uninstall_relaying(peer);
+        peer_free_relaying(peer);
     }
     
     // disable relay provider
@@ -1501,7 +1495,7 @@ int peer_new_link (struct peer_data *peer)
         peer_free_link(peer);
     }
     else if (peer->have_relaying) {
-        peer_uninstall_relaying(peer);
+        peer_free_relaying(peer);
     }
     else if (peer->waiting_relay) {
         peer_unregister_need_relay(peer);
@@ -1550,7 +1544,7 @@ void peer_disable_relay_provider (struct peer_data *peer)
         ASSERT(relay_user->relaying_peer == peer)
         
         // disconnect relay user
-        peer_uninstall_relaying(relay_user);
+        peer_free_relaying(relay_user);
         
         // add it to need relay list
         peer_register_need_relay(relay_user);
@@ -1584,7 +1578,7 @@ void peer_dealloc_relay_provider (struct peer_data *peer)
         ASSERT(relay_user->relaying_peer == peer)
         
         // disconnect relay user
-        peer_dealloc_relaying(relay_user);
+        peer_free_relaying(relay_user);
         
         // add it to need relay list
         peer_register_need_relay(relay_user);
@@ -1620,7 +1614,7 @@ void peer_install_relaying (struct peer_data *peer, struct peer_data *relay)
     peer->have_relaying = 1;
 }
 
-void peer_uninstall_relaying (struct peer_data *peer)
+void peer_free_relaying (struct peer_data *peer)
 {
     ASSERT(peer->have_relaying)
     
@@ -1633,24 +1627,6 @@ void peer_uninstall_relaying (struct peer_data *peer)
     
     peer_log(peer, BLOG_INFO, "uninstalling relaying through %d", (int)relay->id);
     
-    // release local flow before detaching it
-    DataProtoLocalSource_Release(&peer->local_dpflow);
-    
-    // link out relay
-    peer_dealloc_relaying(peer);
-}
-
-void peer_dealloc_relaying (struct peer_data *peer)
-{
-    ASSERT(peer->have_relaying)
-    
-    ASSERT(!peer->have_link)
-    ASSERT(!peer->waiting_relay)
-    
-    struct peer_data *relay = peer->relaying_peer;
-    ASSERT(relay->is_relay)
-    ASSERT(relay->have_link)
-    
     // detach local flow from relay
     DataProtoLocalSource_Detach(&peer->local_dpflow);
     
@@ -1669,7 +1645,7 @@ void peer_need_relay (struct peer_data *peer)
     }
     
     if (peer->have_relaying) {
-        peer_uninstall_relaying(peer);
+        peer_free_relaying(peer);
     }
     
     if (peer->waiting_relay) {
@@ -2546,22 +2522,20 @@ void device_error_handler (void *unused)
     return;
 }
 
-void device_input_handler_send (void *unused, uint8_t *data, int data_len)
+void device_input_dpd_handler (void *unused, const uint8_t *frame, int frame_len)
 {
-    ASSERT(data_len >= 0)
-    ASSERT(data_len <= device.mtu)
-    
-    // accept packet
-    PacketPassInterface_Done(&device.input_interface);
+    ASSERT(frame_len >= 0)
     
     // give frame to decider
-    FrameDecider_AnalyzeAndDecide(&frame_decider, data, data_len);
+    FrameDecider_AnalyzeAndDecide(&frame_decider, frame, frame_len);
     
     // forward frame to peers
-    FrameDeciderPeer *decider_peer;
-    while (decider_peer = FrameDecider_NextDestination(&frame_decider)) {
+    FrameDeciderPeer *decider_peer = FrameDecider_NextDestination(&frame_decider);
+    while (decider_peer) {
+        FrameDeciderPeer *next = FrameDecider_NextDestination(&frame_decider);
         struct peer_data *peer = UPPER_OBJECT(decider_peer, struct peer_data, decider_peer);
-        DataProtoLocalSource_SubmitFrame(&peer->local_dpflow, data, data_len);
+        DataProtoLocalSource_Route(&peer->local_dpflow, !!next);
+        decider_peer = next;
     }
 }
 

+ 1 - 2
client/client.h

@@ -85,8 +85,7 @@ struct device_data {
     int mtu;
     
     // input
-    SinglePacketBuffer input_buffer;
-    PacketPassInterface input_interface;
+    DataProtoDevice input_dpd;
     
     // output
     PacketPassFairQueue output_queue;