Просмотр исходного кода

PacketPassInterface: remove Cancel in favour of RequestCancel with non-immediate effect, update
everything to work with it.
client: move processing received packets into DPReceive

ambrop7 15 лет назад
Родитель
Сommit
d32126c2c9

+ 1 - 0
blog_channels.txt

@@ -46,3 +46,4 @@ BSocketPRFileDesc 4
 PacketProtoDecoder 4
 DPRelay 4
 BThreadWork 4
+DPReceive 4

+ 1 - 0
client/CMakeLists.txt

@@ -7,6 +7,7 @@ add_executable(badvpn-client
     PasswordSender.c
     FrameDecider.c
     DPRelay.c
+    DPReceive.c
 )
 target_link_libraries(badvpn-client system flow tuntap server_conection security ${NSPR_LIBRARIES} ${NSS_LIBRARIES})
 

+ 424 - 0
client/DPReceive.c

@@ -0,0 +1,424 @@
+/**
+ * @file DPReceive.c
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <stddef.h>
+#include <limits.h>
+
+#include <protocol/dataproto.h>
+#include <misc/byteorder.h>
+#include <misc/offset.h>
+#include <system/BLog.h>
+
+#include <client/DPReceive.h>
+
+#include <generated/blog_channel_DPReceive.h>
+
+static DPReceivePeer * find_peer (DPReceiveDevice *o, peerid_t id)
+{
+    LinkedList2Node *node = LinkedList2_GetFirst(&o->peers_list);
+    while (node) {
+        DPReceivePeer *p = UPPER_OBJECT(node, DPReceivePeer, list_node);
+        if (p->peer_id == id) {
+            return p;
+        }
+        node = LinkedList2Node_Next(node);
+    }
+    
+    return NULL;
+}
+
+static void receiver_recv_handler_send (DPReceiveReceiver *o, uint8_t *packet, int packet_len)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(o->peer)
+    DPReceivePeer *peer = o->peer;
+    DPReceiveDevice *device = peer->device;
+    ASSERT(packet_len >= 0)
+    ASSERT(packet_len <= device->packet_mtu)
+    
+    uint8_t *data = packet;
+    int data_len = packet_len;
+    
+    int local = 0;
+    DPReceivePeer *src_peer;
+    DPReceivePeer *relay_dest_peer = NULL;
+    
+    // check header
+    if (data_len < sizeof(struct dataproto_header)) {
+        BLog(BLOG_WARNING, "no dataproto header");
+        goto out;
+    }
+    struct dataproto_header *header = (struct dataproto_header *)data;
+    data += sizeof(*header);
+    data_len -= sizeof(*header);
+    uint8_t flags = ltoh8(header->flags);
+    peerid_t from_id = ltoh16(header->from_id);
+    int num_ids = ltoh16(header->num_peer_ids);
+    
+    // check destination ID
+    if (!(num_ids == 0 || num_ids == 1)) {
+        BLog(BLOG_WARNING, "wrong number of destinations");
+        goto out;
+    }
+    peerid_t to_id;
+    if (num_ids == 1) {
+        if (data_len < sizeof(to_id)) {
+            BLog(BLOG_WARNING, "missing destination");
+            goto out;
+        }
+        to_id = ltoh16(*((peerid_t *)data));
+        data += sizeof(to_id);
+        data_len -= sizeof(to_id);
+    }
+    
+    // check remaining data
+    if (data_len > device->device_mtu) {
+        BLog(BLOG_WARNING, "frame too large");
+        goto out;
+    }
+    
+    // inform sink of received packet
+    if (peer->dp_sink) {
+        DataProtoSink_Received(peer->dp_sink, !!(flags & DATAPROTO_FLAGS_RECEIVING_KEEPALIVES));
+    }
+    
+    if (num_ids == 1) {
+        // find source peer
+        if (!(src_peer = find_peer(device, from_id))) {
+            BLog(BLOG_INFO, "source peer %d not known", (int)from_id);
+            goto out;
+        }
+        
+        // is frame for device or another peer?
+        if (device->have_peer_id && to_id == device->peer_id) {
+            // let the frame decider analyze the frame
+            FrameDeciderPeer_Analyze(src_peer->decider_peer, data, data_len);
+            
+            // pass frame to device
+            local = 1;
+        } else {
+            // TODO check permitted
+            if (!peer->is_relay_client) {
+                BLog(BLOG_WARNING, "relaying not allowed");
+                goto out;
+            }
+            
+            // provided source ID must be the peer sending the frame
+            if (src_peer != peer) {
+                BLog(BLOG_WARNING, "relay source must be the sending peer");
+                goto out;
+            }
+            
+            // find destination peer
+            DPReceivePeer *dest_peer = find_peer(device, to_id);
+            if (!dest_peer) {
+                BLog(BLOG_INFO, "relay destination peer not known");
+                goto out;
+            }
+            
+            // destination cannot be source
+            if (dest_peer == src_peer) {
+                BLog(BLOG_WARNING, "relay destination cannot be the source");
+                goto out;
+            }
+            
+            relay_dest_peer = dest_peer;
+        }
+    }
+    
+out:
+    // pass packet to device or accept right away
+    if (local) {
+        PacketPassInterface_Sender_Send(o->qflow_if, data, data_len);
+    } else {
+        PacketPassInterface_Done(&o->recv_if);
+    }
+    
+    // relay frame
+    if (relay_dest_peer) {
+        DPRelayRouter_SubmitFrame(&device->relay_router, &src_peer->relay_source, &relay_dest_peer->relay_sink, data, data_len, device->relay_flow_buffer_size, device->relay_flow_inactivity_time);
+    }
+}
+
+static void receiver_qflow_handler_done (DPReceiveReceiver *o)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(o->peer)
+    
+    PacketPassInterface_Done(&o->recv_if);
+}
+
+static void device_call_forgotten_cb (DPReceiveDevice *o)
+{
+    ASSERT(o->forgotten_receiver)
+    
+    DPReceiveReceiver *r = o->forgotten_receiver;
+    ASSERT(!r->peer)
+    
+    r->forgotten_cb(r->forgotten_user);
+    
+    ASSERT(!o->forgotten_receiver)
+}
+
+static void receiver_qflow_handler_busy (DPReceiveReceiver *o)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(!o->peer)
+    DPReceiveDevice *device = o->device;
+    ASSERT(device->forgotten_receiver == o)
+    
+    device_call_forgotten_cb(device);
+}
+
+int DPReceiveDevice_Init (DPReceiveDevice *o, PacketPassInterface *output, BReactor *reactor, int relay_flow_buffer_size, int relay_flow_inactivity_time)
+{
+    ASSERT(PacketPassInterface_GetMTU(output) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
+    ASSERT(relay_flow_buffer_size > 0)
+    
+    // init arguments
+    o->reactor = reactor;
+    o->relay_flow_buffer_size = relay_flow_buffer_size;
+    o->relay_flow_inactivity_time = relay_flow_inactivity_time;
+    
+    // remember device MTU
+    o->device_mtu = PacketPassInterface_GetMTU(output);
+    
+    // remember packet MTU
+    o->packet_mtu = DATAPROTO_MAX_OVERHEAD + o->device_mtu;
+    
+    // init relay router
+    if (!DPRelayRouter_Init(&o->relay_router, o->device_mtu, o->reactor)) {
+        BLog(BLOG_ERROR, "DPRelayRouter_Init failed");
+        goto fail0;
+    }
+    
+    // init queue
+    PacketPassFairQueue_Init(&o->queue, output, BReactor_PendingGroup(o->reactor), 0, 1);
+    
+    // have no peer ID
+    o->have_peer_id = 0;
+    
+    // set not freeing
+    o->freeing = 0;
+    
+    // init peers list
+    LinkedList2_Init(&o->peers_list);
+    
+    // set no forgotten receiver
+    o->forgotten_receiver = NULL;
+    
+    DebugObject_Init(&o->d_obj);
+    return 1;
+    
+fail0:
+    return 0;
+}
+
+void DPReceiveDevice_Free (DPReceiveDevice *o)
+{
+    ASSERT(!o->forgotten_receiver)
+    ASSERT(LinkedList2_IsEmpty(&o->peers_list))
+    DebugObject_Free(&o->d_obj);
+    
+    // free queue
+    PacketPassFairQueue_Free(&o->queue);
+    
+    // free relay router
+    DPRelayRouter_Free(&o->relay_router);
+}
+
+void DPReceiveDevice_PrepareFree (DPReceiveDevice *o)
+{
+    DebugObject_Access(&o->d_obj);
+    
+    // prepare queue for freeing
+    PacketPassFairQueue_PrepareFree(&o->queue);
+    
+    // set freeing
+    o->freeing = 1;
+    
+    // call callback for forgotten receiver
+    if (o->forgotten_receiver) {
+        device_call_forgotten_cb(o);
+    }
+}
+
+void DPReceiveDevice_SetPeerID (DPReceiveDevice *o, peerid_t peer_id)
+{
+    DebugObject_Access(&o->d_obj);
+    
+    // remember peer ID
+    o->peer_id = peer_id;
+    o->have_peer_id = 1;
+}
+
+void DPReceivePeer_Init (DPReceivePeer *o, DPReceiveDevice *device, peerid_t peer_id, FrameDeciderPeer *decider_peer, int is_relay_client)
+{
+    ASSERT(is_relay_client == 0 || is_relay_client == 1)
+    DebugObject_Access(&device->d_obj);
+    
+    // init arguments
+    o->device = device;
+    o->peer_id = peer_id;
+    o->decider_peer = decider_peer;
+    o->is_relay_client = is_relay_client;
+    
+    // init relay source
+    DPRelaySource_Init(&o->relay_source, &device->relay_router, o->peer_id, device->reactor);
+    
+    // init relay sink
+    DPRelaySink_Init(&o->relay_sink, o->peer_id);
+    
+    // have no sink
+    o->dp_sink = NULL;
+    
+    // insert to peers list
+    LinkedList2_Append(&device->peers_list, &o->list_node);
+    
+    DebugObject_Init(&o->d_obj);
+    DebugCounter_Init(&o->d_receivers_ctr);
+}
+
+void DPReceivePeer_Free (DPReceivePeer *o)
+{
+    DPReceiveDevice *device = o->device;
+    ASSERT(!o->dp_sink)
+    DebugCounter_Free(&o->d_receivers_ctr);
+    DebugObject_Free(&o->d_obj);
+    
+    // remove from peers list
+    LinkedList2_Remove(&device->peers_list, &o->list_node);
+    
+    // free relay sink
+    DPRelaySink_Free(&o->relay_sink);
+    
+    // free relay source
+    DPRelaySource_Free(&o->relay_source);
+}
+
+void DPReceivePeer_AttachSink (DPReceivePeer *o, DataProtoSink *dp_sink)
+{
+    ASSERT(dp_sink)
+    ASSERT(!o->dp_sink)
+    DebugObject_Access(&o->d_obj);
+    
+    // attach relay sink
+    DPRelaySink_Attach(&o->relay_sink, dp_sink);
+    
+    o->dp_sink = dp_sink;
+}
+
+void DPReceivePeer_DetachSink (DPReceivePeer *o)
+{
+    ASSERT(o->dp_sink)
+    DebugObject_Access(&o->d_obj);
+    
+    // detach relay sink
+    DPRelaySink_Detach(&o->relay_sink);
+    
+    o->dp_sink = NULL;
+}
+
+void DPReceiveReceiver_Init (DPReceiveReceiver *o, DPReceivePeer *peer)
+{
+    DebugObject_Access(&peer->d_obj);
+    DPReceiveDevice *device = peer->device;
+    
+    // remember peer
+    o->peer = peer;
+    
+    // remember device
+    o->device = device;
+    
+    // init queue flow
+    PacketPassFairQueueFlow_Init(&o->qflow, &device->queue);
+    o->qflow_if = PacketPassFairQueueFlow_GetInput(&o->qflow);
+    PacketPassInterface_Sender_Init(o->qflow_if, (PacketPassInterface_handler_done)receiver_qflow_handler_done, o);
+    
+    // init receive interface
+    PacketPassInterface_Init(&o->recv_if, device->packet_mtu, (PacketPassInterface_handler_send)receiver_recv_handler_send, o, BReactor_PendingGroup(device->reactor));
+    
+    // increment peer's receivers counter
+    DebugCounter_Increment(&peer->d_receivers_ctr);
+    
+    DebugObject_Init(&o->d_obj);
+}
+
+void DPReceiveReceiver_Free (DPReceiveReceiver *o)
+{
+    DebugObject_Free(&o->d_obj);
+    PacketPassFairQueueFlow_AssertFree(&o->qflow);
+    
+    if (o->peer) {
+        // decrement peer's receivers counter
+        DebugCounter_Decrement(&o->peer->d_receivers_ctr);
+    } else {
+        // clear forgotten receiver reference in the device
+        ASSERT(o->device->forgotten_receiver == o)
+        o->device->forgotten_receiver = NULL;
+    }
+    
+    // free receive interface
+    PacketPassInterface_Free(&o->recv_if);
+    
+    // free queue flow
+    PacketPassFairQueueFlow_Free(&o->qflow);
+}
+
+PacketPassInterface * DPReceiveReceiver_GetInput (DPReceiveReceiver *o)
+{
+    DebugObject_Access(&o->d_obj);
+    
+    return &o->recv_if;
+}
+
+int DPReceiveReceiver_IsBusy (DPReceiveReceiver *o)
+{
+    DebugObject_Access(&o->d_obj);
+    
+    return (o->device->freeing ? 0 : PacketPassFairQueueFlow_IsBusy(&o->qflow));
+}
+
+void DPReceiveReceiver_Forget (DPReceiveReceiver *o, DPReceiveReceiver_forgotten_cb forgotten_cb, void *user)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(o->peer)
+    ASSERT(!o->device->freeing)
+    ASSERT(PacketPassFairQueueFlow_IsBusy(&o->qflow))
+    
+    // decrement peer's receivers counter
+    DebugCounter_Decrement(&o->peer->d_receivers_ctr);
+    
+    // add forgotten receiver reference in the device
+    ASSERT(!o->device->forgotten_receiver)
+    o->device->forgotten_receiver = o;
+    
+    // set queue flow's busy handler
+    PacketPassFairQueueFlow_SetBusyHandler(&o->qflow, (PacketPassFairQueue_handler_busy)receiver_qflow_handler_busy, o);
+    
+    // remember callback
+    o->forgotten_cb = forgotten_cb;
+    o->forgotten_user = user;
+    
+    // forget peer
+    o->peer = NULL;
+}

+ 100 - 0
client/DPReceive.h

@@ -0,0 +1,100 @@
+/**
+ * @file DPReceive.h
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ * 
+ * @section DESCRIPTION
+ * 
+ * Receive processing for the VPN client.
+ */
+
+#ifndef BADVPN_CLIENT_DPRECEIVE_H
+#define BADVPN_CLIENT_DPRECEIVE_H
+
+#include <protocol/scproto.h>
+#include <misc/debugcounter.h>
+#include <misc/debug.h>
+#include <structure/LinkedList2.h>
+#include <system/DebugObject.h>
+#include <flow/PacketPassFairQueue.h>
+#include <client/DataProto.h>
+#include <client/DPRelay.h>
+#include <client/FrameDecider.h>
+
+typedef void (*DPReceiveReceiver_forgotten_cb) (void *user);
+
+struct DPReceiveReceiver_s;
+
+typedef struct {
+    BReactor *reactor;
+    int relay_flow_buffer_size;
+    int relay_flow_inactivity_time;
+    int device_mtu;
+    int packet_mtu;
+    DPRelayRouter relay_router;
+    PacketPassFairQueue queue;
+    int have_peer_id;
+    peerid_t peer_id;
+    int freeing;
+    LinkedList2 peers_list;
+    struct DPReceiveReceiver_s *forgotten_receiver;
+    DebugObject d_obj;
+} DPReceiveDevice;
+
+typedef struct {
+    DPReceiveDevice *device;
+    peerid_t peer_id;
+    FrameDeciderPeer *decider_peer;
+    int is_relay_client;
+    DPRelaySource relay_source;
+    DPRelaySink relay_sink;
+    DataProtoSink *dp_sink;
+    LinkedList2Node list_node;
+    DebugObject d_obj;
+    DebugCounter d_receivers_ctr;
+} DPReceivePeer;
+
+typedef struct DPReceiveReceiver_s {
+    DPReceivePeer *peer;
+    DPReceiveDevice *device;
+    PacketPassFairQueueFlow qflow;
+    PacketPassInterface *qflow_if;
+    PacketPassInterface recv_if;
+    DPReceiveReceiver_forgotten_cb forgotten_cb;
+    void *forgotten_user;
+    DebugObject d_obj;
+} DPReceiveReceiver;
+
+int DPReceiveDevice_Init (DPReceiveDevice *o, PacketPassInterface *output, BReactor *reactor, int relay_flow_buffer_size, int relay_flow_inactivity_time) WARN_UNUSED;
+void DPReceiveDevice_Free (DPReceiveDevice *o);
+void DPReceiveDevice_PrepareFree (DPReceiveDevice *o);
+void DPReceiveDevice_SetPeerID (DPReceiveDevice *o, peerid_t peer_id);
+
+void DPReceivePeer_Init (DPReceivePeer *o, DPReceiveDevice *device, peerid_t peer_id, FrameDeciderPeer *decider_peer, int is_relay_client);
+void DPReceivePeer_Free (DPReceivePeer *o);
+void DPReceivePeer_AttachSink (DPReceivePeer *o, DataProtoSink *dp_sink);
+void DPReceivePeer_DetachSink (DPReceivePeer *o);
+
+void DPReceiveReceiver_Init (DPReceiveReceiver *o, DPReceivePeer *peer);
+void DPReceiveReceiver_Free (DPReceiveReceiver *o);
+PacketPassInterface * DPReceiveReceiver_GetInput (DPReceiveReceiver *o);
+int DPReceiveReceiver_IsBusy (DPReceiveReceiver *o);
+void DPReceiveReceiver_Forget (DPReceiveReceiver *o, DPReceiveReceiver_forgotten_cb forgotten_cb, void *user);
+
+#endif

+ 0 - 14
client/DPRelay.c

@@ -243,20 +243,6 @@ void DPRelaySource_Free (DPRelaySource *o)
     }
 }
 
-void DPRelaySource_PrepareFreeDestinations (DPRelaySource *o)
-{
-    DebugObject_Access(&o->d_obj);
-    
-    LinkedList1Node *node = LinkedList1_GetFirst(&o->flows_list);
-    while (node) {
-        struct DPRelay_flow *flow = UPPER_OBJECT(node, struct DPRelay_flow, src_list_node);
-        if (flow->sink->dest) {
-            DataProtoSink_PrepareFree(flow->sink->dest);
-        }
-        node = LinkedList1Node_Next(node);
-    }
-}
-
 void DPRelaySink_Init (DPRelaySink *o, peerid_t dest_id)
 {
     // init arguments

+ 0 - 1
client/DPRelay.h

@@ -73,7 +73,6 @@ void DPRelayRouter_SubmitFrame (DPRelayRouter *o, DPRelaySource *src, DPRelaySin
 
 void DPRelaySource_Init (DPRelaySource *o, DPRelayRouter *router, peerid_t source_id, BReactor *reactor);
 void DPRelaySource_Free (DPRelaySource *o);
-void DPRelaySource_PrepareFreeDestinations (DPRelaySource *o);
 
 void DPRelaySink_Init (DPRelaySink *o, peerid_t dest_id);
 void DPRelaySink_Free (DPRelaySink *o);

+ 193 - 68
client/DataProto.c

@@ -40,10 +40,15 @@ static void receive_timer_handler (DataProtoSink *o);
 static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
 static void keepalive_job_handler (DataProtoSink *o);
 static void up_job_handler (DataProtoSink *o);
+static void flow_buffer_free (struct DataProtoFlow_buffer *b);
+static void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *dp);
+static void flow_buffer_detach (struct DataProtoFlow_buffer *b);
+static void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b);
+static void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b);
+static void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b);
 
 void monitor_handler (DataProtoSink *o)
 {
-    ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
     send_keepalive(o);
@@ -51,8 +56,6 @@ void monitor_handler (DataProtoSink *o)
 
 void send_keepalive (DataProtoSink *o)
 {
-    ASSERT(!o->freeing)
-    
     PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
 }
 
@@ -92,7 +95,6 @@ void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len)
 
 void keepalive_job_handler (DataProtoSink *o)
 {
-    ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
     send_keepalive(o);
@@ -101,7 +103,6 @@ void keepalive_job_handler (DataProtoSink *o)
 void up_job_handler (DataProtoSink *o)
 {
     ASSERT(o->up != o->up_report)
-    ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
     o->up_report = o->up;
@@ -110,7 +111,7 @@ void up_job_handler (DataProtoSink *o)
     return;
 }
 
-static void device_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
+void device_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
 {
     ASSERT(buf)
     ASSERT(recv_len >= 0)
@@ -126,6 +127,102 @@ static void device_router_handler (DataProtoSource *o, uint8_t *buf, int recv_le
     return;
 }
 
+void flow_buffer_free (struct DataProtoFlow_buffer *b)
+{
+    ASSERT(!b->dp)
+    
+    // free route buffer
+    RouteBuffer_Free(&b->rbuf);
+    
+    // free inactivity monitor
+    if (b->inactivity_time >= 0) {
+        PacketPassInactivityMonitor_Free(&b->monitor);
+    }
+    
+    // free connector
+    PacketPassConnector_Free(&b->connector);
+    
+    // free buffer structure
+    free(b);
+}
+
+void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *dp)
+{
+    ASSERT(!b->dp)
+    
+    // init queue flow
+    PacketPassFairQueueFlow_Init(&b->dp_qflow, &dp->queue);
+    
+    // connect to queue flow
+    PacketPassConnector_ConnectOutput(&b->connector, PacketPassFairQueueFlow_GetInput(&b->dp_qflow));
+    
+    // set DataProto
+    b->dp = dp;
+}
+
+void flow_buffer_detach (struct DataProtoFlow_buffer *b)
+{
+    ASSERT(b->dp)
+    PacketPassFairQueueFlow_AssertFree(&b->dp_qflow);
+    
+    // disconnect from queue flow
+    PacketPassConnector_DisconnectOutput(&b->connector);
+    
+    // free queue flow
+    PacketPassFairQueueFlow_Free(&b->dp_qflow);
+    
+    // clear reference to this buffer in the sink
+    if (b->dp->detaching_buffer == b) {
+        b->dp->detaching_buffer = NULL;
+    }
+    
+    // set no DataProto
+    b->dp = NULL;
+}
+
+void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b)
+{
+    ASSERT(b->dp)
+    ASSERT(PacketPassFairQueueFlow_IsBusy(&b->dp_qflow))
+    ASSERT(!b->dp->detaching_buffer || b->dp->detaching_buffer == b)
+    
+    if (b->dp->detaching_buffer == b) {
+        return;
+    }
+    
+    // request cancel
+    PacketPassFairQueueFlow_RequestCancel(&b->dp_qflow);
+    
+    // set busy handler
+    PacketPassFairQueueFlow_SetBusyHandler(&b->dp_qflow, (PacketPassFairQueue_handler_busy)flow_buffer_qflow_handler_busy, b);
+    
+    // remember this buffer in the sink so it can handle us if it goes away
+    b->dp->detaching_buffer = b;
+}
+
+void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b)
+{
+    ASSERT(b->dp)
+    ASSERT(b->dp->detaching_buffer == b)
+    PacketPassFairQueueFlow_AssertFree(&b->dp_qflow);
+    
+    // detach
+    flow_buffer_detach(b);
+    
+    if (!b->flow) {
+        // free
+        flow_buffer_free(b);
+    } else if (b->flow->dp_desired) {
+        // attach
+        flow_buffer_attach(b, b->flow->dp_desired);
+    }
+}
+
+void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b)
+{
+    flow_buffer_finish_detach(b);
+}
+
 int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoSink_handler handler, void *user)
 {
     ASSERT(PacketPassInterface_HasCancel(output))
@@ -178,8 +275,8 @@ int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface
     o->up = 0;
     o->up_report = 0;
     
-    // set not freeing
-    o->freeing = 0;
+    // set no detaching buffer
+    o->detaching_buffer = NULL;
     
     DebugCounter_Init(&o->d_ctr);
     DebugObject_Init(&o->d_obj);
@@ -202,12 +299,18 @@ void DataProtoSink_Free (DataProtoSink *o)
     DebugCounter_Free(&o->d_ctr);
     DebugObject_Free(&o->d_obj);
     
-    // free handler job
-    BPending_Free(&o->up_job);
-    
     // allow freeing queue flows
     PacketPassFairQueue_PrepareFree(&o->queue);
     
+    // release detaching buffer
+    if (o->detaching_buffer) {
+        ASSERT(!o->detaching_buffer->flow || o->detaching_buffer->flow->dp_desired != o)
+        flow_buffer_finish_detach(o->detaching_buffer);
+    }
+    
+    // free handler job
+    BPending_Free(&o->up_job);
+    
     // free receive timer
     BReactor_RemoveTimer(o->reactor, &o->receive_timer);
     
@@ -236,21 +339,9 @@ void DataProtoSink_Free (DataProtoSink *o)
     BPending_Free(&o->keepalive_job);
 }
 
-void DataProtoSink_PrepareFree (DataProtoSink *o)
-{
-    DebugObject_Access(&o->d_obj);
-    
-    // allow freeing queue flows
-    PacketPassFairQueue_PrepareFree(&o->queue);
-    
-    // set freeing
-    o->freeing = 1;
-}
-
 void DataProtoSink_Received (DataProtoSink *o, int peer_receiving)
 {
     ASSERT(peer_receiving == 0 || peer_receiving == 1)
-    ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
     // reset receive timer
@@ -315,26 +406,42 @@ int DataProtoFlow_Init (
     o->device = device;
     o->source_id = source_id;
     o->dest_id = dest_id;
-    o->inactivity_time = inactivity_time;
+    
+    // set no desired sink
+    o->dp_desired = NULL;
+    
+    // allocate buffer structure
+    struct DataProtoFlow_buffer *b = malloc(sizeof(*b));
+    if (!b) {
+        BLog(BLOG_ERROR, "malloc failed");
+        goto fail0;
+    }
+    o->b = b;
+    
+    // set parent
+    b->flow = o;
+    
+    // remember inactivity time
+    b->inactivity_time = inactivity_time;
     
     // init connector
-    PacketPassConnector_Init(&o->connector, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, BReactor_PendingGroup(device->reactor));
+    PacketPassConnector_Init(&b->connector, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, BReactor_PendingGroup(device->reactor));
     
     // init inactivity monitor
-    PacketPassInterface *buf_out = PacketPassConnector_GetInput(&o->connector);
-    if (o->inactivity_time >= 0) {
-        PacketPassInactivityMonitor_Init(&o->monitor, buf_out, device->reactor, o->inactivity_time, handler_inactivity, user);
-        buf_out = PacketPassInactivityMonitor_GetInput(&o->monitor);
+    PacketPassInterface *buf_out = PacketPassConnector_GetInput(&b->connector);
+    if (b->inactivity_time >= 0) {
+        PacketPassInactivityMonitor_Init(&b->monitor, buf_out, device->reactor, b->inactivity_time, handler_inactivity, user);
+        buf_out = PacketPassInactivityMonitor_GetInput(&b->monitor);
     }
     
     // init route buffer
-    if (!RouteBuffer_Init(&o->rbuf, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, buf_out, num_packets)) {
+    if (!RouteBuffer_Init(&b->rbuf, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, buf_out, num_packets)) {
         BLog(BLOG_ERROR, "RouteBuffer_Init failed");
         goto fail1;
     }
     
     // set no DataProto
-    o->dp = NULL;
+    b->dp = NULL;
     
     DebugObject_Init(&o->d_obj);
     DebugCounter_Increment(&device->d_ctr);
@@ -342,38 +449,49 @@ int DataProtoFlow_Init (
     return 1;
     
 fail1:
-    if (o->inactivity_time >= 0) {
-        PacketPassInactivityMonitor_Free(&o->monitor);
+    if (b->inactivity_time >= 0) {
+        PacketPassInactivityMonitor_Free(&b->monitor);
     }
-    PacketPassConnector_Free(&o->connector);
+    PacketPassConnector_Free(&b->connector);
+    free(b);
 fail0:
     return 0;
 }
 
 void DataProtoFlow_Free (DataProtoFlow *o)
 {
-    ASSERT(!o->dp)
+    struct DataProtoFlow_buffer *b = o->b;
+    ASSERT(!o->dp_desired)
     DebugCounter_Decrement(&o->device->d_ctr);
     DebugObject_Free(&o->d_obj);
     
-    // free route buffer
-    RouteBuffer_Free(&o->rbuf);
-    
-    // free inactivity monitor
-    if (o->inactivity_time >= 0) {
-        PacketPassInactivityMonitor_Free(&o->monitor);
+    if (b->dp) {
+        if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
+            // schedule detach, free buffer after detach
+            flow_buffer_schedule_detach(b);
+            b->flow = NULL;
+            
+            // remove inactivity handler
+            if (b->inactivity_time >= 0) {
+                PacketPassInactivityMonitor_SetHandler(&b->monitor, NULL, NULL);
+            }
+        } else {
+            // detach and free buffer now
+            flow_buffer_detach(b);
+            flow_buffer_free(b);
+        }
+    } else {
+        // free buffer
+        flow_buffer_free(b);
     }
-    
-    // free connector
-    PacketPassConnector_Free(&o->connector);
 }
 
 void DataProtoFlow_Route (DataProtoFlow *o, int more)
 {
+    struct DataProtoFlow_buffer *b = o->b;
     ASSERT(more == 0 || more == 1)
     PacketRouter_AssertRoute(&o->device->router);
     ASSERT(o->device->current_buf)
-    ASSERT(!o->dp || !o->dp->freeing)
     DebugObject_Access(&o->d_obj);
     
     // write header
@@ -387,7 +505,7 @@ void DataProtoFlow_Route (DataProtoFlow *o, int more)
     // route
     uint8_t *next_buf;
     if (!PacketRouter_Route(
-        &o->device->router, DATAPROTO_MAX_OVERHEAD + o->device->current_recv_len, &o->rbuf,
+        &o->device->router, DATAPROTO_MAX_OVERHEAD + o->device->current_recv_len, &b->rbuf,
         &next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->device->current_recv_len : 0)
     )) {
         BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id);
@@ -399,45 +517,52 @@ void DataProtoFlow_Route (DataProtoFlow *o, int more)
 
 void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *dp)
 {
+    struct DataProtoFlow_buffer *b = o->b;
     ASSERT(dp)
-    ASSERT(!o->dp)
+    ASSERT(!o->dp_desired)
     ASSERT(o->device->frame_mtu <= dp->frame_mtu)
-    ASSERT(!dp->freeing)
     DebugObject_Access(&o->d_obj);
     DebugObject_Access(&dp->d_obj);
     
-    // set DataProto
-    o->dp = dp;
-    
-    // init queue flow
-    PacketPassFairQueueFlow_Init(&o->dp_qflow, &dp->queue);
+    if (b->dp) {
+        if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
+            // schedule detach and reattach
+            flow_buffer_schedule_detach(b);
+        } else {
+            // detach and reattach now
+            flow_buffer_detach(b);
+            flow_buffer_attach(b, dp);
+        }
+    } else {
+        // attach
+        flow_buffer_attach(b, dp);
+    }
     
-    // connect to queue flow
-    PacketPassConnector_ConnectOutput(&o->connector, PacketPassFairQueueFlow_GetInput(&o->dp_qflow));
+    // set desired sink
+    o->dp_desired = dp;
     
     DebugCounter_Increment(&dp->d_ctr);
 }
 
 void DataProtoFlow_Detach (DataProtoFlow *o)
 {
-    ASSERT(o->dp)
+    struct DataProtoFlow_buffer *b = o->b;
+    ASSERT(o->dp_desired)
+    ASSERT(b->dp)
     DebugObject_Access(&o->d_obj);
     
-    DataProtoSink *dp = o->dp;
+    DataProtoSink *dp = o->dp_desired;
     
-    // release flow if needed
-    if (!o->dp->freeing && PacketPassFairQueueFlow_IsBusy(&o->dp_qflow)) {
-        PacketPassFairQueueFlow_Release(&o->dp_qflow);
+    if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
+        // schedule detach
+        flow_buffer_schedule_detach(b);
+    } else {
+        // detach now
+        flow_buffer_detach(b);
     }
     
-    // disconnect from queue flow
-    PacketPassConnector_DisconnectOutput(&o->connector);
-    
-    // free queue flow
-    PacketPassFairQueueFlow_Free(&o->dp_qflow);
-    
-    // set no DataProto
-    o->dp = NULL;
+    // set no desired sink
+    o->dp_desired = NULL;
     
     DebugCounter_Decrement(&dp->d_ctr);
 }

+ 11 - 13
client/DataProto.h

@@ -46,6 +46,8 @@ typedef void (*DataProtoSink_handler) (void *user, int up);
 typedef void (*DataProtoSource_handler) (void *user, const uint8_t *frame, int frame_len);
 typedef void (*DataProtoFlow_handler_inactivity) (void *user);
 
+struct DataProtoFlow_buffer;
+
 /**
  * Frame destination.
  * Represents a peer as a destination for sending frames to.
@@ -67,7 +69,7 @@ typedef struct {
     void *user;
     BPending keepalive_job;
     BPending up_job;
-    int freeing;
+    struct DataProtoFlow_buffer *detaching_buffer;
     DebugObject d_obj;
     DebugCounter d_ctr;
 } DataProtoSink;
@@ -96,14 +98,20 @@ typedef struct {
     DataProtoSource *device;
     peerid_t source_id;
     peerid_t dest_id;
+    DataProtoSink *dp_desired;
+    struct DataProtoFlow_buffer *b;
+    DebugObject d_obj;
+} DataProtoFlow;
+
+struct DataProtoFlow_buffer {
+    DataProtoFlow *flow;
     int inactivity_time;
     RouteBuffer rbuf;
     PacketPassInactivityMonitor monitor;
     PacketPassConnector connector;
     DataProtoSink *dp;
     PacketPassFairQueueFlow dp_qflow;
-    DebugObject d_obj;
-} DataProtoFlow;
+};
 
 /**
  * Initializes the object.
@@ -129,16 +137,6 @@ int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface
  */
 void DataProtoSink_Free (DataProtoSink *o);
 
-/**
- * Prepares for freeing the object by allowing freeing of local sources.
- * The object enters freeing state.
- * The object must be freed before returning control to the reactor,
- * and before any further I/O (output or submitting frames).
- * 
- * @param o the object
- */
-void DataProtoSink_PrepareFree (DataProtoSink *o);
-
 /**
  * Notifies the object that a packet was received from the peer.
  * Must not be in freeing state.

+ 46 - 260
client/client.c

@@ -221,9 +221,6 @@ static int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len);
 // removes a peer
 static void peer_remove (struct peer_data *peer);
 
-// deallocates peer resources
-static void peer_dealloc (struct peer_data *peer);
-
 // passes a message to the logger, prepending it info about the peer
 static void peer_log (struct peer_data *peer, int level, const char *fmt, ...);
 
@@ -245,10 +242,6 @@ static void peer_enable_relay_provider (struct peer_data *peer);
 // unregisters the peer as a relay provider
 static void peer_disable_relay_provider (struct peer_data *peer);
 
-// deallocates peer relay provider resources. Inserts relay users to the
-// need relay list. Used while freeing a peer.
-static void peer_dealloc_relay_provider (struct peer_data *peer);
-
 // install relaying for a peer
 static void peer_install_relaying (struct peer_data *peer, struct peer_data *relay);
 
@@ -291,11 +284,6 @@ static void peer_tcp_pio_handler_error (struct peer_data *peer);
 // wither when we detect an error, or the peer reports an error.
 static void peer_reset_timer_handler (struct peer_data *peer);
 
-// PacketPassInterface handler for receiving packets from the link 
-static void peer_recv_handler_send (struct peer_data *peer, uint8_t *data, int data_len);
-
-static void local_recv_qflow_output_handler_done (struct peer_data *peer);
-
 // start binding, according to the protocol
 static int peer_start_binding (struct peer_data *peer);
 
@@ -521,8 +509,11 @@ int main (int argc, char *argv[])
         goto fail7;
     }
     
-    // init device output
-    PacketPassFairQueue_Init(&device.output_queue, BTap_GetInput(&device.btap), BReactor_PendingGroup(&ss), 1, 1);
+    // init receive device
+    if (!DPReceiveDevice_Init(&device.output_dprd, BTap_GetInput(&device.btap), &ss, options.send_buffer_relay_size, PEER_RELAY_FLOW_INACTIVITY_TIME)) {
+        BLog(BLOG_ERROR, "DPReceiveDevice_Init failed");
+        goto fail7a;
+    }
     
     // calculate data MTU
     if (device.mtu > INT_MAX - DATAPROTO_MAX_OVERHEAD) {
@@ -566,34 +557,14 @@ int main (int argc, char *argv[])
     BLog(BLOG_NOTICE, "entering event loop");
     BReactor_Exec(&ss);
     
-    // allow freeing local receive flows
-    PacketPassFairQueue_PrepareFree(&device.output_queue);
+    // allow freeing receive receivers in peers' links
+    DPReceiveDevice_PrepareFree(&device.output_dprd);
     
     // free peers
     LinkedList2Node *node;
     while (node = LinkedList2_GetFirst(&peers)) {
         struct peer_data *peer = UPPER_OBJECT(node, struct peer_data, list_node);
-        
-        // free relaying
-        if (peer->have_relaying) {
-            struct peer_data *relay = peer->relaying_peer;
-            ASSERT(relay->is_relay)
-            ASSERT(relay->have_link)
-            
-            // free relay provider
-            peer_dealloc_relay_provider(relay);
-        }
-        
-        // free relay provider
-        if (peer->is_relay) {
-            peer_dealloc_relay_provider(peer);
-        }
-        
-        // free relay source
-        DPRelaySource_PrepareFreeDestinations(&peer->relay_source);
-        
-        // deallocate peer
-        peer_dealloc(peer);
+        peer_remove(peer);
     }
     
     ServerConnection_Free(&server);
@@ -602,7 +573,8 @@ fail9:
 fail8a:
     FrameDecider_Free(&frame_decider);
 fail8:
-    PacketPassFairQueue_Free(&device.output_queue);
+    DPReceiveDevice_Free(&device.output_dprd);
+fail7a:
     DataProtoSource_Free(&device.input_dpd);
 fail7:
     BTap_Free(&device.btap);
@@ -1367,19 +1339,17 @@ int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     // init local flow
     if (!DataProtoFlow_Init(&peer->local_dpflow, &device.input_dpd, my_id, peer->id, options.send_buffer_size, -1, NULL, NULL)) {
         peer_log(peer, BLOG_ERROR, "DataProtoFlow_Init failed");
-        goto fail1a;
+        goto fail2;
     }
     
-    // init local receive flow
-    PacketPassFairQueueFlow_Init(&peer->local_recv_qflow, &device.output_queue);
-    peer->local_recv_if = PacketPassFairQueueFlow_GetInput(&peer->local_recv_qflow);
-    PacketPassInterface_Sender_Init(peer->local_recv_if, (PacketPassInterface_handler_done)local_recv_qflow_output_handler_done, peer);
-    
-    // init relay source
-    DPRelaySource_Init(&peer->relay_source, &relay_router, peer->id, &ss);
+    // init frame decider peer
+    if (!FrameDeciderPeer_Init(&peer->decider_peer, &frame_decider)) {
+        peer_log(peer, BLOG_ERROR, "FrameDeciderPeer_Init failed");
+        goto fail3;
+    }
     
-    // init relay sink
-    DPRelaySink_Init(&peer->relay_sink, peer->id);
+    // init receive peer
+    DPReceivePeer_Init(&peer->receive_peer, &device.output_dprd, peer->id, &peer->decider_peer, !!(peer->flags & SCID_NEWCLIENT_FLAG_RELAY_CLIENT));
     
     // have no link
     peer->have_link = 0;
@@ -1393,11 +1363,6 @@ int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     // init retry timer
     BTimer_Init(&peer->reset_timer, PEER_RETRY_TIME, (BTimer_handler)peer_reset_timer_handler, peer);
     
-    // init frame decider peer
-    if (!FrameDeciderPeer_Init(&peer->decider_peer, &frame_decider)) {
-        goto fail5;
-    }
-    
     // is not relay server
     peer->is_relay = 0;
     
@@ -1425,12 +1390,9 @@ int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         return 0;
     }
     
-fail5:
-    DPRelaySink_Free(&peer->relay_sink);
-    DPRelaySource_Free(&peer->relay_source);
-    PacketPassFairQueueFlow_Free(&peer->local_recv_qflow);
+fail3:
     DataProtoFlow_Free(&peer->local_dpflow);
-fail1a:
+fail2:
     if (peer->common_name) {
         PORT_Free(peer->common_name);
     }
@@ -1450,32 +1412,10 @@ void peer_remove (struct peer_data *peer)
     }
     
     // disable relay provider
-    // this inserts former relay users into the need relay list
     if (peer->is_relay) {
-        peer_dealloc_relay_provider(peer);
-    }
-    
-    // release local receive flow
-    if (PacketPassFairQueueFlow_IsBusy(&peer->local_recv_qflow)) {
-        PacketPassFairQueueFlow_Release(&peer->local_recv_qflow);
+        peer_disable_relay_provider(peer);
     }
     
-    // deallocate peer
-    peer_dealloc(peer);
-    
-    // assign relays because former relay users are disconnected above
-    assign_relays();
-}
-
-void peer_dealloc (struct peer_data *peer)
-{
-    ASSERT(!peer->have_relaying)
-    ASSERT(!peer->is_relay)
-    PacketPassFairQueueFlow_AssertFree(&peer->local_recv_qflow);
-    
-    LinkedList2Iterator it;
-    LinkedList2Node *node;
-    
     // remove from waiting relay list
     if (peer->waiting_relay) {
         peer_unregister_need_relay(peer);
@@ -1486,6 +1426,11 @@ void peer_dealloc (struct peer_data *peer)
         peer_free_link(peer);
     }
     
+    ASSERT(!peer->have_relaying)
+    ASSERT(!peer->is_relay)
+    ASSERT(!peer->waiting_relay)
+    ASSERT(!peer->have_link)
+    
     // decrement number of peers
     num_peers--;
     
@@ -1498,20 +1443,14 @@ void peer_dealloc (struct peer_data *peer)
     // free jobs
     BPending_Free(&peer->job_send_seed_after_binding);
     
-    // free frame decider
-    FrameDeciderPeer_Free(&peer->decider_peer);
-    
     // free retry timer
     BReactor_RemoveTimer(&ss, &peer->reset_timer);
     
-    // free relay sink
-    DPRelaySink_Free(&peer->relay_sink);
-    
-    // free relay source
-    DPRelaySource_Free(&peer->relay_source);
+    // free receive peer
+    DPReceivePeer_Free(&peer->receive_peer);
     
-    // free local receive flow
-    PacketPassFairQueueFlow_Free(&peer->local_recv_qflow);
+    // free frame decider
+    FrameDeciderPeer_Free(&peer->decider_peer);
     
     // free local flow
     DataProtoFlow_Free(&peer->local_dpflow);
@@ -1551,8 +1490,9 @@ int peer_init_link (struct peer_data *peer)
     
     ASSERT(!peer->is_relay)
     
-    // init link receive interface
-    PacketPassInterface_Init(&peer->recv_ppi, data_mtu, (PacketPassInterface_handler_send)peer_recv_handler_send, peer,  BReactor_PendingGroup(&ss));
+    // init receive receiver
+    DPReceiveReceiver_Init(&peer->receive_receiver, &peer->receive_peer);
+    PacketPassInterface *recv_if = DPReceiveReceiver_GetInput(&peer->receive_receiver);
     
     // init transport-specific link objects
     PacketPassInterface *link_if;
@@ -1560,7 +1500,7 @@ int peer_init_link (struct peer_data *peer)
         // init DatagramPeerIO
         if (!DatagramPeerIO_Init(
             &peer->pio.udp.pio, &ss, data_mtu, CLIENT_UDP_MTU, sp_params,
-            options.fragmentation_latency, PEER_UDP_ASSEMBLER_NUM_FRAMES, &peer->recv_ppi,
+            options.fragmentation_latency, PEER_UDP_ASSEMBLER_NUM_FRAMES, recv_if,
             options.otp_num_warn, &twd
         )) {
             peer_log(peer, BLOG_ERROR, "DatagramPeerIO_Init failed");
@@ -1588,7 +1528,7 @@ int peer_init_link (struct peer_data *peer)
             &peer->pio.tcp.pio, &ss, options.peer_ssl,
             (options.peer_ssl ? peer->cert : NULL),
             (options.peer_ssl ? peer->cert_len : -1),
-            data_mtu, &peer->recv_ppi,
+            data_mtu, recv_if,
             (StreamPeerIO_handler_error)peer_tcp_pio_handler_error, peer
         )) {
             peer_log(peer, BLOG_ERROR, "StreamPeerIO_Init failed");
@@ -1607,8 +1547,8 @@ int peer_init_link (struct peer_data *peer)
     // attach local flow to our DataProtoSink
     DataProtoFlow_Attach(&peer->local_dpflow, &peer->send_dp);
     
-    // attach relay sink flows to our DataProtoSink
-    DPRelaySink_Attach(&peer->relay_sink, &peer->send_dp);
+    // attach receive peer to our DataProtoSink
+    DPReceivePeer_AttachSink(&peer->receive_peer, &peer->send_dp);
     
     peer->have_link = 1;
     
@@ -1621,7 +1561,7 @@ fail2:
         StreamPeerIO_Free(&peer->pio.tcp.pio);
     }
 fail1:
-    PacketPassInterface_Free(&peer->recv_ppi);
+    DPReceiveReceiver_Free(&peer->receive_receiver);
     return 0;
 }
 
@@ -1633,11 +1573,10 @@ void peer_free_link (struct peer_data *peer)
     ASSERT(!peer->have_relaying)
     ASSERT(!peer->waiting_relay)
     
-    // allow detaching DataProto flows
-    DataProtoSink_PrepareFree(&peer->send_dp);
+    ASSERT(!DPReceiveReceiver_IsBusy(&peer->receive_receiver)) // TODO
     
-    // detach relay sink flows from our DataProtoSink
-    DPRelaySink_Detach(&peer->relay_sink);
+    // detach receive peer from our DataProtoSink
+    DPReceivePeer_DetachSink(&peer->receive_peer);
     
     // detach local flow from our DataProtoSink
     DataProtoFlow_Detach(&peer->local_dpflow);
@@ -1647,15 +1586,13 @@ void peer_free_link (struct peer_data *peer)
     
     // free transport-specific link objects
     if (options.transport_mode == TRANSPORT_MODE_UDP) {
-        // free DatagramPeerIO
         DatagramPeerIO_Free(&peer->pio.udp.pio);
     } else {
-        // free StreamPeerIO
         StreamPeerIO_Free(&peer->pio.tcp.pio);
     }
     
-    // free common link objects
-    PacketPassInterface_Free(&peer->recv_ppi);
+    // free receive receiver
+    DPReceiveReceiver_Free(&peer->receive_receiver);
     
     peer->have_link = 0;
 }
@@ -1734,37 +1671,6 @@ void peer_disable_relay_provider (struct peer_data *peer)
     assign_relays();
 }
 
-void peer_dealloc_relay_provider (struct peer_data *peer)
-{
-    ASSERT(peer->is_relay)
-    
-    ASSERT(peer->have_link)
-    ASSERT(!peer->have_relaying)
-    ASSERT(!peer->waiting_relay)
-    
-    // allow detaching DataProto flows from the relay peer
-    DataProtoSink_PrepareFree(&peer->send_dp);
-    
-    // disconnect relay users
-    LinkedList2Node *list_node;
-    while (list_node = LinkedList2_GetFirst(&peer->relay_users)) {
-        struct peer_data *relay_user = UPPER_OBJECT(list_node, struct peer_data, relaying_list_node);
-        ASSERT(relay_user->have_relaying)
-        ASSERT(relay_user->relaying_peer == peer)
-        
-        // disconnect relay user
-        peer_free_relaying(relay_user);
-        
-        // add it to need relay list
-        peer_register_need_relay(relay_user);
-    }
-    
-    // remove from relays list
-    LinkedList2_Remove(&relays, &peer->relay_list_node);
-    
-    peer->is_relay = 0;
-}
-
 void peer_install_relaying (struct peer_data *peer, struct peer_data *relay)
 {
     ASSERT(!peer->have_relaying)
@@ -2220,127 +2126,6 @@ void peer_reset_timer_handler (struct peer_data *peer)
     return;
 }
 
-void peer_recv_handler_send (struct peer_data *peer, uint8_t *data, int data_len)
-{
-    ASSERT(peer->have_link)
-    ASSERT(data_len >= 0)
-    ASSERT(data_len <= data_mtu)
-    
-    uint8_t *orig_data = data;
-    int orig_data_len = data_len;
-    
-    int dp_good = 0;
-    struct peer_data *src_peer;
-    struct peer_data *relay_dest = NULL;
-    int local = 0;
-    
-    // check dataproto header
-    if (data_len < sizeof(struct dataproto_header)) {
-        peer_log(peer, BLOG_NOTICE, "receive: no dataproto header");
-        goto out;
-    }
-    struct dataproto_header *header = (struct dataproto_header *)data;
-    data += sizeof(struct dataproto_header);
-    data_len -= sizeof(struct dataproto_header);
-    uint8_t flags = header->flags;
-    peerid_t from_id = ltoh16(header->from_id);
-    int num_ids = ltoh16(header->num_peer_ids);
-    
-    // check destination IDs
-    if (num_ids > 1) {
-        peer_log(peer, BLOG_NOTICE, "receive: too many destination IDs");
-        goto out;
-    }
-    if (data_len < num_ids * sizeof(struct dataproto_peer_id)) {
-        peer_log(peer, BLOG_NOTICE, "receive: invalid length for destination IDs");
-        goto out;
-    }
-    struct dataproto_peer_id *ids = (struct dataproto_peer_id *)data;
-    data += num_ids * sizeof(struct dataproto_peer_id);
-    data_len -= num_ids * sizeof(struct dataproto_peer_id);
-    
-    // check remaining data
-    if (data_len > device.mtu) {
-        peer_log(peer, BLOG_NOTICE, "receive: frame too large");
-        goto out;
-    }
-    
-    dp_good = 1;
-    
-    if (num_ids == 0) {
-        goto out;
-    }
-    
-    // find source peer
-    if (!(src_peer = find_peer_by_id(from_id))) {
-        peer_log(peer, BLOG_NOTICE, "receive: source peer %d not known", (int)from_id);
-        goto out;
-    }
-    
-    // find destination
-    peerid_t id = ltoh16(ids[0].id);
-    if (id == my_id) {
-        // frame is for us
-        
-        // let the frame decider analyze the frame
-        FrameDeciderPeer_Analyze(&src_peer->decider_peer, data, data_len);
-        
-        local = 1;
-    } else {
-        // frame is for someone else
-        
-        // make sure the client is allowed to relay though us
-        if (!(peer->flags & SCID_NEWCLIENT_FLAG_RELAY_CLIENT)) {
-            peer_log(peer, BLOG_NOTICE, "relaying not allowed");
-            goto out;
-        }
-        
-        // provided source ID must be the peer sending the frame
-        if (src_peer != peer) {
-            peer_log(peer, BLOG_NOTICE, "relay source must be the sending peer");
-            goto out;
-        }
-        
-        // lookup destination peer
-        struct peer_data *dest_peer = find_peer_by_id(id);
-        if (!dest_peer) {
-            peer_log(peer, BLOG_NOTICE, "relay destination peer not known");
-            goto out;
-        }
-        
-        // destination cannot be source
-        if (dest_peer == src_peer) {
-            peer_log(peer, BLOG_NOTICE, "relay destination cannot be the source");
-            goto out;
-        }
-        
-        relay_dest = dest_peer;
-    }
-    
-out:
-    // pass packet to device, or accept immediately
-    if (local) {
-        PacketPassInterface_Sender_Send(peer->local_recv_if, data, data_len);
-    } else {
-        PacketPassInterface_Done(&peer->recv_ppi);
-    }
-    
-    // relay frame
-    if (relay_dest) {
-        DPRelayRouter_SubmitFrame(&relay_router, &src_peer->relay_source, &relay_dest->relay_sink, data, data_len, options.send_buffer_relay_size, PEER_RELAY_FLOW_INACTIVITY_TIME);
-    }
-    
-    // inform DataProto of received packet
-    if (dp_good) {
-        DataProtoSink_Received(&peer->send_dp, !!(flags & DATAPROTO_FLAGS_RECEIVING_KEEPALIVES));
-    }
-}
-
-void local_recv_qflow_output_handler_done (struct peer_data *peer)
-{
-    PacketPassInterface_Done(&peer->recv_ppi);
-}
-
 int peer_start_binding (struct peer_data *peer)
 {
     peer->binding = 1;
@@ -2682,8 +2467,6 @@ void peer_dataproto_handler (struct peer_data *peer, int up)
 {
     ASSERT(peer->have_link)
     
-    // peer_recv_handler_send relies on this not bringing everything down
-    
     if (up) {
         peer_log(peer, BLOG_INFO, "up");
         
@@ -2808,6 +2591,9 @@ void server_handler_ready (void *user, peerid_t param_my_id, uint32_t ext_ip)
         }
     }
     
+    // give receive device the ID
+    DPReceiveDevice_SetPeerID(&device.output_dprd, my_id);
+    
     BLog(BLOG_INFO, "server: ready, my ID is %d", (int)my_id);
 }
 

+ 11 - 20
client/client.h

@@ -26,13 +26,11 @@
 #include <protocol/scproto.h>
 #include <structure/LinkedList2.h>
 #include <structure/BAVL.h>
-#include <flow/SinglePacketBuffer.h>
-#include <flow/PacketPassFairQueue.h>
 #include <tuntap/BTap.h>
 #include <client/DatagramPeerIO.h>
 #include <client/StreamPeerIO.h>
 #include <client/DataProto.h>
-#include <client/DPRelay.h>
+#include <client/DPReceive.h>
 #include <client/FrameDecider.h>
 
 // NOTE: all time values are in milliseconds
@@ -91,7 +89,7 @@ struct device_data {
     DataProtoSource input_dpd;
     
     // output
-    PacketPassFairQueue output_queue;
+    DPReceiveDevice output_dprd;
 };
 
 struct peer_data {
@@ -109,24 +107,17 @@ struct peer_data {
     // local flow
     DataProtoFlow local_dpflow;
     
-    // local receive flow
-    PacketPassInterface *local_recv_if;
-    PacketPassFairQueueFlow local_recv_qflow;
-    
-    // relay source
-    DPRelaySource relay_source;
+    // frame decider peer
+    FrameDeciderPeer decider_peer;
     
-    // relay sink
-    DPRelaySink relay_sink;
+    // receive peer
+    DPReceivePeer receive_peer;
     
     // flag if link objects are initialized
     int have_link;
     
-    // link sending
-    DataProtoSink send_dp;
-    
-    // link receive interface
-    PacketPassInterface recv_ppi;
+    // receive receiver
+    DPReceiveReceiver receive_receiver;
     
     // transport-specific link objects
     union {
@@ -144,6 +135,9 @@ struct peer_data {
         } tcp;
     } pio;
     
+    // link sending
+    DataProtoSink send_dp;
+    
     // flag if relaying is installed
     int have_relaying;
     
@@ -158,9 +152,6 @@ struct peer_data {
     // retry timer
     BTimer reset_timer;
     
-    // frame decider peer
-    FrameDeciderPeer decider_peer;
-    
     // relay server specific
     int is_relay;
     LinkedList2Node relay_list_node;

+ 3 - 2
examples/RandomPacketSink.h

@@ -55,12 +55,13 @@ static void _RandomPacketSink_input_handler_send (RandomPacketSink *s, uint8_t *
     }
 }
 
-static void _RandomPacketSink_input_handler_cancel (RandomPacketSink *s)
+static void _RandomPacketSink_input_handler_requestcancel (RandomPacketSink *s)
 {
     DebugObject_Access(&s->d_obj);
     
     printf("sink: cancelled\n");
     BReactor_RemoveTimer(s->reactor, &s->timer);
+    PacketPassInterface_Done(&s->input);
 }
 
 static void _RandomPacketSink_timer_handler (RandomPacketSink *s)
@@ -77,7 +78,7 @@ static void RandomPacketSink_Init (RandomPacketSink *s, BReactor *reactor, int m
     
     // init input
     PacketPassInterface_Init(&s->input, mtu, (PacketPassInterface_handler_send)_RandomPacketSink_input_handler_send, s, BReactor_PendingGroup(reactor));
-    PacketPassInterface_EnableCancel(&s->input, (PacketPassInterface_handler_cancel)_RandomPacketSink_input_handler_cancel);
+    PacketPassInterface_EnableCancel(&s->input, (PacketPassInterface_handler_requestcancel)_RandomPacketSink_input_handler_requestcancel);
     
     // init timer
     BTimer_Init(&s->timer, ms, (BTimer_handler)_RandomPacketSink_timer_handler, s);

+ 3 - 2
examples/TimerPacketSink.h

@@ -43,11 +43,12 @@ static void _TimerPacketSink_input_handler_send (TimerPacketSink *s, uint8_t *da
     BReactor_SetTimer(s->reactor, &s->timer);
 }
 
-static void _TimerPacketSink_input_handler_cancel (TimerPacketSink *s)
+static void _TimerPacketSink_input_handler_requestcancel (TimerPacketSink *s)
 {
     printf("sink: cancelled\n");
     
     BReactor_RemoveTimer(s->reactor, &s->timer);
+    PacketPassInterface_Done(&s->input);
 }
 
 static void _TimerPacketSink_timer_handler (TimerPacketSink *s)
@@ -64,7 +65,7 @@ static void TimerPacketSink_Init (TimerPacketSink *s, BReactor *reactor, int mtu
     
     // init input
     PacketPassInterface_Init(&s->input, mtu, (PacketPassInterface_handler_send)_TimerPacketSink_input_handler_send, s, BReactor_PendingGroup(s->reactor));
-    PacketPassInterface_EnableCancel(&s->input, (PacketPassInterface_handler_cancel)_TimerPacketSink_input_handler_cancel);
+    PacketPassInterface_EnableCancel(&s->input, (PacketPassInterface_handler_requestcancel)_TimerPacketSink_input_handler_requestcancel);
     
     // init timer
     BTimer_Init(&s->timer, ms, (BTimer_handler)_TimerPacketSink_timer_handler, s);

+ 24 - 6
examples/fairqueue_test.c

@@ -22,6 +22,7 @@
 
 #include <string.h>
 #include <stdio.h>
+#include <stddef.h>
 
 #include <misc/debug.h>
 #include <system/BReactor.h>
@@ -56,14 +57,11 @@ static void free_input (int i)
     PacketPassFairQueueFlow_Free(&flows[i]);
 }
 
-static void timer_handler (void *user)
+static void reset_input (void)
 {
-    printf("removing %d\n", current_cancel);
+    PacketPassFairQueueFlow_AssertFree(&flows[current_cancel]);
     
-    // release flow
-    if (PacketPassFairQueueFlow_IsBusy(&flows[current_cancel])) {
-        PacketPassFairQueueFlow_Release(&flows[current_cancel]);
-    }
+    printf("removing %d\n", current_cancel);
     
     // remove flow
     free_input(current_cancel);
@@ -78,6 +76,26 @@ static void timer_handler (void *user)
     BReactor_SetTimer(&reactor, &timer);
 }
 
+static void flow_handler_busy (void *user)
+{
+    PacketPassFairQueueFlow_AssertFree(&flows[current_cancel]);
+    
+    reset_input();
+}
+
+static void timer_handler (void *user)
+{
+    // if flow is busy, request cancel and wait for it
+    if (PacketPassFairQueueFlow_IsBusy(&flows[current_cancel])) {
+        printf("cancelling %d\n", current_cancel);
+        PacketPassFairQueueFlow_RequestCancel(&flows[current_cancel]);
+        PacketPassFairQueueFlow_SetBusyHandler(&flows[current_cancel], flow_handler_busy, NULL);
+        return;
+    }
+    
+    reset_input();
+}
+
 int main ()
 {
     // initialize logging

+ 5 - 2
flow/FragmentProtoDisassembler.c

@@ -112,13 +112,16 @@ static void input_handler_send (FragmentProtoDisassembler *o, uint8_t *data, int
     write_chunks(o);
 }
 
-static void input_handler_cancel (FragmentProtoDisassembler *o)
+static void input_handler_requestcancel (FragmentProtoDisassembler *o)
 {
     ASSERT(o->in_len >= 0)
     ASSERT(!o->out)
     
     // set no input packet
     o->in_len = -1;
+    
+    // finish input
+    PacketPassInterface_Done(&o->input);
 }
 
 static void output_handler_recv (FragmentProtoDisassembler *o, uint8_t *data)
@@ -166,7 +169,7 @@ void FragmentProtoDisassembler_Init (FragmentProtoDisassembler *o, BReactor *rea
     
     // init input
     PacketPassInterface_Init(&o->input, input_mtu, (PacketPassInterface_handler_send)input_handler_send, o, BReactor_PendingGroup(reactor));
-    PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_cancel)input_handler_cancel);
+    PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_requestcancel)input_handler_requestcancel);
     
     // init output
     PacketRecvInterface_Init(&o->output, o->output_mtu, (PacketRecvInterface_handler_recv)output_handler_recv, o, BReactor_PendingGroup(reactor));

+ 5 - 2
flow/PacketCopier.c

@@ -49,12 +49,15 @@ static void input_handler_send (PacketCopier *o, uint8_t *data, int data_len)
     o->out_have = 0;
 }
 
-static void input_handler_cancel (PacketCopier *o)
+static void input_handler_requestcancel (PacketCopier *o)
 {
     ASSERT(o->in_len >= 0)
     ASSERT(!o->out_have)
     DebugObject_Access(&o->d_obj);
     
+    // finish input packet
+    PacketPassInterface_Done(&o->input);
+    
     o->in_len = -1;
 }
 
@@ -86,7 +89,7 @@ void PacketCopier_Init (PacketCopier *o, int mtu, BPendingGroup *pg)
     
     // init input
     PacketPassInterface_Init(&o->input, mtu, (PacketPassInterface_handler_send)input_handler_send, o, pg);
-    PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_cancel)input_handler_cancel);
+    PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_requestcancel)input_handler_requestcancel);
     
     // init output
     PacketRecvInterface_Init(&o->output, mtu, (PacketRecvInterface_handler_recv)output_handler_recv, o, pg);

+ 3 - 9
flow/PacketPassFairQueue.c

@@ -345,7 +345,7 @@ int PacketPassFairQueueFlow_IsBusy (PacketPassFairQueueFlow *flow)
     return (flow == m->sending_flow);
 }
 
-void PacketPassFairQueueFlow_Release (PacketPassFairQueueFlow *flow)
+void PacketPassFairQueueFlow_RequestCancel (PacketPassFairQueueFlow *flow)
 {
     PacketPassFairQueue *m = flow->m;
     
@@ -355,14 +355,8 @@ void PacketPassFairQueueFlow_Release (PacketPassFairQueueFlow *flow)
     ASSERT(!BPending_IsSet(&m->schedule_job))
     DebugObject_Access(&flow->d_obj);
     
-    // set no sending flow
-    m->sending_flow = NULL;
-    
-    // schedule schedule
-    BPending_Set(&m->schedule_job);
-    
-    // cancel current packet
-    PacketPassInterface_Sender_Cancel(m->output);
+    // request cancel
+    PacketPassInterface_Sender_RequestCancel(m->output);
 }
 
 void PacketPassFairQueueFlow_SetBusyHandler (PacketPassFairQueueFlow *flow, PacketPassFairQueue_handler_busy handler, void *user)

+ 2 - 7
flow/PacketPassFairQueue.h

@@ -152,19 +152,14 @@ void PacketPassFairQueueFlow_AssertFree (PacketPassFairQueueFlow *flow);
 int PacketPassFairQueueFlow_IsBusy (PacketPassFairQueueFlow *flow);
 
 /**
- * Cancels the packet that is currently being sent to output in order
- * to allow freeing the flow.
+ * Requests the output to stop processing the current packet as soon as possible.
  * Cancel functionality must be enabled for the queue.
  * The flow must be busy as indicated by {@link PacketPassFairQueueFlow_IsBusy}.
  * Queue must not be in freeing state.
- * Must not be called from queue calls to output.
- * Will call Cancel on output. Will not invoke any input I/O.
- * After this, {@link PacketPassFairQueueFlow_IsBusy} will report the flow as not busy.
- * The flow's input's Done will never be called (the flow will become inoperable).
  * 
  * @param flow the object
  */
-void PacketPassFairQueueFlow_Release (PacketPassFairQueueFlow *flow);
+void PacketPassFairQueueFlow_RequestCancel (PacketPassFairQueueFlow *flow);
 
 /**
  * Sets up a callback to be called when the flow is no longer busy.

+ 4 - 7
flow/PacketPassInactivityMonitor.c

@@ -33,15 +33,12 @@ static void input_handler_send (PacketPassInactivityMonitor *o, uint8_t *data, i
     BReactor_RemoveTimer(o->reactor, &o->timer);
 }
 
-static void input_handler_cancel (PacketPassInactivityMonitor *o)
+static void input_handler_requestcancel (PacketPassInactivityMonitor *o)
 {
     DebugObject_Access(&o->d_obj);
     
-    // output no longer busy, restart timer
-    BReactor_SetTimer(o->reactor, &o->timer);
-    
-    // call cancel
-    PacketPassInterface_Sender_Cancel(o->output);
+    // request cancel
+    PacketPassInterface_Sender_RequestCancel(o->output);
 }
 
 static void output_handler_done (PacketPassInactivityMonitor *o)
@@ -80,7 +77,7 @@ void PacketPassInactivityMonitor_Init (PacketPassInactivityMonitor *o, PacketPas
     // init input
     PacketPassInterface_Init(&o->input, PacketPassInterface_GetMTU(o->output), (PacketPassInterface_handler_send)input_handler_send, o, BReactor_PendingGroup(o->reactor));
     if (PacketPassInterface_HasCancel(o->output)) {
-        PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_cancel)input_handler_cancel);
+        PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_requestcancel)input_handler_requestcancel);
     }
     
     // init output

+ 12 - 0
flow/PacketPassInterface.c

@@ -35,6 +35,18 @@ void _PacketPassInterface_job_operation (PacketPassInterface *i)
     return;
 }
 
+void _PacketPassInterface_job_requestcancel (PacketPassInterface *i)
+{
+    ASSERT(i->state == PPI_STATE_BUSY)
+    ASSERT(i->cancel_requested)
+    ASSERT(i->handler_requestcancel)
+    DebugObject_Access(&i->d_obj);
+    
+    // call handler
+    i->handler_requestcancel(i->user_provider);
+    return;
+}
+
 void _PacketPassInterface_job_done (PacketPassInterface *i)
 {
     ASSERT(i->state == PPI_STATE_DONE_PENDING)

+ 42 - 23
flow/PacketPassInterface.h

@@ -41,7 +41,7 @@
 
 typedef void (*PacketPassInterface_handler_send) (void *user, uint8_t *data, int data_len);
 
-typedef void (*PacketPassInterface_handler_cancel) (void *user);
+typedef void (*PacketPassInterface_handler_requestcancel) (void *user);
 
 typedef void (*PacketPassInterface_handler_done) (void *user);
 
@@ -49,7 +49,7 @@ typedef struct {
     // provider data
     int mtu;
     PacketPassInterface_handler_send handler_operation;
-    PacketPassInterface_handler_cancel handler_cancel;
+    PacketPassInterface_handler_requestcancel handler_requestcancel;
     void *user_provider;
     
     // user data
@@ -61,11 +61,15 @@ typedef struct {
     uint8_t *job_operation_data;
     int job_operation_len;
     
+    // requestcancel job
+    BPending job_requestcancel;
+    
     // done job
     BPending job_done;
     
     // state
     int state;
+    int cancel_requested;
     
     DebugObject d_obj;
 } PacketPassInterface;
@@ -74,7 +78,7 @@ static void PacketPassInterface_Init (PacketPassInterface *i, int mtu, PacketPas
 
 static void PacketPassInterface_Free (PacketPassInterface *i);
 
-static void PacketPassInterface_EnableCancel (PacketPassInterface *i, PacketPassInterface_handler_cancel handler_cancel);
+static void PacketPassInterface_EnableCancel (PacketPassInterface *i, PacketPassInterface_handler_requestcancel handler_requestcancel);
 
 static void PacketPassInterface_Done (PacketPassInterface *i);
 
@@ -84,11 +88,12 @@ static void PacketPassInterface_Sender_Init (PacketPassInterface *i, PacketPassI
 
 static void PacketPassInterface_Sender_Send (PacketPassInterface *i, uint8_t *data, int data_len);
 
-static void PacketPassInterface_Sender_Cancel (PacketPassInterface *i);
+static void PacketPassInterface_Sender_RequestCancel (PacketPassInterface *i);
 
 static int PacketPassInterface_HasCancel (PacketPassInterface *i);
 
 void _PacketPassInterface_job_operation (PacketPassInterface *i);
+void _PacketPassInterface_job_requestcancel (PacketPassInterface *i);
 void _PacketPassInterface_job_done (PacketPassInterface *i);
 
 void PacketPassInterface_Init (PacketPassInterface *i, int mtu, PacketPassInterface_handler_send handler_operation, void *user, BPendingGroup *pg)
@@ -98,7 +103,7 @@ void PacketPassInterface_Init (PacketPassInterface *i, int mtu, PacketPassInterf
     // init arguments
     i->mtu = mtu;
     i->handler_operation = handler_operation;
-    i->handler_cancel = NULL;
+    i->handler_requestcancel = NULL;
     i->user_provider = user;
     
     // set no user
@@ -106,6 +111,7 @@ void PacketPassInterface_Init (PacketPassInterface *i, int mtu, PacketPassInterf
     
     // init jobs
     BPending_Init(&i->job_operation, pg, (BPending_handler)_PacketPassInterface_job_operation, i);
+    BPending_Init(&i->job_requestcancel, pg, (BPending_handler)_PacketPassInterface_job_requestcancel, i);
     BPending_Init(&i->job_done, pg, (BPending_handler)_PacketPassInterface_job_done, i);
     
     // set state
@@ -120,16 +126,17 @@ void PacketPassInterface_Free (PacketPassInterface *i)
     
     // free jobs
     BPending_Free(&i->job_done);
+    BPending_Free(&i->job_requestcancel);
     BPending_Free(&i->job_operation);
 }
 
-void PacketPassInterface_EnableCancel (PacketPassInterface *i, PacketPassInterface_handler_cancel handler_cancel)
+void PacketPassInterface_EnableCancel (PacketPassInterface *i, PacketPassInterface_handler_requestcancel handler_requestcancel)
 {
-    ASSERT(!i->handler_cancel)
+    ASSERT(!i->handler_requestcancel)
     ASSERT(!i->handler_done)
-    ASSERT(handler_cancel)
+    ASSERT(handler_requestcancel)
     
-    i->handler_cancel = handler_cancel;
+    i->handler_requestcancel = handler_requestcancel;
 }
 
 void PacketPassInterface_Done (PacketPassInterface *i)
@@ -137,6 +144,9 @@ void PacketPassInterface_Done (PacketPassInterface *i)
     ASSERT(i->state == PPI_STATE_BUSY)
     DebugObject_Access(&i->d_obj);
     
+    // unset requestcancel job
+    BPending_Unset(&i->job_requestcancel);
+    
     // schedule done
     BPending_Set(&i->job_done);
     
@@ -177,34 +187,43 @@ void PacketPassInterface_Sender_Send (PacketPassInterface *i, uint8_t *data, int
     
     // set state
     i->state = PPI_STATE_OPERATION_PENDING;
+    i->cancel_requested = 0;
 }
 
-void PacketPassInterface_Sender_Cancel (PacketPassInterface *i)
+void PacketPassInterface_Sender_RequestCancel (PacketPassInterface *i)
 {
     ASSERT(i->state == PPI_STATE_OPERATION_PENDING || i->state == PPI_STATE_BUSY || i->state == PPI_STATE_DONE_PENDING)
-    ASSERT(i->handler_cancel)
+    ASSERT(i->handler_requestcancel)
     DebugObject_Access(&i->d_obj);
     
-    int prev_state = i->state;
-    
-    // unset jobs
-    BPending_Unset(&i->job_operation);
-    BPending_Unset(&i->job_done);
-    
-    // set state
-    i->state = PPI_STATE_NONE;
-    
-    if (prev_state == PPI_STATE_BUSY) {
-        i->handler_cancel(i->user_provider);
+    // ignore multiple cancel requests
+    if (i->cancel_requested) {
         return;
     }
+    
+    // remember we requested cancel
+    i->cancel_requested = 1;
+    
+    if (i->state == PPI_STATE_OPERATION_PENDING) {
+        // unset operation job
+        BPending_Unset(&i->job_operation);
+        
+        // set done job
+        BPending_Set(&i->job_done);
+        
+        // set state
+        i->state = PPI_STATE_DONE_PENDING;
+    } else if (i->state == PPI_STATE_BUSY) {
+        // set requestcancel job
+        BPending_Set(&i->job_requestcancel);
+    }
 }
 
 int PacketPassInterface_HasCancel (PacketPassInterface *i)
 {
     DebugObject_Access(&i->d_obj);
     
-    return !!i->handler_cancel;
+    return !!i->handler_requestcancel;
 }
 
 #endif

+ 3 - 3
flow/PacketPassNotifier.c

@@ -38,11 +38,11 @@ void input_handler_send (PacketPassNotifier *o, uint8_t *data, int data_len)
     }
 }
 
-void input_handler_cancel (PacketPassNotifier *o)
+void input_handler_requestcancel (PacketPassNotifier *o)
 {
     DebugObject_Access(&o->d_obj);
     
-    PacketPassInterface_Sender_Cancel(o->output);
+    PacketPassInterface_Sender_RequestCancel(o->output);
 }
 
 void output_handler_done (PacketPassNotifier *o)
@@ -60,7 +60,7 @@ void PacketPassNotifier_Init (PacketPassNotifier *o, PacketPassInterface *output
     // init input
     PacketPassInterface_Init(&o->input, PacketPassInterface_GetMTU(o->output), (PacketPassInterface_handler_send)input_handler_send, o, pg);
     if (PacketPassInterface_HasCancel(o->output)) {
-        PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_cancel)input_handler_cancel);
+        PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_requestcancel)input_handler_requestcancel);
     }
     
     // init output

+ 3 - 9
flow/PacketPassPriorityQueue.c

@@ -228,7 +228,7 @@ int PacketPassPriorityQueueFlow_IsBusy (PacketPassPriorityQueueFlow *flow)
     return (flow == m->sending_flow);
 }
 
-void PacketPassPriorityQueueFlow_Release (PacketPassPriorityQueueFlow *flow)
+void PacketPassPriorityQueueFlow_RequestCancel (PacketPassPriorityQueueFlow *flow)
 {
     PacketPassPriorityQueue *m = flow->m;
     
@@ -238,14 +238,8 @@ void PacketPassPriorityQueueFlow_Release (PacketPassPriorityQueueFlow *flow)
     ASSERT(!BPending_IsSet(&m->schedule_job))
     DebugObject_Access(&flow->d_obj);
     
-    // set no sending flow
-    m->sending_flow = NULL;
-    
-    // schedule schedule
-    BPending_Set(&m->schedule_job);
-    
-    // cancel current packet
-    PacketPassInterface_Sender_Cancel(m->output);
+    // request cancel
+    PacketPassInterface_Sender_RequestCancel(m->output);
 }
 
 void PacketPassPriorityQueueFlow_SetBusyHandler (PacketPassPriorityQueueFlow *flow, PacketPassPriorityQueue_handler_busy handler, void *user)

+ 2 - 7
flow/PacketPassPriorityQueue.h

@@ -141,19 +141,14 @@ void PacketPassPriorityQueueFlow_AssertFree (PacketPassPriorityQueueFlow *flow);
 int PacketPassPriorityQueueFlow_IsBusy (PacketPassPriorityQueueFlow *flow);
 
 /**
- * Cancels the packet that is currently being sent to output in order
- * to allow freeing the flow.
+ * Requests the output to stop processing the current packet as soon as possible.
  * Cancel functionality must be enabled for the queue.
  * The flow must be busy as indicated by {@link PacketPassPriorityQueueFlow_IsBusy}.
  * Queue must not be in freeing state.
- * Must not be called from queue calls to output.
- * Will call Cancel on output. Will not invoke any input I/O.
- * After this, {@link PacketPassPriorityQueueFlow_IsBusy} will report the flow as not busy.
- * The flow's input's Done will never be called (the flow will become inoperable).
  * 
  * @param flow the object
  */
-void PacketPassPriorityQueueFlow_Release (PacketPassPriorityQueueFlow *flow);
+void PacketPassPriorityQueueFlow_RequestCancel (PacketPassPriorityQueueFlow *flow);
 
 /**
  * Sets up a callback to be called when the flow is no longer busy.

+ 4 - 0
generated/blog_channel_DPReceive.h

@@ -0,0 +1,4 @@
+#ifdef BLOG_CURRENT_CHANNEL
+#undef BLOG_CURRENT_CHANNEL
+#endif
+#define BLOG_CURRENT_CHANNEL BLOG_CHANNEL_DPReceive

+ 2 - 1
generated/blog_channels_defines.h

@@ -46,4 +46,5 @@
 #define BLOG_CHANNEL_PacketProtoDecoder 45
 #define BLOG_CHANNEL_DPRelay 46
 #define BLOG_CHANNEL_BThreadWork 47
-#define BLOG_NUM_CHANNELS 48
+#define BLOG_CHANNEL_DPReceive 48
+#define BLOG_NUM_CHANNELS 49

+ 1 - 0
generated/blog_channels_list.h

@@ -46,3 +46,4 @@
 {.name = "PacketProtoDecoder", .loglevel = 4},
 {.name = "DPRelay", .loglevel = 4},
 {.name = "BThreadWork", .loglevel = 4},
+{.name = "DPReceive", .loglevel = 4},

+ 6 - 3
tuntap/BTap.c

@@ -53,7 +53,7 @@
 
 static void report_error (BTap *o);
 static void input_handler_send (BTap *o, uint8_t *data, int data_len);
-static void input_handler_cancel (BTap *o);
+static void input_handler_requestcancel (BTap *o);
 static void output_handler_recv (BTap *o, uint8_t *data);
 
 #ifdef BADVPN_USE_WINAPI
@@ -341,7 +341,7 @@ void input_handler_send (BTap *o, uint8_t *data, int data_len)
     PacketPassInterface_Done(&o->input);
 }
 
-void input_handler_cancel (BTap *o)
+void input_handler_requestcancel (BTap *o)
 {
     DebugObject_Access(&o->d_obj);
     DebugError_AssertNoError(&o->d_err);
@@ -378,6 +378,9 @@ void input_handler_cancel (BTap *o)
     
     #endif
     
+    // finish input packet
+    PacketPassInterface_Done(&o->input);
+    
     // set no input packet
     o->input_packet_len = -1;
 }
@@ -681,7 +684,7 @@ fail0:
 success:
     // init input
     PacketPassInterface_Init(&o->input, o->frame_mtu, (PacketPassInterface_handler_send)input_handler_send, o, BReactor_PendingGroup(o->reactor));
-    PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_cancel)input_handler_cancel);
+    PacketPassInterface_EnableCancel(&o->input, (PacketPassInterface_handler_requestcancel)input_handler_requestcancel);
     
     // init output
     PacketRecvInterface_Init(&o->output, o->frame_mtu, (PacketRecvInterface_handler_recv)output_handler_recv, o, BReactor_PendingGroup(o->reactor));