Просмотр исходного кода

DataProto: use a job to report up/down events, instead of directly calling the handler from DataProtoDest_Received()

ambrop7 15 лет назад
Родитель
Сommit
0b72590fb7
3 измененных файлов с 37 добавлено и 25 удалено
  1. 32 14
      client/DataProto.c
  2. 5 9
      client/DataProto.h
  3. 0 2
      client/client.c

+ 32 - 14
client/DataProto.c

@@ -35,9 +35,11 @@
 
 static void monitor_handler (DataProtoDest *o);
 static void send_keepalive (DataProtoDest *o);
+static void refresh_up_job (DataProtoDest *o);
 static void receive_timer_handler (DataProtoDest *o);
 static void notifier_handler (DataProtoDest *o, uint8_t *data, int data_len);
 static void keepalive_job_handler (DataProtoDest *o);
+static void up_job_handler (DataProtoDest *o);
 
 void monitor_handler (DataProtoDest *o)
 {
@@ -54,20 +56,23 @@ void send_keepalive (DataProtoDest *o)
     PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
 }
 
+void refresh_up_job (DataProtoDest *o)
+{
+    if (o->up != o->up_report) {
+        BPending_Set(&o->up_job);
+    } else {
+        BPending_Unset(&o->up_job);
+    }
+}
+
 void receive_timer_handler (DataProtoDest *o)
 {
     DebugObject_Access(&o->d_obj);
     
-    int prev_up = o->up;
-    
     // consider down
     o->up = 0;
     
-    // call handler if up state changed
-    if (o->handler && o->up != prev_up) {
-        o->handler(o->user, o->up);
-        return;
-    }
+    refresh_up_job(o);
 }
 
 void notifier_handler (DataProtoDest *o, uint8_t *data, int data_len)
@@ -93,6 +98,18 @@ void keepalive_job_handler (DataProtoDest *o)
     send_keepalive(o);
 }
 
+void up_job_handler (DataProtoDest *o)
+{
+    ASSERT(o->up != o->up_report)
+    ASSERT(!o->freeing)
+    DebugObject_Access(&o->d_obj);
+    
+    o->up_report = o->up;
+    
+    o->handler(o->user, o->up);
+    return;
+}
+
 static void device_router_handler (DataProtoDevice *o, uint8_t *buf, int recv_len)
 {
     ASSERT(buf)
@@ -154,8 +171,12 @@ int DataProtoDest_Init (DataProtoDest *o, BReactor *reactor, PacketPassInterface
     // init receive timer
     BTimer_Init(&o->receive_timer, tolerance_time, (BTimer_handler)receive_timer_handler, o);
     
+    // init handler job
+    BPending_Init(&o->up_job, BReactor_PendingGroup(o->reactor), (BPending_handler)up_job_handler, o);
+    
     // set not up
     o->up = 0;
+    o->up_report = 0;
     
     // set not freeing
     o->freeing = 0;
@@ -181,6 +202,9 @@ void DataProtoDest_Free (DataProtoDest *o)
     DebugCounter_Free(&o->flows_counter);
     DebugObject_Free(&o->d_obj);
     
+    // free handler job
+    BPending_Free(&o->up_job);
+    
     // allow freeing queue flows
     PacketPassFairQueue_PrepareFree(&o->queue);
     
@@ -229,8 +253,6 @@ void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)
     ASSERT(!o->freeing)
     DebugObject_Access(&o->d_obj);
     
-    int prev_up = o->up;
-    
     // reset receive timer
     BReactor_SetTimer(o->reactor, &o->receive_timer);
     
@@ -244,11 +266,7 @@ void DataProtoDest_Received (DataProtoDest *o, int peer_receiving)
         o->up = 1;
     }
     
-    // call handler if up state changed
-    if (o->handler && o->up != prev_up) {
-        o->handler(o->user, o->up);
-        return;
-    }
+    refresh_up_job(o);
 }
 
 int DataProtoDevice_Init (DataProtoDevice *o, PacketRecvInterface *input, DataProtoDevice_handler handler, void *user, BReactor *reactor)

+ 5 - 9
client/DataProto.h

@@ -33,7 +33,6 @@
 #include <misc/debug.h>
 #include <system/DebugObject.h>
 #include <system/BReactor.h>
-#include <system/BPending.h>
 #include <flow/PacketPassFairQueue.h>
 #include <flow/PacketPassInactivityMonitor.h>
 #include <flow/PacketPassNotifier.h>
@@ -63,9 +62,11 @@ typedef struct {
     PacketPassFairQueueFlow ka_qflow;
     BTimer receive_timer;
     int up;
+    int up_report;
     DataProtoDest_handler handler;
     void *user;
     BPending keepalive_job;
+    BPending up_job;
     int freeing;
     DebugCounter flows_counter;
     DebugObject d_obj;
@@ -110,7 +111,7 @@ typedef struct {
  * @param o the object
  * @param reactor reactor we live in
  * @param output output interface. Must support cancel functionality. Its MTU must be
- *               >=sizeof(struct dataproto_header)+sizeof(struct dataproto_peer_id).
+ *               >=DATAPROTO_MAX_OVERHEAD.
  * @param keepalive_time keepalive time
  * @param tolerance_time after how long of not having received anything from the peer
  *                       to consider the link down
@@ -141,9 +142,6 @@ void DataProtoDest_PrepareFree (DataProtoDest *o);
 /**
  * Notifies the object that a packet was received from the peer.
  * Must not be in freeing state.
- * Must not be called from output Send calls.
- * May call the up state handler.
- * May invoke output I/O.
  * 
  * @param o the object
  * @param peer_receiving whether the DATAPROTO_FLAGS_RECEIVING_KEEPALIVES flag was set in the packet.
@@ -220,16 +218,14 @@ void DataProtoLocalSource_Route (DataProtoLocalSource *o, int more);
  * The object must be in not attached state.
  * 
  * @param o the object
- * @param dp destination to attach to. This object's frame_mtu must be <= destination's
- *           (output MTU)-(sizeof(struct dataproto_header)+sizeof(struct dataproto_peer_id)).
+ * @param dp destination to attach to. This object's frame_mtu must be <=
+ *           (output MTU of dp) - DATAPROTO_MAX_OVERHEAD.
  */
 void DataProtoLocalSource_Attach (DataProtoLocalSource *o, DataProtoDest *dp);
 
 /**
  * Detaches the object from a destination.
  * The object must be in attached state.
- * Unless the destination is in freeing state, must not be called from destination's
- * output Send calls.
  * 
  * @param o the object
  */

+ 0 - 2
client/client.c

@@ -2146,8 +2146,6 @@ void peer_recv_handler_send (struct peer_data *peer, uint8_t *data, int data_len
     
 out:
     // pass packet to device, or accept immediately
-    // NOTE: this must be done first, because DataProtoDest_SubmitRelayFrame needs the frame
-    // while it is evaluating!
     if (local) {
         PacketPassInterface_Sender_Send(peer->local_recv_if, data, data_len);
     } else {