|
|
@@ -39,8 +39,7 @@
|
|
|
struct dp_relay_flow {
|
|
|
DataProtoRelaySource *rs;
|
|
|
DataProtoDest *dp;
|
|
|
- BestEffortPacketWriteInterface *ainput_if;
|
|
|
- PacketBufferAsyncInput ainput;
|
|
|
+ BufferWriter ainput;
|
|
|
PacketBuffer buffer;
|
|
|
PacketPassInactivityMonitor monitor;
|
|
|
PacketPassFairQueueFlow qflow;
|
|
|
@@ -52,10 +51,10 @@ struct dp_relay_flow {
|
|
|
static int peerid_comparator (void *user, peerid_t *val1, peerid_t *val2);
|
|
|
static struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDest *dp, int num_packets);
|
|
|
static void dealloc_relay_flow (struct dp_relay_flow *flow);
|
|
|
-static int release_relay_flow (struct dp_relay_flow *flow);
|
|
|
+static void release_relay_flow (struct dp_relay_flow *flow);
|
|
|
static void flow_monitor_handler (struct dp_relay_flow *flow);
|
|
|
static void monitor_handler (DataProtoDest *o);
|
|
|
-static int send_keepalive (DataProtoDest *o);
|
|
|
+static void send_keepalive (DataProtoDest *o);
|
|
|
static void receive_timer_handler (DataProtoDest *o);
|
|
|
static void notifier_handler (DataProtoDest *o, uint8_t *data, int data_len);
|
|
|
static int pointer_comparator (void *user, void **val1, void **val2);
|
|
|
@@ -77,7 +76,6 @@ struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDes
|
|
|
ASSERT(!BAVL_LookupExact(&rs->relay_flows_tree, &dp))
|
|
|
ASSERT(num_packets > 0)
|
|
|
ASSERT(!dp->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(dp->d_output))
|
|
|
|
|
|
// allocate flow structure
|
|
|
struct dp_relay_flow *flow = malloc(sizeof(struct dp_relay_flow));
|
|
|
@@ -97,11 +95,10 @@ struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDes
|
|
|
PacketPassInactivityMonitor_Init(&flow->monitor, PacketPassFairQueueFlow_GetInput(&flow->qflow), dp->reactor, DATAPROTO_TIMEOUT, (PacketPassInactivityMonitor_handler)flow_monitor_handler, flow);
|
|
|
|
|
|
// init async input
|
|
|
- PacketBufferAsyncInput_Init(&flow->ainput, dp->mtu);
|
|
|
- flow->ainput_if = PacketBufferAsyncInput_GetInput(&flow->ainput);
|
|
|
+ BufferWriter_Init(&flow->ainput, dp->mtu, BReactor_PendingGroup(dp->reactor));
|
|
|
|
|
|
// init buffer
|
|
|
- if (!PacketBuffer_Init(&flow->buffer, PacketBufferAsyncInput_GetOutput(&flow->ainput), PacketPassInactivityMonitor_GetInput(&flow->monitor), num_packets, BReactor_PendingGroup(dp->reactor))) {
|
|
|
+ if (!PacketBuffer_Init(&flow->buffer, BufferWriter_GetOutput(&flow->ainput), PacketPassInactivityMonitor_GetInput(&flow->monitor), num_packets, BReactor_PendingGroup(dp->reactor))) {
|
|
|
BLog(BLOG_ERROR, "PacketBuffer_Init failed for relay flow from peer %d to %d", (int)rs->source_id, (int)dp->dest_id);
|
|
|
goto fail1;
|
|
|
}
|
|
|
@@ -120,7 +117,7 @@ struct dp_relay_flow * create_relay_flow (DataProtoRelaySource *rs, DataProtoDes
|
|
|
return flow;
|
|
|
|
|
|
fail1:
|
|
|
- PacketBufferAsyncInput_Free(&flow->ainput);
|
|
|
+ BufferWriter_Free(&flow->ainput);
|
|
|
PacketPassInactivityMonitor_Free(&flow->monitor);
|
|
|
PacketPassFairQueueFlow_Free(&flow->qflow);
|
|
|
free(flow);
|
|
|
@@ -133,12 +130,9 @@ void dealloc_relay_flow (struct dp_relay_flow *flow)
|
|
|
#ifndef NDEBUG
|
|
|
if (!flow->dp->d_freeing) {
|
|
|
ASSERT(!PacketPassFairQueueFlow_IsBusy(&flow->qflow))
|
|
|
- ASSERT(!PacketPassInterface_InClient(flow->dp->d_output))
|
|
|
}
|
|
|
#endif
|
|
|
|
|
|
- DataProtoDest *o = flow->dp;
|
|
|
-
|
|
|
// remove from dp list
|
|
|
LinkedList2_Remove(&flow->dp->relay_flows_list, &flow->dp_list_node);
|
|
|
|
|
|
@@ -152,7 +146,7 @@ void dealloc_relay_flow (struct dp_relay_flow *flow)
|
|
|
PacketBuffer_Free(&flow->buffer);
|
|
|
|
|
|
// free async input
|
|
|
- PacketBufferAsyncInput_Free(&flow->ainput);
|
|
|
+ BufferWriter_Free(&flow->ainput);
|
|
|
|
|
|
// free inacitvity monitor
|
|
|
PacketPassInactivityMonitor_Free(&flow->monitor);
|
|
|
@@ -164,63 +158,43 @@ void dealloc_relay_flow (struct dp_relay_flow *flow)
|
|
|
free(flow);
|
|
|
}
|
|
|
|
|
|
-int release_relay_flow (struct dp_relay_flow *flow)
|
|
|
+void release_relay_flow (struct dp_relay_flow *flow)
|
|
|
{
|
|
|
ASSERT(!flow->dp->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(flow->dp->d_output))
|
|
|
-
|
|
|
- DataProtoDest *o = flow->dp;
|
|
|
|
|
|
+ // release it if it's busy
|
|
|
if (PacketPassFairQueueFlow_IsBusy(&flow->qflow)) {
|
|
|
- // release it
|
|
|
- DEAD_ENTER(o->dead)
|
|
|
PacketPassFairQueueFlow_Release(&flow->qflow);
|
|
|
- if (DEAD_LEAVE(o->dead)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
// remove flow
|
|
|
dealloc_relay_flow(flow);
|
|
|
-
|
|
|
- return 0;
|
|
|
}
|
|
|
|
|
|
void flow_monitor_handler (struct dp_relay_flow *flow)
|
|
|
{
|
|
|
ASSERT(!flow->dp->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(flow->dp->d_output))
|
|
|
|
|
|
BLog(BLOG_NOTICE, "relay flow from peer %d to %d timed out", (int)flow->rs->source_id, (int)flow->dp->dest_id);
|
|
|
|
|
|
release_relay_flow(flow);
|
|
|
- return;
|
|
|
}
|
|
|
|
|
|
void monitor_handler (DataProtoDest *o)
|
|
|
{
|
|
|
ASSERT(!o->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->d_output))
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
send_keepalive(o);
|
|
|
- return;
|
|
|
}
|
|
|
|
|
|
-int send_keepalive (DataProtoDest *o)
|
|
|
+void send_keepalive (DataProtoDest *o)
|
|
|
{
|
|
|
ASSERT(!o->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->d_output))
|
|
|
|
|
|
BLog(BLOG_DEBUG, "sending keepalive to peer %d", (int)o->dest_id);
|
|
|
|
|
|
- DEAD_ENTER(o->dead)
|
|
|
PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
|
|
|
- if (DEAD_LEAVE(o->dead)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- return 0;
|
|
|
}
|
|
|
|
|
|
void receive_timer_handler (DataProtoDest *o)
|
|
|
@@ -269,12 +243,9 @@ int pointer_comparator (void *user, void **val1, void **val2)
|
|
|
|
|
|
void keepalive_job_handler (DataProtoDest *o)
|
|
|
{
|
|
|
- ASSERT(!o->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->d_output))
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
- send_keepalive(o);
|
|
|
- return;
|
|
|
+ PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
|
|
|
}
|
|
|
|
|
|
int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoDest_handler handler, void *user)
|
|
|
@@ -297,8 +268,12 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, P
|
|
|
// set frame MTU
|
|
|
o->frame_mtu = o->mtu - (sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id));
|
|
|
|
|
|
+ // schedule keep-alive (needs to be before the buffer)
|
|
|
+ BPending_Init(&o->keepalive_job, BReactor_PendingGroup(o->reactor), (BPending_handler)keepalive_job_handler, o);
|
|
|
+ BPending_Set(&o->keepalive_job);
|
|
|
+
|
|
|
// init notifier
|
|
|
- PacketPassNotifier_Init(&o->notifier, output);
|
|
|
+ PacketPassNotifier_Init(&o->notifier, output, BReactor_PendingGroup(o->reactor));
|
|
|
PacketPassNotifier_SetHandler(&o->notifier, (PacketPassNotifier_handler_notify)notifier_handler, o);
|
|
|
|
|
|
// init monitor
|
|
|
@@ -312,10 +287,10 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, P
|
|
|
PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue);
|
|
|
|
|
|
// init keepalive source
|
|
|
- DataProtoKeepaliveSource_Init(&o->ka_source);
|
|
|
+ DataProtoKeepaliveSource_Init(&o->ka_source, BReactor_PendingGroup(o->reactor));
|
|
|
|
|
|
// init keepalive blocker
|
|
|
- PacketRecvBlocker_Init(&o->ka_blocker, DataProtoKeepaliveSource_GetOutput(&o->ka_source));
|
|
|
+ PacketRecvBlocker_Init(&o->ka_blocker, DataProtoKeepaliveSource_GetOutput(&o->ka_source), BReactor_PendingGroup(o->reactor));
|
|
|
|
|
|
// init keepalive buffer
|
|
|
if (!SinglePacketBuffer_Init(&o->ka_buffer, PacketRecvBlocker_GetOutput(&o->ka_blocker), PacketPassFairQueueFlow_GetInput(&o->ka_qflow), BReactor_PendingGroup(o->reactor))) {
|
|
|
@@ -332,14 +307,7 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, P
|
|
|
// init relay flows list
|
|
|
LinkedList2_Init(&o->relay_flows_list);
|
|
|
|
|
|
- // init keepalive job
|
|
|
- BPending_Init(&o->keepalive_job, BReactor_PendingGroup(o->reactor), (BPending_handler)keepalive_job_handler, o);
|
|
|
- BPending_Set(&o->keepalive_job);
|
|
|
-
|
|
|
- // init flows counter
|
|
|
DebugCounter_Init(&o->flows_counter);
|
|
|
-
|
|
|
- // init debug object
|
|
|
DebugObject_Init(&o->d_obj);
|
|
|
|
|
|
#ifndef NDEBUG
|
|
|
@@ -356,6 +324,7 @@ fail0:
|
|
|
PacketPassFairQueue_Free(&o->queue);
|
|
|
PacketPassInactivityMonitor_Free(&o->monitor);
|
|
|
PacketPassNotifier_Free(&o->notifier);
|
|
|
+ BPending_Free(&o->keepalive_job);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -364,9 +333,6 @@ void DataProtoDest_Free (DataProtoDest *o)
|
|
|
DebugCounter_Free(&o->flows_counter);
|
|
|
DebugObject_Free(&o->d_obj);
|
|
|
|
|
|
- // free keepalive job
|
|
|
- BPending_Free(&o->keepalive_job);
|
|
|
-
|
|
|
// allow freeing queue flows
|
|
|
PacketPassFairQueue_PrepareFree(&o->queue);
|
|
|
|
|
|
@@ -401,6 +367,9 @@ void DataProtoDest_Free (DataProtoDest *o)
|
|
|
// free notifier
|
|
|
PacketPassNotifier_Free(&o->notifier);
|
|
|
|
|
|
+ // free keepalive job
|
|
|
+ BPending_Free(&o->keepalive_job);
|
|
|
+
|
|
|
// free dead var
|
|
|
DEAD_KILL(o->dead);
|
|
|
}
|
|
|
@@ -423,7 +392,6 @@ void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs,
|
|
|
ASSERT(data_len <= o->frame_mtu)
|
|
|
ASSERT(buffer_num_packets > 0)
|
|
|
ASSERT(!o->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->d_output))
|
|
|
DebugObject_Access(&rs->d_obj);
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
@@ -442,7 +410,7 @@ void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs,
|
|
|
// get a buffer
|
|
|
uint8_t *out;
|
|
|
// safe because of PacketBufferAsyncInput
|
|
|
- if (!BestEffortPacketWriteInterface_Sender_StartPacket(flow->ainput_if, &out)) {
|
|
|
+ if (!BufferWriter_StartPacket(&flow->ainput, &out)) {
|
|
|
BLog(BLOG_NOTICE, "out of buffer for relayed frame from peer %d to %d", (int)rs->source_id, (int)o->dest_id);
|
|
|
return;
|
|
|
}
|
|
|
@@ -459,15 +427,13 @@ void DataProtoDest_SubmitRelayFrame (DataProtoDest *o, DataProtoRelaySource *rs,
|
|
|
memcpy(out + sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id), data, data_len);
|
|
|
|
|
|
// submit it
|
|
|
- BestEffortPacketWriteInterface_Sender_EndPacket(flow->ainput_if, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + data_len);
|
|
|
- return;
|
|
|
+ BufferWriter_EndPacket(&flow->ainput, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + data_len);
|
|
|
}
|
|
|
|
|
|
void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)
|
|
|
{
|
|
|
ASSERT(peer_receiving == 0 || peer_receiving == 1)
|
|
|
ASSERT(!o->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->d_output))
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
int prev_up = o->up;
|
|
|
@@ -479,9 +445,7 @@ void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)
|
|
|
// peer reports not receiving, consider down
|
|
|
o->up = 0;
|
|
|
// send keep-alive to converge faster
|
|
|
- if (send_keepalive(o) < 0) {
|
|
|
- return;
|
|
|
- }
|
|
|
+ send_keepalive(o);
|
|
|
} else {
|
|
|
// consider up
|
|
|
o->up = 1;
|
|
|
@@ -515,11 +479,10 @@ int DataProtoLocalSource_Init (DataProtoLocalSource *o, int frame_mtu, peerid_t
|
|
|
PacketPassConnector_Init(&o->connector, packet_mtu, BReactor_PendingGroup(reactor));
|
|
|
|
|
|
// init async input
|
|
|
- PacketBufferAsyncInput_Init(&o->ainput, packet_mtu);
|
|
|
- o->ainput_if = PacketBufferAsyncInput_GetInput(&o->ainput);
|
|
|
+ BufferWriter_Init(&o->ainput, packet_mtu, BReactor_PendingGroup(reactor));
|
|
|
|
|
|
// init buffer
|
|
|
- if (!PacketBuffer_Init(&o->buffer, PacketBufferAsyncInput_GetOutput(&o->ainput), PacketPassConnector_GetInput(&o->connector), num_packets, BReactor_PendingGroup(reactor))) {
|
|
|
+ if (!PacketBuffer_Init(&o->buffer, BufferWriter_GetOutput(&o->ainput), PacketPassConnector_GetInput(&o->connector), num_packets, BReactor_PendingGroup(reactor))) {
|
|
|
BLog(BLOG_ERROR, "PacketBuffer_Init failed");
|
|
|
goto fail1;
|
|
|
}
|
|
|
@@ -527,13 +490,12 @@ int DataProtoLocalSource_Init (DataProtoLocalSource *o, int frame_mtu, peerid_t
|
|
|
// set no DataProto
|
|
|
o->dp = NULL;
|
|
|
|
|
|
- // init debug object
|
|
|
DebugObject_Init(&o->d_obj);
|
|
|
|
|
|
return 1;
|
|
|
|
|
|
fail1:
|
|
|
- PacketBufferAsyncInput_Free(&o->ainput);
|
|
|
+ BufferWriter_Free(&o->ainput);
|
|
|
PacketPassConnector_Free(&o->connector);
|
|
|
fail0:
|
|
|
return 0;
|
|
|
@@ -548,7 +510,7 @@ void DataProtoLocalSource_Free (DataProtoLocalSource *o)
|
|
|
PacketBuffer_Free(&o->buffer);
|
|
|
|
|
|
// free async input
|
|
|
- PacketBufferAsyncInput_Free(&o->ainput);
|
|
|
+ BufferWriter_Free(&o->ainput);
|
|
|
|
|
|
// free connector
|
|
|
PacketPassConnector_Free(&o->connector);
|
|
|
@@ -564,14 +526,13 @@ void DataProtoLocalSource_SubmitFrame (DataProtoLocalSource *o, uint8_t *data, i
|
|
|
if (o->dp) {
|
|
|
ASSERT(!o->d_dp_released)
|
|
|
ASSERT(!o->dp->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->dp->d_output))
|
|
|
}
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
// get a buffer
|
|
|
uint8_t *out;
|
|
|
// safe because of PacketBufferAsyncInput
|
|
|
- if (!BestEffortPacketWriteInterface_Sender_StartPacket(o->ainput_if, &out)) {
|
|
|
+ if (!BufferWriter_StartPacket(&o->ainput, &out)) {
|
|
|
BLog(BLOG_NOTICE, "out of buffer for frame from peer %d to %d", (int)o->source_id, (int)o->dest_id);
|
|
|
return;
|
|
|
}
|
|
|
@@ -588,8 +549,7 @@ void DataProtoLocalSource_SubmitFrame (DataProtoLocalSource *o, uint8_t *data, i
|
|
|
memcpy(out + sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id), data, data_len);
|
|
|
|
|
|
// submit it
|
|
|
- BestEffortPacketWriteInterface_Sender_EndPacket(o->ainput_if, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + data_len);
|
|
|
- return;
|
|
|
+ BufferWriter_EndPacket(&o->ainput, sizeof(struct dataproto_header) + sizeof(struct dataproto_peer_id) + data_len);
|
|
|
}
|
|
|
|
|
|
void DataProtoLocalSource_Attach (DataProtoLocalSource *o, DataProtoDest *dp)
|
|
|
@@ -598,7 +558,6 @@ void DataProtoLocalSource_Attach (DataProtoLocalSource *o, DataProtoDest *dp)
|
|
|
ASSERT(!o->dp)
|
|
|
ASSERT(o->frame_mtu <= dp->frame_mtu)
|
|
|
ASSERT(!dp->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(dp->d_output))
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
DebugObject_Access(&dp->d_obj);
|
|
|
|
|
|
@@ -624,15 +583,10 @@ void DataProtoLocalSource_Release (DataProtoLocalSource *o)
|
|
|
ASSERT(o->dp)
|
|
|
ASSERT(!o->d_dp_released)
|
|
|
ASSERT(!o->dp->d_freeing)
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->dp->d_output))
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
if (PacketPassFairQueueFlow_IsBusy(&o->dp_qflow)) {
|
|
|
- DEAD_ENTER(o->dead)
|
|
|
PacketPassFairQueueFlow_Release(&o->dp_qflow);
|
|
|
- if (DEAD_LEAVE(o->dead)) {
|
|
|
- return;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
#ifndef NDEBUG
|
|
|
@@ -645,9 +599,6 @@ void DataProtoLocalSource_Detach (DataProtoLocalSource *o)
|
|
|
#ifndef NDEBUG
|
|
|
ASSERT(o->dp)
|
|
|
ASSERT(o->d_dp_released || o->dp->d_freeing)
|
|
|
- if (!o->dp->d_freeing) {
|
|
|
- ASSERT(!PacketPassInterface_InClient(o->dp->d_output))
|
|
|
- }
|
|
|
#endif
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
@@ -680,7 +631,6 @@ void DataProtoRelaySource_Init (DataProtoRelaySource *o, peerid_t source_id)
|
|
|
// init relay flows tree
|
|
|
BAVL_Init(&o->relay_flows_tree, OFFSET_DIFF(struct dp_relay_flow, dp, source_tree_node), (BAVL_comparator)pointer_comparator, NULL);
|
|
|
|
|
|
- // init debug object
|
|
|
DebugObject_Init(&o->d_obj);
|
|
|
}
|
|
|
|
|
|
@@ -709,11 +659,7 @@ void DataProtoRelaySource_Release (DataProtoRelaySource *o)
|
|
|
while (node = LinkedList2_GetFirst(&o->relay_flows_list)) {
|
|
|
struct dp_relay_flow *flow = UPPER_OBJECT(node, struct dp_relay_flow, source_list_node);
|
|
|
|
|
|
- DEAD_ENTER(o->dead)
|
|
|
release_relay_flow(flow);
|
|
|
- if (DEAD_LEAVE(o->dead)) {
|
|
|
- return;
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|