Преглед изворни кода

ncd: modules: spawn: implement _caller in called process, implement spawn::join(), port to new instance allocation
method

ambrop7 пре 13 година
родитељ
комит
0e337dfb8a
1 измењених фајлова са 264 додато и 35 уклоњено
  1. 264 35
      ncd/modules/spawn.c

+ 264 - 35
ncd/modules/spawn.c

@@ -31,14 +31,32 @@
  * Module which starts a process from a process template on initialization, and
  * Module which starts a process from a process template on initialization, and
  * stops it on deinitialization.
  * stops it on deinitialization.
  * 
  * 
- * Synopsis: spawn(string template_name, list args)
- * Description: on initialization, creates a new process from the template named
- *   template_name, with arguments args. On deinitialization, initiates termination
- *   of the process and waits for it to terminate.
+ * Synopsis:
+ *   spawn(string template_name, list args)
+ * 
+ * Description:
+ *   On initialization, creates a new process from the template named
+ *   'template_name', with arguments 'args'. On deinitialization, initiates termination
+ *   of the process and waits for it to terminate. The process can access objects
+ *   as seen from 'spawn' via the _caller special object.
+ * 
+ * Synopsis:
+ *   spawn::join()
+ * 
+ * Description:
+ *   A join() on a spawn() is like a depend() on a provide() which is located at the
+ *   end of the spawned process.
+ * 
+ * Variables:
+ *   Exposes objects from the spawned process.
  */
  */
 
 
 #include <stdlib.h>
 #include <stdlib.h>
+#include <string.h>
 
 
+#include <misc/offset.h>
+#include <misc/debug.h>
+#include <structure/LinkedList0.h>
 #include <ncd/NCDModule.h>
 #include <ncd/NCDModule.h>
 
 
 #include <generated/blog_channel_ncd_spawn.h>
 #include <generated/blog_channel_ncd_spawn.h>
@@ -47,38 +65,80 @@
 
 
 #define STATE_WORKING 1
 #define STATE_WORKING 1
 #define STATE_UP 2
 #define STATE_UP 2
-#define STATE_TERMINATING 3
+#define STATE_WAITING 3
+#define STATE_WAITING_TERMINATING 4
+#define STATE_TERMINATING 5
 
 
 struct instance {
 struct instance {
     NCDModuleInst *i;
     NCDModuleInst *i;
     NCDModuleProcess process;
     NCDModuleProcess process;
+    LinkedList0 clean_list;
+    LinkedList0 dirty_list;
     int state;
     int state;
 };
 };
 
 
+struct join_instance {
+    NCDModuleInst *i;
+    struct instance *spawn;
+    LinkedList0Node list_node;
+    int is_dirty;
+};
+
+static void assert_dirty_state (struct instance *o);
+static void process_handler_event (struct instance *o, int event);
+static int process_func_getspecialobj (struct instance *o, const char *name, NCDObject *out_object);
+static int caller_obj_func_getobj (struct instance *o, const char *name, NCDObject *out_object);
+static void bring_joins_down (struct instance *o);
+static void continue_working (struct instance *o);
+static void continue_terminating (struct instance *o);
 static void instance_free (struct instance *o);
 static void instance_free (struct instance *o);
 
 
+static void assert_dirty_state (struct instance *o)
+{
+    ASSERT(!LinkedList0_IsEmpty(&o->dirty_list) == (o->state == STATE_WAITING || o->state == STATE_WAITING_TERMINATING))
+}
+
 static void process_handler_event (struct instance *o, int event)
 static void process_handler_event (struct instance *o, int event)
 {
 {
+    assert_dirty_state(o);
+    
     switch (event) {
     switch (event) {
         case NCDMODULEPROCESS_EVENT_UP: {
         case NCDMODULEPROCESS_EVENT_UP: {
             ASSERT(o->state == STATE_WORKING)
             ASSERT(o->state == STATE_WORKING)
+            ASSERT(LinkedList0_IsEmpty(&o->dirty_list))
             
             
             // set state up
             // set state up
             o->state = STATE_UP;
             o->state = STATE_UP;
+            
+            // bring joins up
+            for (LinkedList0Node *ln = LinkedList0_GetFirst(&o->clean_list); ln; ln = LinkedList0Node_Next(ln)) {
+                struct join_instance *join = UPPER_OBJECT(ln, struct join_instance, list_node);
+                ASSERT(join->spawn == o)
+                ASSERT(!join->is_dirty)
+                NCDModuleInst_Backend_Up(join->i);
+            }
         } break;
         } break;
         
         
         case NCDMODULEPROCESS_EVENT_DOWN: {
         case NCDMODULEPROCESS_EVENT_DOWN: {
             ASSERT(o->state == STATE_UP)
             ASSERT(o->state == STATE_UP)
+            ASSERT(LinkedList0_IsEmpty(&o->dirty_list))
+            
+            // bring joins down, moving them to the dirty list
+            bring_joins_down(o);
             
             
-            // process went down; allow it to continue immediately
-            NCDModuleProcess_Continue(&o->process);
+            // set state waiting
+            o->state = STATE_WAITING;
             
             
-            // set state working
-            o->state = STATE_WORKING;
+            // if we have no joins, continue immediately
+            if (LinkedList0_IsEmpty(&o->dirty_list)) {
+                continue_working(o);
+                return;
+            }
         } break;
         } break;
         
         
         case NCDMODULEPROCESS_EVENT_TERMINATED: {
         case NCDMODULEPROCESS_EVENT_TERMINATED: {
             ASSERT(o->state == STATE_TERMINATING)
             ASSERT(o->state == STATE_TERMINATING)
+            ASSERT(LinkedList0_IsEmpty(&o->dirty_list))
             
             
             // die finally
             // die finally
             instance_free(o);
             instance_free(o);
@@ -89,27 +149,74 @@ static void process_handler_event (struct instance *o, int event)
     }
     }
 }
 }
 
 
-static void func_new (NCDModuleInst *i)
+static int process_func_getspecialobj (struct instance *o, const char *name, NCDObject *out_object)
 {
 {
-    // allocate instance
-    struct instance *o = malloc(sizeof(*o));
-    if (!o) {
-        ModuleLog(i, BLOG_ERROR, "failed to allocate instance");
-        goto fail0;
+    if (!strcmp(name, "_caller")) {
+        *out_object = NCDObject_Build(NULL, o, NULL, (NCDObject_func_getobj)caller_obj_func_getobj);
+        return 1;
     }
     }
+    
+    return 0;
+}
+
+static int caller_obj_func_getobj (struct instance *o, const char *name, NCDObject *out_object)
+{
+    return NCDModuleInst_Backend_GetObj(o->i, name, out_object);
+}
+
+static void bring_joins_down (struct instance *o)
+{
+    LinkedList0Node *ln;
+    while (ln = LinkedList0_GetFirst(&o->clean_list)) {
+        struct join_instance *join = UPPER_OBJECT(ln, struct join_instance, list_node);
+        ASSERT(join->spawn == o)
+        ASSERT(!join->is_dirty)
+        NCDModuleInst_Backend_Down(join->i);
+        LinkedList0_Remove(&o->clean_list, &join->list_node);
+        LinkedList0_Prepend(&o->dirty_list, &join->list_node);
+        join->is_dirty = 1;
+    }
+}
+
+static void continue_working (struct instance *o)
+{
+    ASSERT(o->state == STATE_WAITING)
+    ASSERT(LinkedList0_IsEmpty(&o->dirty_list))
+    
+    // continue process
+    NCDModuleProcess_Continue(&o->process);
+    
+    // set state working
+    o->state = STATE_WORKING;
+}
+
+static void continue_terminating (struct instance *o)
+{
+    ASSERT(o->state == STATE_WAITING_TERMINATING)
+    ASSERT(LinkedList0_IsEmpty(&o->dirty_list))
+    
+    // request process to terminate
+    NCDModuleProcess_Terminate(&o->process);
+    
+    // set state terminating
+    o->state = STATE_TERMINATING;
+}
+
+static void func_new (void *vo, NCDModuleInst *i)
+{
+    struct instance *o = vo;
     o->i = i;
     o->i = i;
-    NCDModuleInst_Backend_SetUser(i, o);
     
     
     // check arguments
     // check arguments
     NCDValRef template_name_arg;
     NCDValRef template_name_arg;
     NCDValRef args_arg;
     NCDValRef args_arg;
-    if (!NCDVal_ListRead(o->i->args, 2, &template_name_arg, &args_arg)) {
+    if (!NCDVal_ListRead(i->args, 2, &template_name_arg, &args_arg)) {
         ModuleLog(o->i, BLOG_ERROR, "wrong arity");
         ModuleLog(o->i, BLOG_ERROR, "wrong arity");
-        goto fail1;
+        goto fail0;
     }
     }
     if (!NCDVal_IsStringNoNulls(template_name_arg) || !NCDVal_IsList(args_arg)) {
     if (!NCDVal_IsStringNoNulls(template_name_arg) || !NCDVal_IsList(args_arg)) {
         ModuleLog(o->i, BLOG_ERROR, "wrong type");
         ModuleLog(o->i, BLOG_ERROR, "wrong type");
-        goto fail1;
+        goto fail0;
     }
     }
     
     
     // signal up.
     // signal up.
@@ -119,15 +226,20 @@ static void func_new (NCDModuleInst *i)
     // create process
     // create process
     if (!NCDModuleProcess_Init(&o->process, o->i, NCDVal_StringValue(template_name_arg), args_arg, o, (NCDModuleProcess_handler_event)process_handler_event)) {
     if (!NCDModuleProcess_Init(&o->process, o->i, NCDVal_StringValue(template_name_arg), args_arg, o, (NCDModuleProcess_handler_event)process_handler_event)) {
         ModuleLog(o->i, BLOG_ERROR, "NCDModuleProcess_Init failed");
         ModuleLog(o->i, BLOG_ERROR, "NCDModuleProcess_Init failed");
-        goto fail1;
+        goto fail0;
     }
     }
     
     
+    // set object resolution function
+    NCDModuleProcess_SetSpecialFuncs(&o->process, (NCDModuleProcess_func_getspecialobj)process_func_getspecialobj);
+    
+    // init lists
+    LinkedList0_Init(&o->clean_list);
+    LinkedList0_Init(&o->dirty_list);
+    
     // set state working
     // set state working
     o->state = STATE_WORKING;
     o->state = STATE_WORKING;
     return;
     return;
     
     
-fail1:
-    free(o);
 fail0:
 fail0:
     NCDModuleInst_Backend_SetError(i);
     NCDModuleInst_Backend_SetError(i);
     NCDModuleInst_Backend_Dead(i);
     NCDModuleInst_Backend_Dead(i);
@@ -135,38 +247,155 @@ fail0:
 
 
 void instance_free (struct instance *o)
 void instance_free (struct instance *o)
 {
 {
-    NCDModuleInst *i = o->i;
+    ASSERT(LinkedList0_IsEmpty(&o->dirty_list))
+    
+    // unlink joins
+    LinkedList0Node *ln;
+    while (ln = LinkedList0_GetFirst(&o->clean_list)) {
+        struct join_instance *join = UPPER_OBJECT(ln, struct join_instance, list_node);
+        ASSERT(join->spawn == o)
+        ASSERT(!join->is_dirty)
+        LinkedList0_Remove(&o->clean_list, &join->list_node);
+        join->spawn = NULL;
+    }
     
     
     // free process
     // free process
     NCDModuleProcess_Free(&o->process);
     NCDModuleProcess_Free(&o->process);
     
     
-    // free instance
-    free(o);
-    
-    NCDModuleInst_Backend_Dead(i);
+    NCDModuleInst_Backend_Dead(o->i);
 }
 }
 
 
 static void func_die (void *vo)
 static void func_die (void *vo)
 {
 {
     struct instance *o = vo;
     struct instance *o = vo;
+    ASSERT(o->state != STATE_WAITING_TERMINATING)
     ASSERT(o->state != STATE_TERMINATING)
     ASSERT(o->state != STATE_TERMINATING)
+    assert_dirty_state(o);
     
     
-    // request process to terminate
-    NCDModuleProcess_Terminate(&o->process);
+    // bring joins down
+    if (o->state == STATE_UP) {
+        bring_joins_down(o);
+    }
     
     
-    // set state terminating
-    o->state = STATE_TERMINATING;
+    // set state waiting terminating
+    o->state = STATE_WAITING_TERMINATING;
+    
+    // start terminating now if possible
+    if (LinkedList0_IsEmpty(&o->dirty_list)) {
+        continue_terminating(o);
+        return;
+    }
+}
+
+static void join_func_new (void *vo, NCDModuleInst *i)
+{
+    struct join_instance *o = vo;
+    o->i = i;
+    
+    if (!NCDVal_ListRead(i->args, 0)) {
+        ModuleLog(o->i, BLOG_ERROR, "wrong arity");
+        goto fail0;
+    }
+    
+    o->spawn = NCDModuleInst_Backend_GetUser((NCDModuleInst *)i->method_user);
+    assert_dirty_state(o->spawn);
+    
+    LinkedList0_Prepend(&o->spawn->clean_list, &o->list_node);
+    o->is_dirty = 0;
+    
+    if (o->spawn->state == STATE_UP) {
+        NCDModuleInst_Backend_Up(i);
+    }
+    
+    return;
+    
+fail0:
+    NCDModuleInst_Backend_SetError(i);
+    NCDModuleInst_Backend_Dead(i);
+}
+
+static void join_func_die (void *vo)
+{
+    struct join_instance *o = vo;
+    
+    if (o->spawn) {
+        assert_dirty_state(o->spawn);
+        
+        // remove from list
+        if (o->is_dirty) {
+            LinkedList0_Remove(&o->spawn->dirty_list, &o->list_node);
+        } else {
+            LinkedList0_Remove(&o->spawn->clean_list, &o->list_node);
+        }
+        
+        if (o->is_dirty && LinkedList0_IsEmpty(&o->spawn->dirty_list)) {
+            ASSERT(o->spawn->state == STATE_WAITING || o->spawn->state == STATE_WAITING_TERMINATING)
+            
+            if (o->spawn->state == STATE_WAITING) {
+                continue_working(o->spawn);
+            } else {
+                continue_terminating(o->spawn);
+            }
+        }
+    }
+    
+    NCDModuleInst_Backend_Dead(o->i);
+}
+
+static int join_func_getobj (void *vo, const char *name, NCDObject *out)
+{
+    struct join_instance *o = vo;
+    
+    if (!o->spawn) {
+        return 0;
+    }
+    
+    return NCDModuleProcess_GetObj(&o->spawn->process, name, out);
+}
+
+static void join_func_clean (void *vo)
+{
+    struct join_instance *o = vo;
+    
+    if (!(o->spawn && o->is_dirty)) {
+        return;
+    }
+    
+    assert_dirty_state(o->spawn);
+    ASSERT(o->spawn->state == STATE_WAITING || o->spawn->state == STATE_WAITING_TERMINATING)
+    
+    LinkedList0_Remove(&o->spawn->dirty_list, &o->list_node);
+    LinkedList0_Prepend(&o->spawn->clean_list, &o->list_node);
+    o->is_dirty = 0;
+    
+    if (LinkedList0_IsEmpty(&o->spawn->dirty_list)) {
+        if (o->spawn->state == STATE_WAITING) {
+            continue_working(o->spawn);
+        } else {
+            continue_terminating(o->spawn);
+        }
+    }
 }
 }
 
 
 static const struct NCDModule modules[] = {
 static const struct NCDModule modules[] = {
     {
     {
         .type = "spawn",
         .type = "spawn",
-        .func_new = func_new,
-        .func_die = func_die
+        .func_new2 = func_new,
+        .func_die = func_die,
+        .alloc_size = sizeof(struct instance)
     }, {
     }, {
         .type = "synchronous_process", // deprecated name
         .type = "synchronous_process", // deprecated name
-        .func_new = func_new,
-        .func_die = func_die
+        .func_new2 = func_new,
+        .func_die = func_die,
+        .alloc_size = sizeof(struct instance)
+    }, {
+        .type = "spawn::join",
+        .func_new2 = join_func_new,
+        .func_die = join_func_die,
+        .func_getobj = join_func_getobj,
+        .func_clean = join_func_clean,
+        .alloc_size = sizeof(struct join_instance),
+        .flags = NCDMODULE_FLAG_CAN_RESOLVE_WHEN_DOWN
     }, {
     }, {
         .type = NULL
         .type = NULL
     }
     }