PacketPassFairQueue.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. /**
  2. * @file PacketPassFairQueue.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdlib.h>
  23. #include <misc/debug.h>
  24. #include <misc/offset.h>
  25. #include <flow/PacketPassFairQueue.h>
  26. static int call_send (PacketPassFairQueue *m, uint8_t *data, int data_len)
  27. {
  28. DebugIn_GoIn(&m->in_output);
  29. DEAD_ENTER(m->dead)
  30. int res = PacketPassInterface_Sender_Send(m->output, data, data_len);
  31. if (DEAD_LEAVE(m->dead)) {
  32. return -1;
  33. }
  34. DebugIn_GoOut(&m->in_output);
  35. ASSERT(!m->freeing)
  36. ASSERT(res == 0 || res == 1)
  37. return res;
  38. }
  39. static int call_cancel (PacketPassFairQueue *m)
  40. {
  41. DebugIn_GoIn(&m->in_output);
  42. DEAD_ENTER(m->dead)
  43. PacketPassInterface_Sender_Cancel(m->output);
  44. if (DEAD_LEAVE(m->dead)) {
  45. return -1;
  46. }
  47. DebugIn_GoOut(&m->in_output);
  48. ASSERT(!m->freeing)
  49. return 0;
  50. }
  51. static int call_done (PacketPassFairQueue *m, PacketPassFairQueueFlow *flow)
  52. {
  53. DEAD_ENTER_N(m, m->dead)
  54. DEAD_ENTER_N(flow, flow->dead)
  55. PacketPassInterface_Done(&flow->input);
  56. DEAD_LEAVE_N(m, m->dead);
  57. DEAD_LEAVE_N(flow, flow->dead);
  58. if (DEAD_KILLED_N(m)) {
  59. return -1;
  60. }
  61. ASSERT(!m->freeing)
  62. if (!DEAD_KILLED_N(flow)) {
  63. ASSERT(flow->have_time)
  64. if (flow != m->sending_flow && !flow->is_queued) {
  65. flow->have_time = 0;
  66. }
  67. }
  68. return 0;
  69. }
  70. static uint64_t get_current_time (PacketPassFairQueue *m)
  71. {
  72. if (m->sending_flow) {
  73. return m->sending_flow->time;
  74. }
  75. BHeapNode *heap_node = BHeap_GetFirst(&m->queued_heap);
  76. if (!heap_node) {
  77. return 0;
  78. }
  79. PacketPassFairQueueFlow *first_flow = UPPER_OBJECT(heap_node, PacketPassFairQueueFlow, queued.heap_node);
  80. ASSERT(first_flow->is_queued)
  81. ASSERT(first_flow->have_time)
  82. return first_flow->time;
  83. }
  84. static void increment_sent_flow (PacketPassFairQueueFlow *flow, int iamount)
  85. {
  86. ASSERT(iamount >= 0)
  87. ASSERT(iamount <= UINT64_MAX)
  88. ASSERT(flow->have_time)
  89. ASSERT(!flow->is_queued)
  90. ASSERT(!flow->m->sending_flow)
  91. PacketPassFairQueue *m = flow->m;
  92. uint64_t amount = iamount;
  93. // does time overflow?
  94. if (!(flow->time + amount < flow->time)) {
  95. flow->time += amount;
  96. } else {
  97. // get flow with lowest time
  98. BHeapNode *heap_node = BHeap_GetFirst(&m->queued_heap);
  99. if (!heap_node) {
  100. flow->time = amount;
  101. } else {
  102. PacketPassFairQueueFlow *first_flow = UPPER_OBJECT(heap_node, PacketPassFairQueueFlow, queued.heap_node);
  103. ASSERT(first_flow->is_queued)
  104. ASSERT(first_flow->have_time)
  105. // subtract lowest time from all queued flows
  106. uint64_t subtract = first_flow->time;
  107. LinkedList2Iterator it;
  108. LinkedList2Iterator_InitForward(&it, &m->queued_list);
  109. LinkedList2Node *list_node;
  110. while (list_node = LinkedList2Iterator_Next(&it)) {
  111. PacketPassFairQueueFlow *queue_flow = UPPER_OBJECT(list_node, PacketPassFairQueueFlow, queued.list_node);
  112. ASSERT(queue_flow->is_queued)
  113. ASSERT(queue_flow->have_time)
  114. queue_flow->time -= subtract;
  115. }
  116. // update the given flow's time; note we subtract because it isn't in the queue
  117. // TODO: prove this is correct
  118. flow->time = flow->time - subtract + amount;
  119. }
  120. }
  121. }
  122. static void process_queue (PacketPassFairQueue *m)
  123. {
  124. ASSERT(!m->freeing)
  125. ASSERT(!m->sending_flow)
  126. do {
  127. // get first queued flow
  128. BHeapNode *heap_node = BHeap_GetFirst(&m->queued_heap);
  129. if (!heap_node) {
  130. return;
  131. }
  132. PacketPassFairQueueFlow *qflow = UPPER_OBJECT(heap_node, PacketPassFairQueueFlow, queued.heap_node);
  133. ASSERT(qflow->is_queued)
  134. ASSERT(qflow->have_time)
  135. // remove flow from queue
  136. BHeap_Remove(&m->queued_heap, &qflow->queued.heap_node);
  137. LinkedList2_Remove(&m->queued_list, &qflow->queued.list_node);
  138. qflow->is_queued = 0;
  139. // try to send the packet
  140. int res = call_send(m, qflow->queued.data, qflow->queued.data_len);
  141. if (res < 0) {
  142. return;
  143. }
  144. if (res == 0) {
  145. // sending in progress
  146. m->sending_flow = qflow;
  147. m->sending_len = qflow->queued.data_len;
  148. return;
  149. }
  150. // increment flow time
  151. increment_sent_flow(qflow, qflow->queued.data_len);
  152. // notify sender
  153. if (call_done(m, qflow) < 0) {
  154. return;
  155. }
  156. } while (!m->sending_flow);
  157. }
  158. static int time_comparator (void *user, uint64_t *time1, uint64_t *time2)
  159. {
  160. if (*time1 < *time2) {
  161. return -1;
  162. }
  163. if (*time1 > *time2) {
  164. return 1;
  165. }
  166. return 0;
  167. }
  168. static int input_handler_send (PacketPassFairQueueFlow *flow, uint8_t *data, int data_len)
  169. {
  170. ASSERT(!flow->m->freeing)
  171. ASSERT(flow != flow->m->sending_flow)
  172. ASSERT(!flow->is_queued)
  173. DebugIn_AmOut(&flow->m->in_output);
  174. PacketPassFairQueue *m = flow->m;
  175. // assign time if needed
  176. int had_time = flow->have_time;
  177. if (!flow->have_time) {
  178. flow->time = get_current_time(m);
  179. flow->have_time = 1;
  180. }
  181. // if nothing is being sent and queue is empty, send immediately without queueing
  182. if (!m->sending_flow && !BHeap_GetFirst(&m->queued_heap)) {
  183. int res = call_send(m, data, data_len);
  184. if (res < 0) {
  185. return -1;
  186. }
  187. if (res == 0) {
  188. // output busy, continue in output_handler_done
  189. m->sending_flow = flow;
  190. m->sending_len = data_len;
  191. return 0;
  192. }
  193. // if flow had no time before it shouldn't have after
  194. if (!had_time) {
  195. flow->have_time = 0;
  196. }
  197. return 1;
  198. }
  199. // add flow to queue
  200. flow->queued.data = data;
  201. flow->queued.data_len = data_len;
  202. BHeap_Insert(&m->queued_heap, &flow->queued.heap_node);
  203. LinkedList2_Append(&m->queued_list, &flow->queued.list_node);
  204. flow->is_queued = 1;
  205. return 0;
  206. }
  207. static void output_handler_done (PacketPassFairQueue *m)
  208. {
  209. ASSERT(!m->freeing)
  210. ASSERT(m->sending_flow)
  211. ASSERT(!m->sending_flow->is_queued)
  212. ASSERT(m->sending_flow->have_time)
  213. DebugIn_AmOut(&m->in_output);
  214. PacketPassFairQueueFlow *flow = m->sending_flow;
  215. // sending finished
  216. m->sending_flow = NULL;
  217. // update flow time by packet size
  218. increment_sent_flow(flow, m->sending_len);
  219. // call busy handler if set
  220. if (flow->handler_busy) {
  221. // handler is one-shot, unset it before calling
  222. PacketPassFairQueue_handler_busy handler = flow->handler_busy;
  223. flow->handler_busy = NULL;
  224. // call handler
  225. DEAD_ENTER_N(m, m->dead)
  226. DEAD_ENTER_N(flow, flow->dead)
  227. handler(flow->user);
  228. DEAD_LEAVE_N(m, m->dead);
  229. DEAD_LEAVE_N(flow, flow->dead);
  230. if (DEAD_KILLED_N(m)) {
  231. return;
  232. }
  233. if (DEAD_KILLED_N(flow)) {
  234. flow = NULL;
  235. }
  236. ASSERT(!m->freeing)
  237. }
  238. // report completion to sender
  239. if (flow) {
  240. if (call_done(m, flow) < 0) {
  241. return;
  242. }
  243. }
  244. // process queued flows
  245. if (!m->sending_flow) {
  246. process_queue(m);
  247. return;
  248. }
  249. }
  250. static void job_handler (PacketPassFairQueue *m)
  251. {
  252. ASSERT(!m->freeing)
  253. if (!m->sending_flow) {
  254. process_queue(m);
  255. return;
  256. }
  257. }
  258. void PacketPassFairQueue_Init (PacketPassFairQueue *m, PacketPassInterface *output, BPendingGroup *pg)
  259. {
  260. // init arguments
  261. m->output = output;
  262. // init dead var
  263. DEAD_INIT(m->dead);
  264. // init output
  265. PacketPassInterface_Sender_Init(m->output, (PacketPassInterface_handler_done)output_handler_done, m);
  266. // not sending
  267. m->sending_flow = NULL;
  268. // init queued heap
  269. BHeap_Init(&m->queued_heap, OFFSET_DIFF(PacketPassFairQueueFlow, time, queued.heap_node), (BHeap_comparator)time_comparator, NULL);
  270. // init queued list
  271. LinkedList2_Init(&m->queued_list);
  272. // not freeing
  273. m->freeing = 0;
  274. // not using cancel
  275. m->use_cancel = 0;
  276. // init continue job
  277. BPending_Init(&m->continue_job, pg, (BPending_handler)job_handler, m);
  278. // init debug counter
  279. DebugCounter_Init(&m->d_ctr);
  280. // init debug in output
  281. DebugIn_Init(&m->in_output);
  282. // init debug object
  283. DebugObject_Init(&m->d_obj);
  284. }
  285. void PacketPassFairQueue_Free (PacketPassFairQueue *m)
  286. {
  287. ASSERT(!BHeap_GetFirst(&m->queued_heap))
  288. ASSERT(LinkedList2_IsEmpty(&m->queued_list))
  289. ASSERT(!m->sending_flow)
  290. DebugCounter_Free(&m->d_ctr);
  291. DebugObject_Free(&m->d_obj);
  292. // free continue job
  293. BPending_Free(&m->continue_job);
  294. // free dead var
  295. DEAD_KILL(m->dead);
  296. }
  297. void PacketPassFairQueue_EnableCancel (PacketPassFairQueue *m)
  298. {
  299. ASSERT(!m->use_cancel)
  300. ASSERT(PacketPassInterface_HasCancel(m->output))
  301. // using cancel
  302. m->use_cancel = 1;
  303. }
  304. void PacketPassFairQueue_PrepareFree (PacketPassFairQueue *m)
  305. {
  306. m->freeing = 1;
  307. }
  308. void PacketPassFairQueueFlow_Init (PacketPassFairQueueFlow *flow, PacketPassFairQueue *m)
  309. {
  310. ASSERT(!m->freeing)
  311. DebugIn_AmOut(&m->in_output);
  312. // init arguments
  313. flow->m = m;
  314. // init dead var
  315. DEAD_INIT(flow->dead);
  316. // have no canfree handler
  317. flow->handler_busy = NULL;
  318. // init input
  319. PacketPassInterface_Init(&flow->input, PacketPassInterface_GetMTU(flow->m->output), (PacketPassInterface_handler_send)input_handler_send, flow);
  320. // doesn't have time
  321. flow->have_time = 0;
  322. // is not queued
  323. flow->is_queued = 0;
  324. // increment debug counter
  325. DebugCounter_Increment(&m->d_ctr);
  326. // init debug object
  327. DebugObject_Init(&flow->d_obj);
  328. }
  329. void PacketPassFairQueueFlow_Free (PacketPassFairQueueFlow *flow)
  330. {
  331. if (!flow->m->freeing) {
  332. ASSERT(flow != flow->m->sending_flow)
  333. DebugIn_AmOut(&flow->m->in_output);
  334. }
  335. DebugCounter_Decrement(&flow->m->d_ctr);
  336. DebugObject_Free(&flow->d_obj);
  337. PacketPassFairQueue *m = flow->m;
  338. // remove current flow
  339. if (flow == flow->m->sending_flow) {
  340. flow->m->sending_flow = NULL;
  341. }
  342. // remove from queue
  343. if (flow->is_queued) {
  344. BHeap_Remove(&m->queued_heap, &flow->queued.heap_node);
  345. LinkedList2_Remove(&m->queued_list, &flow->queued.list_node);
  346. }
  347. // free input
  348. PacketPassInterface_Free(&flow->input);
  349. // free dead var
  350. DEAD_KILL(flow->dead);
  351. }
  352. int PacketPassFairQueueFlow_IsBusy (PacketPassFairQueueFlow *flow)
  353. {
  354. ASSERT(!flow->m->freeing)
  355. DebugIn_AmOut(&flow->m->in_output);
  356. return (flow == flow->m->sending_flow);
  357. }
  358. void PacketPassFairQueueFlow_Release (PacketPassFairQueueFlow *flow)
  359. {
  360. ASSERT(flow->m->use_cancel)
  361. ASSERT(flow == flow->m->sending_flow)
  362. ASSERT(!flow->m->freeing)
  363. DebugIn_AmOut(&flow->m->in_output);
  364. PacketPassFairQueue *m = flow->m;
  365. // cancel current packet
  366. if (call_cancel(m) < 0) {
  367. return;
  368. }
  369. // set no sending flow
  370. m->sending_flow = NULL;
  371. // set continue job
  372. BPending_Set(&m->continue_job);
  373. }
  374. void PacketPassFairQueueFlow_SetBusyHandler (PacketPassFairQueueFlow *flow, PacketPassFairQueue_handler_busy handler, void *user)
  375. {
  376. ASSERT(flow == flow->m->sending_flow)
  377. ASSERT(!flow->m->freeing)
  378. DebugIn_AmOut(&flow->m->in_output);
  379. flow->handler_busy = handler;
  380. flow->user = user;
  381. }
  382. PacketPassInterface * PacketPassFairQueueFlow_GetInput (PacketPassFairQueueFlow *flow)
  383. {
  384. return &flow->input;
  385. }