ambrop7 пре 15 година
родитељ
комит
48963af6f9

+ 4 - 0
CMakeLists.txt

@@ -53,6 +53,9 @@ if (WIN32)
         add_definitions(-DBADVPN_USE_CUSTOM_MSWSOCK_H)
     endif ()
 else ()
+    set(BADVPN_THREADWORK_USE_PTHREAD 1)
+    add_definitions(-DBADVPN_THREADWORK_USE_PTHREAD)
+
     link_libraries(rt)
 
     if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
@@ -118,6 +121,7 @@ add_subdirectory(socksclient)
 add_subdirectory(lwip)
 add_subdirectory(dhcpclient)
 add_subdirectory(ncdconfig)
+add_subdirectory(threadwork)
 if (NOT WIN32)
     add_subdirectory(ipc)
     add_subdirectory(process)

+ 1 - 0
blog_channels.txt

@@ -45,3 +45,4 @@ PRStreamSource 4
 BSocketPRFileDesc 4
 PacketProtoDecoder 4
 DPRelay 4
+BThreadWork 4

+ 4 - 0
generated/blog_channel_BThreadWork.h

@@ -0,0 +1,4 @@
+#ifdef BLOG_CURRENT_CHANNEL
+#undef BLOG_CURRENT_CHANNEL
+#endif
+#define BLOG_CURRENT_CHANNEL BLOG_CHANNEL_BThreadWork

+ 2 - 1
generated/blog_channels_defines.h

@@ -45,4 +45,5 @@
 #define BLOG_CHANNEL_BSocketPRFileDesc 44
 #define BLOG_CHANNEL_PacketProtoDecoder 45
 #define BLOG_CHANNEL_DPRelay 46
-#define BLOG_NUM_CHANNELS 47
+#define BLOG_CHANNEL_BThreadWork 47
+#define BLOG_NUM_CHANNELS 48

+ 1 - 0
generated/blog_channels_list.h

@@ -45,3 +45,4 @@
 {.name = "BSocketPRFileDesc", .loglevel = 4},
 {.name = "PacketProtoDecoder", .loglevel = 4},
 {.name = "DPRelay", .loglevel = 4},
+{.name = "BThreadWork", .loglevel = 4},

+ 340 - 0
threadwork/BThreadWork.c

@@ -0,0 +1,340 @@
+/**
+ * @file BThreadWork.c
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <stdint.h>
+
+#ifdef BADVPN_THREADWORK_USE_PTHREAD
+    #include <unistd.h>
+    #include <errno.h>
+#endif
+
+#include <misc/offset.h>
+#include <system/BLog.h>
+
+#include <generated/blog_channel_BThreadWork.h>
+
+#include <threadwork/BThreadWork.h>
+
+#ifdef BADVPN_THREADWORK_USE_PTHREAD
+
+static void * dispatcher_thread (BThreadWorkDispatcher *o)
+{
+    while (1) {
+        // wait for a work
+        ASSERT_FORCE(sem_wait(&o->new_sem) == 0)
+        
+        // grab the work
+        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+        if (LinkedList2_IsEmpty(&o->pending_list)) {
+            ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+            continue;
+        }
+        BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->pending_list), BThreadWork, list_node);
+        ASSERT(w->state == BTHREADWORK_STATE_PENDING)
+        LinkedList2_Remove(&o->pending_list, &w->list_node);
+        o->running_work = w;
+        w->state = BTHREADWORK_STATE_RUNNING;
+        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+        
+        // do the work
+        w->work_func(w->work_func_user);
+        
+        // release the work
+        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+        o->running_work = NULL;
+        LinkedList2_Append(&o->finished_list, &w->list_node);
+        w->state = BTHREADWORK_STATE_FINISHED;
+        ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
+        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+        
+        // write to pipe
+        uint8_t b = 0;
+        ASSERT_FORCE(write(o->pipe[1], &b, sizeof(b)) == sizeof(b))
+    }
+}
+
+static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
+{
+    ASSERT(o->num_threads > 0)
+    DebugObject_Access(&o->d_obj);
+    
+    // read from pipe
+    uint8_t b;
+    int res = read(o->pipe[0], &b, sizeof(b));
+    if (res < 0) {
+        int error = errno;
+        ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
+        return;
+    }
+    ASSERT(res == sizeof(b))
+    ASSERT(b == 0)
+    
+    // grab a finished work
+    ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
+    if (LinkedList2_IsEmpty(&o->finished_list)) {
+        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+        return;
+    }
+    BThreadWork *w = UPPER_OBJECT(LinkedList2_GetFirst(&o->finished_list), BThreadWork, list_node);
+    ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
+    LinkedList2_Remove(&o->finished_list, &w->list_node);
+    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
+    
+    // set state forgotten
+    w->state = BTHREADWORK_STATE_FORGOTTEN;
+    
+    // call handler
+    w->handler_done(w->user);
+    return;
+}
+
+#endif
+
+static void work_job_handler (BThreadWork *o)
+{
+    ASSERT(o->d->num_threads == 0)
+    DebugObject_Access(&o->d_obj);
+    
+    // do the work
+    o->work_func(o->work_func_user);
+    
+    // call handler
+    o->handler_done(o->user);
+    return;
+}
+
+int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
+{
+    // init arguments
+    o->reactor = reactor;
+    
+    // set num threads
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    if (num_threads_hint < 0) {
+        o->num_threads = 2;
+    } else {
+        o->num_threads = num_threads_hint;
+    }
+    #else
+    o->num_threads = 0;
+    #endif
+    
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    
+    if (o->num_threads > 0) {
+        // init pending list
+        LinkedList2_Init(&o->pending_list);
+        
+        // set no running work
+        o->running_work = NULL;
+        
+        // init finished list
+        LinkedList2_Init(&o->finished_list);
+        
+        // init mutex
+        if (pthread_mutex_init(&o->mutex, NULL) != 0) {
+            BLog(BLOG_ERROR, "pthread_mutex_init failed");
+            goto fail0;
+        }
+        
+        // init semaphore
+        if (sem_init(&o->new_sem, 0, 0) != 0) {
+            BLog(BLOG_ERROR, "sem_init failed");
+            goto fail1;
+        }
+        
+        // init pipe
+        if (pipe(o->pipe) < 0) {
+            BLog(BLOG_ERROR, "pipe failed");
+            goto fail2;
+        }
+        
+        // init BFileDescriptor
+        BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
+        if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
+            BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
+            goto fail3;
+        }
+        BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
+        
+        // init thread
+        if (pthread_create(&o->thread, NULL, (void * (*) (void *))dispatcher_thread, o) != 0) {
+            BLog(BLOG_ERROR, "pthread_create failed");
+            goto fail4;
+        }
+    }
+    
+    #endif
+    
+    DebugObject_Init(&o->d_obj);
+    DebugCounter_Init(&o->d_ctr);
+    return 1;
+    
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+fail4:
+    BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
+fail3:
+    ASSERT_FORCE(close(o->pipe[0]) == 0)
+    ASSERT_FORCE(close(o->pipe[1]) == 0)
+fail2:
+    ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
+fail1:
+    ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
+    #endif
+fail0:
+    return 0;
+}
+
+void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
+{
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    if (o->num_threads > 0) {
+        ASSERT(LinkedList2_IsEmpty(&o->pending_list))
+        ASSERT(!o->running_work)
+        ASSERT(LinkedList2_IsEmpty(&o->finished_list))
+    }
+    #endif
+    DebugObject_Free(&o->d_obj);
+    DebugCounter_Free(&o->d_ctr);
+    
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    
+    if (o->num_threads > 0) {
+        // stop thread
+        ASSERT_FORCE(pthread_cancel(o->thread) == 0)
+        void *retval;
+        ASSERT_FORCE(pthread_join(o->thread, &retval) == 0)
+        
+        // free BFileDescriptor
+        BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
+        
+        // free pipe
+        ASSERT_FORCE(close(o->pipe[0]) == 0)
+        ASSERT_FORCE(close(o->pipe[1]) == 0)
+        
+        // free semaphore
+        ASSERT_FORCE(sem_destroy(&o->new_sem) == 0)
+        
+        // free mutex
+        ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
+    }
+    
+    #endif
+}
+
+void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
+{
+    DebugObject_Access(&d->d_obj);
+    
+    // init arguments
+    o->d = d;
+    o->handler_done = handler_done;
+    o->user = user;
+    o->work_func = work_func;
+    o->work_func_user = work_func_user;
+    
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    if (d->num_threads > 0) {
+        // set state
+        o->state = BTHREADWORK_STATE_PENDING;
+        
+        // init finished semaphore
+        ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
+        
+        // insert to pending list
+        ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
+        LinkedList2_Append(&d->pending_list, &o->list_node);
+        ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
+        
+        // post to new semaphore
+        ASSERT_FORCE(sem_post(&d->new_sem) == 0)
+    } else {
+    #endif
+        // schedule job
+        BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
+        BPending_Set(&o->job);
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    }
+    #endif
+    
+    DebugObject_Init(&o->d_obj);
+    DebugCounter_Increment(&d->d_ctr);
+}
+
+void BThreadWork_Free (BThreadWork *o)
+{
+    BThreadWorkDispatcher *d = o->d;
+    DebugObject_Free(&o->d_obj);
+    DebugCounter_Decrement(&d->d_ctr);
+    
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    if (d->num_threads > 0) {
+        ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
+        
+        switch (o->state) {
+            case BTHREADWORK_STATE_PENDING: {
+                BLog(BLOG_DEBUG, "remove pending work");
+                
+                // remove from pending list
+                LinkedList2_Remove(&d->pending_list, &o->list_node);
+            } break;
+            
+            case BTHREADWORK_STATE_RUNNING: {
+                BLog(BLOG_DEBUG, "remove running work");
+                
+                // wait for the work to finish running
+                ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
+                ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
+                ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
+                
+                ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
+                
+                // remove from finished list
+                LinkedList2_Remove(&d->finished_list, &o->list_node);
+            } break;
+            
+            case BTHREADWORK_STATE_FINISHED: {
+                BLog(BLOG_DEBUG, "remove finished work");
+                
+                // remove from finished list
+                LinkedList2_Remove(&d->finished_list, &o->list_node);
+            } break;
+            
+            case BTHREADWORK_STATE_FORGOTTEN: {
+                BLog(BLOG_DEBUG, "remove forgotten work");
+            } break;
+            
+            default:
+                ASSERT(0);
+        }
+        
+        ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
+        
+        // free finished semaphore
+        ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
+    } else {
+    #endif
+        BPending_Free(&o->job);
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    }
+    #endif
+}

+ 144 - 0
threadwork/BThreadWork.h

@@ -0,0 +1,144 @@
+/**
+ * @file BThreadWork.h
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ * 
+ * @section DESCRIPTION
+ * 
+ * System for performing computations (possibly) in parallel with the event loop
+ * in a different thread.
+ */
+
+#ifndef BADVPN_BTHREADWORK_BTHREADWORK_H
+#define BADVPN_BTHREADWORK_BTHREADWORK_H
+
+#ifdef BADVPN_THREADWORK_USE_PTHREAD
+    #include <pthread.h>
+    #include <semaphore.h>
+#endif
+
+#include <misc/debug.h>
+#include <structure/LinkedList2.h>
+#include <system/DebugObject.h>
+#include <system/BReactor.h>
+
+#define BTHREADWORK_STATE_PENDING 1
+#define BTHREADWORK_STATE_RUNNING 2
+#define BTHREADWORK_STATE_FINISHED 3
+#define BTHREADWORK_STATE_FORGOTTEN 4
+
+struct BThreadWork_s;
+
+/**
+ * Function called to do the work for a {@link BThreadWork}.
+ * The function may be called in another thread, in parallel with the event loop.
+ * 
+ * @param user as work_func_user in {@link BThreadWork_Init}
+ */
+typedef void (*BThreadWork_work_func) (void *user);
+
+/**
+ * Handler called when a {@link BThreadWork} work is done.
+ * 
+ * @param user as in {@link BThreadWork_Init}
+ */
+typedef void (*BThreadWork_handler_done) (void *user);
+
+typedef struct {
+    BReactor *reactor;
+    int num_threads;
+    #ifdef BADVPN_THREADWORK_USE_PTHREAD
+    LinkedList2 pending_list;
+    struct BThreadWork_s *running_work;
+    LinkedList2 finished_list;
+    pthread_mutex_t mutex;
+    sem_t new_sem;
+    int pipe[2];
+    BFileDescriptor bfd;
+    pthread_t thread;
+    #endif
+    DebugObject d_obj;
+    DebugCounter d_ctr;
+} BThreadWorkDispatcher;
+
+typedef struct BThreadWork_s {
+    BThreadWorkDispatcher *d;
+    BThreadWork_handler_done handler_done;
+    void *user;
+    BThreadWork_work_func work_func;
+    void *work_func_user;
+    union {
+        #ifdef BADVPN_THREADWORK_USE_PTHREAD
+        struct {
+            LinkedList2Node list_node;
+            int state;
+            sem_t finished_sem;
+        };
+        #endif
+        struct {
+            BPending job;
+        };
+    };
+    DebugObject d_obj;
+} BThreadWork;
+
+/**
+ * Initializes the work dispatcher.
+ * Works may be started using {@link BThreadWork_Init}.
+ * 
+ * @param o the object
+ * @param reactor reactor we live in
+ * @param num_threads_hint hint for the number of threads to use:
+ *                         <0 - A choice will be made automatically, probably based on the number of CPUs.
+ *                         0 - No additional threads will be used, and computations will be performed directly
+ *                             in the event loop in job handlers.
+ * @return 1 on success, 0 on failure
+ */
+int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint) WARN_UNUSED;
+
+/**
+ * Frees the work dispatcher.
+ * There must be no {@link BThreadWork}'s with this dispatcher.
+ * 
+ * @param o the object
+ */
+void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o);
+
+/**
+ * Initializes the work.
+ * 
+ * @param o the object
+ * @param d work dispatcher
+ * @param handler_done handler to call when the work is done
+ * @param user argument to handler
+ * @param work_func function that will do the work, possibly from another thread
+ * @param work_func_user argument to work_func
+ */
+void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user);
+
+/**
+ * Frees the work.
+ * After this function returns, the work function will either have fully executed,
+ * or not called at all, and never will be.
+ * 
+ * @param o the object
+ */
+void BThreadWork_Free (BThreadWork *o);
+
+#endif

+ 7 - 0
threadwork/CMakeLists.txt

@@ -0,0 +1,7 @@
+set(BADVPN_THREADWORK_EXTRA_LIBS)
+if (BADVPN_THREADWORK_USE_PTHREAD)
+    list(APPEND BADVPN_THREADWORK_EXTRA_LIBS pthread)
+endif ()
+
+add_library(threadwork BThreadWork.c)
+target_link_libraries(threadwork system ${BADVPN_THREADWORK_EXTRA_LIBS})