DataProto.c 12 KB


  1. /**
  2. * @file DataProto.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdlib.h>
  23. #include <string.h>
  24. #include <limits.h>
  25. #include <protocol/dataproto.h>
  26. #include <misc/byteorder.h>
  27. #include <misc/debug.h>
  28. #include <system/BLog.h>
  29. #include <client/DataProto.h>
  30. #include <generated/blog_channel_DataProto.h>
  31. static void monitor_handler (DataProtoSink *o);
  32. static void send_keepalive (DataProtoSink *o);
  33. static void refresh_up_job (DataProtoSink *o);
  34. static void receive_timer_handler (DataProtoSink *o);
  35. static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
  36. static void keepalive_job_handler (DataProtoSink *o);
  37. static void up_job_handler (DataProtoSink *o);
  38. void monitor_handler (DataProtoSink *o)
  39. {
  40. ASSERT(!o->freeing)
  41. DebugObject_Access(&o->d_obj);
  42. send_keepalive(o);
  43. }
  44. void send_keepalive (DataProtoSink *o)
  45. {
  46. ASSERT(!o->freeing)
  47. PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
  48. }
  49. void refresh_up_job (DataProtoSink *o)
  50. {
  51. if (o->up != o->up_report) {
  52. BPending_Set(&o->up_job);
  53. } else {
  54. BPending_Unset(&o->up_job);
  55. }
  56. }
  57. void receive_timer_handler (DataProtoSink *o)
  58. {
  59. DebugObject_Access(&o->d_obj);
  60. // consider down
  61. o->up = 0;
  62. refresh_up_job(o);
  63. }
  64. void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len)
  65. {
  66. ASSERT(data_len >= sizeof(struct dataproto_header))
  67. DebugObject_Access(&o->d_obj);
  68. // modify existing packet here
  69. struct dataproto_header *header = (struct dataproto_header *)data;
  70. header->flags = 0;
  71. // if we are receiving keepalives, set the flag
  72. if (BTimer_IsRunning(&o->receive_timer)) {
  73. header->flags |= DATAPROTO_FLAGS_RECEIVING_KEEPALIVES;
  74. }
  75. }
  76. void keepalive_job_handler (DataProtoSink *o)
  77. {
  78. ASSERT(!o->freeing)
  79. DebugObject_Access(&o->d_obj);
  80. send_keepalive(o);
  81. }
  82. void up_job_handler (DataProtoSink *o)
  83. {
  84. ASSERT(o->up != o->up_report)
  85. ASSERT(!o->freeing)
  86. DebugObject_Access(&o->d_obj);
  87. o->up_report = o->up;
  88. o->handler(o->user, o->up);
  89. return;
  90. }
  91. static void device_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
  92. {
  93. ASSERT(buf)
  94. ASSERT(recv_len >= 0)
  95. ASSERT(recv_len <= o->frame_mtu)
  96. DebugObject_Access(&o->d_obj);
  97. // remember packet
  98. o->current_buf = buf;
  99. o->current_recv_len = recv_len;
  100. // call handler
  101. o->handler(o->user, buf + DATAPROTO_MAX_OVERHEAD, recv_len);
  102. return;
  103. }
  104. int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoSink_handler handler, void *user)
  105. {
  106. ASSERT(PacketPassInterface_HasCancel(output))
  107. ASSERT(PacketPassInterface_GetMTU(output) >= DATAPROTO_MAX_OVERHEAD)
  108. // init arguments
  109. o->reactor = reactor;
  110. o->handler = handler;
  111. o->user = user;
  112. // set frame MTU
  113. o->frame_mtu = PacketPassInterface_GetMTU(output) - DATAPROTO_MAX_OVERHEAD;
  114. // schedule keep-alive (needs to be before the buffer)
  115. BPending_Init(&o->keepalive_job, BReactor_PendingGroup(o->reactor), (BPending_handler)keepalive_job_handler, o);
  116. BPending_Set(&o->keepalive_job);
  117. // init notifier
  118. PacketPassNotifier_Init(&o->notifier, output, BReactor_PendingGroup(o->reactor));
  119. PacketPassNotifier_SetHandler(&o->notifier, (PacketPassNotifier_handler_notify)notifier_handler, o);
  120. // init monitor
  121. PacketPassInactivityMonitor_Init(&o->monitor, PacketPassNotifier_GetInput(&o->notifier), o->reactor, keepalive_time, (PacketPassInactivityMonitor_handler)monitor_handler, o);
  122. // init queue
  123. PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1, 1);
  124. // init keepalive queue flow
  125. PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue);
  126. // init keepalive source
  127. DataProtoKeepaliveSource_Init(&o->ka_source, BReactor_PendingGroup(o->reactor));
  128. // init keepalive blocker
  129. PacketRecvBlocker_Init(&o->ka_blocker, DataProtoKeepaliveSource_GetOutput(&o->ka_source), BReactor_PendingGroup(o->reactor));
  130. // init keepalive buffer
  131. if (!SinglePacketBuffer_Init(&o->ka_buffer, PacketRecvBlocker_GetOutput(&o->ka_blocker), PacketPassFairQueueFlow_GetInput(&o->ka_qflow), BReactor_PendingGroup(o->reactor))) {
  132. BLog(BLOG_ERROR, "SinglePacketBuffer_Init failed");
  133. goto fail1;
  134. }
  135. // init receive timer
  136. BTimer_Init(&o->receive_timer, tolerance_time, (BTimer_handler)receive_timer_handler, o);
  137. // init handler job
  138. BPending_Init(&o->up_job, BReactor_PendingGroup(o->reactor), (BPending_handler)up_job_handler, o);
  139. // set not up
  140. o->up = 0;
  141. o->up_report = 0;
  142. // set not freeing
  143. o->freeing = 0;
  144. DebugCounter_Init(&o->d_ctr);
  145. DebugObject_Init(&o->d_obj);
  146. return 1;
  147. fail1:
  148. PacketRecvBlocker_Free(&o->ka_blocker);
  149. DataProtoKeepaliveSource_Free(&o->ka_source);
  150. PacketPassFairQueueFlow_Free(&o->ka_qflow);
  151. PacketPassFairQueue_Free(&o->queue);
  152. PacketPassInactivityMonitor_Free(&o->monitor);
  153. PacketPassNotifier_Free(&o->notifier);
  154. BPending_Free(&o->keepalive_job);
  155. return 0;
  156. }
  157. void DataProtoSink_Free (DataProtoSink *o)
  158. {
  159. DebugCounter_Free(&o->d_ctr);
  160. DebugObject_Free(&o->d_obj);
  161. // free handler job
  162. BPending_Free(&o->up_job);
  163. // allow freeing queue flows
  164. PacketPassFairQueue_PrepareFree(&o->queue);
  165. // free receive timer
  166. BReactor_RemoveTimer(o->reactor, &o->receive_timer);
  167. // free keepalive buffer
  168. SinglePacketBuffer_Free(&o->ka_buffer);
  169. // free keepalive blocker
  170. PacketRecvBlocker_Free(&o->ka_blocker);
  171. // free keepalive source
  172. DataProtoKeepaliveSource_Free(&o->ka_source);
  173. // free keepalive queue flow
  174. PacketPassFairQueueFlow_Free(&o->ka_qflow);
  175. // free queue
  176. PacketPassFairQueue_Free(&o->queue);
  177. // free monitor
  178. PacketPassInactivityMonitor_Free(&o->monitor);
  179. // free notifier
  180. PacketPassNotifier_Free(&o->notifier);
  181. // free keepalive job
  182. BPending_Free(&o->keepalive_job);
  183. }
  184. void DataProtoSink_PrepareFree (DataProtoSink *o)
  185. {
  186. DebugObject_Access(&o->d_obj);
  187. // allow freeing queue flows
  188. PacketPassFairQueue_PrepareFree(&o->queue);
  189. // set freeing
  190. o->freeing = 1;
  191. }
  192. void DataProtoSink_Received (DataProtoSink *o, int peer_receiving)
  193. {
  194. ASSERT(peer_receiving == 0 || peer_receiving == 1)
  195. ASSERT(!o->freeing)
  196. DebugObject_Access(&o->d_obj);
  197. // reset receive timer
  198. BReactor_SetTimer(o->reactor, &o->receive_timer);
  199. if (!peer_receiving) {
  200. // peer reports not receiving, consider down
  201. o->up = 0;
  202. // send keep-alive to converge faster
  203. send_keepalive(o);
  204. } else {
  205. // consider up
  206. o->up = 1;
  207. }
  208. refresh_up_job(o);
  209. }
  210. int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataProtoSource_handler handler, void *user, BReactor *reactor)
  211. {
  212. ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
  213. // init arguments
  214. o->handler = handler;
  215. o->user = user;
  216. o->reactor = reactor;
  217. // remember frame MTU
  218. o->frame_mtu = PacketRecvInterface_GetMTU(input);
  219. // init router
  220. if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)device_router_handler, o, BReactor_PendingGroup(reactor))) {
  221. goto fail1;
  222. }
  223. DebugObject_Init(&o->d_obj);
  224. DebugCounter_Init(&o->d_ctr);
  225. return 1;
  226. fail1:
  227. return 0;
  228. }
  229. void DataProtoSource_Free (DataProtoSource *o)
  230. {
  231. DebugCounter_Free(&o->d_ctr);
  232. DebugObject_Free(&o->d_obj);
  233. // free router
  234. PacketRouter_Free(&o->router);
  235. }
  236. int DataProtoFlow_Init (
  237. DataProtoFlow *o, DataProtoSource *device, peerid_t source_id, peerid_t dest_id, int num_packets,
  238. int inactivity_time, DataProtoFlow_handler_inactivity handler_inactivity, void *user
  239. )
  240. {
  241. ASSERT(num_packets > 0)
  242. // init arguments
  243. o->device = device;
  244. o->source_id = source_id;
  245. o->dest_id = dest_id;
  246. o->inactivity_time = inactivity_time;
  247. // init connector
  248. PacketPassConnector_Init(&o->connector, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, BReactor_PendingGroup(device->reactor));
  249. // init inactivity monitor
  250. PacketPassInterface *buf_out = PacketPassConnector_GetInput(&o->connector);
  251. if (o->inactivity_time >= 0) {
  252. PacketPassInactivityMonitor_Init(&o->monitor, buf_out, device->reactor, o->inactivity_time, handler_inactivity, user);
  253. buf_out = PacketPassInactivityMonitor_GetInput(&o->monitor);
  254. }
  255. // init route buffer
  256. if (!RouteBuffer_Init(&o->rbuf, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, buf_out, num_packets)) {
  257. BLog(BLOG_ERROR, "RouteBuffer_Init failed");
  258. goto fail1;
  259. }
  260. // set no DataProto
  261. o->dp = NULL;
  262. DebugObject_Init(&o->d_obj);
  263. DebugCounter_Increment(&device->d_ctr);
  264. return 1;
  265. fail1:
  266. if (o->inactivity_time >= 0) {
  267. PacketPassInactivityMonitor_Free(&o->monitor);
  268. }
  269. PacketPassConnector_Free(&o->connector);
  270. fail0:
  271. return 0;
  272. }
  273. void DataProtoFlow_Free (DataProtoFlow *o)
  274. {
  275. ASSERT(!o->dp)
  276. DebugCounter_Decrement(&o->device->d_ctr);
  277. DebugObject_Free(&o->d_obj);
  278. // free route buffer
  279. RouteBuffer_Free(&o->rbuf);
  280. // free inactivity monitor
  281. if (o->inactivity_time >= 0) {
  282. PacketPassInactivityMonitor_Free(&o->monitor);
  283. }
  284. // free connector
  285. PacketPassConnector_Free(&o->connector);
  286. }
  287. void DataProtoFlow_Route (DataProtoFlow *o, int more)
  288. {
  289. ASSERT(more == 0 || more == 1)
  290. PacketRouter_AssertRoute(&o->device->router);
  291. ASSERT(o->device->current_buf)
  292. ASSERT(!o->dp || !o->dp->freeing)
  293. DebugObject_Access(&o->d_obj);
  294. // write header
  295. struct dataproto_header *header = (struct dataproto_header *)o->device->current_buf;
  296. // don't set flags, it will be set in notifier_handler
  297. header->from_id = htol16(o->source_id);
  298. header->num_peer_ids = htol16(1);
  299. struct dataproto_peer_id *id = (struct dataproto_peer_id *)(header + 1);
  300. id->id = htol16(o->dest_id);
  301. // route
  302. uint8_t *next_buf;
  303. if (!PacketRouter_Route(
  304. &o->device->router, DATAPROTO_MAX_OVERHEAD + o->device->current_recv_len, &o->rbuf,
  305. &next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->device->current_recv_len : 0)
  306. )) {
  307. BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id);
  308. return;
  309. }
  310. o->device->current_buf = (more ? next_buf : NULL);
  311. }
  312. void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *dp)
  313. {
  314. ASSERT(dp)
  315. ASSERT(!o->dp)
  316. ASSERT(o->device->frame_mtu <= dp->frame_mtu)
  317. ASSERT(!dp->freeing)
  318. DebugObject_Access(&o->d_obj);
  319. DebugObject_Access(&dp->d_obj);
  320. // set DataProto
  321. o->dp = dp;
  322. // init queue flow
  323. PacketPassFairQueueFlow_Init(&o->dp_qflow, &dp->queue);
  324. // connect to queue flow
  325. PacketPassConnector_ConnectOutput(&o->connector, PacketPassFairQueueFlow_GetInput(&o->dp_qflow));
  326. DebugCounter_Increment(&dp->d_ctr);
  327. }
  328. void DataProtoFlow_Detach (DataProtoFlow *o)
  329. {
  330. ASSERT(o->dp)
  331. DebugObject_Access(&o->d_obj);
  332. DataProtoSink *dp = o->dp;
  333. // release flow if needed
  334. if (!o->dp->freeing && PacketPassFairQueueFlow_IsBusy(&o->dp_qflow)) {
  335. PacketPassFairQueueFlow_Release(&o->dp_qflow);
  336. }
  337. // disconnect from queue flow
  338. PacketPassConnector_DisconnectOutput(&o->connector);
  339. // free queue flow
  340. PacketPassFairQueueFlow_Free(&o->dp_qflow);
  341. // set no DataProto
  342. o->dp = NULL;
  343. DebugCounter_Decrement(&dp->d_ctr);
  344. }