|
|
@@ -34,7 +34,6 @@
|
|
|
#include <generated/blog_channel_DataProto.h>
|
|
|
|
|
|
static void monitor_handler (DataProtoSink *o);
|
|
|
-static void send_keepalive (DataProtoSink *o);
|
|
|
static void refresh_up_job (DataProtoSink *o);
|
|
|
static void receive_timer_handler (DataProtoSink *o);
|
|
|
static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
|
|
|
@@ -50,11 +49,7 @@ void monitor_handler (DataProtoSink *o)
|
|
|
{
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
- send_keepalive(o);
|
|
|
-}
|
|
|
-
|
|
|
-void send_keepalive (DataProtoSink *o)
|
|
|
-{
|
|
|
+ // send keep-alive
|
|
|
PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
|
|
|
}
|
|
|
|
|
|
@@ -79,8 +74,8 @@ void receive_timer_handler (DataProtoSink *o)
|
|
|
|
|
|
void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len)
|
|
|
{
|
|
|
- ASSERT(data_len >= sizeof(struct dataproto_header))
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
+ ASSERT(data_len >= sizeof(struct dataproto_header))
|
|
|
|
|
|
int flags = 0;
|
|
|
|
|
|
@@ -96,8 +91,8 @@ void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len)
|
|
|
|
|
|
void up_job_handler (DataProtoSink *o)
|
|
|
{
|
|
|
- ASSERT(o->up != o->up_report)
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
+ ASSERT(o->up != o->up_report)
|
|
|
|
|
|
o->up_report = o->up;
|
|
|
|
|
|
@@ -107,10 +102,10 @@ void up_job_handler (DataProtoSink *o)
|
|
|
|
|
|
void source_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
|
|
|
{
|
|
|
+ DebugObject_Access(&o->d_obj);
|
|
|
ASSERT(buf)
|
|
|
ASSERT(recv_len >= 0)
|
|
|
ASSERT(recv_len <= o->frame_mtu)
|
|
|
- DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
// remember packet
|
|
|
o->current_buf = buf;
|
|
|
@@ -293,8 +288,8 @@ fail1:
|
|
|
|
|
|
void DataProtoSink_Free (DataProtoSink *o)
|
|
|
{
|
|
|
- DebugCounter_Free(&o->d_ctr);
|
|
|
DebugObject_Free(&o->d_obj);
|
|
|
+ DebugCounter_Free(&o->d_ctr);
|
|
|
|
|
|
// allow freeing queue flows
|
|
|
PacketPassFairQueue_PrepareFree(&o->queue);
|
|
|
@@ -345,7 +340,7 @@ void DataProtoSink_Received (DataProtoSink *o, int peer_receiving)
|
|
|
// peer reports not receiving, consider down
|
|
|
o->up = 0;
|
|
|
// send keep-alive to converge faster
|
|
|
- send_keepalive(o);
|
|
|
+ PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
|
|
|
} else {
|
|
|
// consider up
|
|
|
o->up = 1;
|
|
|
@@ -368,22 +363,21 @@ int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataPr
|
|
|
|
|
|
// init router
|
|
|
if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)source_router_handler, o, BReactor_PendingGroup(reactor))) {
|
|
|
- goto fail1;
|
|
|
+ goto fail0;
|
|
|
}
|
|
|
|
|
|
- DebugObject_Init(&o->d_obj);
|
|
|
DebugCounter_Init(&o->d_ctr);
|
|
|
-
|
|
|
+ DebugObject_Init(&o->d_obj);
|
|
|
return 1;
|
|
|
|
|
|
-fail1:
|
|
|
+fail0:
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
void DataProtoSource_Free (DataProtoSource *o)
|
|
|
{
|
|
|
- DebugCounter_Free(&o->d_ctr);
|
|
|
DebugObject_Free(&o->d_obj);
|
|
|
+ DebugCounter_Free(&o->d_ctr);
|
|
|
|
|
|
// free router
|
|
|
PacketRouter_Free(&o->router);
|
|
|
@@ -437,9 +431,8 @@ int DataProtoFlow_Init (
|
|
|
// set no DataProto
|
|
|
b->dp = NULL;
|
|
|
|
|
|
- DebugObject_Init(&o->d_obj);
|
|
|
DebugCounter_Increment(&source->d_ctr);
|
|
|
-
|
|
|
+ DebugObject_Init(&o->d_obj);
|
|
|
return 1;
|
|
|
|
|
|
fail1:
|
|
|
@@ -454,10 +447,10 @@ fail0:
|
|
|
|
|
|
void DataProtoFlow_Free (DataProtoFlow *o)
|
|
|
{
|
|
|
+ DebugObject_Free(&o->d_obj);
|
|
|
+ DebugCounter_Decrement(&o->source->d_ctr);
|
|
|
struct DataProtoFlow_buffer *b = o->b;
|
|
|
ASSERT(!o->dp_desired)
|
|
|
- DebugCounter_Decrement(&o->source->d_ctr);
|
|
|
- DebugObject_Free(&o->d_obj);
|
|
|
|
|
|
if (b->dp) {
|
|
|
if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
|
|
|
@@ -482,11 +475,11 @@ void DataProtoFlow_Free (DataProtoFlow *o)
|
|
|
|
|
|
void DataProtoFlow_Route (DataProtoFlow *o, int more)
|
|
|
{
|
|
|
- struct DataProtoFlow_buffer *b = o->b;
|
|
|
- ASSERT(more == 0 || more == 1)
|
|
|
+ DebugObject_Access(&o->d_obj);
|
|
|
PacketRouter_AssertRoute(&o->source->router);
|
|
|
ASSERT(o->source->current_buf)
|
|
|
- DebugObject_Access(&o->d_obj);
|
|
|
+ ASSERT(more == 0 || more == 1)
|
|
|
+ struct DataProtoFlow_buffer *b = o->b;
|
|
|
|
|
|
// write header
|
|
|
struct dataproto_header *header = (struct dataproto_header *)o->source->current_buf;
|
|
|
@@ -506,17 +499,18 @@ void DataProtoFlow_Route (DataProtoFlow *o, int more)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ // remember next buffer, or don't allow further routing if more==0
|
|
|
o->source->current_buf = (more ? next_buf : NULL);
|
|
|
}
|
|
|
|
|
|
void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *dp)
|
|
|
{
|
|
|
- struct DataProtoFlow_buffer *b = o->b;
|
|
|
- ASSERT(dp)
|
|
|
- ASSERT(!o->dp_desired)
|
|
|
- ASSERT(o->source->frame_mtu <= dp->frame_mtu)
|
|
|
DebugObject_Access(&o->d_obj);
|
|
|
DebugObject_Access(&dp->d_obj);
|
|
|
+ ASSERT(!o->dp_desired)
|
|
|
+ ASSERT(dp)
|
|
|
+ ASSERT(o->source->frame_mtu <= dp->frame_mtu)
|
|
|
+ struct DataProtoFlow_buffer *b = o->b;
|
|
|
|
|
|
if (b->dp) {
|
|
|
if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
|
|
|
@@ -540,10 +534,10 @@ void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *dp)
|
|
|
|
|
|
void DataProtoFlow_Detach (DataProtoFlow *o)
|
|
|
{
|
|
|
- struct DataProtoFlow_buffer *b = o->b;
|
|
|
+ DebugObject_Access(&o->d_obj);
|
|
|
ASSERT(o->dp_desired)
|
|
|
+ struct DataProtoFlow_buffer *b = o->b;
|
|
|
ASSERT(b->dp)
|
|
|
- DebugObject_Access(&o->d_obj);
|
|
|
|
|
|
DataProtoSink *dp = o->dp_desired;
|
|
|
|