BThreadWork.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /**
  2. * @file BThreadWork.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdint.h>
  23. #include <stddef.h>
  24. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  25. #include <unistd.h>
  26. #include <errno.h>
  27. #include <fcntl.h>
  28. #endif
  29. #include <misc/offset.h>
  30. #include <base/BLog.h>
  31. #include <generated/blog_channel_BThreadWork.h>
  32. #include <threadwork/BThreadWork.h>
  33. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  34. static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
  35. {
  36. BThreadWorkDispatcher *o = t->d;
  37. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  38. while (1) {
  39. // exit if requested
  40. if (o->cancel) {
  41. break;
  42. }
  43. if (LinkedList2_IsEmpty(&o->pending_list)) {
  44. // wait for event
  45. ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
  46. continue;
  47. }
  48. // grab the work
  49. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
  50. ASSERT(w->state == BTHREADWORK_STATE_PENDING)
  51. LinkedList2_Remove(&o->pending_list, &w->list_node);
  52. t->running_work = w;
  53. w->state = BTHREADWORK_STATE_RUNNING;
  54. // do the work
  55. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  56. w->work_func(w->work_func_user);
  57. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  58. // release the work
  59. t->running_work = NULL;
  60. LinkedList2_Append(&o->finished_list, &w->list_node);
  61. w->state = BTHREADWORK_STATE_FINISHED;
  62. ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
  63. // write to pipe
  64. uint8_t b = 0;
  65. int res = write(o->pipe[1], &b, sizeof(b));
  66. if (res < 0) {
  67. int error = errno;
  68. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  69. }
  70. }
  71. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  72. return NULL;
  73. }
  74. static void dispatch_job (BThreadWorkDispatcher *o)
  75. {
  76. ASSERT(o->num_threads > 0)
  77. // lock
  78. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  79. // check for finished job
  80. if (LinkedList2_IsEmpty(&o->finished_list)) {
  81. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  82. return;
  83. }
  84. // grab finished job
  85. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node);
  86. ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
  87. LinkedList2_Remove(&o->finished_list, &w->list_node);
  88. // schedule more
  89. if (!LinkedList2_IsEmpty(&o->finished_list)) {
  90. BPending_Set(&o->more_job);
  91. }
  92. // set state forgotten
  93. w->state = BTHREADWORK_STATE_FORGOTTEN;
  94. // unlock
  95. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  96. // call handler
  97. w->handler_done(w->user);
  98. return;
  99. }
  100. static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
  101. {
  102. ASSERT(o->num_threads > 0)
  103. DebugObject_Access(&o->d_obj);
  104. // read data from pipe
  105. uint8_t b[64];
  106. int res = read(o->pipe[0], b, sizeof(b));
  107. if (res < 0) {
  108. int error = errno;
  109. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  110. } else {
  111. ASSERT(res > 0)
  112. }
  113. dispatch_job(o);
  114. return;
  115. }
  116. static void more_job_handler (BThreadWorkDispatcher *o)
  117. {
  118. ASSERT(o->num_threads > 0)
  119. DebugObject_Access(&o->d_obj);
  120. dispatch_job(o);
  121. return;
  122. }
  123. static void stop_threads (BThreadWorkDispatcher *o)
  124. {
  125. // set cancelling
  126. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  127. o->cancel = 1;
  128. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  129. while (o->num_threads > 0) {
  130. struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
  131. // wake up thread
  132. ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
  133. // wait for thread to exit
  134. ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
  135. // free condition variable
  136. ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
  137. o->num_threads--;
  138. }
  139. }
  140. #endif
  141. static void work_job_handler (BThreadWork *o)
  142. {
  143. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  144. ASSERT(o->d->num_threads == 0)
  145. #endif
  146. DebugObject_Access(&o->d_obj);
  147. // do the work
  148. o->work_func(o->work_func_user);
  149. // call handler
  150. o->handler_done(o->user);
  151. return;
  152. }
  153. int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
  154. {
  155. // init arguments
  156. o->reactor = reactor;
  157. if (num_threads_hint < 0) {
  158. num_threads_hint = 2;
  159. }
  160. if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
  161. num_threads_hint = BTHREADWORK_MAX_THREADS;
  162. }
  163. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  164. if (num_threads_hint > 0) {
  165. // init pending list
  166. LinkedList2_Init(&o->pending_list);
  167. // init finished list
  168. LinkedList2_Init(&o->finished_list);
  169. // init mutex
  170. if (pthread_mutex_init(&o->mutex, NULL) != 0) {
  171. BLog(BLOG_ERROR, "pthread_mutex_init failed");
  172. goto fail0;
  173. }
  174. // init pipe
  175. if (pipe(o->pipe) < 0) {
  176. BLog(BLOG_ERROR, "pipe failed");
  177. goto fail1;
  178. }
  179. // set read end non-blocking
  180. if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
  181. BLog(BLOG_ERROR, "fcntl failed");
  182. goto fail2;
  183. }
  184. // set write end non-blocking
  185. if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
  186. BLog(BLOG_ERROR, "fcntl failed");
  187. goto fail2;
  188. }
  189. // init BFileDescriptor
  190. BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
  191. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  192. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  193. goto fail2;
  194. }
  195. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  196. // init more job
  197. BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
  198. // set not cancelling
  199. o->cancel = 0;
  200. // init threads
  201. o->num_threads = 0;
  202. for (int i = 0; i < num_threads_hint; i++) {
  203. struct BThreadWorkDispatcher_thread *t = &o->threads[i];
  204. // set parent pointer
  205. t->d = o;
  206. // set no running work
  207. t->running_work = NULL;
  208. // init condition variable
  209. if (pthread_cond_init(&t->new_cond, NULL) != 0) {
  210. BLog(BLOG_ERROR, "pthread_cond_init failed");
  211. goto fail3;
  212. }
  213. // init thread
  214. if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
  215. BLog(BLOG_ERROR, "pthread_create failed");
  216. ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
  217. goto fail3;
  218. }
  219. o->num_threads++;
  220. }
  221. }
  222. #endif
  223. DebugObject_Init(&o->d_obj);
  224. DebugCounter_Init(&o->d_ctr);
  225. return 1;
  226. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  227. fail3:
  228. stop_threads(o);
  229. BPending_Free(&o->more_job);
  230. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  231. fail2:
  232. ASSERT_FORCE(close(o->pipe[0]) == 0)
  233. ASSERT_FORCE(close(o->pipe[1]) == 0)
  234. fail1:
  235. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  236. fail0:
  237. return 0;
  238. #endif
  239. }
  240. void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
  241. {
  242. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  243. if (o->num_threads > 0) {
  244. ASSERT(LinkedList2_IsEmpty(&o->pending_list))
  245. for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
  246. ASSERT(LinkedList2_IsEmpty(&o->finished_list))
  247. }
  248. #endif
  249. DebugObject_Free(&o->d_obj);
  250. DebugCounter_Free(&o->d_ctr);
  251. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  252. if (o->num_threads > 0) {
  253. // stop threads
  254. stop_threads(o);
  255. // free more job
  256. BPending_Free(&o->more_job);
  257. // free BFileDescriptor
  258. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  259. // free pipe
  260. ASSERT_FORCE(close(o->pipe[0]) == 0)
  261. ASSERT_FORCE(close(o->pipe[1]) == 0)
  262. // free mutex
  263. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  264. }
  265. #endif
  266. }
  267. int BThreadWorkDispatcher_UsingThreads (BThreadWorkDispatcher *o)
  268. {
  269. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  270. return (o->num_threads > 0);
  271. #else
  272. return 0;
  273. #endif
  274. }
  275. void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
  276. {
  277. DebugObject_Access(&d->d_obj);
  278. // init arguments
  279. o->d = d;
  280. o->handler_done = handler_done;
  281. o->user = user;
  282. o->work_func = work_func;
  283. o->work_func_user = work_func_user;
  284. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  285. if (d->num_threads > 0) {
  286. // set state
  287. o->state = BTHREADWORK_STATE_PENDING;
  288. // init finished semaphore
  289. ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
  290. // post work
  291. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  292. LinkedList2_Append(&d->pending_list, &o->list_node);
  293. for (int i = 0; i < d->num_threads; i++) {
  294. if (!d->threads[i].running_work) {
  295. ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
  296. break;
  297. }
  298. }
  299. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  300. } else {
  301. #endif
  302. // schedule job
  303. BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
  304. BPending_Set(&o->job);
  305. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  306. }
  307. #endif
  308. DebugObject_Init(&o->d_obj);
  309. DebugCounter_Increment(&d->d_ctr);
  310. }
  311. void BThreadWork_Free (BThreadWork *o)
  312. {
  313. BThreadWorkDispatcher *d = o->d;
  314. DebugObject_Free(&o->d_obj);
  315. DebugCounter_Decrement(&d->d_ctr);
  316. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  317. if (d->num_threads > 0) {
  318. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  319. switch (o->state) {
  320. case BTHREADWORK_STATE_PENDING: {
  321. BLog(BLOG_DEBUG, "remove pending work");
  322. // remove from pending list
  323. LinkedList2_Remove(&d->pending_list, &o->list_node);
  324. } break;
  325. case BTHREADWORK_STATE_RUNNING: {
  326. BLog(BLOG_DEBUG, "remove running work");
  327. // wait for the work to finish running
  328. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  329. ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
  330. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  331. ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
  332. // remove from finished list
  333. LinkedList2_Remove(&d->finished_list, &o->list_node);
  334. } break;
  335. case BTHREADWORK_STATE_FINISHED: {
  336. BLog(BLOG_DEBUG, "remove finished work");
  337. // remove from finished list
  338. LinkedList2_Remove(&d->finished_list, &o->list_node);
  339. } break;
  340. case BTHREADWORK_STATE_FORGOTTEN: {
  341. BLog(BLOG_DEBUG, "remove forgotten work");
  342. } break;
  343. default:
  344. ASSERT(0);
  345. }
  346. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  347. // free finished semaphore
  348. ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
  349. } else {
  350. #endif
  351. BPending_Free(&o->job);
  352. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  353. }
  354. #endif
  355. }