|
@@ -352,6 +352,7 @@ void DataProtoSink_Received (DataProtoSink *o, int peer_receiving)
|
|
|
int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataProtoSource_handler handler, void *user, BReactor *reactor)
|
|
int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataProtoSource_handler handler, void *user, BReactor *reactor)
|
|
|
{
|
|
{
|
|
|
ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
|
|
ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
|
|
|
|
|
+ ASSERT(handler)
|
|
|
|
|
|
|
|
// init arguments
|
|
// init arguments
|
|
|
o->handler = handler;
|
|
o->handler = handler;
|
|
@@ -363,6 +364,7 @@ int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataPr
|
|
|
|
|
|
|
|
// init router
|
|
// init router
|
|
|
if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)source_router_handler, o, BReactor_PendingGroup(reactor))) {
|
|
if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)source_router_handler, o, BReactor_PendingGroup(reactor))) {
|
|
|
|
|
+ BLog(BLOG_ERROR, "PacketRouter_Init failed");
|
|
|
goto fail0;
|
|
goto fail0;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -383,12 +385,12 @@ void DataProtoSource_Free (DataProtoSource *o)
|
|
|
PacketRouter_Free(&o->router);
|
|
PacketRouter_Free(&o->router);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-int DataProtoFlow_Init (
|
|
|
|
|
- DataProtoFlow *o, DataProtoSource *source, peerid_t source_id, peerid_t dest_id, int num_packets,
|
|
|
|
|
- int inactivity_time, DataProtoFlow_handler_inactivity handler_inactivity, void *user
|
|
|
|
|
-)
|
|
|
|
|
|
|
+int DataProtoFlow_Init (DataProtoFlow *o, DataProtoSource *source, peerid_t source_id, peerid_t dest_id, int num_packets, int inactivity_time, void *user,
|
|
|
|
|
+ DataProtoFlow_handler_inactivity handler_inactivity)
|
|
|
{
|
|
{
|
|
|
|
|
+ DebugObject_Access(&source->d_obj);
|
|
|
ASSERT(num_packets > 0)
|
|
ASSERT(num_packets > 0)
|
|
|
|
|
+ ASSERT(!(inactivity_time >= 0) || handler_inactivity)
|
|
|
|
|
|
|
|
// init arguments
|
|
// init arguments
|
|
|
o->source = source;
|
|
o->source = source;
|
|
@@ -428,7 +430,7 @@ int DataProtoFlow_Init (
|
|
|
goto fail1;
|
|
goto fail1;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // set no DataProto
|
|
|
|
|
|
|
+ // set no sink
|
|
|
b->sink = NULL;
|
|
b->sink = NULL;
|
|
|
|
|
|
|
|
DebugCounter_Increment(&source->d_ctr);
|
|
DebugCounter_Increment(&source->d_ctr);
|
|
@@ -449,8 +451,8 @@ void DataProtoFlow_Free (DataProtoFlow *o)
|
|
|
{
|
|
{
|
|
|
DebugObject_Free(&o->d_obj);
|
|
DebugObject_Free(&o->d_obj);
|
|
|
DebugCounter_Decrement(&o->source->d_ctr);
|
|
DebugCounter_Decrement(&o->source->d_ctr);
|
|
|
- struct DataProtoFlow_buffer *b = o->b;
|
|
|
|
|
ASSERT(!o->sink_desired)
|
|
ASSERT(!o->sink_desired)
|
|
|
|
|
+ struct DataProtoFlow_buffer *b = o->b;
|
|
|
|
|
|
|
|
if (b->sink) {
|
|
if (b->sink) {
|
|
|
if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
|
|
if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
|
|
@@ -491,9 +493,8 @@ void DataProtoFlow_Route (DataProtoFlow *o, int more)
|
|
|
|
|
|
|
|
// route
|
|
// route
|
|
|
uint8_t *next_buf;
|
|
uint8_t *next_buf;
|
|
|
- if (!PacketRouter_Route(
|
|
|
|
|
- &o->source->router, DATAPROTO_MAX_OVERHEAD + o->source->current_recv_len, &b->rbuf,
|
|
|
|
|
- &next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->source->current_recv_len : 0)
|
|
|
|
|
|
|
+ if (!PacketRouter_Route(&o->source->router, DATAPROTO_MAX_OVERHEAD + o->source->current_recv_len, &b->rbuf,
|
|
|
|
|
+ &next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->source->current_recv_len : 0)
|
|
|
)) {
|
|
)) {
|
|
|
BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id);
|
|
BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id);
|
|
|
return;
|
|
return;
|