浏览代码

BThreadWork: support multiple worker threads

ambrop7 15 年之前
父节点
当前提交
eca5b73fd6
共有 2 个文件被更改,包括 91 次插入51 次删除
  1. 76 46
      threadwork/BThreadWork.c
  2. 15 5
      threadwork/BThreadWork.h

+ 76 - 46
threadwork/BThreadWork.c

@@ -37,8 +37,10 @@
 
 
 #ifdef BADVPN_THREADWORK_USE_PTHREAD
 #ifdef BADVPN_THREADWORK_USE_PTHREAD
 
 
-static void * dispatcher_thread (BThreadWorkDispatcher *o)
+static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
 {
 {
+    BThreadWorkDispatcher *o = t->d;
+    
     ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
     ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
     
     
     while (1) {
     while (1) {
@@ -49,7 +51,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o)
         
         
         if (LinkedList2_IsEmpty(&o->pending_list)) {
         if (LinkedList2_IsEmpty(&o->pending_list)) {
             // wait for event
             // wait for event
-            ASSERT_FORCE(pthread_cond_wait(&o->new_cond, &o->mutex) == 0)
+            ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
             continue;
             continue;
         }
         }
         
         
@@ -57,7 +59,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o)
         BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
         BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
         ASSERT(w->state == BTHREADWORK_STATE_PENDING)
         ASSERT(w->state == BTHREADWORK_STATE_PENDING)
         LinkedList2_Remove(&o->pending_list, &w->list_node);
         LinkedList2_Remove(&o->pending_list, &w->list_node);
-        o->running_work = w;
+        t->running_work = w;
         w->state = BTHREADWORK_STATE_RUNNING;
         w->state = BTHREADWORK_STATE_RUNNING;
         
         
         // do the work
         // do the work
@@ -66,7 +68,7 @@ static void * dispatcher_thread (BThreadWorkDispatcher *o)
         ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
         ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
         
         
         // release the work
         // release the work
-        o->running_work = NULL;
+        t->running_work = NULL;
         LinkedList2_Append(&o->finished_list, &w->list_node);
         LinkedList2_Append(&o->finished_list, &w->list_node);
         w->state = BTHREADWORK_STATE_FINISHED;
         w->state = BTHREADWORK_STATE_FINISHED;
         ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
         ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
@@ -145,6 +147,29 @@ static void more_job_handler (BThreadWorkDispatcher *o)
     return;
     return;
 }
 }
 
 
+static void stop_threads (BThreadWorkDispatcher *o)
+{
+    // set cancelling
+    ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+    o->cancel = 1;
+    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+    
+    while (o->num_threads > 0) {
+        struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
+        
+        // wake up thread
+        ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
+        
+        // wait for thread to exit
+        ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
+        
+        // free condition variable
+        ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
+        
+        o->num_threads--;
+    }
+}
+
 #endif
 #endif
 
 
 static void work_job_handler (BThreadWork *o)
 static void work_job_handler (BThreadWork *o)
@@ -167,24 +192,19 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
     // init arguments
     // init arguments
     o->reactor = reactor;
     o->reactor = reactor;
     
     
-    // set num threads
-    #ifdef BADVPN_THREADWORK_USE_PTHREAD
     if (num_threads_hint < 0) {
     if (num_threads_hint < 0) {
-        o->num_threads = 2;
-    } else {
-        o->num_threads = num_threads_hint;
+        num_threads_hint = 2;
+    }
+    if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
+        num_threads_hint = BTHREADWORK_MAX_THREADS;
     }
     }
-    #endif
     
     
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     
     
-    if (o->num_threads > 0) {
+    if (num_threads_hint > 0) {
         // init pending list
         // init pending list
         LinkedList2_Init(&o->pending_list);
         LinkedList2_Init(&o->pending_list);
         
         
-        // set no running work
-        o->running_work = NULL;
-        
         // init finished list
         // init finished list
         LinkedList2_Init(&o->finished_list);
         LinkedList2_Init(&o->finished_list);
         
         
@@ -194,35 +214,29 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
             goto fail0;
             goto fail0;
         }
         }
         
         
-        // init condition variable
-        if (pthread_cond_init(&o->new_cond, NULL) != 0) {
-            BLog(BLOG_ERROR, "pthread_cond_init failed");
-            goto fail1;
-        }
-        
         // init pipe
         // init pipe
         if (pipe(o->pipe) < 0) {
         if (pipe(o->pipe) < 0) {
             BLog(BLOG_ERROR, "pipe failed");
             BLog(BLOG_ERROR, "pipe failed");
-            goto fail2;
+            goto fail1;
         }
         }
         
         
         // set read end non-blocking
         // set read end non-blocking
         if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
         if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
             BLog(BLOG_ERROR, "fcntl failed");
             BLog(BLOG_ERROR, "fcntl failed");
-            goto fail3;
+            goto fail2;
         }
         }
         
         
         // set write end non-blocking
         // set write end non-blocking
         if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
         if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
             BLog(BLOG_ERROR, "fcntl failed");
             BLog(BLOG_ERROR, "fcntl failed");
-            goto fail3;
+            goto fail2;
         }
         }
         
         
         // init BFileDescriptor
         // init BFileDescriptor
         BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
         BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
         if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
         if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
             BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
             BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
-            goto fail3;
+            goto fail2;
         }
         }
         BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
         BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
         
         
@@ -232,10 +246,31 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
         // set not cancelling
         // set not cancelling
         o->cancel = 0;
         o->cancel = 0;
         
         
-        // init thread
-        if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) {
-            BLog(BLOG_ERROR, "pthread_create failed");
-            goto fail4;
+        // init threads
+        o->num_threads = 0;
+        for (int i = 0; i < num_threads_hint; i++) {
+            struct BThreadWorkDispatcher_thread *t = &o->threads[i];
+            
+            // set parent pointer
+            t->d = o;
+            
+            // set no running work
+            t->running_work = NULL;
+            
+            // init condition variable
+            if (pthread_cond_init(&t->new_cond, NULL) != 0) {
+                BLog(BLOG_ERROR, "pthread_cond_init failed");
+                goto fail3;
+            }
+            
+            // init thread
+            if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
+                BLog(BLOG_ERROR, "pthread_create failed");
+                ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
+                goto fail3;
+            }
+        
+            o->num_threads++;
         }
         }
     }
     }
     
     
@@ -246,19 +281,18 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
     return 1;
     return 1;
     
     
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
-fail4:
+fail3:
+    stop_threads(o);
     BPending_Free(&o->more_job);
     BPending_Free(&o->more_job);
     BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
     BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
-fail3:
+fail2:
     ASSERT_FORCE(close(o->pipe[0]) == 0)
     ASSERT_FORCE(close(o->pipe[0]) == 0)
     ASSERT_FORCE(close(o->pipe[1]) == 0)
     ASSERT_FORCE(close(o->pipe[1]) == 0)
-fail2:
-    ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
 fail1:
 fail1:
     ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
     ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
-    #endif
 fail0:
 fail0:
     return 0;
     return 0;
+    #endif
 }
 }
 
 
 void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
 void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
@@ -266,7 +300,7 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     if (o->num_threads > 0) {
     if (o->num_threads > 0) {
         ASSERT(LinkedList2_IsEmpty(&o->pending_list))
         ASSERT(LinkedList2_IsEmpty(&o->pending_list))
-        ASSERT(!o->running_work)
+        for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
         ASSERT(LinkedList2_IsEmpty(&o->finished_list))
         ASSERT(LinkedList2_IsEmpty(&o->finished_list))
     }
     }
     #endif
     #endif
@@ -276,14 +310,8 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     
     
     if (o->num_threads > 0) {
     if (o->num_threads > 0) {
-        // post termination request
-        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
-        o->cancel = 1;
-        ASSERT_FORCE(pthread_cond_signal(&o->new_cond) == 0)
-        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
-        
-        // wait for thread to exit
-        ASSERT_FORCE(pthread_join(o->thread, NULL) == 0)
+        // stop threads
+        stop_threads(o);
         
         
         // free more job
         // free more job
         BPending_Free(&o->more_job);
         BPending_Free(&o->more_job);
@@ -295,9 +323,6 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
         ASSERT_FORCE(close(o->pipe[0]) == 0)
         ASSERT_FORCE(close(o->pipe[0]) == 0)
         ASSERT_FORCE(close(o->pipe[1]) == 0)
         ASSERT_FORCE(close(o->pipe[1]) == 0)
         
         
-        // free condition variable
-        ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
-        
         // free mutex
         // free mutex
         ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
         ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
     }
     }
@@ -336,7 +361,12 @@ void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_han
         // post work
         // post work
         ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
         ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
         LinkedList2_Append(&d->pending_list, &o->list_node);
         LinkedList2_Append(&d->pending_list, &o->list_node);
-        ASSERT_FORCE(pthread_cond_signal(&d->new_cond) == 0)
+        for (int i = 0; i < d->num_threads; i++) {
+            if (!d->threads[i].running_work) {
+                ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
+                break;
+            }
+        }
         ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
         ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
     } else {
     } else {
     #endif
     #endif

+ 15 - 5
threadwork/BThreadWork.h

@@ -43,7 +43,10 @@
 #define BTHREADWORK_STATE_FINISHED 3
 #define BTHREADWORK_STATE_FINISHED 3
 #define BTHREADWORK_STATE_FORGOTTEN 4
 #define BTHREADWORK_STATE_FORGOTTEN 4
 
 
+#define BTHREADWORK_MAX_THREADS 8
+
 struct BThreadWork_s;
 struct BThreadWork_s;
+struct BThreadWorkDispatcher_s;
 
 
 /**
 /**
  * Function called to do the work for a {@link BThreadWork}.
  * Function called to do the work for a {@link BThreadWork}.
@@ -60,20 +63,27 @@ typedef void (*BThreadWork_work_func) (void *user);
  */
  */
 typedef void (*BThreadWork_handler_done) (void *user);
 typedef void (*BThreadWork_handler_done) (void *user);
 
 
-typedef struct {
+#ifdef BADVPN_THREADWORK_USE_PTHREAD
+struct BThreadWorkDispatcher_thread {
+    struct BThreadWorkDispatcher_s *d;
+    struct BThreadWork_s *running_work;
+    pthread_cond_t new_cond;
+    pthread_t thread;
+};
+#endif
+
+typedef struct BThreadWorkDispatcher_s {
     BReactor *reactor;
     BReactor *reactor;
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
-    int num_threads;
     LinkedList2 pending_list;
     LinkedList2 pending_list;
-    struct BThreadWork_s *running_work;
     LinkedList2 finished_list;
     LinkedList2 finished_list;
     pthread_mutex_t mutex;
     pthread_mutex_t mutex;
-    pthread_cond_t new_cond;
     int pipe[2];
     int pipe[2];
     BFileDescriptor bfd;
     BFileDescriptor bfd;
     BPending more_job;
     BPending more_job;
     int cancel;
     int cancel;
-    pthread_t thread;
+    int num_threads;
+    struct BThreadWorkDispatcher_thread threads[BTHREADWORK_MAX_THREADS];
     #endif
     #endif
     DebugObject d_obj;
     DebugObject d_obj;
     DebugCounter d_ctr;
     DebugCounter d_ctr;