ソースを参照

PacketPassFairQueue: add additional weight to packets

ambrop7 15 年 前
コミット
55d8881fde

+ 1 - 1
client/DataProto.c

@@ -321,7 +321,7 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, peerid_t dest_id, P
     PacketPassInactivityMonitor_Init(&o->monitor, PacketPassNotifier_GetInput(&o->notifier), o->reactor, keepalive_time, (PacketPassInactivityMonitor_handler)monitor_handler, o);
     
     // init queue
-    PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1);
+    PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1, 1);
     
     // init keepalive queue flow
     PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue);

+ 1 - 1
client/client.c

@@ -492,7 +492,7 @@ int main (int argc, char *argv[])
     }
     
     // init device output
-    PacketPassFairQueue_Init(&device.output_queue, BTap_GetInput(&device.btap), BReactor_PendingGroup(&ss), 1);
+    PacketPassFairQueue_Init(&device.output_queue, BTap_GetInput(&device.btap), BReactor_PendingGroup(&ss), 1, 1);
     
     // calculate data MTU
     data_mtu = DATAPROTO_MAX_OVERHEAD + device.mtu;

+ 1 - 1
examples/fairqueue_test.c

@@ -96,7 +96,7 @@ int main ()
     TimerPacketSink_Init(&sink, &reactor, 500, OUTPUT_INTERVAL);
     
     // initialize queue
-    PacketPassFairQueue_Init(&fq, TimerPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor), 1);
+    PacketPassFairQueue_Init(&fq, TimerPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor), 1, 1);
     
     // initialize inputs
     for (int i = 0; i < NUM_INPUTS; i++) {

+ 1 - 1
examples/fairqueue_test2.c

@@ -53,7 +53,7 @@ int main ()
     
     // initialize queue
     PacketPassFairQueue fq;
-    PacketPassFairQueue_Init(&fq, RandomPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor), 0);
+    PacketPassFairQueue_Init(&fq, RandomPacketSink_GetInput(&sink), BReactor_PendingGroup(&reactor), 0, 1);
     
     // initialize source 1
     PacketPassFairQueueFlow flow1;

+ 10 - 9
flow/PacketPassFairQueue.c

@@ -70,15 +70,13 @@ static uint64_t get_current_time (PacketPassFairQueue *m)
     return (have ? time : 0);
 }
 
-static void increment_sent_flow (PacketPassFairQueueFlow *flow, int iamount)
+static void increment_sent_flow (PacketPassFairQueueFlow *flow, uint64_t amount)
 {
-    ASSERT(iamount >= 0)
-    ASSERT(iamount <= FAIRQUEUE_MAX_TIME)
-    ASSERT(!flow->is_queued)
-    ASSERT(!flow->m->sending_flow)
-    
     PacketPassFairQueue *m = flow->m;
-    uint64_t amount = iamount;
+    
+    ASSERT(amount <= FAIRQUEUE_MAX_TIME)
+    ASSERT(!flow->is_queued)
+    ASSERT(!m->sending_flow)
     
     // does time overflow?
     if (amount > FAIRQUEUE_MAX_TIME - flow->time) {
@@ -195,7 +193,7 @@ static void output_handler_done (PacketPassFairQueue *m)
     m->previous_flow = flow;
     
     // update flow time by packet size
-    increment_sent_flow(flow, m->sending_len);
+    increment_sent_flow(flow, (uint64_t)m->packet_weight + m->sending_len);
     
     // schedule schedule
     BPending_Set(&m->schedule_job);
@@ -215,9 +213,11 @@ static void output_handler_done (PacketPassFairQueue *m)
     }
 }
 
-void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel)
+void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel, int packet_weight)
 {
     ASSERT(PacketPassInterface_GetMTU(output) <= FAIRQUEUE_MAX_TIME)
+    ASSERT(packet_weight > 0)
+    ASSERT(packet_weight <= FAIRQUEUE_MAX_TIME - PacketPassInterface_GetMTU(output))
     ASSERT(use_cancel == 0 || use_cancel == 1)
     ASSERT(!use_cancel || PacketPassInterface_HasCancel(output))
     
@@ -225,6 +225,7 @@ void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *outp
     m->output = output;
     m->pg = pg;
     m->use_cancel = use_cancel;
+    m->packet_weight = packet_weight;
     
     // init output
     PacketPassInterface_Sender_Init(m->output, (PacketPassInterface_handler_done)output_handler_done, m);

+ 6 - 2
flow/PacketPassFairQueue.h

@@ -47,6 +47,7 @@ typedef struct {
     PacketPassInterface *output;
     BPendingGroup *pg;
     int use_cancel;
+    int packet_weight;
     struct PacketPassFairQueueFlow_s *sending_flow;
     int sending_len;
     struct PacketPassFairQueueFlow_s *previous_flow;
@@ -76,14 +77,17 @@ typedef struct PacketPassFairQueueFlow_s {
 
 /**
  * Initializes the queue.
+ * (output MTU + packet_weight <= FAIRQUEUE_MAX_TIME) must hold.
  *
  * @param m the object
- * @param output output interface. Its MTU must be <=FAIRQUEUE_MAX_TIME.
+ * @param output output interface
  * @param pg pending group
  * @param use_cancel whether cancel functionality is required. Must be 0 or 1.
  *                   If 1, output must support cancel functionality.
+ * @param packet_weight additional weight a packet bears. Must be >0, to keep
+ *                      the queue fair for zero size packets.
  */
-void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel);
+void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg, int use_cancel, int packet_weight);
 
 /**
  * Frees the queue.

+ 1 - 1
server/server.c

@@ -1234,7 +1234,7 @@ int client_init_io (struct client_data *client)
     PacketPassPriorityQueueFlow_Init(&client->output_peers_qflow, &client->output_priorityqueue, 0);
     
     // init fair queue (for different peers)
-    PacketPassFairQueue_Init(&client->output_peers_fairqueue, PacketPassPriorityQueueFlow_GetInput(&client->output_peers_qflow), BReactor_PendingGroup(&ss), 0);
+    PacketPassFairQueue_Init(&client->output_peers_fairqueue, PacketPassPriorityQueueFlow_GetInput(&client->output_peers_qflow), BReactor_PendingGroup(&ss), 0, 1);
     
     // init list of flows
     LinkedList2_Init(&client->output_peers_flows);