DataProto.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. /**
  2. * @file DataProto.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdlib.h>
  23. #include <string.h>
  24. #include <limits.h>
  25. #include <protocol/dataproto.h>
  26. #include <misc/byteorder.h>
  27. #include <misc/debug.h>
  28. #include <system/BLog.h>
  29. #include <client/DataProto.h>
  30. #include <generated/blog_channel_DataProto.h>
  31. static void monitor_handler (DataProtoSink *o);
  32. static void send_keepalive (DataProtoSink *o);
  33. static void refresh_up_job (DataProtoSink *o);
  34. static void receive_timer_handler (DataProtoSink *o);
  35. static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
  36. static void keepalive_job_handler (DataProtoSink *o);
  37. static void up_job_handler (DataProtoSink *o);
  38. void monitor_handler (DataProtoSink *o)
  39. {
  40. ASSERT(!o->freeing)
  41. DebugObject_Access(&o->d_obj);
  42. send_keepalive(o);
  43. }
  44. void send_keepalive (DataProtoSink *o)
  45. {
  46. ASSERT(!o->freeing)
  47. PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
  48. }
  49. void refresh_up_job (DataProtoSink *o)
  50. {
  51. if (o->up != o->up_report) {
  52. BPending_Set(&o->up_job);
  53. } else {
  54. BPending_Unset(&o->up_job);
  55. }
  56. }
  57. void receive_timer_handler (DataProtoSink *o)
  58. {
  59. DebugObject_Access(&o->d_obj);
  60. // consider down
  61. o->up = 0;
  62. refresh_up_job(o);
  63. }
  64. void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len)
  65. {
  66. ASSERT(data_len >= sizeof(struct dataproto_header))
  67. DebugObject_Access(&o->d_obj);
  68. // modify existing packet here
  69. struct dataproto_header *header = (struct dataproto_header *)data;
  70. header->flags = 0;
  71. // if we are receiving keepalives, set the flag
  72. if (BTimer_IsRunning(&o->receive_timer)) {
  73. header->flags |= DATAPROTO_FLAGS_RECEIVING_KEEPALIVES;
  74. }
  75. }
  76. void keepalive_job_handler (DataProtoSink *o)
  77. {
  78. ASSERT(!o->freeing)
  79. DebugObject_Access(&o->d_obj);
  80. send_keepalive(o);
  81. }
  82. void up_job_handler (DataProtoSink *o)
  83. {
  84. ASSERT(o->up != o->up_report)
  85. ASSERT(!o->freeing)
  86. DebugObject_Access(&o->d_obj);
  87. o->up_report = o->up;
  88. o->handler(o->user, o->up);
  89. return;
  90. }
  91. static void device_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
  92. {
  93. ASSERT(buf)
  94. ASSERT(recv_len >= 0)
  95. ASSERT(recv_len <= o->frame_mtu)
  96. DebugObject_Access(&o->d_obj);
  97. // remember packet
  98. o->current_buf = buf;
  99. o->current_recv_len = recv_len;
  100. // call handler
  101. o->handler(o->user, buf + DATAPROTO_MAX_OVERHEAD, recv_len);
  102. return;
  103. }
  104. int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoSink_handler handler, void *user)
  105. {
  106. ASSERT(PacketPassInterface_HasCancel(output))
  107. ASSERT(PacketPassInterface_GetMTU(output) >= DATAPROTO_MAX_OVERHEAD)
  108. // init arguments
  109. o->reactor = reactor;
  110. o->handler = handler;
  111. o->user = user;
  112. // set frame MTU
  113. o->frame_mtu = PacketPassInterface_GetMTU(output) - DATAPROTO_MAX_OVERHEAD;
  114. // schedule keep-alive (needs to be before the buffer)
  115. BPending_Init(&o->keepalive_job, BReactor_PendingGroup(o->reactor), (BPending_handler)keepalive_job_handler, o);
  116. BPending_Set(&o->keepalive_job);
  117. // init notifier
  118. PacketPassNotifier_Init(&o->notifier, output, BReactor_PendingGroup(o->reactor));
  119. PacketPassNotifier_SetHandler(&o->notifier, (PacketPassNotifier_handler_notify)notifier_handler, o);
  120. // init monitor
  121. PacketPassInactivityMonitor_Init(&o->monitor, PacketPassNotifier_GetInput(&o->notifier), o->reactor, keepalive_time, (PacketPassInactivityMonitor_handler)monitor_handler, o);
  122. // init queue
  123. PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1, 1);
  124. // init keepalive queue flow
  125. PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue);
  126. // init keepalive source
  127. DataProtoKeepaliveSource_Init(&o->ka_source, BReactor_PendingGroup(o->reactor));
  128. // init keepalive blocker
  129. PacketRecvBlocker_Init(&o->ka_blocker, DataProtoKeepaliveSource_GetOutput(&o->ka_source), BReactor_PendingGroup(o->reactor));
  130. // init keepalive buffer
  131. if (!SinglePacketBuffer_Init(&o->ka_buffer, PacketRecvBlocker_GetOutput(&o->ka_blocker), PacketPassFairQueueFlow_GetInput(&o->ka_qflow), BReactor_PendingGroup(o->reactor))) {
  132. BLog(BLOG_ERROR, "SinglePacketBuffer_Init failed");
  133. goto fail1;
  134. }
  135. // init receive timer
  136. BTimer_Init(&o->receive_timer, tolerance_time, (BTimer_handler)receive_timer_handler, o);
  137. // init handler job
  138. BPending_Init(&o->up_job, BReactor_PendingGroup(o->reactor), (BPending_handler)up_job_handler, o);
  139. // set not up
  140. o->up = 0;
  141. o->up_report = 0;
  142. // set not freeing
  143. o->freeing = 0;
  144. DebugCounter_Init(&o->d_ctr);
  145. DebugObject_Init(&o->d_obj);
  146. return 1;
  147. fail1:
  148. PacketRecvBlocker_Free(&o->ka_blocker);
  149. DataProtoKeepaliveSource_Free(&o->ka_source);
  150. PacketPassFairQueueFlow_Free(&o->ka_qflow);
  151. PacketPassFairQueue_Free(&o->queue);
  152. PacketPassInactivityMonitor_Free(&o->monitor);
  153. PacketPassNotifier_Free(&o->notifier);
  154. BPending_Free(&o->keepalive_job);
  155. return 0;
  156. }
  157. void DataProtoSink_Free (DataProtoSink *o)
  158. {
  159. DebugCounter_Free(&o->d_ctr);
  160. DebugObject_Free(&o->d_obj);
  161. // free handler job
  162. BPending_Free(&o->up_job);
  163. // allow freeing queue flows
  164. PacketPassFairQueue_PrepareFree(&o->queue);
  165. // free receive timer
  166. BReactor_RemoveTimer(o->reactor, &o->receive_timer);
  167. // free keepalive buffer
  168. SinglePacketBuffer_Free(&o->ka_buffer);
  169. // free keepalive blocker
  170. PacketRecvBlocker_Free(&o->ka_blocker);
  171. // free keepalive source
  172. DataProtoKeepaliveSource_Free(&o->ka_source);
  173. // free keepalive queue flow
  174. PacketPassFairQueueFlow_Free(&o->ka_qflow);
  175. // free queue
  176. PacketPassFairQueue_Free(&o->queue);
  177. // free monitor
  178. PacketPassInactivityMonitor_Free(&o->monitor);
  179. // free notifier
  180. PacketPassNotifier_Free(&o->notifier);
  181. // free keepalive job
  182. BPending_Free(&o->keepalive_job);
  183. }
  184. void DataProtoSink_PrepareFree (DataProtoSink *o)
  185. {
  186. DebugObject_Access(&o->d_obj);
  187. // allow freeing queue flows
  188. PacketPassFairQueue_PrepareFree(&o->queue);
  189. // set freeing
  190. o->freeing = 1;
  191. }
  192. void DataProtoSink_Received (DataProtoSink *o, int peer_receiving)
  193. {
  194. ASSERT(peer_receiving == 0 || peer_receiving == 1)
  195. ASSERT(!o->freeing)
  196. DebugObject_Access(&o->d_obj);
  197. // reset receive timer
  198. BReactor_SetTimer(o->reactor, &o->receive_timer);
  199. if (!peer_receiving) {
  200. // peer reports not receiving, consider down
  201. o->up = 0;
  202. // send keep-alive to converge faster
  203. send_keepalive(o);
  204. } else {
  205. // consider up
  206. o->up = 1;
  207. }
  208. refresh_up_job(o);
  209. }
  210. int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataProtoSource_handler handler, void *user, BReactor *reactor)
  211. {
  212. ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
  213. // init arguments
  214. o->handler = handler;
  215. o->user = user;
  216. o->reactor = reactor;
  217. // remember frame MTU
  218. o->frame_mtu = PacketRecvInterface_GetMTU(input);
  219. // init router
  220. if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)device_router_handler, o, BReactor_PendingGroup(reactor))) {
  221. goto fail1;
  222. }
  223. DebugObject_Init(&o->d_obj);
  224. DebugCounter_Init(&o->d_ctr);
  225. return 1;
  226. fail1:
  227. return 0;
  228. }
  229. void DataProtoSource_Free (DataProtoSource *o)
  230. {
  231. DebugCounter_Free(&o->d_ctr);
  232. DebugObject_Free(&o->d_obj);
  233. // free router
  234. PacketRouter_Free(&o->router);
  235. }
  236. int DataProtoFlow_Init (
  237. DataProtoFlow *o, DataProtoSource *device, peerid_t source_id, peerid_t dest_id, int num_packets,
  238. int inactivity_time, DataProtoFlow_handler_inactivity handler_inactivity, void *user
  239. )
  240. {
  241. ASSERT(num_packets > 0)
  242. // init arguments
  243. o->device = device;
  244. o->source_id = source_id;
  245. o->dest_id = dest_id;
  246. o->inactivity_time = inactivity_time;
  247. // init connector
  248. PacketPassConnector_Init(&o->connector, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, BReactor_PendingGroup(device->reactor));
  249. // init inactivity monitor
  250. PacketPassInterface *buf_out = PacketPassConnector_GetInput(&o->connector);
  251. if (o->inactivity_time >= 0) {
  252. PacketPassInactivityMonitor_Init(&o->monitor, buf_out, device->reactor, o->inactivity_time, handler_inactivity, user);
  253. buf_out = PacketPassInactivityMonitor_GetInput(&o->monitor);
  254. }
  255. // init route buffer
  256. if (!RouteBuffer_Init(&o->rbuf, DATAPROTO_MAX_OVERHEAD + device->frame_mtu, buf_out, num_packets)) {
  257. BLog(BLOG_ERROR, "RouteBuffer_Init failed");
  258. goto fail1;
  259. }
  260. // set no DataProto
  261. o->dp = NULL;
  262. DebugObject_Init(&o->d_obj);
  263. DebugCounter_Increment(&device->d_ctr);
  264. return 1;
  265. fail1:
  266. if (o->inactivity_time >= 0) {
  267. PacketPassInactivityMonitor_Free(&o->monitor);
  268. }
  269. PacketPassConnector_Free(&o->connector);
  270. fail0:
  271. return 0;
  272. }
  273. void DataProtoFlow_Free (DataProtoFlow *o)
  274. {
  275. ASSERT(!o->dp)
  276. DebugCounter_Decrement(&o->device->d_ctr);
  277. DebugObject_Free(&o->d_obj);
  278. // free route buffer
  279. RouteBuffer_Free(&o->rbuf);
  280. // free inactivity monitor
  281. if (o->inactivity_time >= 0) {
  282. PacketPassInactivityMonitor_Free(&o->monitor);
  283. }
  284. // free connector
  285. PacketPassConnector_Free(&o->connector);
  286. }
  287. void DataProtoFlow_Route (DataProtoFlow *o, int more)
  288. {
  289. ASSERT(more == 0 || more == 1)
  290. PacketRouter_AssertRoute(&o->device->router);
  291. ASSERT(o->device->current_buf)
  292. ASSERT(!o->dp || !o->dp->freeing)
  293. DebugObject_Access(&o->d_obj);
  294. // write header
  295. struct dataproto_header *header = (struct dataproto_header *)o->device->current_buf;
  296. // don't set flags, it will be set in notifier_handler
  297. header->from_id = htol16(o->source_id);
  298. header->num_peer_ids = htol16(1);
  299. struct dataproto_peer_id *id = (struct dataproto_peer_id *)(header + 1);
  300. id->id = htol16(o->dest_id);
  301. // route
  302. uint8_t *next_buf;
  303. if (!PacketRouter_Route(
  304. &o->device->router, DATAPROTO_MAX_OVERHEAD + o->device->current_recv_len, &o->rbuf,
  305. &next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->device->current_recv_len : 0)
  306. )) {
  307. BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id);
  308. return;
  309. }
  310. o->device->current_buf = (more ? next_buf : NULL);
  311. }
  312. void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *dp)
  313. {
  314. ASSERT(dp)
  315. ASSERT(!o->dp)
  316. ASSERT(o->device->frame_mtu <= dp->frame_mtu)
  317. ASSERT(!dp->freeing)
  318. DebugObject_Access(&o->d_obj);
  319. DebugObject_Access(&dp->d_obj);
  320. // set DataProto
  321. o->dp = dp;
  322. // init queue flow
  323. PacketPassFairQueueFlow_Init(&o->dp_qflow, &dp->queue);
  324. // connect to queue flow
  325. PacketPassConnector_ConnectOutput(&o->connector, PacketPassFairQueueFlow_GetInput(&o->dp_qflow));
  326. DebugCounter_Increment(&dp->d_ctr);
  327. }
  328. void DataProtoFlow_Detach (DataProtoFlow *o)
  329. {
  330. ASSERT(o->dp)
  331. DebugObject_Access(&o->d_obj);
  332. DataProtoSink *dp = o->dp;
  333. // release flow if needed
  334. if (!o->dp->freeing && PacketPassFairQueueFlow_IsBusy(&o->dp_qflow)) {
  335. PacketPassFairQueueFlow_Release(&o->dp_qflow);
  336. }
  337. // disconnect from queue flow
  338. PacketPassConnector_DisconnectOutput(&o->connector);
  339. // free queue flow
  340. PacketPassFairQueueFlow_Free(&o->dp_qflow);
  341. // set no DataProto
  342. o->dp = NULL;
  343. DebugCounter_Decrement(&dp->d_ctr);
  344. }