BThreadWork.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. /**
  2. * @file BThreadWork.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <stdint.h>
  30. #include <stddef.h>
  31. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  32. #include <unistd.h>
  33. #include <errno.h>
  34. #include <fcntl.h>
  35. #endif
  36. #include <misc/offset.h>
  37. #include <base/BLog.h>
  38. #include <generated/blog_channel_BThreadWork.h>
  39. #include <threadwork/BThreadWork.h>
  40. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  41. static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
  42. {
  43. BThreadWorkDispatcher *o = t->d;
  44. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  45. while (1) {
  46. // exit if requested
  47. if (o->cancel) {
  48. break;
  49. }
  50. if (LinkedList1_IsEmpty(&o->pending_list)) {
  51. // wait for event
  52. ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
  53. continue;
  54. }
  55. // grab the work
  56. BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->pending_list), BThreadWork, list_node);
  57. ASSERT(w->state == BTHREADWORK_STATE_PENDING)
  58. LinkedList1_Remove(&o->pending_list, &w->list_node);
  59. t->running_work = w;
  60. w->state = BTHREADWORK_STATE_RUNNING;
  61. // do the work
  62. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  63. w->work_func(w->work_func_user);
  64. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  65. // release the work
  66. t->running_work = NULL;
  67. LinkedList1_Append(&o->finished_list, &w->list_node);
  68. w->state = BTHREADWORK_STATE_FINISHED;
  69. ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
  70. // write to pipe
  71. uint8_t b = 0;
  72. int res = write(o->pipe[1], &b, sizeof(b));
  73. if (res < 0) {
  74. int error = errno;
  75. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  76. }
  77. }
  78. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  79. return NULL;
  80. }
  81. static void dispatch_job (BThreadWorkDispatcher *o)
  82. {
  83. ASSERT(o->num_threads > 0)
  84. // lock
  85. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  86. // check for finished job
  87. if (LinkedList1_IsEmpty(&o->finished_list)) {
  88. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  89. return;
  90. }
  91. // grab finished job
  92. BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->finished_list), BThreadWork, list_node);
  93. ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
  94. LinkedList1_Remove(&o->finished_list, &w->list_node);
  95. // schedule more
  96. if (!LinkedList1_IsEmpty(&o->finished_list)) {
  97. BPending_Set(&o->more_job);
  98. }
  99. // set state forgotten
  100. w->state = BTHREADWORK_STATE_FORGOTTEN;
  101. // unlock
  102. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  103. // call handler
  104. w->handler_done(w->user);
  105. return;
  106. }
  107. static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
  108. {
  109. ASSERT(o->num_threads > 0)
  110. DebugObject_Access(&o->d_obj);
  111. // read data from pipe
  112. uint8_t b[64];
  113. int res = read(o->pipe[0], b, sizeof(b));
  114. if (res < 0) {
  115. int error = errno;
  116. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  117. } else {
  118. ASSERT(res > 0)
  119. }
  120. dispatch_job(o);
  121. return;
  122. }
  123. static void more_job_handler (BThreadWorkDispatcher *o)
  124. {
  125. ASSERT(o->num_threads > 0)
  126. DebugObject_Access(&o->d_obj);
  127. dispatch_job(o);
  128. return;
  129. }
  130. static void stop_threads (BThreadWorkDispatcher *o)
  131. {
  132. // set cancelling
  133. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  134. o->cancel = 1;
  135. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  136. while (o->num_threads > 0) {
  137. struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
  138. // wake up thread
  139. ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
  140. // wait for thread to exit
  141. ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
  142. // free condition variable
  143. ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
  144. o->num_threads--;
  145. }
  146. }
  147. #endif
  148. static void work_job_handler (BThreadWork *o)
  149. {
  150. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  151. ASSERT(o->d->num_threads == 0)
  152. #endif
  153. DebugObject_Access(&o->d_obj);
  154. // do the work
  155. o->work_func(o->work_func_user);
  156. // call handler
  157. o->handler_done(o->user);
  158. return;
  159. }
  160. int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
  161. {
  162. // init arguments
  163. o->reactor = reactor;
  164. if (num_threads_hint < 0) {
  165. num_threads_hint = 2;
  166. }
  167. if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
  168. num_threads_hint = BTHREADWORK_MAX_THREADS;
  169. }
  170. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  171. if (num_threads_hint > 0) {
  172. // init pending list
  173. LinkedList1_Init(&o->pending_list);
  174. // init finished list
  175. LinkedList1_Init(&o->finished_list);
  176. // init mutex
  177. if (pthread_mutex_init(&o->mutex, NULL) != 0) {
  178. BLog(BLOG_ERROR, "pthread_mutex_init failed");
  179. goto fail0;
  180. }
  181. // init pipe
  182. if (pipe(o->pipe) < 0) {
  183. BLog(BLOG_ERROR, "pipe failed");
  184. goto fail1;
  185. }
  186. // set read end non-blocking
  187. if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
  188. BLog(BLOG_ERROR, "fcntl failed");
  189. goto fail2;
  190. }
  191. // set write end non-blocking
  192. if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
  193. BLog(BLOG_ERROR, "fcntl failed");
  194. goto fail2;
  195. }
  196. // init BFileDescriptor
  197. BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
  198. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  199. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  200. goto fail2;
  201. }
  202. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  203. // init more job
  204. BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
  205. // set not cancelling
  206. o->cancel = 0;
  207. // init threads
  208. o->num_threads = 0;
  209. for (int i = 0; i < num_threads_hint; i++) {
  210. struct BThreadWorkDispatcher_thread *t = &o->threads[i];
  211. // set parent pointer
  212. t->d = o;
  213. // set no running work
  214. t->running_work = NULL;
  215. // init condition variable
  216. if (pthread_cond_init(&t->new_cond, NULL) != 0) {
  217. BLog(BLOG_ERROR, "pthread_cond_init failed");
  218. goto fail3;
  219. }
  220. // init thread
  221. if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
  222. BLog(BLOG_ERROR, "pthread_create failed");
  223. ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
  224. goto fail3;
  225. }
  226. o->num_threads++;
  227. }
  228. }
  229. #endif
  230. DebugObject_Init(&o->d_obj);
  231. DebugCounter_Init(&o->d_ctr);
  232. return 1;
  233. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  234. fail3:
  235. stop_threads(o);
  236. BPending_Free(&o->more_job);
  237. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  238. fail2:
  239. ASSERT_FORCE(close(o->pipe[0]) == 0)
  240. ASSERT_FORCE(close(o->pipe[1]) == 0)
  241. fail1:
  242. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  243. fail0:
  244. return 0;
  245. #endif
  246. }
  247. void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
  248. {
  249. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  250. if (o->num_threads > 0) {
  251. ASSERT(LinkedList1_IsEmpty(&o->pending_list))
  252. for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
  253. ASSERT(LinkedList1_IsEmpty(&o->finished_list))
  254. }
  255. #endif
  256. DebugObject_Free(&o->d_obj);
  257. DebugCounter_Free(&o->d_ctr);
  258. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  259. if (o->num_threads > 0) {
  260. // stop threads
  261. stop_threads(o);
  262. // free more job
  263. BPending_Free(&o->more_job);
  264. // free BFileDescriptor
  265. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  266. // free pipe
  267. ASSERT_FORCE(close(o->pipe[0]) == 0)
  268. ASSERT_FORCE(close(o->pipe[1]) == 0)
  269. // free mutex
  270. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  271. }
  272. #endif
  273. }
  274. int BThreadWorkDispatcher_UsingThreads (BThreadWorkDispatcher *o)
  275. {
  276. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  277. return (o->num_threads > 0);
  278. #else
  279. return 0;
  280. #endif
  281. }
  282. void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
  283. {
  284. DebugObject_Access(&d->d_obj);
  285. // init arguments
  286. o->d = d;
  287. o->handler_done = handler_done;
  288. o->user = user;
  289. o->work_func = work_func;
  290. o->work_func_user = work_func_user;
  291. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  292. if (d->num_threads > 0) {
  293. // set state
  294. o->state = BTHREADWORK_STATE_PENDING;
  295. // init finished semaphore
  296. ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
  297. // post work
  298. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  299. LinkedList1_Append(&d->pending_list, &o->list_node);
  300. for (int i = 0; i < d->num_threads; i++) {
  301. if (!d->threads[i].running_work) {
  302. ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
  303. break;
  304. }
  305. }
  306. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  307. } else {
  308. #endif
  309. // schedule job
  310. BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
  311. BPending_Set(&o->job);
  312. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  313. }
  314. #endif
  315. DebugObject_Init(&o->d_obj);
  316. DebugCounter_Increment(&d->d_ctr);
  317. }
  318. void BThreadWork_Free (BThreadWork *o)
  319. {
  320. BThreadWorkDispatcher *d = o->d;
  321. DebugObject_Free(&o->d_obj);
  322. DebugCounter_Decrement(&d->d_ctr);
  323. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  324. if (d->num_threads > 0) {
  325. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  326. switch (o->state) {
  327. case BTHREADWORK_STATE_PENDING: {
  328. BLog(BLOG_DEBUG, "remove pending work");
  329. // remove from pending list
  330. LinkedList1_Remove(&d->pending_list, &o->list_node);
  331. } break;
  332. case BTHREADWORK_STATE_RUNNING: {
  333. BLog(BLOG_DEBUG, "remove running work");
  334. // wait for the work to finish running
  335. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  336. ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
  337. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  338. ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
  339. // remove from finished list
  340. LinkedList1_Remove(&d->finished_list, &o->list_node);
  341. } break;
  342. case BTHREADWORK_STATE_FINISHED: {
  343. BLog(BLOG_DEBUG, "remove finished work");
  344. // remove from finished list
  345. LinkedList1_Remove(&d->finished_list, &o->list_node);
  346. } break;
  347. case BTHREADWORK_STATE_FORGOTTEN: {
  348. BLog(BLOG_DEBUG, "remove forgotten work");
  349. } break;
  350. default:
  351. ASSERT(0);
  352. }
  353. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  354. // free finished semaphore
  355. ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
  356. } else {
  357. #endif
  358. BPending_Free(&o->job);
  359. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  360. }
  361. #endif
  362. }