BThreadWork.c 12 KB


  1. /**
  2. * @file BThreadWork.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <stdint.h>
  23. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  24. #include <unistd.h>
  25. #include <errno.h>
  26. #include <fcntl.h>
  27. #endif
  28. #include <misc/offset.h>
  29. #include <system/BLog.h>
  30. #include <generated/blog_channel_BThreadWork.h>
  31. #include <threadwork/BThreadWork.h>
  32. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  33. static void * dispatcher_thread (BThreadWorkDispatcher *o)
  34. {
  35. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  36. while (1) {
  37. // exit if requested
  38. if (o->cancel) {
  39. break;
  40. }
  41. if (LinkedList2_IsEmpty(&o->pending_list)) {
  42. // wait for event
  43. ASSERT_FORCE(pthread_cond_wait(&o->new_cond, &o->mutex) == 0)
  44. continue;
  45. }
  46. // grab the work
  47. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
  48. ASSERT(w->state == BTHREADWORK_STATE_PENDING)
  49. LinkedList2_Remove(&o->pending_list, &w->list_node);
  50. o->running_work = w;
  51. w->state = BTHREADWORK_STATE_RUNNING;
  52. // do the work
  53. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  54. w->work_func(w->work_func_user);
  55. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  56. // release the work
  57. o->running_work = NULL;
  58. LinkedList2_Append(&o->finished_list, &w->list_node);
  59. w->state = BTHREADWORK_STATE_FINISHED;
  60. ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
  61. // write to pipe
  62. uint8_t b = 0;
  63. int res = write(o->pipe[1], &b, sizeof(b));
  64. if (res < 0) {
  65. int error = errno;
  66. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  67. }
  68. }
  69. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  70. }
  71. static void dispatch_job (BThreadWorkDispatcher *o)
  72. {
  73. ASSERT(o->num_threads > 0)
  74. // lock
  75. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  76. // check for finished job
  77. if (LinkedList2_IsEmpty(&o->finished_list)) {
  78. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  79. return;
  80. }
  81. // grab finished job
  82. BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node);
  83. ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
  84. LinkedList2_Remove(&o->finished_list, &w->list_node);
  85. // schedule more
  86. if (!LinkedList2_IsEmpty(&o->finished_list)) {
  87. BPending_Set(&o->more_job);
  88. }
  89. // set state forgotten
  90. w->state = BTHREADWORK_STATE_FORGOTTEN;
  91. // unlock
  92. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  93. // call handler
  94. w->handler_done(w->user);
  95. return;
  96. }
  97. static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
  98. {
  99. ASSERT(o->num_threads > 0)
  100. DebugObject_Access(&o->d_obj);
  101. // read data from pipe
  102. uint8_t b[64];
  103. int res = read(o->pipe[0], b, sizeof(b));
  104. if (res < 0) {
  105. int error = errno;
  106. ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
  107. } else {
  108. ASSERT(res > 0)
  109. }
  110. dispatch_job(o);
  111. return;
  112. }
  113. static void more_job_handler (BThreadWorkDispatcher *o)
  114. {
  115. ASSERT(o->num_threads > 0)
  116. DebugObject_Access(&o->d_obj);
  117. dispatch_job(o);
  118. return;
  119. }
  120. #endif
  121. static void work_job_handler (BThreadWork *o)
  122. {
  123. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  124. ASSERT(o->d->num_threads == 0)
  125. #endif
  126. DebugObject_Access(&o->d_obj);
  127. // do the work
  128. o->work_func(o->work_func_user);
  129. // call handler
  130. o->handler_done(o->user);
  131. return;
  132. }
  133. int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
  134. {
  135. // init arguments
  136. o->reactor = reactor;
  137. // set num threads
  138. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  139. if (num_threads_hint < 0) {
  140. o->num_threads = 2;
  141. } else {
  142. o->num_threads = num_threads_hint;
  143. }
  144. #endif
  145. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  146. if (o->num_threads > 0) {
  147. // init pending list
  148. LinkedList2_Init(&o->pending_list);
  149. // set no running work
  150. o->running_work = NULL;
  151. // init finished list
  152. LinkedList2_Init(&o->finished_list);
  153. // init mutex
  154. if (pthread_mutex_init(&o->mutex, NULL) != 0) {
  155. BLog(BLOG_ERROR, "pthread_mutex_init failed");
  156. goto fail0;
  157. }
  158. // init condition variable
  159. if (pthread_cond_init(&o->new_cond, NULL) != 0) {
  160. BLog(BLOG_ERROR, "pthread_cond_init failed");
  161. goto fail1;
  162. }
  163. // init pipe
  164. if (pipe(o->pipe) < 0) {
  165. BLog(BLOG_ERROR, "pipe failed");
  166. goto fail2;
  167. }
  168. // set read end non-blocking
  169. if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
  170. BLog(BLOG_ERROR, "fcntl failed");
  171. goto fail3;
  172. }
  173. // set write end non-blocking
  174. if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
  175. BLog(BLOG_ERROR, "fcntl failed");
  176. goto fail3;
  177. }
  178. // init BFileDescriptor
  179. BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
  180. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  181. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  182. goto fail3;
  183. }
  184. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  185. // init more job
  186. BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
  187. // set not cancelling
  188. o->cancel = 0;
  189. // init thread
  190. if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) {
  191. BLog(BLOG_ERROR, "pthread_create failed");
  192. goto fail4;
  193. }
  194. }
  195. #endif
  196. DebugObject_Init(&o->d_obj);
  197. DebugCounter_Init(&o->d_ctr);
  198. return 1;
  199. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  200. fail4:
  201. BPending_Free(&o->more_job);
  202. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  203. fail3:
  204. ASSERT_FORCE(close(o->pipe[0]) == 0)
  205. ASSERT_FORCE(close(o->pipe[1]) == 0)
  206. fail2:
  207. ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
  208. fail1:
  209. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  210. #endif
  211. fail0:
  212. return 0;
  213. }
  214. void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
  215. {
  216. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  217. if (o->num_threads > 0) {
  218. ASSERT(LinkedList2_IsEmpty(&o->pending_list))
  219. ASSERT(!o->running_work)
  220. ASSERT(LinkedList2_IsEmpty(&o->finished_list))
  221. }
  222. #endif
  223. DebugObject_Free(&o->d_obj);
  224. DebugCounter_Free(&o->d_ctr);
  225. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  226. if (o->num_threads > 0) {
  227. // post termination request
  228. ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
  229. o->cancel = 1;
  230. ASSERT_FORCE(pthread_cond_signal(&o->new_cond) == 0)
  231. ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
  232. // wait for thread to exit
  233. ASSERT_FORCE(pthread_join(o->thread, NULL) == 0)
  234. // free more job
  235. BPending_Free(&o->more_job);
  236. // free BFileDescriptor
  237. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  238. // free pipe
  239. ASSERT_FORCE(close(o->pipe[0]) == 0)
  240. ASSERT_FORCE(close(o->pipe[1]) == 0)
  241. // free condition variable
  242. ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
  243. // free mutex
  244. ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
  245. }
  246. #endif
  247. }
  248. int BThreadWorkDispatcher_UsingThreads (BThreadWorkDispatcher *o)
  249. {
  250. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  251. return (o->num_threads > 0);
  252. #else
  253. return 0;
  254. #endif
  255. }
  256. void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
  257. {
  258. DebugObject_Access(&d->d_obj);
  259. // init arguments
  260. o->d = d;
  261. o->handler_done = handler_done;
  262. o->user = user;
  263. o->work_func = work_func;
  264. o->work_func_user = work_func_user;
  265. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  266. if (d->num_threads > 0) {
  267. // set state
  268. o->state = BTHREADWORK_STATE_PENDING;
  269. // init finished semaphore
  270. ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
  271. // post work
  272. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  273. LinkedList2_Append(&d->pending_list, &o->list_node);
  274. ASSERT_FORCE(pthread_cond_signal(&d->new_cond) == 0)
  275. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  276. } else {
  277. #endif
  278. // schedule job
  279. BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
  280. BPending_Set(&o->job);
  281. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  282. }
  283. #endif
  284. DebugObject_Init(&o->d_obj);
  285. DebugCounter_Increment(&d->d_ctr);
  286. }
  287. void BThreadWork_Free (BThreadWork *o)
  288. {
  289. BThreadWorkDispatcher *d = o->d;
  290. DebugObject_Free(&o->d_obj);
  291. DebugCounter_Decrement(&d->d_ctr);
  292. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  293. if (d->num_threads > 0) {
  294. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  295. switch (o->state) {
  296. case BTHREADWORK_STATE_PENDING: {
  297. BLog(BLOG_DEBUG, "remove pending work");
  298. // remove from pending list
  299. LinkedList2_Remove(&d->pending_list, &o->list_node);
  300. } break;
  301. case BTHREADWORK_STATE_RUNNING: {
  302. BLog(BLOG_DEBUG, "remove running work");
  303. // wait for the work to finish running
  304. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  305. ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
  306. ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
  307. ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
  308. // remove from finished list
  309. LinkedList2_Remove(&d->finished_list, &o->list_node);
  310. } break;
  311. case BTHREADWORK_STATE_FINISHED: {
  312. BLog(BLOG_DEBUG, "remove finished work");
  313. // remove from finished list
  314. LinkedList2_Remove(&d->finished_list, &o->list_node);
  315. } break;
  316. case BTHREADWORK_STATE_FORGOTTEN: {
  317. BLog(BLOG_DEBUG, "remove forgotten work");
  318. } break;
  319. default:
  320. ASSERT(0);
  321. }
  322. ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
  323. // free finished semaphore
  324. ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
  325. } else {
  326. #endif
  327. BPending_Free(&o->job);
  328. #ifdef BADVPN_THREADWORK_USE_PTHREAD
  329. }
  330. #endif
  331. }