浏览代码

BThreadWork: use a condition variable instead of a semaphore to pass works to the thread. Also use it
to terminate the thread instead of cancelling.

ambrop7 15 年之前
父节点
当前提交
fb953a3050
共有 2 个文件被更改,包括 97 次插入40 次删除
  1. 94 39
      threadwork/BThreadWork.c
  2. 3 1
      threadwork/BThreadWork.h

+ 94 - 39
threadwork/BThreadWork.c

@@ -39,75 +39,112 @@
 
 static void * dispatcher_thread (BThreadWorkDispatcher *o)
 {
+    ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+    
     while (1) {
-        // wait for a work
-        ASSERT_FORCE(sem_wait(&o->new_sem) == 0)
+        // exit if requested
+        if (o->cancel) {
+            break;
+        }
         
-        // grab the work
-        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
         if (LinkedList2_IsEmpty(&o->pending_list)) {
-            ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+            // wait for event
+            ASSERT_FORCE(pthread_cond_wait(&o->new_cond, &o->mutex) == 0)
             continue;
         }
+        
+        // grab the work
         BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
         ASSERT(w->state == BTHREADWORK_STATE_PENDING)
         LinkedList2_Remove(&o->pending_list, &w->list_node);
         o->running_work = w;
         w->state = BTHREADWORK_STATE_RUNNING;
-        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
         
         // do the work
+        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
         w->work_func(w->work_func_user);
+        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
         
         // release the work
-        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
         o->running_work = NULL;
         LinkedList2_Append(&o->finished_list, &w->list_node);
         w->state = BTHREADWORK_STATE_FINISHED;
         ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
-        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
         
         // write to pipe
         uint8_t b = 0;
-        ASSERT_FORCE(write(o->pipe[1], &b, sizeof(b)) == sizeof(b))
+        int res = write(o->pipe[1], &b, sizeof(b));
+        if (res < 0) {
+            int error = errno;
+            ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
+        }
     }
+    
+    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
 }
 
-static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
+static void dispatch_job (BThreadWorkDispatcher *o)
 {
     ASSERT(o->num_threads > 0)
-    DebugObject_Access(&o->d_obj);
-    
-    // read from pipe
-    uint8_t b;
-    int res = read(o->pipe[0], &b, sizeof(b));
-    if (res < 0) {
-        int error = errno;
-        ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
-        return;
-    }
-    ASSERT(res == sizeof(b))
-    ASSERT(b == 0)
     
-    // grab a finished work
+    // lock
     ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+    
+    // check for finished job
     if (LinkedList2_IsEmpty(&o->finished_list)) {
         ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
         return;
     }
+    
+    // grab finished job
     BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node);
     ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
     LinkedList2_Remove(&o->finished_list, &w->list_node);
-    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+    
+    // schedule more
+    if (!LinkedList2_IsEmpty(&o->finished_list)) {
+        BPending_Set(&o->more_job);
+    }
     
     // set state forgotten
     w->state = BTHREADWORK_STATE_FORGOTTEN;
     
+    // unlock
+    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+    
     // call handler
     w->handler_done(w->user);
     return;
 }
 
+static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
+{
+    ASSERT(o->num_threads > 0)
+    DebugObject_Access(&o->d_obj);
+    
+    // read data from pipe
+    uint8_t b[64];
+    int res = read(o->pipe[0], b, sizeof(b));
+    if (res < 0) {
+        int error = errno;
+        ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
+    } else {
+        ASSERT(res > 0)
+    }
+    
+    dispatch_job(o);
+    return;
+}
+
+static void more_job_handler (BThreadWorkDispatcher *o)
+{
+    ASSERT(o->num_threads > 0)
+    DebugObject_Access(&o->d_obj);
+    
+    dispatch_job(o);
+    return;
+}
+
 #endif
 
 static void work_job_handler (BThreadWork *o)
@@ -157,9 +194,9 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
             goto fail0;
         }
         
-        // init semaphore
-        if (sem_init(&o->new_sem, 0, 0) != 0) {
-            BLog(BLOG_ERROR, "sem_init failed");
+        // init condition variable
+        if (pthread_cond_init(&o->new_cond, NULL) != 0) {
+            BLog(BLOG_ERROR, "pthread_cond_init failed");
             goto fail1;
         }
         
@@ -171,7 +208,13 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
         
         // set read end non-blocking
         if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
-            BLog(BLOG_ERROR, "pipe failed");
+            BLog(BLOG_ERROR, "fcntl failed");
+            goto fail3;
+        }
+        
+        // set write end non-blocking
+        if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
+            BLog(BLOG_ERROR, "fcntl failed");
             goto fail3;
         }
         
@@ -183,6 +226,12 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
         }
         BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
         
+        // init more job
+        BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
+        
+        // set not cancelling
+        o->cancel = 0;
+        
         // init thread
         if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) {
             BLog(BLOG_ERROR, "pthread_create failed");
@@ -198,12 +247,13 @@ int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int
     
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
 fail4:
+    BPending_Free(&o->more_job);
     BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
 fail3:
     ASSERT_FORCE(close(o->pipe[0]) == 0)
     ASSERT_FORCE(close(o->pipe[1]) == 0)
 fail2:
-    ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
+    ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
 fail1:
     ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
     #endif
@@ -226,10 +276,17 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
     #ifdef BADVPN_THREADWORK_USE_PTHREAD
     
     if (o->num_threads > 0) {
-        // stop thread
-        ASSERT_FORCE(pthread_cancel(o->thread) == 0)
-        void *retval;
-        ASSERT_FORCE(pthread_join(o->thread, &retval) == 0)
+        // post termination request
+        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+        o->cancel = 1;
+        ASSERT_FORCE(pthread_cond_signal(&o->new_cond) == 0)
+        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+        
+        // wait for thread to exit
+        ASSERT_FORCE(pthread_join(o->thread, NULL) == 0)
+        
+        // free more job
+        BPending_Free(&o->more_job);
         
         // free BFileDescriptor
         BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
@@ -238,8 +295,8 @@ void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
         ASSERT_FORCE(close(o->pipe[0]) == 0)
         ASSERT_FORCE(close(o->pipe[1]) == 0)
         
-        // free semaphore
-        ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
+        // free condition variable
+        ASSERT_FORCE(pthread_cond_destroy(&o->new_cond) == 0)
         
         // free mutex
         ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
@@ -267,13 +324,11 @@ void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_han
         // init finished semaphore
         ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
         
-        // insert to pending list
+        // post work
         ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
         LinkedList2_Append(&d->pending_list, &o->list_node);
+        ASSERT_FORCE(pthread_cond_signal(&d->new_cond) == 0)
         ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
-        
-        // post to new semaphore
-        ASSERT_FORCE(sem_post(&d->new_sem) == 0)
     } else {
     #endif
         // schedule job

+ 3 - 1
threadwork/BThreadWork.h

@@ -68,9 +68,11 @@ typedef struct {
     struct BThreadWork_s *running_work;
     LinkedList2 finished_list;
     pthread_mutex_t mutex;
-    sem_t new_sem;
+    pthread_cond_t new_cond;
     int pipe[2];
     BFileDescriptor bfd;
+    BPending more_job;
+    int cancel;
     pthread_t thread;
     #endif
     DebugObject d_obj;