BThreadWork.c 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /**
  2. * @file BThreadWork.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdint.h>
  23. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  24. #include <unistd.h>
  25. #include <errno.h>
  26. #endif
  27. #include <misc/offset.h>
  28. #include <system/BLog.h>
  29. #include <generated/blog_channel_BThreadWork.h>
  30. #include <threadwork/BThreadWork.h>
  31. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  32. static void * dispatcher_thread (BThreadWorkDispatcher *o)
  33. {
  34. while (1) {
  35. // wait for a work
  36. ASSERT_FORCE(sem_wait(&o->new_sem) == 0)
  37. // grab the work
  38. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  39. if (LinkedList2_IsEmpty(&o->pending_list)) {
  40. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  41. continue;
  42. }
  43. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
  44. ASSERT(w->state == BTHREADWORK_STATE_PENDING)
  45. LinkedList2_Remove(&o->pending_list, &w->list_node);
  46. o->running_work = w;
  47. w->state = BTHREADWORK_STATE_RUNNING;
  48. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  49. // do the work
  50. w->work_func(w->work_func_user);
  51. // release the work
  52. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  53. o->running_work = NULL;
  54. LinkedList2_Append(&o->finished_list, &w->list_node);
  55. w->state = BTHREADWORK_STATE_FINISHED;
  56. ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
  57. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  58. // write to pipe
  59. uint8_t b = 0;
  60. ASSERT_FORCE(write(o->pipe[1], &b, sizeof(b)) == sizeof(b))
  61. }
  62. }
  63. static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
  64. {
  65. ASSERT(o->num_threads > 0)
  66. DebugObject_Access(&o->d_obj);
  67. // read from pipe
  68. uint8_t b;
  69. int res = read(o->pipe[0], &b, sizeof(b));
  70. if (res < 0) {
  71. int error = errno;
  72. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  73. return;
  74. }
  75. ASSERT(res == sizeof(b))
  76. ASSERT(b == 0)
  77. // grab a finished work
  78. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  79. if (LinkedList2_IsEmpty(&o->finished_list)) {
  80. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  81. return;
  82. }
  83. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node);
  84. ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
  85. LinkedList2_Remove(&o->finished_list, &w->list_node);
  86. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  87. // set state forgotten
  88. w->state = BTHREADWORK_STATE_FORGOTTEN;
  89. // call handler
  90. w->handler_done(w->user);
  91. return;
  92. }
  93. #endif
  94. static void work_job_handler (BThreadWork *o)
  95. {
  96. ASSERT(o->d->num_threads == 0)
  97. DebugObject_Access(&o->d_obj);
  98. // do the work
  99. o->work_func(o->work_func_user);
  100. // call handler
  101. o->handler_done(o->user);
  102. return;
  103. }
  104. int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
  105. {
  106. // init arguments
  107. o->reactor = reactor;
  108. // set num threads
  109. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  110. if (num_threads_hint < 0) {
  111. o->num_threads = 2;
  112. } else {
  113. o->num_threads = num_threads_hint;
  114. }
  115. #else
  116. o->num_threads = 0;
  117. #endif
  118. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  119. if (o->num_threads > 0) {
  120. // init pending list
  121. LinkedList2_Init(&o->pending_list);
  122. // set no running work
  123. o->running_work = NULL;
  124. // init finished list
  125. LinkedList2_Init(&o->finished_list);
  126. // init mutex
  127. if (pthread_mutex_init(&o->mutex, NULL) != 0) {
  128. BLog(BLOG_ERROR, "pthread_mutex_init failed");
  129. goto fail0;
  130. }
  131. // init semaphore
  132. if (sem_init(&o->new_sem, 0, 0) != 0) {
  133. BLog(BLOG_ERROR, "sem_init failed");
  134. goto fail1;
  135. }
  136. // init pipe
  137. if (pipe(o->pipe) < 0) {
  138. BLog(BLOG_ERROR, "pipe failed");
  139. goto fail2;
  140. }
  141. // init BFileDescriptor
  142. BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
  143. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  144. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  145. goto fail3;
  146. }
  147. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  148. // init thread
  149. if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) {
  150. BLog(BLOG_ERROR, "pthread_create failed");
  151. goto fail4;
  152. }
  153. }
  154. #endif
  155. DebugObject_Init(&o->d_obj);
  156. DebugCounter_Init(&o->d_ctr);
  157. return 1;
  158. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  159. fail4:
  160. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  161. fail3:
  162. ASSERT_FORCE(close(o->pipe[0]) == 0)
  163. ASSERT_FORCE(close(o->pipe[1]) == 0)
  164. fail2:
  165. ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
  166. fail1:
  167. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  168. #endif
  169. fail0:
  170. return 0;
  171. }
  172. void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
  173. {
  174. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  175. if (o->num_threads > 0) {
  176. ASSERT(LinkedList2_IsEmpty(&o->pending_list))
  177. ASSERT(!o->running_work)
  178. ASSERT(LinkedList2_IsEmpty(&o->finished_list))
  179. }
  180. #endif
  181. DebugObject_Free(&o->d_obj);
  182. DebugCounter_Free(&o->d_ctr);
  183. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  184. if (o->num_threads > 0) {
  185. // stop thread
  186. ASSERT_FORCE(pthread_cancel(o->thread) == 0)
  187. void *retval;
  188. ASSERT_FORCE(pthread_join(o->thread, &retval) == 0)
  189. // free BFileDescriptor
  190. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  191. // free pipe
  192. ASSERT_FORCE(close(o->pipe[0]) == 0)
  193. ASSERT_FORCE(close(o->pipe[1]) == 0)
  194. // free semaphore
  195. ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
  196. // free mutex
  197. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  198. }
  199. #endif
  200. }
  201. void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
  202. {
  203. DebugObject_Access(&d->d_obj);
  204. // init arguments
  205. o->d = d;
  206. o->handler_done = handler_done;
  207. o->user = user;
  208. o->work_func = work_func;
  209. o->work_func_user = work_func_user;
  210. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  211. if (d->num_threads > 0) {
  212. // set state
  213. o->state = BTHREADWORK_STATE_PENDING;
  214. // init finished semaphore
  215. ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
  216. // insert to pending list
  217. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  218. LinkedList2_Append(&d->pending_list, &o->list_node);
  219. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  220. // post to new semaphore
  221. ASSERT_FORCE(sem_post(&d->new_sem) == 0)
  222. } else {
  223. #endif
  224. // schedule job
  225. BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
  226. BPending_Set(&o->job);
  227. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  228. }
  229. #endif
  230. DebugObject_Init(&o->d_obj);
  231. DebugCounter_Increment(&d->d_ctr);
  232. }
  233. void BThreadWork_Free (BThreadWork *o)
  234. {
  235. BThreadWorkDispatcher *d = o->d;
  236. DebugObject_Free(&o->d_obj);
  237. DebugCounter_Decrement(&d->d_ctr);
  238. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  239. if (d->num_threads > 0) {
  240. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  241. switch (o->state) {
  242. case BTHREADWORK_STATE_PENDING: {
  243. BLog(BLOG_DEBUG, "remove pending work");
  244. // remove from pending list
  245. LinkedList2_Remove(&d->pending_list, &o->list_node);
  246. } break;
  247. case BTHREADWORK_STATE_RUNNING: {
  248. BLog(BLOG_DEBUG, "remove running work");
  249. // wait for the work to finish running
  250. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  251. ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
  252. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  253. ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
  254. // remove from finished list
  255. LinkedList2_Remove(&d->finished_list, &o->list_node);
  256. } break;
  257. case BTHREADWORK_STATE_FINISHED: {
  258. BLog(BLOG_DEBUG, "remove finished work");
  259. // remove from finished list
  260. LinkedList2_Remove(&d->finished_list, &o->list_node);
  261. } break;
  262. case BTHREADWORK_STATE_FORGOTTEN: {
  263. BLog(BLOG_DEBUG, "remove forgotten work");
  264. } break;
  265. default:
  266. ASSERT(0);
  267. }
  268. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  269. // free finished semaphore
  270. ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
  271. } else {
  272. #endif
  273. BPending_Free(&o->job);
  274. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  275. }
  276. #endif
  277. }