BReactor.c 39 KB


  1. /**
  2. * @file BReactor.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <stdlib.h>
  30. #include <string.h>
  31. #include <stdio.h>
  32. #include <stddef.h>
  33. #ifdef BADVPN_USE_WINAPI
  34. #include <windows.h>
  35. #else
  36. #include <limits.h>
  37. #include <sys/types.h>
  38. #include <errno.h>
  39. #include <unistd.h>
  40. #endif
  41. #include <misc/debug.h>
  42. #include <misc/offset.h>
  43. #include <misc/balloc.h>
  44. #include <base/BLog.h>
  45. #include <system/BReactor.h>
  46. #include <generated/blog_channel_BReactor.h>
  47. #define KEVENT_TAG_FD 1
  48. #define KEVENT_TAG_KEVENT 2
  49. static int timer_comparator (void *user, btime_t *val1, btime_t *val2)
  50. {
  51. if (*val1 < *val2) {
  52. return -1;
  53. }
  54. if (*val1 > *val2) {
  55. return 1;
  56. }
  57. return 0;
  58. }
  59. static int move_expired_timers (BReactor *bsys, btime_t now)
  60. {
  61. int moved = 0;
  62. // move timed out timers to the expired list
  63. BHeapNode *heap_node;
  64. while (heap_node = BHeap_GetFirst(&bsys->timers_heap)) {
  65. BTimer *timer = UPPER_OBJECT(heap_node, BTimer, heap_node);
  66. ASSERT(timer->active)
  67. // if it's in the future, stop
  68. if (timer->absTime > now) {
  69. break;
  70. }
  71. moved = 1;
  72. // remove from running timers heap
  73. BHeap_Remove(&bsys->timers_heap, &timer->heap_node);
  74. // add to expired timers list
  75. LinkedList1_Append(&bsys->timers_expired_list, &timer->list_node);
  76. // set expired
  77. timer->expired = 1;
  78. }
  79. return moved;
  80. }
  81. static void move_first_timers (BReactor *bsys)
  82. {
  83. // get the time of the first timer
  84. BHeapNode *heap_node = BHeap_GetFirst(&bsys->timers_heap);
  85. ASSERT(heap_node)
  86. BTimer *first_timer = UPPER_OBJECT(heap_node, BTimer, heap_node);
  87. ASSERT(first_timer->active)
  88. btime_t first_time = first_timer->absTime;
  89. // remove from running timers heap
  90. BHeap_Remove(&bsys->timers_heap, &first_timer->heap_node);
  91. // add to expired timers list
  92. LinkedList1_Append(&bsys->timers_expired_list, &first_timer->list_node);
  93. // set expired
  94. first_timer->expired = 1;
  95. // also move other timers with the same timeout
  96. while (heap_node = BHeap_GetFirst(&bsys->timers_heap)) {
  97. BTimer *timer = UPPER_OBJECT(heap_node, BTimer, heap_node);
  98. ASSERT(timer->active)
  99. ASSERT(timer->absTime >= first_time)
  100. // if it's in the future, stop
  101. if (timer->absTime > first_time) {
  102. break;
  103. }
  104. // remove from running timers heap
  105. BHeap_Remove(&bsys->timers_heap, &timer->heap_node);
  106. // add to expired timers list
  107. LinkedList1_Append(&bsys->timers_expired_list, &timer->list_node);
  108. // set expired
  109. timer->expired = 1;
  110. }
  111. }
  112. #ifdef BADVPN_USE_WINAPI
  113. static void set_iocp_ready (BReactorIOCPOverlapped *olap, int succeeded, DWORD bytes)
  114. {
  115. BReactor *reactor = olap->reactor;
  116. ASSERT(!olap->is_ready)
  117. // set parameters
  118. olap->ready_succeeded = succeeded;
  119. olap->ready_bytes = bytes;
  120. // insert to IOCP ready list
  121. LinkedList1_Append(&reactor->iocp_ready_list, &olap->ready_list_node);
  122. // set ready
  123. olap->is_ready = 1;
  124. }
  125. #endif
  126. #ifdef BADVPN_USE_EPOLL
  127. static void set_epoll_fd_pointers (BReactor *bsys)
  128. {
  129. // Write pointers to our entry pointers into file descriptors.
  130. // If a handler function frees some other file descriptor, the
  131. // free routine will set our pointer to NULL so we don't dispatch it.
  132. for (int i = 0; i < bsys->epoll_results_num; i++) {
  133. struct epoll_event *event = &bsys->epoll_results[i];
  134. ASSERT(event->data.ptr)
  135. BFileDescriptor *bfd = (BFileDescriptor *)event->data.ptr;
  136. ASSERT(bfd->active)
  137. ASSERT(!bfd->epoll_returned_ptr)
  138. bfd->epoll_returned_ptr = (BFileDescriptor **)&event->data.ptr;
  139. }
  140. }
  141. #endif
  142. #ifdef BADVPN_USE_KEVENT
  143. static void set_kevent_fd_pointers (BReactor *bsys)
  144. {
  145. for (int i = 0; i < bsys->kevent_results_num; i++) {
  146. struct kevent *event = &bsys->kevent_results[i];
  147. ASSERT(event->udata)
  148. int *tag = event->udata;
  149. switch (*tag) {
  150. case KEVENT_TAG_FD: {
  151. BFileDescriptor *bfd = UPPER_OBJECT(tag, BFileDescriptor, kevent_tag);
  152. ASSERT(bfd->active)
  153. ASSERT(!bfd->kevent_returned_ptr)
  154. bfd->kevent_returned_ptr = (int **)&event->udata;
  155. } break;
  156. case KEVENT_TAG_KEVENT: {
  157. BReactorKEvent *kev = UPPER_OBJECT(tag, BReactorKEvent, kevent_tag);
  158. ASSERT(kev->reactor == bsys)
  159. ASSERT(!kev->kevent_returned_ptr)
  160. kev->kevent_returned_ptr = (int **)&event->udata;
  161. } break;
  162. default:
  163. ASSERT(0);
  164. }
  165. }
  166. }
  167. static void update_kevent_fd_events (BReactor *bsys, BFileDescriptor *bs, int events)
  168. {
  169. struct kevent event;
  170. if (!(bs->waitEvents & BREACTOR_READ) && (events & BREACTOR_READ)) {
  171. memset(&event, 0, sizeof(event));
  172. event.ident = bs->fd;
  173. event.filter = EVFILT_READ;
  174. event.flags = EV_ADD;
  175. event.udata = &bs->kevent_tag;
  176. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  177. }
  178. else if ((bs->waitEvents & BREACTOR_READ) && !(events & BREACTOR_READ)) {
  179. memset(&event, 0, sizeof(event));
  180. event.ident = bs->fd;
  181. event.filter = EVFILT_READ;
  182. event.flags = EV_DELETE;
  183. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  184. }
  185. if (!(bs->waitEvents & BREACTOR_WRITE) && (events & BREACTOR_WRITE)) {
  186. memset(&event, 0, sizeof(event));
  187. event.ident = bs->fd;
  188. event.filter = EVFILT_WRITE;
  189. event.flags = EV_ADD;
  190. event.udata = &bs->kevent_tag;
  191. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  192. }
  193. else if ((bs->waitEvents & BREACTOR_WRITE) && !(events & BREACTOR_WRITE)) {
  194. memset(&event, 0, sizeof(event));
  195. event.ident = bs->fd;
  196. event.filter = EVFILT_WRITE;
  197. event.flags = EV_DELETE;
  198. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  199. }
  200. }
  201. #endif
  202. #ifdef BADVPN_USE_POLL
  203. static void set_poll_fd_pointers (BReactor *bsys)
  204. {
  205. for (int i = 0; i < bsys->poll_results_num; i++) {
  206. BFileDescriptor *bfd = bsys->poll_results_bfds[i];
  207. ASSERT(bfd)
  208. ASSERT(bfd->active)
  209. ASSERT(bfd->poll_returned_index == -1)
  210. bfd->poll_returned_index = i;
  211. }
  212. }
  213. #endif
  214. static void wait_for_events (BReactor *bsys)
  215. {
  216. // must have processed all pending events
  217. ASSERT(!BPendingGroup_HasJobs(&bsys->pending_jobs))
  218. ASSERT(LinkedList1_IsEmpty(&bsys->timers_expired_list))
  219. #ifdef BADVPN_USE_WINAPI
  220. ASSERT(LinkedList1_IsEmpty(&bsys->iocp_ready_list))
  221. #endif
  222. #ifdef BADVPN_USE_EPOLL
  223. ASSERT(bsys->epoll_results_pos == bsys->epoll_results_num)
  224. #endif
  225. #ifdef BADVPN_USE_KEVENT
  226. ASSERT(bsys->kevent_results_pos == bsys->kevent_results_num)
  227. #endif
  228. #ifdef BADVPN_USE_POLL
  229. ASSERT(bsys->poll_results_pos == bsys->poll_results_num)
  230. #endif
  231. // clean up epoll results
  232. #ifdef BADVPN_USE_EPOLL
  233. bsys->epoll_results_num = 0;
  234. bsys->epoll_results_pos = 0;
  235. #endif
  236. // clean up kevent results
  237. #ifdef BADVPN_USE_KEVENT
  238. bsys->kevent_results_num = 0;
  239. bsys->kevent_results_pos = 0;
  240. #endif
  241. // clean up poll results
  242. #ifdef BADVPN_USE_POLL
  243. bsys->poll_results_num = 0;
  244. bsys->poll_results_pos = 0;
  245. #endif
  246. // timeout vars
  247. int have_timeout = 0;
  248. btime_t timeout_abs;
  249. btime_t now;
  250. // compute timeout
  251. BHeapNode *first_node;
  252. if (first_node = BHeap_GetFirst(&bsys->timers_heap)) {
  253. // get current time
  254. now = btime_gettime();
  255. // if some timers have already timed out, return them immediately
  256. if (move_expired_timers(bsys, now)) {
  257. BLog(BLOG_DEBUG, "Got already expired timers");
  258. return;
  259. }
  260. // timeout is first timer, remember absolute time
  261. BTimer *first_timer = UPPER_OBJECT(first_node, BTimer, heap_node);
  262. have_timeout = 1;
  263. timeout_abs = first_timer->absTime;
  264. }
  265. // wait until the timeout is reached or the file descriptor / handle in ready
  266. while (1) {
  267. // compute timeout
  268. btime_t timeout_rel;
  269. btime_t timeout_rel_trunc;
  270. if (have_timeout) {
  271. timeout_rel = timeout_abs - now;
  272. timeout_rel_trunc = timeout_rel;
  273. }
  274. // perform wait
  275. #ifdef BADVPN_USE_WINAPI
  276. if (have_timeout) {
  277. if (timeout_rel_trunc > INFINITE - 1) {
  278. timeout_rel_trunc = INFINITE - 1;
  279. }
  280. }
  281. DWORD bytes = 0;
  282. ULONG_PTR key;
  283. BReactorIOCPOverlapped *olap = NULL;
  284. BOOL res = GetQueuedCompletionStatus(bsys->iocp_handle, &bytes, &key, (OVERLAPPED **)&olap, (have_timeout ? timeout_rel_trunc : INFINITE));
  285. ASSERT_FORCE(olap || have_timeout)
  286. if (olap || timeout_rel_trunc == timeout_rel) {
  287. if (olap) {
  288. BLog(BLOG_DEBUG, "GetQueuedCompletionStatus returned event");
  289. DebugObject_Access(&olap->d_obj);
  290. ASSERT(olap->reactor == bsys)
  291. ASSERT(!olap->is_ready)
  292. set_iocp_ready(olap, (res == TRUE), bytes);
  293. } else {
  294. BLog(BLOG_DEBUG, "GetQueuedCompletionStatus timed out");
  295. move_first_timers(bsys);
  296. }
  297. break;
  298. }
  299. #endif
  300. #ifdef BADVPN_USE_EPOLL
  301. if (have_timeout) {
  302. if (timeout_rel_trunc > INT_MAX) {
  303. timeout_rel_trunc = INT_MAX;
  304. }
  305. }
  306. BLog(BLOG_DEBUG, "Calling epoll_wait");
  307. int waitres = epoll_wait(bsys->efd, bsys->epoll_results, BSYSTEM_MAX_RESULTS, (have_timeout ? timeout_rel_trunc : -1));
  308. if (waitres < 0) {
  309. int error = errno;
  310. if (error == EINTR) {
  311. BLog(BLOG_DEBUG, "epoll_wait interrupted");
  312. goto try_again;
  313. }
  314. perror("epoll_wait");
  315. ASSERT_FORCE(0)
  316. }
  317. ASSERT_FORCE(!(waitres == 0) || have_timeout)
  318. ASSERT_FORCE(waitres <= BSYSTEM_MAX_RESULTS)
  319. if (waitres != 0 || timeout_rel_trunc == timeout_rel) {
  320. if (waitres != 0) {
  321. BLog(BLOG_DEBUG, "epoll_wait returned %d file descriptors", waitres);
  322. bsys->epoll_results_num = waitres;
  323. set_epoll_fd_pointers(bsys);
  324. } else {
  325. BLog(BLOG_DEBUG, "epoll_wait timed out");
  326. move_first_timers(bsys);
  327. }
  328. break;
  329. }
  330. #endif
  331. #ifdef BADVPN_USE_KEVENT
  332. struct timespec ts;
  333. if (have_timeout) {
  334. if (timeout_rel_trunc > 86400000) {
  335. timeout_rel_trunc = 86400000;
  336. }
  337. ts.tv_sec = timeout_rel_trunc / 1000;
  338. ts.tv_nsec = (timeout_rel_trunc % 1000) * 1000000;
  339. }
  340. BLog(BLOG_DEBUG, "Calling kevent");
  341. int waitres = kevent(bsys->kqueue_fd, NULL, 0, bsys->kevent_results, BSYSTEM_MAX_RESULTS, (have_timeout ? &ts : NULL));
  342. if (waitres < 0) {
  343. int error = errno;
  344. if (error == EINTR) {
  345. BLog(BLOG_DEBUG, "kevent interrupted");
  346. goto try_again;
  347. }
  348. perror("kevent");
  349. ASSERT_FORCE(0)
  350. }
  351. ASSERT_FORCE(!(waitres == 0) || have_timeout)
  352. ASSERT_FORCE(waitres <= BSYSTEM_MAX_RESULTS)
  353. if (waitres != 0 || timeout_rel_trunc == timeout_rel) {
  354. if (waitres != 0) {
  355. BLog(BLOG_DEBUG, "kevent returned %d events", waitres);
  356. bsys->kevent_results_num = waitres;
  357. set_kevent_fd_pointers(bsys);
  358. } else {
  359. BLog(BLOG_DEBUG, "kevent timed out");
  360. move_first_timers(bsys);
  361. }
  362. break;
  363. }
  364. #endif
  365. #ifdef BADVPN_USE_POLL
  366. if (have_timeout) {
  367. if (timeout_rel_trunc > INT_MAX) {
  368. timeout_rel_trunc = INT_MAX;
  369. }
  370. }
  371. ASSERT(bsys->poll_num_enabled_fds >= 0)
  372. ASSERT(bsys->poll_num_enabled_fds <= BSYSTEM_MAX_POLL_FDS)
  373. int num_fds = 0;
  374. LinkedList1Node *list_node = LinkedList1_GetFirst(&bsys->poll_enabled_fds_list);
  375. while (list_node) {
  376. BFileDescriptor *bfd = UPPER_OBJECT(list_node, BFileDescriptor, poll_enabled_fds_list_node);
  377. ASSERT(bfd->active)
  378. ASSERT(bfd->poll_returned_index == -1)
  379. // calculate poll events
  380. int pevents = 0;
  381. if ((bfd->waitEvents & BREACTOR_READ)) {
  382. pevents |= POLLIN;
  383. }
  384. if ((bfd->waitEvents & BREACTOR_WRITE)) {
  385. pevents |= POLLOUT;
  386. }
  387. // write pollfd entry
  388. struct pollfd *pfd = &bsys->poll_results_pollfds[num_fds];
  389. pfd->fd = bfd->fd;
  390. pfd->events = pevents;
  391. pfd->revents = 0;
  392. // write BFileDescriptor reference entry
  393. bsys->poll_results_bfds[num_fds] = bfd;
  394. // increment number of fds in array
  395. num_fds++;
  396. list_node = LinkedList1Node_Next(list_node);
  397. }
  398. BLog(BLOG_DEBUG, "Calling poll");
  399. int waitres = poll(bsys->poll_results_pollfds, num_fds, (have_timeout ? timeout_rel_trunc : -1));
  400. if (waitres < 0) {
  401. int error = errno;
  402. if (error == EINTR) {
  403. BLog(BLOG_DEBUG, "poll interrupted");
  404. goto try_again;
  405. }
  406. perror("poll");
  407. ASSERT_FORCE(0)
  408. }
  409. ASSERT_FORCE(!(waitres == 0) || have_timeout)
  410. if (waitres != 0 || timeout_rel_trunc == timeout_rel) {
  411. if (waitres != 0) {
  412. BLog(BLOG_DEBUG, "poll returned %d file descriptors", waitres);
  413. bsys->poll_results_num = num_fds;
  414. bsys->poll_results_pos = 0;
  415. set_poll_fd_pointers(bsys);
  416. } else {
  417. BLog(BLOG_DEBUG, "poll timed out");
  418. move_first_timers(bsys);
  419. }
  420. break;
  421. }
  422. #endif
  423. try_again:
  424. if (have_timeout) {
  425. // get current time
  426. now = btime_gettime();
  427. // check if we already reached the time we're waiting for
  428. if (now >= timeout_abs) {
  429. BLog(BLOG_DEBUG, "already timed out while trying again");
  430. move_first_timers(bsys);
  431. break;
  432. }
  433. }
  434. }
  435. // reset limit objects
  436. LinkedList1Node *list_node;
  437. while (list_node = LinkedList1_GetFirst(&bsys->active_limits_list)) {
  438. BReactorLimit *limit = UPPER_OBJECT(list_node, BReactorLimit, active_limits_list_node);
  439. ASSERT(limit->count > 0)
  440. limit->count = 0;
  441. LinkedList1_Remove(&bsys->active_limits_list, &limit->active_limits_list_node);
  442. }
  443. }
  444. #ifndef BADVPN_USE_WINAPI
  445. void BFileDescriptor_Init (BFileDescriptor *bs, int fd, BFileDescriptor_handler handler, void *user)
  446. {
  447. bs->fd = fd;
  448. bs->handler = handler;
  449. bs->user = user;
  450. bs->active = 0;
  451. }
  452. #endif
  453. void BTimer_Init (BTimer *bt, btime_t msTime, BTimer_handler handler, void *handler_pointer)
  454. {
  455. bt->msTime = msTime;
  456. bt->handler = handler;
  457. bt->handler_pointer = handler_pointer;
  458. bt->active = 0;
  459. }
  460. int BTimer_IsRunning (BTimer *bt)
  461. {
  462. ASSERT(bt->active == 0 || bt->active == 1)
  463. return bt->active;
  464. }
  465. int BReactor_Init (BReactor *bsys)
  466. {
  467. BLog(BLOG_DEBUG, "Reactor initializing");
  468. // set not exiting
  469. bsys->exiting = 0;
  470. // init jobs
  471. BPendingGroup_Init(&bsys->pending_jobs);
  472. // init timers
  473. BHeap_Init(&bsys->timers_heap, OFFSET_DIFF(BTimer, absTime, heap_node), (BHeap_comparator)timer_comparator, NULL);
  474. LinkedList1_Init(&bsys->timers_expired_list);
  475. // init limits
  476. LinkedList1_Init(&bsys->active_limits_list);
  477. #ifdef BADVPN_USE_WINAPI
  478. // init IOCP list
  479. LinkedList1_Init(&bsys->iocp_list);
  480. // init IOCP handle
  481. if (!(bsys->iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1))) {
  482. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  483. goto fail0;
  484. }
  485. // init IOCP ready list
  486. LinkedList1_Init(&bsys->iocp_ready_list);
  487. #endif
  488. #ifdef BADVPN_USE_EPOLL
  489. // create epoll fd
  490. if ((bsys->efd = epoll_create(10)) < 0) {
  491. BLog(BLOG_ERROR, "epoll_create failed");
  492. goto fail0;
  493. }
  494. // init results array
  495. bsys->epoll_results_num = 0;
  496. bsys->epoll_results_pos = 0;
  497. #endif
  498. #ifdef BADVPN_USE_KEVENT
  499. // create kqueue fd
  500. if ((bsys->kqueue_fd = kqueue()) < 0) {
  501. BLog(BLOG_ERROR, "kqueue failed");
  502. goto fail0;
  503. }
  504. // init results array
  505. bsys->kevent_results_num = 0;
  506. bsys->kevent_results_pos = 0;
  507. #endif
  508. #ifdef BADVPN_USE_POLL
  509. // init enabled fds list
  510. LinkedList1_Init(&bsys->poll_enabled_fds_list);
  511. // set zero enabled fds
  512. bsys->poll_num_enabled_fds = 0;
  513. // allocate results arrays
  514. if (!(bsys->poll_results_pollfds = BAllocArray(BSYSTEM_MAX_POLL_FDS, sizeof(bsys->poll_results_pollfds[0])))) {
  515. BLog(BLOG_ERROR, "BAllocArray failed");
  516. goto fail0;
  517. }
  518. if (!(bsys->poll_results_bfds = BAllocArray(BSYSTEM_MAX_POLL_FDS, sizeof(bsys->poll_results_bfds[0])))) {
  519. BLog(BLOG_ERROR, "BAllocArray failed");
  520. goto fail1;
  521. }
  522. // init results array
  523. bsys->poll_results_num = 0;
  524. bsys->poll_results_pos = 0;
  525. #endif
  526. DebugObject_Init(&bsys->d_obj);
  527. #ifndef BADVPN_USE_WINAPI
  528. DebugCounter_Init(&bsys->d_fds_counter);
  529. #endif
  530. #ifdef BADVPN_USE_KEVENT
  531. DebugCounter_Init(&bsys->d_kevent_ctr);
  532. #endif
  533. DebugCounter_Init(&bsys->d_limits_ctr);
  534. return 1;
  535. #ifdef BADVPN_USE_POLL
  536. fail1:
  537. BFree(bsys->poll_results_pollfds);
  538. #endif
  539. fail0:
  540. BPendingGroup_Free(&bsys->pending_jobs);
  541. BLog(BLOG_ERROR, "Reactor failed to initialize");
  542. return 0;
  543. }
  544. void BReactor_Free (BReactor *bsys)
  545. {
  546. DebugObject_Access(&bsys->d_obj);
  547. #ifdef BADVPN_USE_WINAPI
  548. while (!LinkedList1_IsEmpty(&bsys->iocp_list)) {
  549. BReactorIOCPOverlapped *olap = UPPER_OBJECT(LinkedList1_GetLast(&bsys->iocp_list), BReactorIOCPOverlapped, iocp_list_node);
  550. ASSERT(olap->reactor == bsys)
  551. olap->handler(olap->user, BREACTOR_IOCP_EVENT_EXITING, 0);
  552. }
  553. #endif
  554. // {pending group has no BPending objects}
  555. ASSERT(!BPendingGroup_HasJobs(&bsys->pending_jobs))
  556. ASSERT(!BHeap_GetFirst(&bsys->timers_heap))
  557. ASSERT(LinkedList1_IsEmpty(&bsys->timers_expired_list))
  558. ASSERT(LinkedList1_IsEmpty(&bsys->active_limits_list))
  559. DebugObject_Free(&bsys->d_obj);
  560. #ifdef BADVPN_USE_WINAPI
  561. ASSERT(LinkedList1_IsEmpty(&bsys->iocp_ready_list))
  562. ASSERT(LinkedList1_IsEmpty(&bsys->iocp_list))
  563. #endif
  564. #ifndef BADVPN_USE_WINAPI
  565. DebugCounter_Free(&bsys->d_fds_counter);
  566. #endif
  567. #ifdef BADVPN_USE_KEVENT
  568. DebugCounter_Free(&bsys->d_kevent_ctr);
  569. #endif
  570. DebugCounter_Free(&bsys->d_limits_ctr);
  571. #ifdef BADVPN_USE_POLL
  572. ASSERT(bsys->poll_num_enabled_fds == 0)
  573. ASSERT(LinkedList1_IsEmpty(&bsys->poll_enabled_fds_list))
  574. #endif
  575. BLog(BLOG_DEBUG, "Reactor freeing");
  576. #ifdef BADVPN_USE_WINAPI
  577. // close IOCP handle
  578. ASSERT_FORCE(CloseHandle(bsys->iocp_handle))
  579. #endif
  580. #ifdef BADVPN_USE_EPOLL
  581. // close epoll fd
  582. ASSERT_FORCE(close(bsys->efd) == 0)
  583. #endif
  584. #ifdef BADVPN_USE_KEVENT
  585. // close kqueue fd
  586. ASSERT_FORCE(close(bsys->kqueue_fd) == 0)
  587. #endif
  588. #ifdef BADVPN_USE_POLL
  589. // free results arrays
  590. BFree(bsys->poll_results_bfds);
  591. BFree(bsys->poll_results_pollfds);
  592. #endif
  593. // free jobs
  594. BPendingGroup_Free(&bsys->pending_jobs);
  595. }
  596. int BReactor_Exec (BReactor *bsys)
  597. {
  598. BLog(BLOG_DEBUG, "Entering event loop");
  599. while (!bsys->exiting) {
  600. // dispatch job
  601. if (BPendingGroup_HasJobs(&bsys->pending_jobs)) {
  602. BPendingGroup_ExecuteJob(&bsys->pending_jobs);
  603. continue;
  604. }
  605. // dispatch timer
  606. LinkedList1Node *list_node = LinkedList1_GetFirst(&bsys->timers_expired_list);
  607. if (list_node) {
  608. BTimer *timer = UPPER_OBJECT(list_node, BTimer, list_node);
  609. ASSERT(timer->active)
  610. ASSERT(timer->expired)
  611. // remove from expired list
  612. LinkedList1_Remove(&bsys->timers_expired_list, &timer->list_node);
  613. // set inactive
  614. timer->active = 0;
  615. // call handler
  616. BLog(BLOG_DEBUG, "Dispatching timer");
  617. timer->handler(timer->handler_pointer);
  618. continue;
  619. }
  620. #ifdef BADVPN_USE_WINAPI
  621. if (!LinkedList1_IsEmpty(&bsys->iocp_ready_list)) {
  622. BReactorIOCPOverlapped *olap = UPPER_OBJECT(LinkedList1_GetFirst(&bsys->iocp_ready_list), BReactorIOCPOverlapped, ready_list_node);
  623. ASSERT(olap->is_ready)
  624. ASSERT(olap->handler)
  625. // remove from ready list
  626. LinkedList1_Remove(&bsys->iocp_ready_list, &olap->ready_list_node);
  627. // set not ready
  628. olap->is_ready = 0;
  629. int event = (olap->ready_succeeded ? BREACTOR_IOCP_EVENT_SUCCEEDED : BREACTOR_IOCP_EVENT_FAILED);
  630. // call handler
  631. olap->handler(olap->user, event, olap->ready_bytes);
  632. continue;
  633. }
  634. #endif
  635. #ifdef BADVPN_USE_EPOLL
  636. // dispatch file descriptor
  637. if (bsys->epoll_results_pos < bsys->epoll_results_num) {
  638. // grab event
  639. struct epoll_event *event = &bsys->epoll_results[bsys->epoll_results_pos];
  640. bsys->epoll_results_pos++;
  641. // check if the BFileDescriptor was removed
  642. if (!event->data.ptr) {
  643. continue;
  644. }
  645. // get BFileDescriptor
  646. BFileDescriptor *bfd = (BFileDescriptor *)event->data.ptr;
  647. ASSERT(bfd->active)
  648. ASSERT(bfd->epoll_returned_ptr == (BFileDescriptor **)&event->data.ptr)
  649. // zero pointer to the epoll entry
  650. bfd->epoll_returned_ptr = NULL;
  651. // calculate events to report
  652. int events = 0;
  653. if ((bfd->waitEvents&BREACTOR_READ) && (event->events&EPOLLIN)) {
  654. events |= BREACTOR_READ;
  655. }
  656. if ((bfd->waitEvents&BREACTOR_WRITE) && (event->events&EPOLLOUT)) {
  657. events |= BREACTOR_WRITE;
  658. }
  659. if ((event->events&EPOLLERR) || (event->events&EPOLLHUP)) {
  660. events |= BREACTOR_ERROR;
  661. }
  662. if (!events) {
  663. BLog(BLOG_ERROR, "no events detected?");
  664. continue;
  665. }
  666. // call handler
  667. BLog(BLOG_DEBUG, "Dispatching file descriptor");
  668. bfd->handler(bfd->user, events);
  669. continue;
  670. }
  671. #endif
  672. #ifdef BADVPN_USE_KEVENT
  673. // dispatch kevent
  674. if (bsys->kevent_results_pos < bsys->kevent_results_num) {
  675. // grab event
  676. struct kevent *event = &bsys->kevent_results[bsys->kevent_results_pos];
  677. bsys->kevent_results_pos++;
  678. // check if the event was removed
  679. if (!event->udata) {
  680. continue;
  681. }
  682. // check tag
  683. int *tag = event->udata;
  684. switch (*tag) {
  685. case KEVENT_TAG_FD: {
  686. // get BFileDescriptor
  687. BFileDescriptor *bfd = UPPER_OBJECT(tag, BFileDescriptor, kevent_tag);
  688. ASSERT(bfd->active)
  689. ASSERT(bfd->kevent_returned_ptr == (int **)&event->udata)
  690. // zero pointer to the kevent entry
  691. bfd->kevent_returned_ptr = NULL;
  692. // calculate event to report
  693. int events = 0;
  694. if ((bfd->waitEvents&BREACTOR_READ) && event->filter == EVFILT_READ) {
  695. events |= BREACTOR_READ;
  696. }
  697. if ((bfd->waitEvents&BREACTOR_WRITE) && event->filter == EVFILT_WRITE) {
  698. events |= BREACTOR_WRITE;
  699. }
  700. if (!events) {
  701. BLog(BLOG_ERROR, "no events detected?");
  702. continue;
  703. }
  704. // call handler
  705. BLog(BLOG_DEBUG, "Dispatching file descriptor");
  706. bfd->handler(bfd->user, events);
  707. continue;
  708. } break;
  709. case KEVENT_TAG_KEVENT: {
  710. // get BReactorKEvent
  711. BReactorKEvent *kev = UPPER_OBJECT(tag, BReactorKEvent, kevent_tag);
  712. ASSERT(kev->reactor == bsys)
  713. ASSERT(kev->kevent_returned_ptr == (int **)&event->udata)
  714. // zero pointer to the kevent entry
  715. kev->kevent_returned_ptr = NULL;
  716. // call handler
  717. BLog(BLOG_DEBUG, "Dispatching kevent");
  718. kev->handler(kev->user, event->fflags, event->data);
  719. continue;
  720. } break;
  721. default:
  722. ASSERT(0);
  723. }
  724. }
  725. #endif
  726. #ifdef BADVPN_USE_POLL
  727. if (bsys->poll_results_pos < bsys->poll_results_num) {
  728. // grab event
  729. struct pollfd *pfd = &bsys->poll_results_pollfds[bsys->poll_results_pos];
  730. BFileDescriptor *bfd = bsys->poll_results_bfds[bsys->poll_results_pos];
  731. bsys->poll_results_pos++;
  732. // skip removed entry
  733. if (!bfd) {
  734. continue;
  735. }
  736. ASSERT(bfd->active)
  737. ASSERT(bfd->poll_returned_index == bsys->poll_results_pos - 1)
  738. // remove result reference
  739. bfd->poll_returned_index = -1;
  740. // calculate events to report
  741. int events = 0;
  742. if ((bfd->waitEvents & BREACTOR_READ) && (pfd->revents & POLLIN)) {
  743. events |= BREACTOR_READ;
  744. }
  745. if ((bfd->waitEvents & BREACTOR_WRITE) && (pfd->revents & POLLOUT)) {
  746. events |= BREACTOR_WRITE;
  747. }
  748. if ((pfd->revents & POLLERR) || (pfd->revents & POLLHUP)) {
  749. events |= BREACTOR_ERROR;
  750. }
  751. if (!events) {
  752. continue;
  753. }
  754. // call handler
  755. BLog(BLOG_DEBUG, "Dispatching file descriptor");
  756. bfd->handler(bfd->user, events);
  757. continue;
  758. }
  759. #endif
  760. wait_for_events(bsys);
  761. }
  762. BLog(BLOG_DEBUG, "Exiting event loop, exit code %d", bsys->exit_code);
  763. return bsys->exit_code;
  764. }
  765. void BReactor_Quit (BReactor *bsys, int code)
  766. {
  767. bsys->exiting = 1;
  768. bsys->exit_code = code;
  769. }
  770. void BReactor_SetTimer (BReactor *bsys, BTimer *bt)
  771. {
  772. BReactor_SetTimerAfter(bsys, bt, bt->msTime);
  773. }
  774. void BReactor_SetTimerAfter (BReactor *bsys, BTimer *bt, btime_t after)
  775. {
  776. BReactor_SetTimerAbsolute(bsys, bt, btime_add(btime_gettime(), after));
  777. }
  778. void BReactor_SetTimerAbsolute (BReactor *bsys, BTimer *bt, btime_t time)
  779. {
  780. // unlink it if it's already in the list
  781. BReactor_RemoveTimer(bsys, bt);
  782. // initialize timer
  783. bt->active = 1;
  784. bt->expired = 0;
  785. bt->absTime = time;
  786. // insert to running timers heap
  787. BHeap_Insert(&bsys->timers_heap, &bt->heap_node);
  788. }
  789. void BReactor_RemoveTimer (BReactor *bsys, BTimer *bt)
  790. {
  791. if (!bt->active) {
  792. return;
  793. }
  794. if (bt->expired) {
  795. // remove from expired list
  796. LinkedList1_Remove(&bsys->timers_expired_list, &bt->list_node);
  797. } else {
  798. // remove from running heap
  799. BHeap_Remove(&bsys->timers_heap, &bt->heap_node);
  800. }
  801. // set inactive
  802. bt->active = 0;
  803. }
  804. BPendingGroup * BReactor_PendingGroup (BReactor *bsys)
  805. {
  806. return &bsys->pending_jobs;
  807. }
  808. int BReactor_Synchronize (BReactor *bsys, BPending *ref)
  809. {
  810. ASSERT(ref)
  811. while (!bsys->exiting) {
  812. ASSERT(BPendingGroup_HasJobs(&bsys->pending_jobs))
  813. if (BPendingGroup_PeekJob(&bsys->pending_jobs) == ref) {
  814. return 1;
  815. }
  816. BPendingGroup_ExecuteJob(&bsys->pending_jobs);
  817. }
  818. return 0;
  819. }
  820. #ifndef BADVPN_USE_WINAPI
  821. int BReactor_AddFileDescriptor (BReactor *bsys, BFileDescriptor *bs)
  822. {
  823. ASSERT(!bs->active)
  824. #ifdef BADVPN_USE_EPOLL
  825. // add epoll entry
  826. struct epoll_event event;
  827. memset(&event, 0, sizeof(event));
  828. event.events = 0;
  829. event.data.ptr = bs;
  830. if (epoll_ctl(bsys->efd, EPOLL_CTL_ADD, bs->fd, &event) < 0) {
  831. int error = errno;
  832. BLog(BLOG_ERROR, "epoll_ctl failed: %d", error);
  833. return 0;
  834. }
  835. // set epoll returned pointer
  836. bs->epoll_returned_ptr = NULL;
  837. #endif
  838. #ifdef BADVPN_USE_KEVENT
  839. // set kevent tag
  840. bs->kevent_tag = KEVENT_TAG_FD;
  841. // set kevent returned pointer
  842. bs->kevent_returned_ptr = NULL;
  843. #endif
  844. #ifdef BADVPN_USE_POLL
  845. if (bsys->poll_num_enabled_fds == BSYSTEM_MAX_POLL_FDS) {
  846. BLog(BLOG_ERROR, "too many fds");
  847. return 0;
  848. }
  849. // append to enabled fds list
  850. LinkedList1_Append(&bsys->poll_enabled_fds_list, &bs->poll_enabled_fds_list_node);
  851. bsys->poll_num_enabled_fds++;
  852. // set not returned
  853. bs->poll_returned_index = -1;
  854. #endif
  855. bs->active = 1;
  856. bs->waitEvents = 0;
  857. DebugCounter_Increment(&bsys->d_fds_counter);
  858. return 1;
  859. }
  860. void BReactor_RemoveFileDescriptor (BReactor *bsys, BFileDescriptor *bs)
  861. {
  862. ASSERT(bs->active)
  863. DebugCounter_Decrement(&bsys->d_fds_counter);
  864. bs->active = 0;
  865. #ifdef BADVPN_USE_EPOLL
  866. // delete epoll entry
  867. struct epoll_event event;
  868. memset(&event, 0, sizeof(event));
  869. ASSERT_FORCE(epoll_ctl(bsys->efd, EPOLL_CTL_DEL, bs->fd, &event) == 0)
  870. // write through epoll returned pointer
  871. if (bs->epoll_returned_ptr) {
  872. *bs->epoll_returned_ptr = NULL;
  873. }
  874. #endif
  875. #ifdef BADVPN_USE_KEVENT
  876. // delete kevents
  877. update_kevent_fd_events(bsys, bs, 0);
  878. // write through kevent returned pointer
  879. if (bs->kevent_returned_ptr) {
  880. *bs->kevent_returned_ptr = NULL;
  881. }
  882. #endif
  883. #ifdef BADVPN_USE_POLL
  884. // invalidate results entry
  885. if (bs->poll_returned_index != -1) {
  886. ASSERT(bs->poll_returned_index >= bsys->poll_results_pos)
  887. ASSERT(bs->poll_returned_index < bsys->poll_results_num)
  888. ASSERT(bsys->poll_results_bfds[bs->poll_returned_index] == bs)
  889. bsys->poll_results_bfds[bs->poll_returned_index] = NULL;
  890. }
  891. // remove from enabled fds list
  892. LinkedList1_Remove(&bsys->poll_enabled_fds_list, &bs->poll_enabled_fds_list_node);
  893. bsys->poll_num_enabled_fds--;
  894. #endif
  895. }
  896. void BReactor_SetFileDescriptorEvents (BReactor *bsys, BFileDescriptor *bs, int events)
  897. {
  898. ASSERT(bs->active)
  899. ASSERT(!(events&~(BREACTOR_READ|BREACTOR_WRITE)))
  900. if (bs->waitEvents == events) {
  901. return;
  902. }
  903. #ifdef BADVPN_USE_EPOLL
  904. // calculate epoll events
  905. int eevents = 0;
  906. if ((events & BREACTOR_READ)) {
  907. eevents |= EPOLLIN;
  908. }
  909. if ((events & BREACTOR_WRITE)) {
  910. eevents |= EPOLLOUT;
  911. }
  912. // update epoll entry
  913. struct epoll_event event;
  914. memset(&event, 0, sizeof(event));
  915. event.events = eevents;
  916. event.data.ptr = bs;
  917. ASSERT_FORCE(epoll_ctl(bsys->efd, EPOLL_CTL_MOD, bs->fd, &event) == 0)
  918. #endif
  919. #ifdef BADVPN_USE_KEVENT
  920. update_kevent_fd_events(bsys, bs, events);
  921. #endif
  922. // update events
  923. bs->waitEvents = events;
  924. }
  925. #endif
  926. void BReactorLimit_Init (BReactorLimit *o, BReactor *reactor, int limit)
  927. {
  928. DebugObject_Access(&reactor->d_obj);
  929. ASSERT(limit > 0)
  930. // init arguments
  931. o->reactor = reactor;
  932. o->limit = limit;
  933. // set count zero
  934. o->count = 0;
  935. DebugCounter_Increment(&reactor->d_limits_ctr);
  936. DebugObject_Init(&o->d_obj);
  937. }
  938. void BReactorLimit_Free (BReactorLimit *o)
  939. {
  940. BReactor *reactor = o->reactor;
  941. DebugObject_Free(&o->d_obj);
  942. DebugCounter_Decrement(&reactor->d_limits_ctr);
  943. // remove from active limits list
  944. if (o->count > 0) {
  945. LinkedList1_Remove(&reactor->active_limits_list, &o->active_limits_list_node);
  946. }
  947. }
  948. int BReactorLimit_Increment (BReactorLimit *o)
  949. {
  950. BReactor *reactor = o->reactor;
  951. DebugObject_Access(&o->d_obj);
  952. // check count against limit
  953. if (o->count >= o->limit) {
  954. return 0;
  955. }
  956. // increment count
  957. o->count++;
  958. // if limit was zero, add to active limits list
  959. if (o->count == 1) {
  960. LinkedList1_Append(&reactor->active_limits_list, &o->active_limits_list_node);
  961. }
  962. return 1;
  963. }
  964. void BReactorLimit_SetLimit (BReactorLimit *o, int limit)
  965. {
  966. DebugObject_Access(&o->d_obj);
  967. ASSERT(limit > 0)
  968. // set limit
  969. o->limit = limit;
  970. }
  971. #ifdef BADVPN_USE_KEVENT
  972. int BReactorKEvent_Init (BReactorKEvent *o, BReactor *reactor, BReactorKEvent_handler handler, void *user, uintptr_t ident, short filter, u_int fflags, intptr_t data)
  973. {
  974. DebugObject_Access(&reactor->d_obj);
  975. // init arguments
  976. o->reactor = reactor;
  977. o->handler = handler;
  978. o->user = user;
  979. o->ident = ident;
  980. o->filter = filter;
  981. // add kevent
  982. struct kevent event;
  983. memset(&event, 0, sizeof(event));
  984. event.ident = o->ident;
  985. event.filter = o->filter;
  986. event.flags = EV_ADD;
  987. event.fflags = fflags;
  988. event.data = data;
  989. event.udata = &o->kevent_tag;
  990. if (kevent(o->reactor->kqueue_fd, &event, 1, NULL, 0, NULL) < 0) {
  991. return 0;
  992. }
  993. // set kevent tag
  994. o->kevent_tag = KEVENT_TAG_KEVENT;
  995. // set kevent returned pointer
  996. o->kevent_returned_ptr = NULL;
  997. DebugObject_Init(&o->d_obj);
  998. DebugCounter_Increment(&o->reactor->d_kevent_ctr);
  999. return 1;
  1000. }
  1001. void BReactorKEvent_Free (BReactorKEvent *o)
  1002. {
  1003. DebugObject_Free(&o->d_obj);
  1004. DebugCounter_Decrement(&o->reactor->d_kevent_ctr);
  1005. // write through kevent returned pointer
  1006. if (o->kevent_returned_ptr) {
  1007. *o->kevent_returned_ptr = NULL;
  1008. }
  1009. // delete kevent
  1010. struct kevent event;
  1011. memset(&event, 0, sizeof(event));
  1012. event.ident = o->ident;
  1013. event.filter = o->filter;
  1014. event.flags = EV_DELETE;
  1015. ASSERT_FORCE(kevent(o->reactor->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  1016. }
  1017. #endif
  1018. #ifdef BADVPN_USE_WINAPI
  1019. HANDLE BReactor_GetIOCPHandle (BReactor *reactor)
  1020. {
  1021. DebugObject_Access(&reactor->d_obj);
  1022. return reactor->iocp_handle;
  1023. }
  1024. void BReactorIOCPOverlapped_Init (BReactorIOCPOverlapped *o, BReactor *reactor, void *user, BReactorIOCPOverlapped_handler handler)
  1025. {
  1026. DebugObject_Access(&reactor->d_obj);
  1027. // init arguments
  1028. o->reactor = reactor;
  1029. o->user = user;
  1030. o->handler = handler;
  1031. // zero overlapped
  1032. memset(&o->olap, 0, sizeof(o->olap));
  1033. // append to IOCP list
  1034. LinkedList1_Append(&reactor->iocp_list, &o->iocp_list_node);
  1035. // set not ready
  1036. o->is_ready = 0;
  1037. DebugObject_Init(&o->d_obj);
  1038. }
  1039. void BReactorIOCPOverlapped_Free (BReactorIOCPOverlapped *o)
  1040. {
  1041. BReactor *reactor = o->reactor;
  1042. DebugObject_Free(&o->d_obj);
  1043. // remove from IOCP ready list
  1044. if (o->is_ready) {
  1045. LinkedList1_Remove(&reactor->iocp_ready_list, &o->ready_list_node);
  1046. }
  1047. // remove from IOCP list
  1048. LinkedList1_Remove(&reactor->iocp_list, &o->iocp_list_node);
  1049. }
  1050. void BReactorIOCPOverlapped_Wait (BReactorIOCPOverlapped *o, int *out_succeeded, DWORD *out_bytes)
  1051. {
  1052. BReactor *reactor = o->reactor;
  1053. DebugObject_Access(&o->d_obj);
  1054. // wait for IOCP events until we get an event for this olap
  1055. while (!o->is_ready) {
  1056. DWORD bytes = 0;
  1057. ULONG_PTR key;
  1058. BReactorIOCPOverlapped *olap = NULL;
  1059. BOOL res = GetQueuedCompletionStatus(reactor->iocp_handle, &bytes, &key, (OVERLAPPED **)&olap, INFINITE);
  1060. ASSERT_FORCE(olap)
  1061. DebugObject_Access(&olap->d_obj);
  1062. ASSERT(olap->reactor == reactor)
  1063. // regular I/O should be done synchronously, so we shoudln't ever get a second completion before an
  1064. // existing one is dispatched. If however PostQueuedCompletionStatus is being used to signal events,
  1065. // just discard any excess events.
  1066. if (!olap->is_ready) {
  1067. set_iocp_ready(olap, (res == TRUE), bytes);
  1068. }
  1069. }
  1070. // remove from IOCP ready list
  1071. LinkedList1_Remove(&reactor->iocp_ready_list, &o->ready_list_node);
  1072. // set not ready
  1073. o->is_ready = 0;
  1074. if (out_succeeded) {
  1075. *out_succeeded = o->ready_succeeded;
  1076. }
  1077. if (out_bytes) {
  1078. *out_bytes = o->ready_bytes;
  1079. }
  1080. }
  1081. #endif