BReactor.c 38 KB


  1. /**
  2. * @file BReactor.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdlib.h>
  23. #include <string.h>
  24. #include <stdio.h>
  25. #include <stddef.h>
  26. #ifdef BADVPN_USE_WINAPI
  27. #include <windows.h>
  28. #else
  29. #include <limits.h>
  30. #include <sys/types.h>
  31. #include <errno.h>
  32. #include <unistd.h>
  33. #endif
  34. #include <misc/debug.h>
  35. #include <misc/offset.h>
  36. #include <misc/balloc.h>
  37. #include <base/BLog.h>
  38. #include <system/BReactor.h>
  39. #include <generated/blog_channel_BReactor.h>
  40. #define KEVENT_TAG_FD 1
  41. #define KEVENT_TAG_KEVENT 2
  42. static int timer_comparator (void *user, btime_t *val1, btime_t *val2)
  43. {
  44. if (*val1 < *val2) {
  45. return -1;
  46. }
  47. if (*val1 > *val2) {
  48. return 1;
  49. }
  50. return 0;
  51. }
  52. static int move_expired_timers (BReactor *bsys, btime_t now)
  53. {
  54. int moved = 0;
  55. // move timed out timers to the expired list
  56. BHeapNode *heap_node;
  57. while (heap_node = BHeap_GetFirst(&bsys->timers_heap)) {
  58. BTimer *timer = UPPER_OBJECT(heap_node, BTimer, heap_node);
  59. ASSERT(timer->active)
  60. // if it's in the future, stop
  61. if (timer->absTime > now) {
  62. break;
  63. }
  64. moved = 1;
  65. // remove from running timers heap
  66. BHeap_Remove(&bsys->timers_heap, &timer->heap_node);
  67. // add to expired timers list
  68. LinkedList1_Append(&bsys->timers_expired_list, &timer->list_node);
  69. // set expired
  70. timer->expired = 1;
  71. }
  72. return moved;
  73. }
  74. static void move_first_timers (BReactor *bsys)
  75. {
  76. // get the time of the first timer
  77. BHeapNode *heap_node = BHeap_GetFirst(&bsys->timers_heap);
  78. ASSERT(heap_node)
  79. BTimer *first_timer = UPPER_OBJECT(heap_node, BTimer, heap_node);
  80. ASSERT(first_timer->active)
  81. btime_t first_time = first_timer->absTime;
  82. // remove from running timers heap
  83. BHeap_Remove(&bsys->timers_heap, &first_timer->heap_node);
  84. // add to expired timers list
  85. LinkedList1_Append(&bsys->timers_expired_list, &first_timer->list_node);
  86. // set expired
  87. first_timer->expired = 1;
  88. // also move other timers with the same timeout
  89. while (heap_node = BHeap_GetFirst(&bsys->timers_heap)) {
  90. BTimer *timer = UPPER_OBJECT(heap_node, BTimer, heap_node);
  91. ASSERT(timer->active)
  92. ASSERT(timer->absTime >= first_time)
  93. // if it's in the future, stop
  94. if (timer->absTime > first_time) {
  95. break;
  96. }
  97. // remove from running timers heap
  98. BHeap_Remove(&bsys->timers_heap, &timer->heap_node);
  99. // add to expired timers list
  100. LinkedList1_Append(&bsys->timers_expired_list, &timer->list_node);
  101. // set expired
  102. timer->expired = 1;
  103. }
  104. }
  105. #ifdef BADVPN_USE_WINAPI
  106. static void set_iocp_ready (BReactorIOCPOverlapped *olap, int succeeded, DWORD bytes)
  107. {
  108. BReactor *reactor = olap->reactor;
  109. ASSERT(!olap->is_ready)
  110. // set parameters
  111. olap->ready_succeeded = succeeded;
  112. olap->ready_bytes = bytes;
  113. // insert to IOCP ready list
  114. LinkedList1_Append(&reactor->iocp_ready_list, &olap->ready_list_node);
  115. // set ready
  116. olap->is_ready = 1;
  117. }
  118. #endif
  119. #ifdef BADVPN_USE_EPOLL
  120. static void set_epoll_fd_pointers (BReactor *bsys)
  121. {
  122. // Write pointers to our entry pointers into file descriptors.
  123. // If a handler function frees some other file descriptor, the
  124. // free routine will set our pointer to NULL so we don't dispatch it.
  125. for (int i = 0; i < bsys->epoll_results_num; i++) {
  126. struct epoll_event *event = &bsys->epoll_results[i];
  127. ASSERT(event->data.ptr)
  128. BFileDescriptor *bfd = (BFileDescriptor *)event->data.ptr;
  129. ASSERT(bfd->active)
  130. ASSERT(!bfd->epoll_returned_ptr)
  131. bfd->epoll_returned_ptr = (BFileDescriptor **)&event->data.ptr;
  132. }
  133. }
  134. #endif
  135. #ifdef BADVPN_USE_KEVENT
  136. static void set_kevent_fd_pointers (BReactor *bsys)
  137. {
  138. for (int i = 0; i < bsys->kevent_results_num; i++) {
  139. struct kevent *event = &bsys->kevent_results[i];
  140. ASSERT(event->udata)
  141. int *tag = event->udata;
  142. switch (*tag) {
  143. case KEVENT_TAG_FD: {
  144. BFileDescriptor *bfd = UPPER_OBJECT(tag, BFileDescriptor, kevent_tag);
  145. ASSERT(bfd->active)
  146. ASSERT(!bfd->kevent_returned_ptr)
  147. bfd->kevent_returned_ptr = (int **)&event->udata;
  148. } break;
  149. case KEVENT_TAG_KEVENT: {
  150. BReactorKEvent *kev = UPPER_OBJECT(tag, BReactorKEvent, kevent_tag);
  151. ASSERT(kev->reactor == bsys)
  152. ASSERT(!kev->kevent_returned_ptr)
  153. kev->kevent_returned_ptr = (int **)&event->udata;
  154. } break;
  155. default:
  156. ASSERT(0);
  157. }
  158. }
  159. }
  160. static void update_kevent_fd_events (BReactor *bsys, BFileDescriptor *bs, int events)
  161. {
  162. struct kevent event;
  163. if (!(bs->waitEvents & BREACTOR_READ) && (events & BREACTOR_READ)) {
  164. memset(&event, 0, sizeof(event));
  165. event.ident = bs->fd;
  166. event.filter = EVFILT_READ;
  167. event.flags = EV_ADD;
  168. event.udata = &bs->kevent_tag;
  169. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  170. }
  171. else if ((bs->waitEvents & BREACTOR_READ) && !(events & BREACTOR_READ)) {
  172. memset(&event, 0, sizeof(event));
  173. event.ident = bs->fd;
  174. event.filter = EVFILT_READ;
  175. event.flags = EV_DELETE;
  176. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  177. }
  178. if (!(bs->waitEvents & BREACTOR_WRITE) && (events & BREACTOR_WRITE)) {
  179. memset(&event, 0, sizeof(event));
  180. event.ident = bs->fd;
  181. event.filter = EVFILT_WRITE;
  182. event.flags = EV_ADD;
  183. event.udata = &bs->kevent_tag;
  184. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  185. }
  186. else if ((bs->waitEvents & BREACTOR_WRITE) && !(events & BREACTOR_WRITE)) {
  187. memset(&event, 0, sizeof(event));
  188. event.ident = bs->fd;
  189. event.filter = EVFILT_WRITE;
  190. event.flags = EV_DELETE;
  191. ASSERT_FORCE(kevent(bsys->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  192. }
  193. }
  194. #endif
  195. #ifdef BADVPN_USE_POLL
  196. static void set_poll_fd_pointers (BReactor *bsys)
  197. {
  198. for (int i = 0; i < bsys->poll_results_num; i++) {
  199. BFileDescriptor *bfd = bsys->poll_results_bfds[i];
  200. ASSERT(bfd)
  201. ASSERT(bfd->active)
  202. ASSERT(bfd->poll_returned_index == -1)
  203. bfd->poll_returned_index = i;
  204. }
  205. }
  206. #endif
  207. static void wait_for_events (BReactor *bsys)
  208. {
  209. // must have processed all pending events
  210. ASSERT(!BPendingGroup_HasJobs(&bsys->pending_jobs))
  211. ASSERT(LinkedList1_IsEmpty(&bsys->timers_expired_list))
  212. #ifdef BADVPN_USE_WINAPI
  213. ASSERT(LinkedList1_IsEmpty(&bsys->iocp_ready_list))
  214. #endif
  215. #ifdef BADVPN_USE_EPOLL
  216. ASSERT(bsys->epoll_results_pos == bsys->epoll_results_num)
  217. #endif
  218. #ifdef BADVPN_USE_KEVENT
  219. ASSERT(bsys->kevent_results_pos == bsys->kevent_results_num)
  220. #endif
  221. #ifdef BADVPN_USE_POLL
  222. ASSERT(bsys->poll_results_pos == bsys->poll_results_num)
  223. #endif
  224. // clean up epoll results
  225. #ifdef BADVPN_USE_EPOLL
  226. bsys->epoll_results_num = 0;
  227. bsys->epoll_results_pos = 0;
  228. #endif
  229. // clean up kevent results
  230. #ifdef BADVPN_USE_KEVENT
  231. bsys->kevent_results_num = 0;
  232. bsys->kevent_results_pos = 0;
  233. #endif
  234. // clean up poll results
  235. #ifdef BADVPN_USE_POLL
  236. bsys->poll_results_num = 0;
  237. bsys->poll_results_pos = 0;
  238. #endif
  239. // timeout vars
  240. int have_timeout = 0;
  241. btime_t timeout_abs;
  242. btime_t now;
  243. // compute timeout
  244. BHeapNode *first_node;
  245. if (first_node = BHeap_GetFirst(&bsys->timers_heap)) {
  246. // get current time
  247. now = btime_gettime();
  248. // if some timers have already timed out, return them immediately
  249. if (move_expired_timers(bsys, now)) {
  250. BLog(BLOG_DEBUG, "Got already expired timers");
  251. return;
  252. }
  253. // timeout is first timer, remember absolute time
  254. BTimer *first_timer = UPPER_OBJECT(first_node, BTimer, heap_node);
  255. have_timeout = 1;
  256. timeout_abs = first_timer->absTime;
  257. }
  258. // wait until the timeout is reached or the file descriptor / handle in ready
  259. while (1) {
  260. // compute timeout
  261. btime_t timeout_rel;
  262. btime_t timeout_rel_trunc;
  263. if (have_timeout) {
  264. timeout_rel = timeout_abs - now;
  265. timeout_rel_trunc = timeout_rel;
  266. }
  267. // perform wait
  268. #ifdef BADVPN_USE_WINAPI
  269. if (have_timeout) {
  270. if (timeout_rel_trunc > INFINITE - 1) {
  271. timeout_rel_trunc = INFINITE - 1;
  272. }
  273. }
  274. DWORD bytes = 0;
  275. ULONG_PTR key;
  276. BReactorIOCPOverlapped *olap = NULL;
  277. BOOL res = GetQueuedCompletionStatus(bsys->iocp_handle, &bytes, &key, (OVERLAPPED **)&olap, (have_timeout ? timeout_rel_trunc : INFINITE));
  278. ASSERT_FORCE(olap || have_timeout)
  279. if (olap || timeout_rel_trunc == timeout_rel) {
  280. if (olap) {
  281. BLog(BLOG_DEBUG, "GetQueuedCompletionStatus returned event");
  282. DebugObject_Access(&olap->d_obj);
  283. ASSERT(olap->reactor == bsys)
  284. ASSERT(!olap->is_ready)
  285. set_iocp_ready(olap, (res == TRUE), bytes);
  286. } else {
  287. BLog(BLOG_DEBUG, "GetQueuedCompletionStatus timed out");
  288. move_first_timers(bsys);
  289. }
  290. break;
  291. }
  292. #endif
  293. #ifdef BADVPN_USE_EPOLL
  294. if (have_timeout) {
  295. if (timeout_rel_trunc > INT_MAX) {
  296. timeout_rel_trunc = INT_MAX;
  297. }
  298. }
  299. BLog(BLOG_DEBUG, "Calling epoll_wait");
  300. int waitres = epoll_wait(bsys->efd, bsys->epoll_results, BSYSTEM_MAX_RESULTS, (have_timeout ? timeout_rel_trunc : -1));
  301. if (waitres < 0) {
  302. int error = errno;
  303. if (error == EINTR) {
  304. BLog(BLOG_DEBUG, "epoll_wait interrupted");
  305. goto try_again;
  306. }
  307. perror("epoll_wait");
  308. ASSERT_FORCE(0)
  309. }
  310. ASSERT_FORCE(!(waitres == 0) || have_timeout)
  311. ASSERT_FORCE(waitres <= BSYSTEM_MAX_RESULTS)
  312. if (waitres != 0 || timeout_rel_trunc == timeout_rel) {
  313. if (waitres != 0) {
  314. BLog(BLOG_DEBUG, "epoll_wait returned %d file descriptors", waitres);
  315. bsys->epoll_results_num = waitres;
  316. set_epoll_fd_pointers(bsys);
  317. } else {
  318. BLog(BLOG_DEBUG, "epoll_wait timed out");
  319. move_first_timers(bsys);
  320. }
  321. break;
  322. }
  323. #endif
  324. #ifdef BADVPN_USE_KEVENT
  325. struct timespec ts;
  326. if (have_timeout) {
  327. if (timeout_rel_trunc > 86400000) {
  328. timeout_rel_trunc = 86400000;
  329. }
  330. ts.tv_sec = timeout_rel_trunc / 1000;
  331. ts.tv_nsec = (timeout_rel_trunc % 1000) * 1000000;
  332. }
  333. BLog(BLOG_DEBUG, "Calling kevent");
  334. int waitres = kevent(bsys->kqueue_fd, NULL, 0, bsys->kevent_results, BSYSTEM_MAX_RESULTS, (have_timeout ? &ts : NULL));
  335. if (waitres < 0) {
  336. int error = errno;
  337. if (error == EINTR) {
  338. BLog(BLOG_DEBUG, "kevent interrupted");
  339. goto try_again;
  340. }
  341. perror("kevent");
  342. ASSERT_FORCE(0)
  343. }
  344. ASSERT_FORCE(!(waitres == 0) || have_timeout)
  345. ASSERT_FORCE(waitres <= BSYSTEM_MAX_RESULTS)
  346. if (waitres != 0 || timeout_rel_trunc == timeout_rel) {
  347. if (waitres != 0) {
  348. BLog(BLOG_DEBUG, "kevent returned %d events", waitres);
  349. bsys->kevent_results_num = waitres;
  350. set_kevent_fd_pointers(bsys);
  351. } else {
  352. BLog(BLOG_DEBUG, "kevent timed out");
  353. move_first_timers(bsys);
  354. }
  355. break;
  356. }
  357. #endif
  358. #ifdef BADVPN_USE_POLL
  359. if (have_timeout) {
  360. if (timeout_rel_trunc > INT_MAX) {
  361. timeout_rel_trunc = INT_MAX;
  362. }
  363. }
  364. ASSERT(bsys->poll_num_enabled_fds >= 0)
  365. ASSERT(bsys->poll_num_enabled_fds <= BSYSTEM_MAX_POLL_FDS)
  366. int num_fds = 0;
  367. LinkedList1Node *list_node = LinkedList1_GetFirst(&bsys->poll_enabled_fds_list);
  368. while (list_node) {
  369. BFileDescriptor *bfd = UPPER_OBJECT(list_node, BFileDescriptor, poll_enabled_fds_list_node);
  370. ASSERT(bfd->active)
  371. ASSERT(bfd->poll_returned_index == -1)
  372. // calculate poll events
  373. int pevents = 0;
  374. if ((bfd->waitEvents & BREACTOR_READ)) {
  375. pevents |= POLLIN;
  376. }
  377. if ((bfd->waitEvents & BREACTOR_WRITE)) {
  378. pevents |= POLLOUT;
  379. }
  380. // write pollfd entry
  381. struct pollfd *pfd = &bsys->poll_results_pollfds[num_fds];
  382. pfd->fd = bfd->fd;
  383. pfd->events = pevents;
  384. pfd->revents = 0;
  385. // write BFileDescriptor reference entry
  386. bsys->poll_results_bfds[num_fds] = bfd;
  387. // increment number of fds in array
  388. num_fds++;
  389. list_node = LinkedList1Node_Next(list_node);
  390. }
  391. BLog(BLOG_DEBUG, "Calling poll");
  392. int waitres = poll(bsys->poll_results_pollfds, num_fds, (have_timeout ? timeout_rel_trunc : -1));
  393. if (waitres < 0) {
  394. int error = errno;
  395. if (error == EINTR) {
  396. BLog(BLOG_DEBUG, "poll interrupted");
  397. goto try_again;
  398. }
  399. perror("poll");
  400. ASSERT_FORCE(0)
  401. }
  402. ASSERT_FORCE(!(waitres == 0) || have_timeout)
  403. if (waitres != 0 || timeout_rel_trunc == timeout_rel) {
  404. if (waitres != 0) {
  405. BLog(BLOG_DEBUG, "poll returned %d file descriptors", waitres);
  406. bsys->poll_results_num = num_fds;
  407. bsys->poll_results_pos = 0;
  408. set_poll_fd_pointers(bsys);
  409. } else {
  410. BLog(BLOG_DEBUG, "poll timed out");
  411. move_first_timers(bsys);
  412. }
  413. break;
  414. }
  415. #endif
  416. try_again:
  417. if (have_timeout) {
  418. // get current time
  419. now = btime_gettime();
  420. // check if we already reached the time we're waiting for
  421. if (now >= timeout_abs) {
  422. BLog(BLOG_DEBUG, "already timed out while trying again");
  423. move_first_timers(bsys);
  424. break;
  425. }
  426. }
  427. }
  428. // reset limit objects
  429. LinkedList1Node *list_node;
  430. while (list_node = LinkedList1_GetFirst(&bsys->active_limits_list)) {
  431. BReactorLimit *limit = UPPER_OBJECT(list_node, BReactorLimit, active_limits_list_node);
  432. ASSERT(limit->count > 0)
  433. limit->count = 0;
  434. LinkedList1_Remove(&bsys->active_limits_list, &limit->active_limits_list_node);
  435. }
  436. }
  437. #ifndef BADVPN_USE_WINAPI
  438. void BFileDescriptor_Init (BFileDescriptor *bs, int fd, BFileDescriptor_handler handler, void *user)
  439. {
  440. bs->fd = fd;
  441. bs->handler = handler;
  442. bs->user = user;
  443. bs->active = 0;
  444. }
  445. #endif
  446. void BTimer_Init (BTimer *bt, btime_t msTime, BTimer_handler handler, void *handler_pointer)
  447. {
  448. bt->msTime = msTime;
  449. bt->handler = handler;
  450. bt->handler_pointer = handler_pointer;
  451. bt->active = 0;
  452. }
  453. int BTimer_IsRunning (BTimer *bt)
  454. {
  455. ASSERT(bt->active == 0 || bt->active == 1)
  456. return bt->active;
  457. }
  458. int BReactor_Init (BReactor *bsys)
  459. {
  460. BLog(BLOG_DEBUG, "Reactor initializing");
  461. // set not exiting
  462. bsys->exiting = 0;
  463. // init jobs
  464. BPendingGroup_Init(&bsys->pending_jobs);
  465. // init timers
  466. BHeap_Init(&bsys->timers_heap, OFFSET_DIFF(BTimer, absTime, heap_node), (BHeap_comparator)timer_comparator, NULL);
  467. LinkedList1_Init(&bsys->timers_expired_list);
  468. // init limits
  469. LinkedList1_Init(&bsys->active_limits_list);
  470. #ifdef BADVPN_USE_WINAPI
  471. // init IOCP list
  472. LinkedList1_Init(&bsys->iocp_list);
  473. // init IOCP handle
  474. if (!(bsys->iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1))) {
  475. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  476. goto fail0;
  477. }
  478. // init IOCP ready list
  479. LinkedList1_Init(&bsys->iocp_ready_list);
  480. #endif
  481. #ifdef BADVPN_USE_EPOLL
  482. // create epoll fd
  483. if ((bsys->efd = epoll_create(10)) < 0) {
  484. BLog(BLOG_ERROR, "epoll_create failed");
  485. goto fail0;
  486. }
  487. // init results array
  488. bsys->epoll_results_num = 0;
  489. bsys->epoll_results_pos = 0;
  490. #endif
  491. #ifdef BADVPN_USE_KEVENT
  492. // create kqueue fd
  493. if ((bsys->kqueue_fd = kqueue()) < 0) {
  494. BLog(BLOG_ERROR, "kqueue failed");
  495. goto fail0;
  496. }
  497. // init results array
  498. bsys->kevent_results_num = 0;
  499. bsys->kevent_results_pos = 0;
  500. #endif
  501. #ifdef BADVPN_USE_POLL
  502. // init enabled fds list
  503. LinkedList1_Init(&bsys->poll_enabled_fds_list);
  504. // set zero enabled fds
  505. bsys->poll_num_enabled_fds = 0;
  506. // allocate results arrays
  507. if (!(bsys->poll_results_pollfds = BAllocArray(BSYSTEM_MAX_POLL_FDS, sizeof(bsys->poll_results_pollfds[0])))) {
  508. BLog(BLOG_ERROR, "BAllocArray failed");
  509. goto fail0;
  510. }
  511. if (!(bsys->poll_results_bfds = BAllocArray(BSYSTEM_MAX_POLL_FDS, sizeof(bsys->poll_results_bfds[0])))) {
  512. BLog(BLOG_ERROR, "BAllocArray failed");
  513. goto fail1;
  514. }
  515. // init results array
  516. bsys->poll_results_num = 0;
  517. bsys->poll_results_pos = 0;
  518. #endif
  519. DebugObject_Init(&bsys->d_obj);
  520. #ifndef BADVPN_USE_WINAPI
  521. DebugCounter_Init(&bsys->d_fds_counter);
  522. #endif
  523. #ifdef BADVPN_USE_KEVENT
  524. DebugCounter_Init(&bsys->d_kevent_ctr);
  525. #endif
  526. DebugCounter_Init(&bsys->d_limits_ctr);
  527. return 1;
  528. #ifdef BADVPN_USE_POLL
  529. fail1:
  530. BFree(bsys->poll_results_pollfds);
  531. #endif
  532. fail0:
  533. BPendingGroup_Free(&bsys->pending_jobs);
  534. BLog(BLOG_ERROR, "Reactor failed to initialize");
  535. return 0;
  536. }
  537. void BReactor_Free (BReactor *bsys)
  538. {
  539. DebugObject_Access(&bsys->d_obj);
  540. #ifdef BADVPN_USE_WINAPI
  541. while (!LinkedList1_IsEmpty(&bsys->iocp_list)) {
  542. BReactorIOCPOverlapped *olap = UPPER_OBJECT(LinkedList1_GetLast(&bsys->iocp_list), BReactorIOCPOverlapped, iocp_list_node);
  543. ASSERT(olap->reactor == bsys)
  544. olap->handler(olap->user, BREACTOR_IOCP_EVENT_EXITING, 0);
  545. }
  546. #endif
  547. // {pending group has no BPending objects}
  548. ASSERT(!BPendingGroup_HasJobs(&bsys->pending_jobs))
  549. ASSERT(!BHeap_GetFirst(&bsys->timers_heap))
  550. ASSERT(LinkedList1_IsEmpty(&bsys->timers_expired_list))
  551. ASSERT(LinkedList1_IsEmpty(&bsys->active_limits_list))
  552. DebugObject_Free(&bsys->d_obj);
  553. #ifdef BADVPN_USE_WINAPI
  554. ASSERT(LinkedList1_IsEmpty(&bsys->iocp_ready_list))
  555. ASSERT(LinkedList1_IsEmpty(&bsys->iocp_list))
  556. #endif
  557. #ifndef BADVPN_USE_WINAPI
  558. DebugCounter_Free(&bsys->d_fds_counter);
  559. #endif
  560. #ifdef BADVPN_USE_KEVENT
  561. DebugCounter_Free(&bsys->d_kevent_ctr);
  562. #endif
  563. DebugCounter_Free(&bsys->d_limits_ctr);
  564. #ifdef BADVPN_USE_POLL
  565. ASSERT(bsys->poll_num_enabled_fds == 0)
  566. ASSERT(LinkedList1_IsEmpty(&bsys->poll_enabled_fds_list))
  567. #endif
  568. BLog(BLOG_DEBUG, "Reactor freeing");
  569. #ifdef BADVPN_USE_WINAPI
  570. // close IOCP handle
  571. ASSERT_FORCE(CloseHandle(bsys->iocp_handle))
  572. #endif
  573. #ifdef BADVPN_USE_EPOLL
  574. // close epoll fd
  575. ASSERT_FORCE(close(bsys->efd) == 0)
  576. #endif
  577. #ifdef BADVPN_USE_KEVENT
  578. // close kqueue fd
  579. ASSERT_FORCE(close(bsys->kqueue_fd) == 0)
  580. #endif
  581. #ifdef BADVPN_USE_POLL
  582. // free results arrays
  583. BFree(bsys->poll_results_bfds);
  584. BFree(bsys->poll_results_pollfds);
  585. #endif
  586. // free jobs
  587. BPendingGroup_Free(&bsys->pending_jobs);
  588. }
  589. int BReactor_Exec (BReactor *bsys)
  590. {
  591. BLog(BLOG_DEBUG, "Entering event loop");
  592. while (!bsys->exiting) {
  593. // dispatch job
  594. if (BPendingGroup_HasJobs(&bsys->pending_jobs)) {
  595. BPendingGroup_ExecuteJob(&bsys->pending_jobs);
  596. continue;
  597. }
  598. // dispatch timer
  599. LinkedList1Node *list_node = LinkedList1_GetFirst(&bsys->timers_expired_list);
  600. if (list_node) {
  601. BTimer *timer = UPPER_OBJECT(list_node, BTimer, list_node);
  602. ASSERT(timer->active)
  603. ASSERT(timer->expired)
  604. // remove from expired list
  605. LinkedList1_Remove(&bsys->timers_expired_list, &timer->list_node);
  606. // set inactive
  607. timer->active = 0;
  608. // call handler
  609. BLog(BLOG_DEBUG, "Dispatching timer");
  610. timer->handler(timer->handler_pointer);
  611. continue;
  612. }
  613. #ifdef BADVPN_USE_WINAPI
  614. if (!LinkedList1_IsEmpty(&bsys->iocp_ready_list)) {
  615. BReactorIOCPOverlapped *olap = UPPER_OBJECT(LinkedList1_GetFirst(&bsys->iocp_ready_list), BReactorIOCPOverlapped, ready_list_node);
  616. ASSERT(olap->is_ready)
  617. ASSERT(olap->handler)
  618. // remove from ready list
  619. LinkedList1_Remove(&bsys->iocp_ready_list, &olap->ready_list_node);
  620. // set not ready
  621. olap->is_ready = 0;
  622. int event = (olap->ready_succeeded ? BREACTOR_IOCP_EVENT_SUCCEEDED : BREACTOR_IOCP_EVENT_FAILED);
  623. // call handler
  624. olap->handler(olap->user, event, olap->ready_bytes);
  625. continue;
  626. }
  627. #endif
  628. #ifdef BADVPN_USE_EPOLL
  629. // dispatch file descriptor
  630. if (bsys->epoll_results_pos < bsys->epoll_results_num) {
  631. // grab event
  632. struct epoll_event *event = &bsys->epoll_results[bsys->epoll_results_pos];
  633. bsys->epoll_results_pos++;
  634. // check if the BFileDescriptor was removed
  635. if (!event->data.ptr) {
  636. continue;
  637. }
  638. // get BFileDescriptor
  639. BFileDescriptor *bfd = (BFileDescriptor *)event->data.ptr;
  640. ASSERT(bfd->active)
  641. ASSERT(bfd->epoll_returned_ptr == (BFileDescriptor **)&event->data.ptr)
  642. // zero pointer to the epoll entry
  643. bfd->epoll_returned_ptr = NULL;
  644. // calculate events to report
  645. int events = 0;
  646. if ((bfd->waitEvents&BREACTOR_READ) && (event->events&EPOLLIN)) {
  647. events |= BREACTOR_READ;
  648. }
  649. if ((bfd->waitEvents&BREACTOR_WRITE) && (event->events&EPOLLOUT)) {
  650. events |= BREACTOR_WRITE;
  651. }
  652. if ((event->events&EPOLLERR) || (event->events&EPOLLHUP)) {
  653. events |= BREACTOR_ERROR;
  654. }
  655. if (!events) {
  656. BLog(BLOG_ERROR, "no events detected?");
  657. continue;
  658. }
  659. // call handler
  660. BLog(BLOG_DEBUG, "Dispatching file descriptor");
  661. bfd->handler(bfd->user, events);
  662. continue;
  663. }
  664. #endif
  665. #ifdef BADVPN_USE_KEVENT
  666. // dispatch kevent
  667. if (bsys->kevent_results_pos < bsys->kevent_results_num) {
  668. // grab event
  669. struct kevent *event = &bsys->kevent_results[bsys->kevent_results_pos];
  670. bsys->kevent_results_pos++;
  671. // check if the event was removed
  672. if (!event->udata) {
  673. continue;
  674. }
  675. // check tag
  676. int *tag = event->udata;
  677. switch (*tag) {
  678. case KEVENT_TAG_FD: {
  679. // get BFileDescriptor
  680. BFileDescriptor *bfd = UPPER_OBJECT(tag, BFileDescriptor, kevent_tag);
  681. ASSERT(bfd->active)
  682. ASSERT(bfd->kevent_returned_ptr == (int **)&event->udata)
  683. // zero pointer to the kevent entry
  684. bfd->kevent_returned_ptr = NULL;
  685. // calculate event to report
  686. int events = 0;
  687. if ((bfd->waitEvents&BREACTOR_READ) && event->filter == EVFILT_READ) {
  688. events |= BREACTOR_READ;
  689. }
  690. if ((bfd->waitEvents&BREACTOR_WRITE) && event->filter == EVFILT_WRITE) {
  691. events |= BREACTOR_WRITE;
  692. }
  693. if (!events) {
  694. BLog(BLOG_ERROR, "no events detected?");
  695. continue;
  696. }
  697. // call handler
  698. BLog(BLOG_DEBUG, "Dispatching file descriptor");
  699. bfd->handler(bfd->user, events);
  700. continue;
  701. } break;
  702. case KEVENT_TAG_KEVENT: {
  703. // get BReactorKEvent
  704. BReactorKEvent *kev = UPPER_OBJECT(tag, BReactorKEvent, kevent_tag);
  705. ASSERT(kev->reactor == bsys)
  706. ASSERT(kev->kevent_returned_ptr == (int **)&event->udata)
  707. // zero pointer to the kevent entry
  708. kev->kevent_returned_ptr = NULL;
  709. // call handler
  710. BLog(BLOG_DEBUG, "Dispatching kevent");
  711. kev->handler(kev->user, event->fflags, event->data);
  712. continue;
  713. } break;
  714. default:
  715. ASSERT(0);
  716. }
  717. }
  718. #endif
  719. #ifdef BADVPN_USE_POLL
  720. if (bsys->poll_results_pos < bsys->poll_results_num) {
  721. // grab event
  722. struct pollfd *pfd = &bsys->poll_results_pollfds[bsys->poll_results_pos];
  723. BFileDescriptor *bfd = bsys->poll_results_bfds[bsys->poll_results_pos];
  724. bsys->poll_results_pos++;
  725. // skip removed entry
  726. if (!bfd) {
  727. continue;
  728. }
  729. ASSERT(bfd->active)
  730. ASSERT(bfd->poll_returned_index == bsys->poll_results_pos - 1)
  731. // remove result reference
  732. bfd->poll_returned_index = -1;
  733. // calculate events to report
  734. int events = 0;
  735. if ((bfd->waitEvents & BREACTOR_READ) && (pfd->revents & POLLIN)) {
  736. events |= BREACTOR_READ;
  737. }
  738. if ((bfd->waitEvents & BREACTOR_WRITE) && (pfd->revents & POLLOUT)) {
  739. events |= BREACTOR_WRITE;
  740. }
  741. if ((pfd->revents & POLLERR) || (pfd->revents & POLLHUP)) {
  742. events |= BREACTOR_ERROR;
  743. }
  744. if (!events) {
  745. continue;
  746. }
  747. // call handler
  748. BLog(BLOG_DEBUG, "Dispatching file descriptor");
  749. bfd->handler(bfd->user, events);
  750. continue;
  751. }
  752. #endif
  753. wait_for_events(bsys);
  754. }
  755. BLog(BLOG_DEBUG, "Exiting event loop, exit code %d", bsys->exit_code);
  756. return bsys->exit_code;
  757. }
  758. void BReactor_Quit (BReactor *bsys, int code)
  759. {
  760. bsys->exiting = 1;
  761. bsys->exit_code = code;
  762. }
  763. void BReactor_SetTimer (BReactor *bsys, BTimer *bt)
  764. {
  765. BReactor_SetTimerAfter(bsys, bt, bt->msTime);
  766. }
  767. void BReactor_SetTimerAfter (BReactor *bsys, BTimer *bt, btime_t after)
  768. {
  769. BReactor_SetTimerAbsolute(bsys, bt, btime_add(btime_gettime(), after));
  770. }
  771. void BReactor_SetTimerAbsolute (BReactor *bsys, BTimer *bt, btime_t time)
  772. {
  773. // unlink it if it's already in the list
  774. BReactor_RemoveTimer(bsys, bt);
  775. // initialize timer
  776. bt->active = 1;
  777. bt->expired = 0;
  778. bt->absTime = time;
  779. // insert to running timers heap
  780. BHeap_Insert(&bsys->timers_heap, &bt->heap_node);
  781. }
  782. void BReactor_RemoveTimer (BReactor *bsys, BTimer *bt)
  783. {
  784. if (!bt->active) {
  785. return;
  786. }
  787. if (bt->expired) {
  788. // remove from expired list
  789. LinkedList1_Remove(&bsys->timers_expired_list, &bt->list_node);
  790. } else {
  791. // remove from running heap
  792. BHeap_Remove(&bsys->timers_heap, &bt->heap_node);
  793. }
  794. // set inactive
  795. bt->active = 0;
  796. }
  797. BPendingGroup * BReactor_PendingGroup (BReactor *bsys)
  798. {
  799. return &bsys->pending_jobs;
  800. }
  801. int BReactor_Synchronize (BReactor *bsys, BPending *ref)
  802. {
  803. ASSERT(ref)
  804. while (!bsys->exiting) {
  805. ASSERT(BPendingGroup_HasJobs(&bsys->pending_jobs))
  806. if (BPendingGroup_PeekJob(&bsys->pending_jobs) == ref) {
  807. return 1;
  808. }
  809. BPendingGroup_ExecuteJob(&bsys->pending_jobs);
  810. }
  811. return 0;
  812. }
  813. #ifndef BADVPN_USE_WINAPI
  814. int BReactor_AddFileDescriptor (BReactor *bsys, BFileDescriptor *bs)
  815. {
  816. ASSERT(!bs->active)
  817. #ifdef BADVPN_USE_EPOLL
  818. // add epoll entry
  819. struct epoll_event event;
  820. memset(&event, 0, sizeof(event));
  821. event.events = 0;
  822. event.data.ptr = bs;
  823. if (epoll_ctl(bsys->efd, EPOLL_CTL_ADD, bs->fd, &event) < 0) {
  824. int error = errno;
  825. BLog(BLOG_ERROR, "epoll_ctl failed: %d", error);
  826. return 0;
  827. }
  828. // set epoll returned pointer
  829. bs->epoll_returned_ptr = NULL;
  830. #endif
  831. #ifdef BADVPN_USE_KEVENT
  832. // set kevent tag
  833. bs->kevent_tag = KEVENT_TAG_FD;
  834. // set kevent returned pointer
  835. bs->kevent_returned_ptr = NULL;
  836. #endif
  837. #ifdef BADVPN_USE_POLL
  838. if (bsys->poll_num_enabled_fds == BSYSTEM_MAX_POLL_FDS) {
  839. BLog(BLOG_ERROR, "too many fds");
  840. return 0;
  841. }
  842. // append to enabled fds list
  843. LinkedList1_Append(&bsys->poll_enabled_fds_list, &bs->poll_enabled_fds_list_node);
  844. bsys->poll_num_enabled_fds++;
  845. // set not returned
  846. bs->poll_returned_index = -1;
  847. #endif
  848. bs->active = 1;
  849. bs->waitEvents = 0;
  850. DebugCounter_Increment(&bsys->d_fds_counter);
  851. return 1;
  852. }
  853. void BReactor_RemoveFileDescriptor (BReactor *bsys, BFileDescriptor *bs)
  854. {
  855. ASSERT(bs->active)
  856. DebugCounter_Decrement(&bsys->d_fds_counter);
  857. bs->active = 0;
  858. #ifdef BADVPN_USE_EPOLL
  859. // delete epoll entry
  860. struct epoll_event event;
  861. memset(&event, 0, sizeof(event));
  862. ASSERT_FORCE(epoll_ctl(bsys->efd, EPOLL_CTL_DEL, bs->fd, &event) == 0)
  863. // write through epoll returned pointer
  864. if (bs->epoll_returned_ptr) {
  865. *bs->epoll_returned_ptr = NULL;
  866. }
  867. #endif
  868. #ifdef BADVPN_USE_KEVENT
  869. // delete kevents
  870. update_kevent_fd_events(bsys, bs, 0);
  871. // write through kevent returned pointer
  872. if (bs->kevent_returned_ptr) {
  873. *bs->kevent_returned_ptr = NULL;
  874. }
  875. #endif
  876. #ifdef BADVPN_USE_POLL
  877. // invalidate results entry
  878. if (bs->poll_returned_index != -1) {
  879. ASSERT(bs->poll_returned_index >= bsys->poll_results_pos)
  880. ASSERT(bs->poll_returned_index < bsys->poll_results_num)
  881. ASSERT(bsys->poll_results_bfds[bs->poll_returned_index] == bs)
  882. bsys->poll_results_bfds[bs->poll_returned_index] = NULL;
  883. }
  884. // remove from enabled fds list
  885. LinkedList1_Remove(&bsys->poll_enabled_fds_list, &bs->poll_enabled_fds_list_node);
  886. bsys->poll_num_enabled_fds--;
  887. #endif
  888. }
  889. void BReactor_SetFileDescriptorEvents (BReactor *bsys, BFileDescriptor *bs, int events)
  890. {
  891. ASSERT(bs->active)
  892. ASSERT(!(events&~(BREACTOR_READ|BREACTOR_WRITE)))
  893. if (bs->waitEvents == events) {
  894. return;
  895. }
  896. #ifdef BADVPN_USE_EPOLL
  897. // calculate epoll events
  898. int eevents = 0;
  899. if ((events & BREACTOR_READ)) {
  900. eevents |= EPOLLIN;
  901. }
  902. if ((events & BREACTOR_WRITE)) {
  903. eevents |= EPOLLOUT;
  904. }
  905. // update epoll entry
  906. struct epoll_event event;
  907. memset(&event, 0, sizeof(event));
  908. event.events = eevents;
  909. event.data.ptr = bs;
  910. ASSERT_FORCE(epoll_ctl(bsys->efd, EPOLL_CTL_MOD, bs->fd, &event) == 0)
  911. #endif
  912. #ifdef BADVPN_USE_KEVENT
  913. update_kevent_fd_events(bsys, bs, events);
  914. #endif
  915. // update events
  916. bs->waitEvents = events;
  917. }
  918. #endif
  919. void BReactorLimit_Init (BReactorLimit *o, BReactor *reactor, int limit)
  920. {
  921. DebugObject_Access(&reactor->d_obj);
  922. ASSERT(limit > 0)
  923. // init arguments
  924. o->reactor = reactor;
  925. o->limit = limit;
  926. // set count zero
  927. o->count = 0;
  928. DebugCounter_Increment(&reactor->d_limits_ctr);
  929. DebugObject_Init(&o->d_obj);
  930. }
  931. void BReactorLimit_Free (BReactorLimit *o)
  932. {
  933. BReactor *reactor = o->reactor;
  934. DebugObject_Free(&o->d_obj);
  935. DebugCounter_Decrement(&reactor->d_limits_ctr);
  936. // remove from active limits list
  937. if (o->count > 0) {
  938. LinkedList1_Remove(&reactor->active_limits_list, &o->active_limits_list_node);
  939. }
  940. }
  941. int BReactorLimit_Increment (BReactorLimit *o)
  942. {
  943. BReactor *reactor = o->reactor;
  944. DebugObject_Access(&o->d_obj);
  945. // check count against limit
  946. if (o->count >= o->limit) {
  947. return 0;
  948. }
  949. // increment count
  950. o->count++;
  951. // if limit was zero, add to active limits list
  952. if (o->count == 1) {
  953. LinkedList1_Append(&reactor->active_limits_list, &o->active_limits_list_node);
  954. }
  955. return 1;
  956. }
  957. void BReactorLimit_SetLimit (BReactorLimit *o, int limit)
  958. {
  959. DebugObject_Access(&o->d_obj);
  960. ASSERT(limit > 0)
  961. // set limit
  962. o->limit = limit;
  963. }
  964. #ifdef BADVPN_USE_KEVENT
  965. int BReactorKEvent_Init (BReactorKEvent *o, BReactor *reactor, BReactorKEvent_handler handler, void *user, uintptr_t ident, short filter, u_int fflags, intptr_t data)
  966. {
  967. DebugObject_Access(&reactor->d_obj);
  968. // init arguments
  969. o->reactor = reactor;
  970. o->handler = handler;
  971. o->user = user;
  972. o->ident = ident;
  973. o->filter = filter;
  974. // add kevent
  975. struct kevent event;
  976. memset(&event, 0, sizeof(event));
  977. event.ident = o->ident;
  978. event.filter = o->filter;
  979. event.flags = EV_ADD;
  980. event.fflags = fflags;
  981. event.data = data;
  982. event.udata = &o->kevent_tag;
  983. if (kevent(o->reactor->kqueue_fd, &event, 1, NULL, 0, NULL) < 0) {
  984. return 0;
  985. }
  986. // set kevent tag
  987. o->kevent_tag = KEVENT_TAG_KEVENT;
  988. // set kevent returned pointer
  989. o->kevent_returned_ptr = NULL;
  990. DebugObject_Init(&o->d_obj);
  991. DebugCounter_Increment(&o->reactor->d_kevent_ctr);
  992. return 1;
  993. }
  994. void BReactorKEvent_Free (BReactorKEvent *o)
  995. {
  996. DebugObject_Free(&o->d_obj);
  997. DebugCounter_Decrement(&o->reactor->d_kevent_ctr);
  998. // write through kevent returned pointer
  999. if (o->kevent_returned_ptr) {
  1000. *o->kevent_returned_ptr = NULL;
  1001. }
  1002. // delete kevent
  1003. struct kevent event;
  1004. memset(&event, 0, sizeof(event));
  1005. event.ident = o->ident;
  1006. event.filter = o->filter;
  1007. event.flags = EV_DELETE;
  1008. ASSERT_FORCE(kevent(o->reactor->kqueue_fd, &event, 1, NULL, 0, NULL) == 0)
  1009. }
  1010. #endif
  1011. #ifdef BADVPN_USE_WINAPI
  1012. HANDLE BReactor_GetIOCPHandle (BReactor *reactor)
  1013. {
  1014. DebugObject_Access(&reactor->d_obj);
  1015. return reactor->iocp_handle;
  1016. }
  1017. void BReactorIOCPOverlapped_Init (BReactorIOCPOverlapped *o, BReactor *reactor, void *user, BReactorIOCPOverlapped_handler handler)
  1018. {
  1019. DebugObject_Access(&reactor->d_obj);
  1020. // init arguments
  1021. o->reactor = reactor;
  1022. o->user = user;
  1023. o->handler = handler;
  1024. // zero overlapped
  1025. memset(&o->olap, 0, sizeof(o->olap));
  1026. // append to IOCP list
  1027. LinkedList1_Append(&reactor->iocp_list, &o->iocp_list_node);
  1028. // set not ready
  1029. o->is_ready = 0;
  1030. DebugObject_Init(&o->d_obj);
  1031. }
  1032. void BReactorIOCPOverlapped_Free (BReactorIOCPOverlapped *o)
  1033. {
  1034. BReactor *reactor = o->reactor;
  1035. DebugObject_Free(&o->d_obj);
  1036. // remove from IOCP ready list
  1037. if (o->is_ready) {
  1038. LinkedList1_Remove(&reactor->iocp_ready_list, &o->ready_list_node);
  1039. }
  1040. // remove from IOCP list
  1041. LinkedList1_Remove(&reactor->iocp_list, &o->iocp_list_node);
  1042. }
  1043. void BReactorIOCPOverlapped_Wait (BReactorIOCPOverlapped *o, int *out_succeeded, DWORD *out_bytes)
  1044. {
  1045. BReactor *reactor = o->reactor;
  1046. DebugObject_Access(&o->d_obj);
  1047. // wait for IOCP events until we get an event for this olap
  1048. while (!o->is_ready) {
  1049. DWORD bytes = 0;
  1050. ULONG_PTR key;
  1051. BReactorIOCPOverlapped *olap = NULL;
  1052. BOOL res = GetQueuedCompletionStatus(reactor->iocp_handle, &bytes, &key, (OVERLAPPED **)&olap, INFINITE);
  1053. ASSERT_FORCE(olap)
  1054. DebugObject_Access(&olap->d_obj);
  1055. ASSERT(olap->reactor == reactor)
  1056. // regular I/O should be done synchronously, so we shoudln't ever get a second completion before an
  1057. // existing one is dispatched. If however PostQueuedCompletionStatus is being used to signal events,
  1058. // just discard any excess events.
  1059. if (!olap->is_ready) {
  1060. set_iocp_ready(olap, (res == TRUE), bytes);
  1061. }
  1062. }
  1063. // remove from IOCP ready list
  1064. LinkedList1_Remove(&reactor->iocp_ready_list, &o->ready_list_node);
  1065. // set not ready
  1066. o->is_ready = 0;
  1067. if (out_succeeded) {
  1068. *out_succeeded = o->ready_succeeded;
  1069. }
  1070. if (out_bytes) {
  1071. *out_bytes = o->ready_bytes;
  1072. }
  1073. }
  1074. #endif