ambrop7 15 лет назад
Родитель
Сommit
c2167ac4b0

+ 1 - 2
client/DataProto.c

@@ -280,8 +280,7 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, P
     PacketPassInactivityMonitor_Init(&o->monitor, PacketPassNotifier_GetInput(&o->notifier), o->reactor, keepalive_time, (PacketPassInactivityMonitor_handler)monitor_handler, o);
     
     // init queue
-    PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor));
-    PacketPassFairQueue_EnableCancel(&o->queue);
+    PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1);
     
     // init keepalive queue flow
     PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue);

+ 1 - 2
client/client.c

@@ -558,8 +558,7 @@ int main (int argc, char *argv[])
     }
     
     // init device output
-    PacketPassFairQueue_Init(&device.output_queue, BTap_GetInput(&device.btap), BReactor_PendingGroup(&ss));
-    PacketPassFairQueue_EnableCancel(&device.output_queue);
+    PacketPassFairQueue_Init(&device.output_queue, BTap_GetInput(&device.btap), BReactor_PendingGroup(&ss), 1);
     
     // calculate data MTU
     data_mtu = DATAPROTO_MAX_OVERHEAD + device.mtu;

+ 1 - 2
examples/fairqueue_test.c

@@ -96,8 +96,7 @@ int main ()
     TimerPacketSink_Init(&sink, &reactor, 500, OUTPUT_INTERVAL);
     
     // initialize queue
-    PacketPassFairQueue_Init(&fq, TimerPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor));
-    PacketPassFairQueue_EnableCancel(&fq);
+    PacketPassFairQueue_Init(&fq, TimerPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor), 1);
     
     // initialize inputs
     for (int i = 0; i < NUM_INPUTS; i++) {

+ 1 - 1
examples/fairqueue_test2.c

@@ -53,7 +53,7 @@ int main ()
     
     // initialize queue
     PacketPassFairQueue fq;
-    PacketPassFairQueue_Init(&fq, RandomPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor));
+    PacketPassFairQueue_Init(&fq, RandomPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor), 0);
     
     // initialize source 1
     PacketPassFairQueueFlow flow1;

+ 2 - 0
flow/BufferWriter.c

@@ -87,6 +87,8 @@ int BufferWriter_StartPacket (BufferWriter *o, uint8_t **buf)
 
 void BufferWriter_EndPacket (BufferWriter *o, int len)
 {
+    ASSERT(len >= 0)
+    ASSERT(len <= o->d_mtu)
     ASSERT(o->out_have)
     ASSERT(o->d_writing)
     DebugObject_Access(&o->d_obj);

+ 1 - 1
flow/KeepaliveIO.c

@@ -43,7 +43,7 @@ int KeepaliveIO_Init (KeepaliveIO *o, BReactor *reactor, PacketPassInterface *ou
     PacketPassInactivityMonitor_Init(&o->kasender, output, o->reactor, keepalive_interval_ms, (PacketPassInactivityMonitor_handler)keepalive_handler, o);
     
     // init queue
-    PacketPassPriorityQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->kasender), BReactor_PendingGroup(o->reactor));
+    PacketPassPriorityQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->kasender), BReactor_PendingGroup(o->reactor), 0);
     
     // init keepalive flow
     PacketPassPriorityQueueFlow_Init(&o->ka_qflow, &o->queue, -1);

+ 10 - 14
flow/PacketPassFairQueue.c

@@ -213,8 +213,13 @@ static void output_handler_done (PacketPassFairQueue *m)
     }
 }
 
-void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg)
+void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel)
 {
+    ASSERT(use_cancel == 0 || use_cancel == 1)
+    if (use_cancel) {
+        ASSERT(PacketPassInterface_HasCancel(output))
+    }
+    
     // init arguments
     m->output = output;
     m->pg = pg;
@@ -237,8 +242,8 @@ void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *outp
     // not freeing
     m->freeing = 0;
     
-    // not using cancel
-    m->use_cancel = 0;
+    // set if using cancel
+    m->use_cancel = use_cancel;
     
     // init schedule job
     BPending_Init(&m->schedule_job, m->pg, (BPending_handler)schedule_job_handler, m);
@@ -249,8 +254,9 @@ void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *outp
 
 void PacketPassFairQueue_Free (PacketPassFairQueue *m)
 {
-    ASSERT(!BHeap_GetFirst(&m->queued_heap))
     ASSERT(LinkedList2_IsEmpty(&m->queued_list))
+    ASSERT(!BHeap_GetFirst(&m->queued_heap))
+    ASSERT(!m->previous_flow)
     ASSERT(!m->sending_flow)
     DebugCounter_Free(&m->d_ctr);
     DebugObject_Free(&m->d_obj);
@@ -259,16 +265,6 @@ void PacketPassFairQueue_Free (PacketPassFairQueue *m)
     BPending_Free(&m->schedule_job);
 }
 
-void PacketPassFairQueue_EnableCancel (PacketPassFairQueue *m)
-{
-    ASSERT(!m->use_cancel)
-    ASSERT(PacketPassInterface_HasCancel(m->output))
-    DebugObject_Access(&m->d_obj);
-    
-    // using cancel
-    m->use_cancel = 1;
-}
-
 void PacketPassFairQueue_PrepareFree (PacketPassFairQueue *m)
 {
     DebugObject_Access(&m->d_obj);

+ 4 - 10
flow/PacketPassFairQueue.h

@@ -46,8 +46,8 @@ struct PacketPassFairQueueFlow_s;
 typedef struct {
     PacketPassInterface *output;
     struct PacketPassFairQueueFlow_s *sending_flow;
-    struct PacketPassFairQueueFlow_s *previous_flow;
     int sending_len;
+    struct PacketPassFairQueueFlow_s *previous_flow;
     BHeap queued_heap;
     LinkedList2 queued_list;
     int freeing;
@@ -81,8 +81,10 @@ typedef struct PacketPassFairQueueFlow_s {
  * @param m the object
  * @param output output interface
  * @param pg pending group
+ * @param use_cancel whether cancel functionality is required. Must be 0 or 1.
+ *                   If 1, output must support cancel functionality.
  */
-void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg);
+void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel);
 
 /**
  * Frees the queue.
@@ -92,14 +94,6 @@ void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *outp
  */
 void PacketPassFairQueue_Free (PacketPassFairQueue *m);
 
-/**
- * Enables cancel functionality.
- * This allows freeing flows even if they're busy by releasing them.
- * Output must support {@link PacketPassInterface} cancel functionality.
- * May only be called once.
- */
-void PacketPassFairQueue_EnableCancel (PacketPassFairQueue *m);
-
 /**
  * Prepares for freeing the entire queue. Must be called to allow freeing
  * the flows in the process of freeing the entire queue.

+ 8 - 13
flow/PacketPassPriorityQueue.c

@@ -120,8 +120,13 @@ static void output_handler_done (PacketPassPriorityQueue *m)
     }
 }
 
-void PacketPassPriorityQueue_Init (PacketPassPriorityQueue *m, PacketPassInterface *output, BPendingGroup *pg)
+void PacketPassPriorityQueue_Init (PacketPassPriorityQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel)
 {
+    ASSERT(use_cancel == 0 || use_cancel == 1)
+    if (use_cancel) {
+        ASSERT(PacketPassInterface_HasCancel(output))
+    }
+    
     // init arguments
     m->output = output;
     m->pg = pg;
@@ -138,8 +143,8 @@ void PacketPassPriorityQueue_Init (PacketPassPriorityQueue *m, PacketPassInterfa
     // not freeing
     m->freeing = 0;
     
-    // not using cancel
-    m->use_cancel = 0;
+    // set if using cancel
+    m->use_cancel = use_cancel;
     
     // init schedule job
     BPending_Init(&m->schedule_job, m->pg, (BPending_handler)schedule_job_handler, m);
@@ -159,16 +164,6 @@ void PacketPassPriorityQueue_Free (PacketPassPriorityQueue *m)
     BPending_Free(&m->schedule_job);
 }
 
-void PacketPassPriorityQueue_EnableCancel (PacketPassPriorityQueue *m)
-{
-    ASSERT(!m->use_cancel)
-    ASSERT(PacketPassInterface_HasCancel(m->output))
-    DebugObject_Access(&m->d_obj);
-    
-    // using cancel
-    m->use_cancel = 1;
-}
-
 void PacketPassPriorityQueue_PrepareFree (PacketPassPriorityQueue *m)
 {
     DebugObject_Access(&m->d_obj);

+ 3 - 9
flow/PacketPassPriorityQueue.h

@@ -76,8 +76,10 @@ typedef struct PacketPassPriorityQueueFlow_s {
  * @param m the object
  * @param output output interface
  * @param pg pending group
+ * @param use_cancel whether cancel functionality is required. Must be 0 or 1.
+ *                   If 1, output must support cancel functionality.
  */
-void PacketPassPriorityQueue_Init (PacketPassPriorityQueue *m, PacketPassInterface *output, BPendingGroup *pg);
+void PacketPassPriorityQueue_Init (PacketPassPriorityQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel);
 
 /**
  * Frees the queue.
@@ -87,14 +89,6 @@ void PacketPassPriorityQueue_Init (PacketPassPriorityQueue *m, PacketPassInterfa
  */
 void PacketPassPriorityQueue_Free (PacketPassPriorityQueue *m);
 
-/**
- * Enables cancel functionality.
- * This allows freeing flows even if they're busy by releasing them.
- * Output must support {@link PacketPassInterface} cancel functionality.
- * May only be called once.
- */
-void PacketPassPriorityQueue_EnableCancel (PacketPassPriorityQueue *m);
-
 /**
  * Prepares for freeing the entire queue. Must be called to allow freeing
  * the flows in the process of freeing the entire queue.

+ 2 - 2
server/server.c

@@ -1296,7 +1296,7 @@ int client_init_io (struct client_data *client)
     PacketStreamSender_Init(&client->output_sender, sink_interface, PACKETPROTO_ENCLEN(SC_MAX_ENC), BReactor_PendingGroup(&ss));
     
     // init queue
-    PacketPassPriorityQueue_Init(&client->output_priorityqueue, PacketStreamSender_GetInput(&client->output_sender), BReactor_PendingGroup(&ss));
+    PacketPassPriorityQueue_Init(&client->output_priorityqueue, PacketStreamSender_GetInput(&client->output_sender), BReactor_PendingGroup(&ss), 0);
     
     // init output control flow
     
@@ -1321,7 +1321,7 @@ int client_init_io (struct client_data *client)
     PacketPassPriorityQueueFlow_Init(&client->output_peers_qflow, &client->output_priorityqueue, 0);
     
     // init fair queue (for different peers)
-    PacketPassFairQueue_Init(&client->output_peers_fairqueue, PacketPassPriorityQueueFlow_GetInput(&client->output_peers_qflow), BReactor_PendingGroup(&ss));
+    PacketPassFairQueue_Init(&client->output_peers_fairqueue, PacketPassPriorityQueueFlow_GetInput(&client->output_peers_qflow), BReactor_PendingGroup(&ss), 0);
     
     // init list of flows
     LinkedList2_Init(&client->output_peers_flows);

+ 1 - 1
server_connection/ServerConnection.c

@@ -175,7 +175,7 @@ void connect_handler (ServerConnection *o, int event)
     }
     
     // init queue
-    PacketPassPriorityQueue_Init(&o->output_queue, KeepaliveIO_GetInput(&o->output_keepaliveio), BReactor_PendingGroup(o->reactor));
+    PacketPassPriorityQueue_Init(&o->output_queue, KeepaliveIO_GetInput(&o->output_keepaliveio), BReactor_PendingGroup(o->reactor), 0);
     
     // init output local flow