Ver código fonte

DataProto: DataProtoLocalSource: add inactivity reporting

ambrop7 15 anos atrás
pai
commit
0b49e90306
4 arquivos alterados com 36 adições e 5 exclusões
  1. 1 1
      client/DPRelay.c
  2. 21 2
      client/DataProto.c
  3. 13 1
      client/DataProto.h
  4. 1 1
      client/client.c

+ 1 - 1
client/DPRelay.c

@@ -46,7 +46,7 @@ static struct DPRelay_flow * create_flow (DPRelaySource *src, DPRelaySink *sink,
     flow->sink = sink;
     
     // init DataProtoLocalSource
-    if (!DataProtoLocalSource_Init(&flow->dpls, &src->device, src->source_id, sink->dest_id, num_packets)) {
+    if (!DataProtoLocalSource_Init(&flow->dpls, &src->device, src->source_id, sink->dest_id, num_packets, -1, NULL, NULL)) {
         BLog(BLOG_ERROR, "relay flow %d->%d: DataProtoLocalSource_Init failed", (int)src->source_id, (int)sink->dest_id);
         goto fail1;
     }

+ 21 - 2
client/DataProto.c

@@ -286,7 +286,10 @@ void DataProtoDevice_Free (DataProtoDevice *o)
     PacketRouter_Free(&o->router);
 }
 
-int DataProtoLocalSource_Init (DataProtoLocalSource *o, DataProtoDevice *device, peerid_t source_id, peerid_t dest_id, int num_packets)
+int DataProtoLocalSource_Init (
+    DataProtoLocalSource *o, DataProtoDevice *device, peerid_t source_id, peerid_t dest_id, int num_packets,
+    int inactivity_time, DataProtoLocalSource_handler_inactivity handler_inactivity, void *user
+)
 {
     ASSERT(num_packets > 0)
     
@@ -294,12 +297,20 @@ int DataProtoLocalSource_Init (DataProtoLocalSource *o, DataProtoDevice *device,
     o->device = device;
     o->source_id = source_id;
     o->dest_id = dest_id;
+    o->inactivity_time = inactivity_time;
     
     // init connector
     PacketPassConnector_Init(&o->connector, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, BReactor_PendingGroup(device->reactor));
     
+    // init inactivity monitor
+    PacketPassInterface *buf_out = PacketPassConnector_GetInput(&o->connector);
+    if (o->inactivity_time >= 0) {
+        PacketPassInactivityMonitor_Init(&o->monitor, buf_out, device->reactor, o->inactivity_time, handler_inactivity, user);
+        buf_out = PacketPassInactivityMonitor_GetInput(&o->monitor);
+    }
+    
     // init route buffer
-    if (!RouteBuffer_Init(&o->rbuf, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, PacketPassConnector_GetInput(&o->connector), num_packets)) {
+    if (!RouteBuffer_Init(&o->rbuf, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, buf_out, num_packets)) {
         BLog(BLOG_ERROR, "RouteBuffer_Init failed");
         goto fail1;
     }
@@ -313,6 +324,9 @@ int DataProtoLocalSource_Init (DataProtoLocalSource *o, DataProtoDevice *device,
     return 1;
     
 fail1:
+    if (o->inactivity_time >= 0) {
+        PacketPassInactivityMonitor_Free(&o->monitor);
+    }
     PacketPassConnector_Free(&o->connector);
 fail0:
     return 0;
@@ -327,6 +341,11 @@ void DataProtoLocalSource_Free (DataProtoLocalSource *o)
     // free route buffer
     RouteBuffer_Free(&o->rbuf);
     
+    // free inactivity monitor
+    if (o->inactivity_time >= 0) {
+        PacketPassInactivityMonitor_Free(&o->monitor);
+    }
+    
     // free connector
     PacketPassConnector_Free(&o->connector);
 }

+ 13 - 1
client/DataProto.h

@@ -45,6 +45,7 @@
 
 typedef void (*DataProtoDest_handler) (void *user, int up);
 typedef void (*DataProtoDevice_handler) (void *user, const uint8_t *frame, int frame_len);
+typedef void (*DataProtoLocalSource_handler_inactivity) (void *user);
 
 /**
  * Frame destination.
@@ -94,7 +95,9 @@ typedef struct {
     DataProtoDevice *device;
     peerid_t source_id;
     peerid_t dest_id;
+    int inactivity_time;
     RouteBuffer rbuf;
+    PacketPassInactivityMonitor monitor;
     PacketPassConnector connector;
     DataProtoDest *dp;
     PacketPassFairQueueFlow dp_qflow;
@@ -179,9 +182,18 @@ void DataProtoDevice_Free (DataProtoDevice *o);
  * @param dest_id destination peer ID to encode in the headers (i.e. ID if the peer this
  *                object belongs to)
  * @param num_packets number of packets the buffer should hold. Must be >0.
+ * @param inactivity_time milliseconds of output inactivity after which to call the
+ *                        inactivity handler; <0 to disable. Note that the object is considered
+ *                        active as long as its buffer is non-empty, even if is not attached to
+ *                        a {@link DataProtoDest}.
+ * @param handler_inactivity inactivity handler, if inactivity_time >=0
+ * @param user value to pass to handler
  * @return 1 on success, 0 on failure
  */
-int DataProtoLocalSource_Init (DataProtoLocalSource *o, DataProtoDevice *device, peerid_t source_id, peerid_t dest_id, int num_packets) WARN_UNUSED;
+int DataProtoLocalSource_Init (
+    DataProtoLocalSource *o, DataProtoDevice *device, peerid_t source_id, peerid_t dest_id, int num_packets,
+    int inactivity_time, DataProtoLocalSource_handler_inactivity handler_inactivity, void *user
+) WARN_UNUSED;
 
 /**
  * Frees the object.

+ 1 - 1
client/client.c

@@ -1220,7 +1220,7 @@ int peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     }
     
     // init local flow
-    if (!DataProtoLocalSource_Init(&peer->local_dpflow, &device.input_dpd, my_id, peer->id, options.send_buffer_size)) {
+    if (!DataProtoLocalSource_Init(&peer->local_dpflow, &device.input_dpd, my_id, peer->id, options.send_buffer_size, -1, NULL, NULL)) {
         peer_log(peer, BLOG_ERROR, "DataProtoLocalSource_Init failed");
         goto fail1;
     }