BThreadWork.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. /**
  2. * @file BThreadWork.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdint.h>
  23. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  24. #include <unistd.h>
  25. #include <errno.h>
  26. #include <fcntl.h>
  27. #endif
  28. #include <misc/offset.h>
  29. #include <system/BLog.h>
  30. #include <generated/blog_channel_BThreadWork.h>
  31. #include <threadwork/BThreadWork.h>
  32. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  33. static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
  34. {
  35. BThreadWorkDispatcher *o = t->d;
  36. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  37. while (1) {
  38. // exit if requested
  39. if (o->cancel) {
  40. break;
  41. }
  42. if (LinkedList2_IsEmpty(&o->pending_list)) {
  43. // wait for event
  44. ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
  45. continue;
  46. }
  47. // grab the work
  48. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
  49. ASSERT(w->state == BTHREADWORK_STATE_PENDING)
  50. LinkedList2_Remove(&o->pending_list, &w->list_node);
  51. t->running_work = w;
  52. w->state = BTHREADWORK_STATE_RUNNING;
  53. // do the work
  54. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  55. w->work_func(w->work_func_user);
  56. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  57. // release the work
  58. t->running_work = NULL;
  59. LinkedList2_Append(&o->finished_list, &w->list_node);
  60. w->state = BTHREADWORK_STATE_FINISHED;
  61. ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
  62. // write to pipe
  63. uint8_t b = 0;
  64. int res = write(o->pipe[1], &b, sizeof(b));
  65. if (res < 0) {
  66. int error = errno;
  67. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  68. }
  69. }
  70. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  71. }
  72. static void dispatch_job (BThreadWorkDispatcher *o)
  73. {
  74. ASSERT(o->num_threads > 0)
  75. // lock
  76. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  77. // check for finished job
  78. if (LinkedList2_IsEmpty(&o->finished_list)) {
  79. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  80. return;
  81. }
  82. // grab finished job
  83. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node);
  84. ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
  85. LinkedList2_Remove(&o->finished_list, &w->list_node);
  86. // schedule more
  87. if (!LinkedList2_IsEmpty(&o->finished_list)) {
  88. BPending_Set(&o->more_job);
  89. }
  90. // set state forgotten
  91. w->state = BTHREADWORK_STATE_FORGOTTEN;
  92. // unlock
  93. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  94. // call handler
  95. w->handler_done(w->user);
  96. return;
  97. }
  98. static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
  99. {
  100. ASSERT(o->num_threads > 0)
  101. DebugObject_Access(&o->d_obj);
  102. // read data from pipe
  103. uint8_t b[64];
  104. int res = read(o->pipe[0], b, sizeof(b));
  105. if (res < 0) {
  106. int error = errno;
  107. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  108. } else {
  109. ASSERT(res > 0)
  110. }
  111. dispatch_job(o);
  112. return;
  113. }
  114. static void more_job_handler (BThreadWorkDispatcher *o)
  115. {
  116. ASSERT(o->num_threads > 0)
  117. DebugObject_Access(&o->d_obj);
  118. dispatch_job(o);
  119. return;
  120. }
  121. static void stop_threads (BThreadWorkDispatcher *o)
  122. {
  123. // set cancelling
  124. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  125. o->cancel = 1;
  126. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  127. while (o->num_threads > 0) {
  128. struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
  129. // wake up thread
  130. ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
  131. // wait for thread to exit
  132. ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
  133. // free condition variable
  134. ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
  135. o->num_threads--;
  136. }
  137. }
  138. #endif
  139. static void work_job_handler (BThreadWork *o)
  140. {
  141. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  142. ASSERT(o->d->num_threads == 0)
  143. #endif
  144. DebugObject_Access(&o->d_obj);
  145. // do the work
  146. o->work_func(o->work_func_user);
  147. // call handler
  148. o->handler_done(o->user);
  149. return;
  150. }
  151. int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
  152. {
  153. // init arguments
  154. o->reactor = reactor;
  155. if (num_threads_hint < 0) {
  156. num_threads_hint = 2;
  157. }
  158. if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
  159. num_threads_hint = BTHREADWORK_MAX_THREADS;
  160. }
  161. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  162. if (num_threads_hint > 0) {
  163. // init pending list
  164. LinkedList2_Init(&o->pending_list);
  165. // init finished list
  166. LinkedList2_Init(&o->finished_list);
  167. // init mutex
  168. if (pthread_mutex_init(&o->mutex, NULL) != 0) {
  169. BLog(BLOG_ERROR, "pthread_mutex_init failed");
  170. goto fail0;
  171. }
  172. // init pipe
  173. if (pipe(o->pipe) < 0) {
  174. BLog(BLOG_ERROR, "pipe failed");
  175. goto fail1;
  176. }
  177. // set read end non-blocking
  178. if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
  179. BLog(BLOG_ERROR, "fcntl failed");
  180. goto fail2;
  181. }
  182. // set write end non-blocking
  183. if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
  184. BLog(BLOG_ERROR, "fcntl failed");
  185. goto fail2;
  186. }
  187. // init BFileDescriptor
  188. BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
  189. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  190. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  191. goto fail2;
  192. }
  193. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  194. // init more job
  195. BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
  196. // set not cancelling
  197. o->cancel = 0;
  198. // init threads
  199. o->num_threads = 0;
  200. for (int i = 0; i < num_threads_hint; i++) {
  201. struct BThreadWorkDispatcher_thread *t = &o->threads[i];
  202. // set parent pointer
  203. t->d = o;
  204. // set no running work
  205. t->running_work = NULL;
  206. // init condition variable
  207. if (pthread_cond_init(&t->new_cond, NULL) != 0) {
  208. BLog(BLOG_ERROR, "pthread_cond_init failed");
  209. goto fail3;
  210. }
  211. // init thread
  212. if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
  213. BLog(BLOG_ERROR, "pthread_create failed");
  214. ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
  215. goto fail3;
  216. }
  217. o->num_threads++;
  218. }
  219. }
  220. #endif
  221. DebugObject_Init(&o->d_obj);
  222. DebugCounter_Init(&o->d_ctr);
  223. return 1;
  224. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  225. fail3:
  226. stop_threads(o);
  227. BPending_Free(&o->more_job);
  228. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  229. fail2:
  230. ASSERT_FORCE(close(o->pipe[0]) == 0)
  231. ASSERT_FORCE(close(o->pipe[1]) == 0)
  232. fail1:
  233. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  234. fail0:
  235. return 0;
  236. #endif
  237. }
  238. void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
  239. {
  240. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  241. if (o->num_threads > 0) {
  242. ASSERT(LinkedList2_IsEmpty(&o->pending_list))
  243. for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
  244. ASSERT(LinkedList2_IsEmpty(&o->finished_list))
  245. }
  246. #endif
  247. DebugObject_Free(&o->d_obj);
  248. DebugCounter_Free(&o->d_ctr);
  249. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  250. if (o->num_threads > 0) {
  251. // stop threads
  252. stop_threads(o);
  253. // free more job
  254. BPending_Free(&o->more_job);
  255. // free BFileDescriptor
  256. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  257. // free pipe
  258. ASSERT_FORCE(close(o->pipe[0]) == 0)
  259. ASSERT_FORCE(close(o->pipe[1]) == 0)
  260. // free mutex
  261. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  262. }
  263. #endif
  264. }
  265. int BThreadWorkDispatcher_UsingThreads (BThreadWorkDispatcher *o)
  266. {
  267. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  268. return (o->num_threads > 0);
  269. #else
  270. return 0;
  271. #endif
  272. }
  273. void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
  274. {
  275. DebugObject_Access(&d->d_obj);
  276. // init arguments
  277. o->d = d;
  278. o->handler_done = handler_done;
  279. o->user = user;
  280. o->work_func = work_func;
  281. o->work_func_user = work_func_user;
  282. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  283. if (d->num_threads > 0) {
  284. // set state
  285. o->state = BTHREADWORK_STATE_PENDING;
  286. // init finished semaphore
  287. ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
  288. // post work
  289. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  290. LinkedList2_Append(&d->pending_list, &o->list_node);
  291. for (int i = 0; i < d->num_threads; i++) {
  292. if (!d->threads[i].running_work) {
  293. ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
  294. break;
  295. }
  296. }
  297. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  298. } else {
  299. #endif
  300. // schedule job
  301. BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
  302. BPending_Set(&o->job);
  303. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  304. }
  305. #endif
  306. DebugObject_Init(&o->d_obj);
  307. DebugCounter_Increment(&d->d_ctr);
  308. }
  309. void BThreadWork_Free (BThreadWork *o)
  310. {
  311. BThreadWorkDispatcher *d = o->d;
  312. DebugObject_Free(&o->d_obj);
  313. DebugCounter_Decrement(&d->d_ctr);
  314. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  315. if (d->num_threads > 0) {
  316. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  317. switch (o->state) {
  318. case BTHREADWORK_STATE_PENDING: {
  319. BLog(BLOG_DEBUG, "remove pending work");
  320. // remove from pending list
  321. LinkedList2_Remove(&d->pending_list, &o->list_node);
  322. } break;
  323. case BTHREADWORK_STATE_RUNNING: {
  324. BLog(BLOG_DEBUG, "remove running work");
  325. // wait for the work to finish running
  326. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  327. ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
  328. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  329. ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
  330. // remove from finished list
  331. LinkedList2_Remove(&d->finished_list, &o->list_node);
  332. } break;
  333. case BTHREADWORK_STATE_FINISHED: {
  334. BLog(BLOG_DEBUG, "remove finished work");
  335. // remove from finished list
  336. LinkedList2_Remove(&d->finished_list, &o->list_node);
  337. } break;
  338. case BTHREADWORK_STATE_FORGOTTEN: {
  339. BLog(BLOG_DEBUG, "remove forgotten work");
  340. } break;
  341. default:
  342. ASSERT(0);
  343. }
  344. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  345. // free finished semaphore
  346. ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
  347. } else {
  348. #endif
  349. BPending_Free(&o->job);
  350. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  351. }
  352. #endif
  353. }