|
@@ -39,7 +39,7 @@ static void receive_timer_handler (DataProtoSink *o);
|
|
|
static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
|
|
static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
|
|
|
static void up_job_handler (DataProtoSink *o);
|
|
static void up_job_handler (DataProtoSink *o);
|
|
|
static void flow_buffer_free (struct DataProtoFlow_buffer *b);
|
|
static void flow_buffer_free (struct DataProtoFlow_buffer *b);
|
|
|
-static void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *dp);
|
|
|
|
|
|
|
+static void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink);
|
|
|
static void flow_buffer_detach (struct DataProtoFlow_buffer *b);
|
|
static void flow_buffer_detach (struct DataProtoFlow_buffer *b);
|
|
|
static void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b);
|
|
static void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b);
|
|
|
static void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b);
|
|
static void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b);
|
|
@@ -118,7 +118,7 @@ void source_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
|
|
|
|
|
|
|
|
void flow_buffer_free (struct DataProtoFlow_buffer *b)
|
|
void flow_buffer_free (struct DataProtoFlow_buffer *b)
|
|
|
{
|
|
{
|
|
|
- ASSERT(!b->dp)
|
|
|
|
|
|
|
+ ASSERT(!b->sink)
|
|
|
|
|
|
|
|
// free route buffer
|
|
// free route buffer
|
|
|
RouteBuffer_Free(&b->rbuf);
|
|
RouteBuffer_Free(&b->rbuf);
|
|
@@ -135,65 +135,65 @@ void flow_buffer_free (struct DataProtoFlow_buffer *b)
|
|
|
free(b);
|
|
free(b);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *dp)
|
|
|
|
|
|
|
+void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink)
|
|
|
{
|
|
{
|
|
|
- ASSERT(!b->dp)
|
|
|
|
|
|
|
+ ASSERT(!b->sink)
|
|
|
|
|
|
|
|
// init queue flow
|
|
// init queue flow
|
|
|
- PacketPassFairQueueFlow_Init(&b->dp_qflow, &dp->queue);
|
|
|
|
|
|
|
+ PacketPassFairQueueFlow_Init(&b->sink_qflow, &sink->queue);
|
|
|
|
|
|
|
|
// connect to queue flow
|
|
// connect to queue flow
|
|
|
- PacketPassConnector_ConnectOutput(&b->connector, PacketPassFairQueueFlow_GetInput(&b->dp_qflow));
|
|
|
|
|
|
|
+ PacketPassConnector_ConnectOutput(&b->connector, PacketPassFairQueueFlow_GetInput(&b->sink_qflow));
|
|
|
|
|
|
|
|
// set DataProto
|
|
// set DataProto
|
|
|
- b->dp = dp;
|
|
|
|
|
|
|
+ b->sink = sink;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void flow_buffer_detach (struct DataProtoFlow_buffer *b)
|
|
void flow_buffer_detach (struct DataProtoFlow_buffer *b)
|
|
|
{
|
|
{
|
|
|
- ASSERT(b->dp)
|
|
|
|
|
- PacketPassFairQueueFlow_AssertFree(&b->dp_qflow);
|
|
|
|
|
|
|
+ ASSERT(b->sink)
|
|
|
|
|
+ PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
|
|
|
|
|
|
|
|
// disconnect from queue flow
|
|
// disconnect from queue flow
|
|
|
PacketPassConnector_DisconnectOutput(&b->connector);
|
|
PacketPassConnector_DisconnectOutput(&b->connector);
|
|
|
|
|
|
|
|
// free queue flow
|
|
// free queue flow
|
|
|
- PacketPassFairQueueFlow_Free(&b->dp_qflow);
|
|
|
|
|
|
|
+ PacketPassFairQueueFlow_Free(&b->sink_qflow);
|
|
|
|
|
|
|
|
// clear reference to this buffer in the sink
|
|
// clear reference to this buffer in the sink
|
|
|
- if (b->dp->detaching_buffer == b) {
|
|
|
|
|
- b->dp->detaching_buffer = NULL;
|
|
|
|
|
|
|
+ if (b->sink->detaching_buffer == b) {
|
|
|
|
|
+ b->sink->detaching_buffer = NULL;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// set no DataProto
|
|
// set no DataProto
|
|
|
- b->dp = NULL;
|
|
|
|
|
|
|
+ b->sink = NULL;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b)
|
|
void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b)
|
|
|
{
|
|
{
|
|
|
- ASSERT(b->dp)
|
|
|
|
|
- ASSERT(PacketPassFairQueueFlow_IsBusy(&b->dp_qflow))
|
|
|
|
|
- ASSERT(!b->dp->detaching_buffer || b->dp->detaching_buffer == b)
|
|
|
|
|
|
|
+ ASSERT(b->sink)
|
|
|
|
|
+ ASSERT(PacketPassFairQueueFlow_IsBusy(&b->sink_qflow))
|
|
|
|
|
+ ASSERT(!b->sink->detaching_buffer || b->sink->detaching_buffer == b)
|
|
|
|
|
|
|
|
- if (b->dp->detaching_buffer == b) {
|
|
|
|
|
|
|
+ if (b->sink->detaching_buffer == b) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// request cancel
|
|
// request cancel
|
|
|
- PacketPassFairQueueFlow_RequestCancel(&b->dp_qflow);
|
|
|
|
|
|
|
+ PacketPassFairQueueFlow_RequestCancel(&b->sink_qflow);
|
|
|
|
|
|
|
|
// set busy handler
|
|
// set busy handler
|
|
|
- PacketPassFairQueueFlow_SetBusyHandler(&b->dp_qflow, (PacketPassFairQueue_handler_busy)flow_buffer_qflow_handler_busy, b);
|
|
|
|
|
|
|
+ PacketPassFairQueueFlow_SetBusyHandler(&b->sink_qflow, (PacketPassFairQueue_handler_busy)flow_buffer_qflow_handler_busy, b);
|
|
|
|
|
|
|
|
// remember this buffer in the sink so it can handle us if it goes away
|
|
// remember this buffer in the sink so it can handle us if it goes away
|
|
|
- b->dp->detaching_buffer = b;
|
|
|
|
|
|
|
+ b->sink->detaching_buffer = b;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b)
|
|
void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b)
|
|
|
{
|
|
{
|
|
|
- ASSERT(b->dp)
|
|
|
|
|
- ASSERT(b->dp->detaching_buffer == b)
|
|
|
|
|
- PacketPassFairQueueFlow_AssertFree(&b->dp_qflow);
|
|
|
|
|
|
|
+ ASSERT(b->sink)
|
|
|
|
|
+ ASSERT(b->sink->detaching_buffer == b)
|
|
|
|
|
+ PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
|
|
|
|
|
|
|
|
// detach
|
|
// detach
|
|
|
flow_buffer_detach(b);
|
|
flow_buffer_detach(b);
|
|
@@ -201,17 +201,17 @@ void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b)
|
|
|
if (!b->flow) {
|
|
if (!b->flow) {
|
|
|
// free
|
|
// free
|
|
|
flow_buffer_free(b);
|
|
flow_buffer_free(b);
|
|
|
- } else if (b->flow->dp_desired) {
|
|
|
|
|
|
|
+ } else if (b->flow->sink_desired) {
|
|
|
// attach
|
|
// attach
|
|
|
- flow_buffer_attach(b, b->flow->dp_desired);
|
|
|
|
|
|
|
+ flow_buffer_attach(b, b->flow->sink_desired);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b)
|
|
void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b)
|
|
|
{
|
|
{
|
|
|
- ASSERT(b->dp)
|
|
|
|
|
- ASSERT(b->dp->detaching_buffer == b)
|
|
|
|
|
- PacketPassFairQueueFlow_AssertFree(&b->dp_qflow);
|
|
|
|
|
|
|
+ ASSERT(b->sink)
|
|
|
|
|
+ ASSERT(b->sink->detaching_buffer == b)
|
|
|
|
|
+ PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
|
|
|
|
|
|
|
|
flow_buffer_finish_detach(b);
|
|
flow_buffer_finish_detach(b);
|
|
|
}
|
|
}
|
|
@@ -296,7 +296,7 @@ void DataProtoSink_Free (DataProtoSink *o)
|
|
|
|
|
|
|
|
// release detaching buffer
|
|
// release detaching buffer
|
|
|
if (o->detaching_buffer) {
|
|
if (o->detaching_buffer) {
|
|
|
- ASSERT(!o->detaching_buffer->flow || o->detaching_buffer->flow->dp_desired != o)
|
|
|
|
|
|
|
+ ASSERT(!o->detaching_buffer->flow || o->detaching_buffer->flow->sink_desired != o)
|
|
|
flow_buffer_finish_detach(o->detaching_buffer);
|
|
flow_buffer_finish_detach(o->detaching_buffer);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -396,7 +396,7 @@ int DataProtoFlow_Init (
|
|
|
o->dest_id = dest_id;
|
|
o->dest_id = dest_id;
|
|
|
|
|
|
|
|
// set no desired sink
|
|
// set no desired sink
|
|
|
- o->dp_desired = NULL;
|
|
|
|
|
|
|
+ o->sink_desired = NULL;
|
|
|
|
|
|
|
|
// allocate buffer structure
|
|
// allocate buffer structure
|
|
|
struct DataProtoFlow_buffer *b = malloc(sizeof(*b));
|
|
struct DataProtoFlow_buffer *b = malloc(sizeof(*b));
|
|
@@ -429,7 +429,7 @@ int DataProtoFlow_Init (
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// set no DataProto
|
|
// set no DataProto
|
|
|
- b->dp = NULL;
|
|
|
|
|
|
|
+ b->sink = NULL;
|
|
|
|
|
|
|
|
DebugCounter_Increment(&source->d_ctr);
|
|
DebugCounter_Increment(&source->d_ctr);
|
|
|
DebugObject_Init(&o->d_obj);
|
|
DebugObject_Init(&o->d_obj);
|
|
@@ -450,10 +450,10 @@ void DataProtoFlow_Free (DataProtoFlow *o)
|
|
|
DebugObject_Free(&o->d_obj);
|
|
DebugObject_Free(&o->d_obj);
|
|
|
DebugCounter_Decrement(&o->source->d_ctr);
|
|
DebugCounter_Decrement(&o->source->d_ctr);
|
|
|
struct DataProtoFlow_buffer *b = o->b;
|
|
struct DataProtoFlow_buffer *b = o->b;
|
|
|
- ASSERT(!o->dp_desired)
|
|
|
|
|
|
|
+ ASSERT(!o->sink_desired)
|
|
|
|
|
|
|
|
- if (b->dp) {
|
|
|
|
|
- if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
|
|
|
|
|
|
|
+ if (b->sink) {
|
|
|
|
|
+ if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
|
|
|
// schedule detach, free buffer after detach
|
|
// schedule detach, free buffer after detach
|
|
|
flow_buffer_schedule_detach(b);
|
|
flow_buffer_schedule_detach(b);
|
|
|
b->flow = NULL;
|
|
b->flow = NULL;
|
|
@@ -503,45 +503,45 @@ void DataProtoFlow_Route (DataProtoFlow *o, int more)
|
|
|
o->source->current_buf = (more ? next_buf : NULL);
|
|
o->source->current_buf = (more ? next_buf : NULL);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *dp)
|
|
|
|
|
|
|
+void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *sink)
|
|
|
{
|
|
{
|
|
|
DebugObject_Access(&o->d_obj);
|
|
DebugObject_Access(&o->d_obj);
|
|
|
- DebugObject_Access(&dp->d_obj);
|
|
|
|
|
- ASSERT(!o->dp_desired)
|
|
|
|
|
- ASSERT(dp)
|
|
|
|
|
- ASSERT(o->source->frame_mtu <= dp->frame_mtu)
|
|
|
|
|
|
|
+ DebugObject_Access(&sink->d_obj);
|
|
|
|
|
+ ASSERT(!o->sink_desired)
|
|
|
|
|
+ ASSERT(sink)
|
|
|
|
|
+ ASSERT(o->source->frame_mtu <= sink->frame_mtu)
|
|
|
struct DataProtoFlow_buffer *b = o->b;
|
|
struct DataProtoFlow_buffer *b = o->b;
|
|
|
|
|
|
|
|
- if (b->dp) {
|
|
|
|
|
- if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
|
|
|
|
|
|
|
+ if (b->sink) {
|
|
|
|
|
+ if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
|
|
|
// schedule detach and reattach
|
|
// schedule detach and reattach
|
|
|
flow_buffer_schedule_detach(b);
|
|
flow_buffer_schedule_detach(b);
|
|
|
} else {
|
|
} else {
|
|
|
// detach and reattach now
|
|
// detach and reattach now
|
|
|
flow_buffer_detach(b);
|
|
flow_buffer_detach(b);
|
|
|
- flow_buffer_attach(b, dp);
|
|
|
|
|
|
|
+ flow_buffer_attach(b, sink);
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
// attach
|
|
// attach
|
|
|
- flow_buffer_attach(b, dp);
|
|
|
|
|
|
|
+ flow_buffer_attach(b, sink);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// set desired sink
|
|
// set desired sink
|
|
|
- o->dp_desired = dp;
|
|
|
|
|
|
|
+ o->sink_desired = sink;
|
|
|
|
|
|
|
|
- DebugCounter_Increment(&dp->d_ctr);
|
|
|
|
|
|
|
+ DebugCounter_Increment(&sink->d_ctr);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void DataProtoFlow_Detach (DataProtoFlow *o)
|
|
void DataProtoFlow_Detach (DataProtoFlow *o)
|
|
|
{
|
|
{
|
|
|
DebugObject_Access(&o->d_obj);
|
|
DebugObject_Access(&o->d_obj);
|
|
|
- ASSERT(o->dp_desired)
|
|
|
|
|
|
|
+ ASSERT(o->sink_desired)
|
|
|
struct DataProtoFlow_buffer *b = o->b;
|
|
struct DataProtoFlow_buffer *b = o->b;
|
|
|
- ASSERT(b->dp)
|
|
|
|
|
|
|
+ ASSERT(b->sink)
|
|
|
|
|
|
|
|
- DataProtoSink *dp = o->dp_desired;
|
|
|
|
|
|
|
+ DataProtoSink *sink = o->sink_desired;
|
|
|
|
|
|
|
|
- if (PacketPassFairQueueFlow_IsBusy(&b->dp_qflow)) {
|
|
|
|
|
|
|
+ if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
|
|
|
// schedule detach
|
|
// schedule detach
|
|
|
flow_buffer_schedule_detach(b);
|
|
flow_buffer_schedule_detach(b);
|
|
|
} else {
|
|
} else {
|
|
@@ -550,7 +550,7 @@ void DataProtoFlow_Detach (DataProtoFlow *o)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// set no desired sink
|
|
// set no desired sink
|
|
|
- o->dp_desired = NULL;
|
|
|
|
|
|
|
+ o->sink_desired = NULL;
|
|
|
|
|
|
|
|
- DebugCounter_Decrement(&dp->d_ctr);
|
|
|
|
|
|
|
+ DebugCounter_Decrement(&sink->d_ctr);
|
|
|
}
|
|
}
|