BThreadWork.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. /**
  2. * @file BThreadWork.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdint.h>
  23. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  24. #include <unistd.h>
  25. #include <errno.h>
  26. #include <fcntl.h>
  27. #endif
  28. #include <misc/offset.h>
  29. #include <system/BLog.h>
  30. #include <generated/blog_channel_BThreadWork.h>
  31. #include <threadwork/BThreadWork.h>
  32. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  33. static void * dispatcher_thread (BThreadWorkDispatcher *o)
  34. {
  35. while (1) {
  36. // wait for a work
  37. ASSERT_FORCE(sem_wait(&o->new_sem) == 0)
  38. // grab the work
  39. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  40. if (LinkedList2_IsEmpty(&o->pending_list)) {
  41. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  42. continue;
  43. }
  44. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
  45. ASSERT(w->state == BTHREADWORK_STATE_PENDING)
  46. LinkedList2_Remove(&o->pending_list, &w->list_node);
  47. o->running_work = w;
  48. w->state = BTHREADWORK_STATE_RUNNING;
  49. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  50. // do the work
  51. w->work_func(w->work_func_user);
  52. // release the work
  53. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  54. o->running_work = NULL;
  55. LinkedList2_Append(&o->finished_list, &w->list_node);
  56. w->state = BTHREADWORK_STATE_FINISHED;
  57. ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
  58. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  59. // write to pipe
  60. uint8_t b = 0;
  61. ASSERT_FORCE(write(o->pipe[1], &b, sizeof(b)) == sizeof(b))
  62. }
  63. }
  64. static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
  65. {
  66. ASSERT(o->num_threads > 0)
  67. DebugObject_Access(&o->d_obj);
  68. // read from pipe
  69. uint8_t b;
  70. int res = read(o->pipe[0], &b, sizeof(b));
  71. if (res < 0) {
  72. int error = errno;
  73. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  74. return;
  75. }
  76. ASSERT(res == sizeof(b))
  77. ASSERT(b == 0)
  78. // grab a finished work
  79. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  80. if (LinkedList2_IsEmpty(&o->finished_list)) {
  81. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  82. return;
  83. }
  84. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node);
  85. ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
  86. LinkedList2_Remove(&o->finished_list, &w->list_node);
  87. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  88. // set state forgotten
  89. w->state = BTHREADWORK_STATE_FORGOTTEN;
  90. // call handler
  91. w->handler_done(w->user);
  92. return;
  93. }
  94. #endif
  95. static void work_job_handler (BThreadWork *o)
  96. {
  97. ASSERT(o->d->num_threads == 0)
  98. DebugObject_Access(&o->d_obj);
  99. // do the work
  100. o->work_func(o->work_func_user);
  101. // call handler
  102. o->handler_done(o->user);
  103. return;
  104. }
  105. int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
  106. {
  107. // init arguments
  108. o->reactor = reactor;
  109. // set num threads
  110. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  111. if (num_threads_hint < 0) {
  112. o->num_threads = 2;
  113. } else {
  114. o->num_threads = num_threads_hint;
  115. }
  116. #else
  117. o->num_threads = 0;
  118. #endif
  119. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  120. if (o->num_threads > 0) {
  121. // init pending list
  122. LinkedList2_Init(&o->pending_list);
  123. // set no running work
  124. o->running_work = NULL;
  125. // init finished list
  126. LinkedList2_Init(&o->finished_list);
  127. // init mutex
  128. if (pthread_mutex_init(&o->mutex, NULL) != 0) {
  129. BLog(BLOG_ERROR, "pthread_mutex_init failed");
  130. goto fail0;
  131. }
  132. // init semaphore
  133. if (sem_init(&o->new_sem, 0, 0) != 0) {
  134. BLog(BLOG_ERROR, "sem_init failed");
  135. goto fail1;
  136. }
  137. // init pipe
  138. if (pipe(o->pipe) < 0) {
  139. BLog(BLOG_ERROR, "pipe failed");
  140. goto fail2;
  141. }
  142. // set read end non-blocking
  143. if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
  144. BLog(BLOG_ERROR, "pipe failed");
  145. goto fail3;
  146. }
  147. // init BFileDescriptor
  148. BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
  149. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  150. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  151. goto fail3;
  152. }
  153. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  154. // init thread
  155. if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) {
  156. BLog(BLOG_ERROR, "pthread_create failed");
  157. goto fail4;
  158. }
  159. }
  160. #endif
  161. DebugObject_Init(&o->d_obj);
  162. DebugCounter_Init(&o->d_ctr);
  163. return 1;
  164. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  165. fail4:
  166. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  167. fail3:
  168. ASSERT_FORCE(close(o->pipe[0]) == 0)
  169. ASSERT_FORCE(close(o->pipe[1]) == 0)
  170. fail2:
  171. ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
  172. fail1:
  173. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  174. #endif
  175. fail0:
  176. return 0;
  177. }
  178. void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
  179. {
  180. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  181. if (o->num_threads > 0) {
  182. ASSERT(LinkedList2_IsEmpty(&o->pending_list))
  183. ASSERT(!o->running_work)
  184. ASSERT(LinkedList2_IsEmpty(&o->finished_list))
  185. }
  186. #endif
  187. DebugObject_Free(&o->d_obj);
  188. DebugCounter_Free(&o->d_ctr);
  189. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  190. if (o->num_threads > 0) {
  191. // stop thread
  192. ASSERT_FORCE(pthread_cancel(o->thread) == 0)
  193. void *retval;
  194. ASSERT_FORCE(pthread_join(o->thread, &retval) == 0)
  195. // free BFileDescriptor
  196. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  197. // free pipe
  198. ASSERT_FORCE(close(o->pipe[0]) == 0)
  199. ASSERT_FORCE(close(o->pipe[1]) == 0)
  200. // free semaphore
  201. ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
  202. // free mutex
  203. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  204. }
  205. #endif
  206. }
  207. void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
  208. {
  209. DebugObject_Access(&d->d_obj);
  210. // init arguments
  211. o->d = d;
  212. o->handler_done = handler_done;
  213. o->user = user;
  214. o->work_func = work_func;
  215. o->work_func_user = work_func_user;
  216. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  217. if (d->num_threads > 0) {
  218. // set state
  219. o->state = BTHREADWORK_STATE_PENDING;
  220. // init finished semaphore
  221. ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
  222. // insert to pending list
  223. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  224. LinkedList2_Append(&d->pending_list, &o->list_node);
  225. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  226. // post to new semaphore
  227. ASSERT_FORCE(sem_post(&d->new_sem) == 0)
  228. } else {
  229. #endif
  230. // schedule job
  231. BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
  232. BPending_Set(&o->job);
  233. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  234. }
  235. #endif
  236. DebugObject_Init(&o->d_obj);
  237. DebugCounter_Increment(&d->d_ctr);
  238. }
  239. void BThreadWork_Free (BThreadWork *o)
  240. {
  241. BThreadWorkDispatcher *d = o->d;
  242. DebugObject_Free(&o->d_obj);
  243. DebugCounter_Decrement(&d->d_ctr);
  244. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  245. if (d->num_threads > 0) {
  246. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  247. switch (o->state) {
  248. case BTHREADWORK_STATE_PENDING: {
  249. BLog(BLOG_DEBUG, "remove pending work");
  250. // remove from pending list
  251. LinkedList2_Remove(&d->pending_list, &o->list_node);
  252. } break;
  253. case BTHREADWORK_STATE_RUNNING: {
  254. BLog(BLOG_DEBUG, "remove running work");
  255. // wait for the work to finish running
  256. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  257. ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
  258. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  259. ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
  260. // remove from finished list
  261. LinkedList2_Remove(&d->finished_list, &o->list_node);
  262. } break;
  263. case BTHREADWORK_STATE_FINISHED: {
  264. BLog(BLOG_DEBUG, "remove finished work");
  265. // remove from finished list
  266. LinkedList2_Remove(&d->finished_list, &o->list_node);
  267. } break;
  268. case BTHREADWORK_STATE_FORGOTTEN: {
  269. BLog(BLOG_DEBUG, "remove forgotten work");
  270. } break;
  271. default:
  272. ASSERT(0);
  273. }
  274. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  275. // free finished semaphore
  276. ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
  277. } else {
  278. #endif
  279. BPending_Free(&o->job);
  280. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  281. }
  282. #endif
  283. }