|
|
@@ -37,17 +37,12 @@
|
|
|
#define DATAPROTO_TIMEOUT 30000
|
|
|
|
|
|
static int peerid_comparator (void *user, peerid_t *val1, peerid_t *val2);
|
|
|
-static struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDest *dp, int num_packets, uint8_t *first_frame, int first_frame_len);
|
|
|
-static void free_relay_flow (struct dp_relay_flow *flow);
|
|
|
-static void flow_monitor_handler (struct dp_relay_flow *flow);
|
|
|
static void monitor_handler (DataProtoDest *o);
|
|
|
static void send_keepalive (DataProtoDest *o);
|
|
|
static void receive_timer_handler (DataProtoDest *o);
|
|
|
static void notifier_handler (DataProtoDest *o, uint8_t *data, int data_len);
|
|
|
static int pointer_comparator (void *user, void **val1, void **val2);
|
|
|
static void keepalive_job_handler (DataProtoDest *o);
|
|
|
-static void relay_job_handler (struct dp_relay_flow *flow);
|
|
|
-static void submit_relay_frame (struct dp_relay_flow *flow, uint8_t *frame, int frame_len);
|
|
|
|
|
|
int peerid_comparator (void *user, peerid_t *val1, peerid_t *val2)
|
|
|
{
|
|
|
@@ -60,113 +55,6 @@ int peerid_comparator (void *user, peerid_t *val1, peerid_t *val2)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDest *dp, int num_packets, uint8_t *first_frame, int first_frame_len)
|
|
|
-{
|
|
|
- ASSERT(first_frame_len >= 0)
|
|
|
- ASSERT(first_frame_len <= dp->frame_mtu)
|
|
|
- ASSERT(!BAVL_LookupExact(&rs->relay_flows_tree, &dp))
|
|
|
- ASSERT(num_packets > 0)
|
|
|
- ASSERT(!dp->freeing)
|
|
|
-
|
|
|
- // allocate flow structure
|
|
|
- struct dp_relay_flow *flow = malloc(sizeof(struct dp_relay_flow));
|
|
|
- if (!flow) {
|
|
|
- BLog(BLOG_ERROR, "failed to allocate flow structure for relay flow from peer %d to %d", (int)rs->source_id, (int)dp->dest_id);
|
|
|
- goto fail0;
|
|
|
- }
|
|
|
-
|
|
|
- // init arguments
|
|
|
- flow->rs = rs;
|
|
|
- flow->dp = dp;
|
|
|
- flow->first_frame = first_frame;
|
|
|
- flow->first_frame_len = first_frame_len;
|
|
|
-
|
|
|
- // init first frame job
|
|
|
- BPending_Init(&flow->first_frame_job, BReactor_PendingGroup(dp->reactor), (BPending_handler)relay_job_handler, flow);
|
|
|
- BPending_Set(&flow->first_frame_job);
|
|
|
-
|
|
|
- // init queue flow
|
|
|
- PacketPassFairQueueFlow_Init(&flow->qflow, &dp->queue);
|
|
|
-
|
|
|
- // init inacitvity monitor
|
|
|
- PacketPassInactivityMonitor_Init(&flow->monitor, PacketPassFairQueueFlow_GetInput(&flow->qflow), dp->reactor, DATAPROTO_TIMEOUT, (PacketPassInactivityMonitor_handler)flow_monitor_handler, flow);
|
|
|
-
|
|
|
- // init async input
|
|
|
- BufferWriter_Init(&flow->ainput, dp->mtu, BReactor_PendingGroup(dp->reactor));
|
|
|
-
|
|
|
- // init buffer
|
|
|
- if (!PacketBuffer_Init(&flow->buffer, BufferWriter_GetOutput(&flow->ainput), PacketPassInactivityMonitor_GetInput(&flow->monitor), num_packets, BReactor_PendingGroup(dp->reactor))) {
|
|
|
- BLog(BLOG_ERROR, "PacketBuffer_Init failed for relay flow from peer %d to %d", (int)rs->source_id, (int)dp->dest_id);
|
|
|
- goto fail1;
|
|
|
- }
|
|
|
-
|
|
|
- // insert to source list
|
|
|
- LinkedList2_Append(&rs->relay_flows_list, &flow->source_list_node);
|
|
|
-
|
|
|
- // insert to source tree
|
|
|
- ASSERT_EXECUTE(BAVL_Insert(&rs->relay_flows_tree, &flow->source_tree_node, NULL))
|
|
|
-
|
|
|
- // insert to dp list
|
|
|
- LinkedList2_Append(&dp->relay_flows_list, &flow->dp_list_node);
|
|
|
-
|
|
|
- BLog(BLOG_NOTICE, "created relay flow from peer %d to %d", (int)rs->source_id, (int)dp->dest_id);
|
|
|
-
|
|
|
- return flow;
|
|
|
-
|
|
|
-fail1:
|
|
|
- BufferWriter_Free(&flow->ainput);
|
|
|
- PacketPassInactivityMonitor_Free(&flow->monitor);
|
|
|
- PacketPassFairQueueFlow_Free(&flow->qflow);
|
|
|
- BPending_Free(&flow->first_frame_job);
|
|
|
- free(flow);
|
|
|
-fail0:
|
|
|
- return NULL;
|
|
|
-}
|
|
|
-
|
|
|
-void free_relay_flow (struct dp_relay_flow *flow)
|
|
|
-{
|
|
|
- // release it if it's busy
|
|
|
- if (!flow->dp->freeing && PacketPassFairQueueFlow_IsBusy(&flow->qflow)) {
|
|
|
- PacketPassFairQueueFlow_Release(&flow->qflow);
|
|
|
- }
|
|
|
-
|
|
|
- // remove from dp list
|
|
|
- LinkedList2_Remove(&flow->dp->relay_flows_list, &flow->dp_list_node);
|
|
|
-
|
|
|
- // remove from source tree
|
|
|
- BAVL_Remove(&flow->rs->relay_flows_tree, &flow->source_tree_node);
|
|
|
-
|
|
|
- // remove from source list
|
|
|
- LinkedList2_Remove(&flow->rs->relay_flows_list, &flow->source_list_node);
|
|
|
-
|
|
|
- // free buffer
|
|
|
- PacketBuffer_Free(&flow->buffer);
|
|
|
-
|
|
|
- // free async input
|
|
|
- BufferWriter_Free(&flow->ainput);
|
|
|
-
|
|
|
- // free inacitvity monitor
|
|
|
- PacketPassInactivityMonitor_Free(&flow->monitor);
|
|
|
-
|
|
|
- // free queue flow
|
|
|
- PacketPassFairQueueFlow_Free(&flow->qflow);
|
|
|
-
|
|
|
- // free first frame job
|
|
|
- BPending_Free(&flow->first_frame_job);
|
|
|
-
|
|
|
- // free flow structure
|
|
|
- free(flow);
|
|
|
-}
|
|
|
-
|
|
|
-void flow_monitor_handler (struct dp_relay_flow *flow)
|
|
|
-{
|
|
|
- ASSERT(!flow->dp->freeing)
|
|
|
-
|
|
|
- BLog(BLOG_NOTICE, "relay flow from peer %d to %d timed out", (int)flow->rs->source_id, (int)flow->dp->dest_id);
|
|
|
-
|
|
|
- free_relay_flow(flow);
|
|
|
-}
|
|
|
-
|
|
|
void monitor_handler (DataProtoDest *o)
|
|
|
{
|
|
|
ASSERT(!o->freeing)
|
|
|
@@ -179,8 +67,6 @@ void send_keepalive (DataProtoDest *o)
|
|
|
{
|
|
|
ASSERT(!o->freeing)
|
|
|
|
|
|
- BLog(BLOG_DEBUG, "sending keepalive to peer %d", (int)o->dest_id);
|
|
|
-
|
|
|
PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
|
|
|
}
|
|
|
|
|
|
@@ -188,8 +74,6 @@ void receive_timer_handler (DataProtoDest *o)
|
|
|
{
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
- BLog(BLOG_DEBUG, "receive timer triggered for peer %d", (int)o->dest_id);
|
|
|
-
|
|
|
int prev_up = o->up;
|
|
|
|
|
|
// consider down
|
|
|
@@ -236,46 +120,6 @@ void keepalive_job_handler (DataProtoDest *o)
|
|
|
send_keepalive(o);
|
|
|
}
|
|
|
|
|
|
-void relay_job_handler (struct dp_relay_flow *flow)
|
|
|
-{
|
|
|
- ASSERT(flow->first_frame_len >= 0)
|
|
|
-
|
|
|
- int frame_len = flow->first_frame_len;
|
|
|
-
|
|
|
- // set no first frame
|
|
|
- flow->first_frame_len = -1;
|
|
|
-
|
|
|
- // submit first frame
|
|
|
- submit_relay_frame(flow, flow->first_frame, frame_len);
|
|
|
-}
|
|
|
-
|
|
|
-void submit_relay_frame (struct dp_relay_flow *flow, uint8_t *frame, int frame_len)
|
|
|
-{
|
|
|
- ASSERT(flow->first_frame_len == -1)
|
|
|
-
|
|
|
- // get a buffer
|
|
|
- uint8_t *out;
|
|
|
- // safe because of PacketBufferAsyncInput
|
|
|
- if (!BufferWriter_StartPacket(&flow->ainput, &out)) {
|
|
|
- BLog(BLOG_NOTICE, "out of buffer for relayed frame from peer %d to %d", (int)flow->rs->source_id, (int)flow->dp->dest_id);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // write header
|
|
|
- struct dataproto_header *header = (struct dataproto_header *)out;
|
|
|
- // don't set flags, it will be set in notifier_handler
|
|
|
- header->from_id = htol16(flow->rs->source_id);
|
|
|
- header->num_peer_ids = htol16(1);
|
|
|
- struct dataproto_peer_id *id = (struct dataproto_peer_id *)(out + sizeof(struct dataproto_header));
|
|
|
- id->id = htol16(flow->dp->dest_id);
|
|
|
-
|
|
|
- // write data
|
|
|
- memcpy(out + sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id), frame, frame_len);
|
|
|
-
|
|
|
- // submit it
|
|
|
- BufferWriter_EndPacket(&flow->ainput, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + frame_len);
|
|
|
-}
|
|
|
-
|
|
|
static void device_router_handler (DataProtoDevice *o, uint8_t *buf, int recv_len)
|
|
|
{
|
|
|
ASSERT(buf)
|
|
|
@@ -292,14 +136,13 @@ static void device_router_handler (DataProtoDevice *o, uint8_t *buf, int recv_le
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoDest_handler handler, void *user)
|
|
|
+int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoDest_handler handler, void *user)
|
|
|
{
|
|
|
ASSERT(PacketPassInterface_HasCancel(output))
|
|
|
ASSERT(PacketPassInterface_GetMTU(output) >= sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id))
|
|
|
|
|
|
// init arguments
|
|
|
o->reactor = reactor;
|
|
|
- o->dest_id = dest_id;
|
|
|
o->handler = handler;
|
|
|
o->user = user;
|
|
|
|
|
|
@@ -344,9 +187,6 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, P
|
|
|
// set not up
|
|
|
o->up = 0;
|
|
|
|
|
|
- // init relay flows list
|
|
|
- LinkedList2_Init(&o->relay_flows_list);
|
|
|
-
|
|
|
// set not freeing
|
|
|
o->freeing = 0;
|
|
|
|
|
|
@@ -378,16 +218,6 @@ void DataProtoDest_Free (DataProtoDest *o)
|
|
|
// allow freeing queue flows
|
|
|
PacketPassFairQueue_PrepareFree(&o->queue);
|
|
|
|
|
|
- // set freeing so free_relay_flow will not attempt releasing
|
|
|
- o->freeing = 1;
|
|
|
-
|
|
|
- // free relay flows
|
|
|
- LinkedList2Node *node;
|
|
|
- while (node = LinkedList2_GetFirst(&o->relay_flows_list)) {
|
|
|
- struct dp_relay_flow *flow = UPPER_OBJECT(node, struct dp_relay_flow, dp_list_node);
|
|
|
- free_relay_flow(flow);
|
|
|
- }
|
|
|
-
|
|
|
// free receive timer
|
|
|
BReactor_RemoveTimer(o->reactor, &o->receive_timer);
|
|
|
|
|
|
@@ -427,30 +257,6 @@ void DataProtoDest_PrepareFree (DataProtoDest *o)
|
|
|
o->freeing = 1;
|
|
|
}
|
|
|
|
|
|
-void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs, uint8_t *data, int data_len, int buffer_num_packets)
|
|
|
-{
|
|
|
- ASSERT(data_len >= 0)
|
|
|
- ASSERT(data_len <= o->frame_mtu)
|
|
|
- ASSERT(buffer_num_packets > 0)
|
|
|
- ASSERT(!o->freeing)
|
|
|
- DebugObject_Access(&rs->d_obj);
|
|
|
- DebugObject_Access(&o->d_obj);
|
|
|
-
|
|
|
- // lookup relay flow from source to this DataProto
|
|
|
- struct dp_relay_flow *flow;
|
|
|
- BAVLNode *node = BAVL_LookupExact(&rs->relay_flows_tree, &o);
|
|
|
- if (!node) {
|
|
|
- // create new flow
|
|
|
- create_relay_flow(rs, o, buffer_num_packets, data, data_len);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- flow = UPPER_OBJECT(node, struct dp_relay_flow, source_tree_node);
|
|
|
-
|
|
|
- // submit frame
|
|
|
- submit_relay_frame(flow, data, data_len);
|
|
|
-}
|
|
|
-
|
|
|
void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)
|
|
|
{
|
|
|
ASSERT(peer_receiving == 0 || peer_receiving == 1)
|
|
|
@@ -638,55 +444,3 @@ void DataProtoLocalSource_Detach (DataProtoLocalSource *o)
|
|
|
// set no DataProto
|
|
|
o->dp = NULL;
|
|
|
}
|
|
|
-
|
|
|
-void DataProtoRelaySource_Init (DataProtoRelaySource *o, peerid_t source_id)
|
|
|
-{
|
|
|
- // init arguments
|
|
|
- o->source_id = source_id;
|
|
|
-
|
|
|
- // init relay flows list
|
|
|
- LinkedList2_Init(&o->relay_flows_list);
|
|
|
-
|
|
|
- // init relay flows tree
|
|
|
- BAVL_Init(&o->relay_flows_tree, OFFSET_DIFF(struct dp_relay_flow, dp, source_tree_node), (BAVL_comparator)pointer_comparator, NULL);
|
|
|
-
|
|
|
- DebugObject_Init(&o->d_obj);
|
|
|
-}
|
|
|
-
|
|
|
-void DataProtoRelaySource_Free (DataProtoRelaySource *o)
|
|
|
-{
|
|
|
- ASSERT(BAVL_IsEmpty(&o->relay_flows_tree))
|
|
|
- ASSERT(LinkedList2_IsEmpty(&o->relay_flows_list))
|
|
|
- DebugObject_Free(&o->d_obj);
|
|
|
-}
|
|
|
-
|
|
|
-void DataProtoRelaySource_AssertFree (DataProtoRelaySource *o)
|
|
|
-{
|
|
|
- ASSERT(LinkedList2_IsEmpty(&o->relay_flows_list))
|
|
|
- DebugObject_Access(&o->d_obj);
|
|
|
-}
|
|
|
-
|
|
|
-void DataProtoRelaySource_Release (DataProtoRelaySource *o)
|
|
|
-{
|
|
|
- DebugObject_Access(&o->d_obj);
|
|
|
-
|
|
|
- LinkedList2Node *node;
|
|
|
- while (node = LinkedList2_GetFirst(&o->relay_flows_list)) {
|
|
|
- struct dp_relay_flow *flow = UPPER_OBJECT(node, struct dp_relay_flow, source_list_node);
|
|
|
-
|
|
|
- free_relay_flow(flow);
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-void DataProtoRelaySource_FreeRelease (DataProtoRelaySource *o)
|
|
|
-{
|
|
|
- DebugObject_Access(&o->d_obj);
|
|
|
-
|
|
|
- LinkedList2Node *node;
|
|
|
- while (node = LinkedList2_GetFirst(&o->relay_flows_list)) {
|
|
|
- struct dp_relay_flow *flow = UPPER_OBJECT(node, struct dp_relay_flow, source_list_node);
|
|
|
-
|
|
|
- DataProtoDest_PrepareFree(flow->dp);
|
|
|
- free_relay_flow(flow);
|
|
|
- }
|
|
|
-}
|