Selaa lähdekoodia

ncd: improve requestproto such that it is possible for a client to abort a request. Reimplement request client
abstraction to be able to use a single connection for multiple requests.

ambrop7 14 vuotta sitten
vanhempi
sitoutus
d5509cefb3

+ 2 - 0
blog_channels.txt

@@ -108,3 +108,5 @@ ncd_try 4
 ncd_sys_request_server 4
 NCDRequest 4
 ncd_net_ipv6_wait_dynamic_addr 4
+NCDRequestClient 4
+ncd_request 4

+ 4 - 0
generated/blog_channel_NCDRequestClient.h

@@ -0,0 +1,4 @@
+#ifdef BLOG_CURRENT_CHANNEL
+#undef BLOG_CURRENT_CHANNEL
+#endif
+#define BLOG_CURRENT_CHANNEL BLOG_CHANNEL_NCDRequestClient

+ 4 - 0
generated/blog_channel_ncd_request.h

@@ -0,0 +1,4 @@
+#ifdef BLOG_CURRENT_CHANNEL
+#undef BLOG_CURRENT_CHANNEL
+#endif
+#define BLOG_CURRENT_CHANNEL BLOG_CHANNEL_ncd_request

+ 3 - 1
generated/blog_channels_defines.h

@@ -108,4 +108,6 @@
 #define BLOG_CHANNEL_ncd_sys_request_server 107
 #define BLOG_CHANNEL_NCDRequest 108
 #define BLOG_CHANNEL_ncd_net_ipv6_wait_dynamic_addr 109
-#define BLOG_NUM_CHANNELS 110
+#define BLOG_CHANNEL_NCDRequestClient 110
+#define BLOG_CHANNEL_ncd_request 111
+#define BLOG_NUM_CHANNELS 112

+ 2 - 0
generated/blog_channels_list.h

@@ -108,3 +108,5 @@
 {.name = "ncd_sys_request_server", .loglevel = 4},
 {.name = "NCDRequest", .loglevel = 4},
 {.name = "ncd_net_ipv6_wait_dynamic_addr", .loglevel = 4},
+{.name = "NCDRequestClient", .loglevel = 4},
+{.name = "ncd_request", .loglevel = 4},

+ 60 - 23
ncd-request/ncd-request.c

@@ -38,14 +38,22 @@
 #include <system/BReactor.h>
 #include <ncd/NCDValueParser.h>
 #include <ncd/NCDValueGenerator.h>
-#include <ncd/NCDRequest.h>
+#include <ncd/NCDRequestClient.h>
 
-static void request_handler_finished (void *user, int is_error);
+#include <generated/blog_channel_ncd_request.h>
+
+static void client_handler_error (void *user);
+static void client_handler_connected (void *user);
+static void request_handler_sent (void *user);
 static void request_handler_reply (void *user, NCDValue reply_data);
+static void request_handler_finished (void *user, int is_error);
 static int write_all (int fd, const uint8_t *data, size_t len);
 
+NCDValue request_payload;
 BReactor reactor;
-NCDRequest request;
+NCDRequestClient client;
+NCDRequestClientRequest request;
+int have_request;
 
 int main (int argc, char *argv[])
 {
@@ -63,34 +71,38 @@ int main (int argc, char *argv[])
     
     BTime_Init();
     
-    if (!BNetwork_GlobalInit()) {
-        BLog(BLOG_ERROR, "BNetwork_Init failed");
+    if (!NCDValueParser_Parse(request_payload_string, strlen(request_payload_string), &request_payload)) {
+        BLog(BLOG_ERROR, "BReactor_Init failed");
         goto fail1;
     }
     
-    if (!BReactor_Init(&reactor)) {
-        BLog(BLOG_ERROR, "BReactor_Init failed");
-        goto fail1;
+    if (!BNetwork_GlobalInit()) {
+        BLog(BLOG_ERROR, "BNetwork_Init failed");
+        goto fail2;
     }
     
-    NCDValue request_payload;
-    if (!NCDValueParser_Parse(request_payload_string, strlen(request_payload_string), &request_payload)) {
+    if (!BReactor_Init(&reactor)) {
         BLog(BLOG_ERROR, "BReactor_Init failed");
         goto fail2;
     }
     
-    if (!NCDRequest_Init(&request, socket_path, &request_payload, &reactor, NULL, request_handler_finished, request_handler_reply)) {
-        BLog(BLOG_ERROR, "NCDRequest_Init failed");
-        NCDValue_Free(&request_payload);
-        goto fail2;
+    if (!NCDRequestClient_Init(&client, socket_path, &reactor, NULL, client_handler_error, client_handler_connected)) {
+        BLog(BLOG_ERROR, "NCDRequestClient_Init failed");
+        goto fail3;
     }
-    NCDValue_Free(&request_payload);
+    
+    have_request = 0;
     
     res = BReactor_Exec(&reactor);
     
-    NCDRequest_Free(&request);
-fail2:
+    if (have_request) {
+        NCDRequestClientRequest_Free(&request);
+    }
+    NCDRequestClient_Free(&client);
+fail3:
     BReactor_Free(&reactor);
+fail2:
+    NCDValue_Free(&request_payload);
 fail1:
     BLog_Free();
 fail0:
@@ -98,19 +110,35 @@ fail0:
     return res;
 }
 
-static void request_handler_finished (void *user, int is_error)
+static void client_handler_error (void *user)
 {
-    if (is_error) {
-        BLog(BLOG_ERROR, "error");
+    BLog(BLOG_ERROR, "client error");
+    
+    BReactor_Quit(&reactor, 1);
+}
+
+static void client_handler_connected (void *user)
+{
+    ASSERT(!have_request)
+    
+    if (!NCDRequestClientRequest_Init(&request, &client, &request_payload, NULL, request_handler_sent, request_handler_reply, request_handler_finished)) {
+        BLog(BLOG_ERROR, "NCDRequestClientRequest_Init failed");
         BReactor_Quit(&reactor, 1);
         return;
     }
     
-    BReactor_Quit(&reactor, 0);
+    have_request = 1;
+}
+
+static void request_handler_sent (void *user)
+{
+    ASSERT(have_request)
 }
 
 static void request_handler_reply (void *user, NCDValue reply_data)
 {
+    ASSERT(have_request)
+    
     char *str = NCDValueGenerator_Generate(&reply_data);
     if (!str) {
         BLog(BLOG_ERROR, "NCDValueGenerator_Generate failed");
@@ -124,8 +152,6 @@ static void request_handler_reply (void *user, NCDValue reply_data)
         goto fail1;
     }
     
-    NCDRequest_Next(&request);
-    
     free(str);
     NCDValue_Free(&reply_data);
     return;
@@ -137,6 +163,17 @@ fail0:
     BReactor_Quit(&reactor, 1);
 }
 
+static void request_handler_finished (void *user, int is_error)
+{
+    if (is_error) {
+        BLog(BLOG_ERROR, "request error");
+        BReactor_Quit(&reactor, 1);
+        return;
+    }
+    
+    BReactor_Quit(&reactor, 0);
+}
+
 static int write_all (int fd, const uint8_t *data, size_t len)
 {
     while (len > 0) {

+ 2 - 2
ncd/CMakeLists.txt

@@ -36,7 +36,7 @@ add_library(ncdvalue
 target_link_libraries(ncdvalue ncdconfig)
 
 add_library(ncdrequest
-    NCDRequest.c
+    NCDRequestClient.c
 )
 target_link_libraries(ncdrequest base system ncdvalue)
 
@@ -104,7 +104,7 @@ add_executable(badvpn-ncd
     modules/net_ipv6_wait_dynamic_addr.c
     ${NCD_ADDITIONAL_SOURCES}
 )
-target_link_libraries(badvpn-ncd system flow flowextra dhcpclient arpprobe ncdconfig ncdvalue udevmonitor ncdinterfacemonitor)
+target_link_libraries(badvpn-ncd system flow flowextra dhcpclient arpprobe ncdconfig ncdvalue udevmonitor ncdinterfacemonitor ncdrequest)
 
 if (BADVPN_USE_LINUX_INPUT)
     string(REPLACE " " ";" FLAGS_LIST "${CMAKE_C_FLAGS}")

+ 0 - 325
ncd/NCDRequest.c

@@ -1,325 +0,0 @@
-/**
- * @file NCDRequest.c
- * @author Ambroz Bizjak <ambrop7@gmail.com>
- * 
- * @section LICENSE
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * 1. Redistributions of source code must retain the above copyright
- *    notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- *    notice, this list of conditions and the following disclaimer in the
- *    documentation and/or other materials provided with the distribution.
- * 3. Neither the name of the author nor the
- *    names of its contributors may be used to endorse or promote products
- *    derived from this software without specific prior written permission.
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include <stddef.h>
-#include <stdint.h>
-#include <limits.h>
-
-#include <misc/byteorder.h>
-#include <misc/expstring.h>
-#include <protocol/packetproto.h>
-#include <protocol/requestproto.h>
-#include <base/BLog.h>
-
-#include "NCDRequest.h"
-
-#include <generated/blog_channel_NCDRequest.h>
-
-#define SEND_PAYLOAD_MTU 32768
-#define RECV_PAYLOAD_MTU 32768
-
-#define SEND_MTU (SEND_PAYLOAD_MTU + sizeof(struct requestproto_header))
-#define RECV_MTU (RECV_PAYLOAD_MTU + sizeof(struct requestproto_header))
-
-#define STATE_CONNECTING 1
-#define STATE_CONNECTED 2
-
-static int build_requestproto_packet (uint32_t request_id, uint32_t flags, NCDValue *payload_value, uint8_t **out_data, int *out_len);
-static void report_finished (NCDRequest *o, int is_error);
-static void connector_handler (NCDRequest *o, int is_error);
-static void connection_handler (NCDRequest *o, int event);
-static void decoder_handler_error (NCDRequest *o);
-static void recv_if_handler_send (NCDRequest *o, uint8_t *data, int data_len);
-static void send_sender_iface_handler_done (NCDRequest *o);
-
-static int build_requestproto_packet (uint32_t request_id, uint32_t flags, NCDValue *payload_value, uint8_t **out_data, int *out_len)
-{
-    struct header {
-        struct packetproto_header pp;
-        struct requestproto_header rp;
-    };
-    
-    ExpString str;
-    if (!ExpString_Init(&str)) {
-        BLog(BLOG_ERROR, "ExpString_Init failed");
-        goto fail0;
-    }
-    
-    if (!ExpString_AppendZeros(&str, sizeof(struct header))) {
-        BLog(BLOG_ERROR, "ExpString_AppendBinary failed");
-        goto fail1;
-    }
-    
-    if (payload_value && !NCDValueGenerator_AppendGenerate(payload_value, &str)) {
-        BLog(BLOG_ERROR, "NCDValueGenerator_AppendGenerate failed");
-        goto fail1;
-    }
-    
-    size_t len = ExpString_Length(&str);
-    if (len > INT_MAX || len > PACKETPROTO_ENCLEN(SEND_MTU) || len - sizeof(struct packetproto_header) > UINT16_MAX) {
-        BLog(BLOG_ERROR, "reply is too long");
-        goto fail1;
-    }
-    
-    uint8_t *packet = ExpString_Get(&str);
-    
-    struct header *header = (void *)packet;
-    header->pp.len = htol16(len - sizeof(struct packetproto_header));
-    header->rp.request_id = htol32(request_id);
-    header->rp.flags = htol32(flags);
-    
-    *out_data = packet;
-    *out_len = len;
-    return 1;
-    
-fail1:
-    ExpString_Free(&str);
-fail0:
-    return 0;
-}
-
-static void report_finished (NCDRequest *o, int is_error)
-{
-    DEBUGERROR(&o->d_err, o->handler_finished(o->user, is_error))
-}
-
-static void connector_handler (NCDRequest *o, int is_error)
-{
-    DebugObject_Access(&o->d_obj);
-    ASSERT(o->state == STATE_CONNECTING)
-    
-    // check error
-    if (is_error) {
-        BLog(BLOG_ERROR, "failed to connect to socket");
-        goto fail0;
-    }
-    
-    BPendingGroup *pg = BReactor_PendingGroup(o->reactor);
-    
-    // init connection
-    if (!BConnection_Init(&o->con, BCONNECTION_SOURCE_CONNECTOR(&o->connector), o->reactor, o, (BConnection_handler)connection_handler)) {
-        BLog(BLOG_ERROR, "BConnection_Init failed");
-        goto fail0;
-    }
-    
-    // init connection interfaces
-    BConnection_SendAsync_Init(&o->con);
-    BConnection_RecvAsync_Init(&o->con);
-    StreamPassInterface *con_send_if = BConnection_SendAsync_GetIf(&o->con);
-    StreamRecvInterface *con_recv_if = BConnection_RecvAsync_GetIf(&o->con);
-    
-    // init receive interface
-    PacketPassInterface_Init(&o->recv_if, RECV_MTU, (PacketPassInterface_handler_send)recv_if_handler_send, o, pg);
-    
-    // init receive decoder
-    if (!PacketProtoDecoder_Init(&o->recv_decoder, con_recv_if, &o->recv_if, pg, o, (PacketProtoDecoder_handler_error)decoder_handler_error)) {
-        BLog(BLOG_ERROR, "PacketProtoDecoder_Init failed");
-        goto fail1;
-    }
-    
-    // init send sender
-    PacketStreamSender_Init(&o->send_sender, con_send_if, PACKETPROTO_ENCLEN(SEND_MTU), pg);
-    o->send_sender_iface = PacketStreamSender_GetInput(&o->send_sender);
-    
-    // init send interface
-    PacketPassInterface_Sender_Init(o->send_sender_iface, (PacketPassInterface_handler_done)send_sender_iface_handler_done, o);
-    
-    // send request
-    PacketPassInterface_Sender_Send(o->send_sender_iface, o->request_data, o->request_len);
-    
-    // set state connected
-    o->state = STATE_CONNECTED;
-    return;
-    
-fail1:
-    PacketPassInterface_Free(&o->recv_if);
-    BConnection_RecvAsync_Free(&o->con);
-    BConnection_SendAsync_Free(&o->con);
-    BConnection_Free(&o->con);
-fail0:
-    report_finished(o, 1);
-}
-
-static void connection_handler (NCDRequest *o, int event)
-{
-    DebugObject_Access(&o->d_obj);
-    ASSERT(o->state == STATE_CONNECTED)
-    
-    BLog(BLOG_ERROR, "connection error");
-    
-    report_finished(o, 1);
-}
-
-static void decoder_handler_error (NCDRequest *o)
-{
-    DebugObject_Access(&o->d_obj);
-    ASSERT(o->state == STATE_CONNECTED)
-    
-    BLog(BLOG_ERROR, "decoder error");
-    
-    report_finished(o, 1);
-}
-
-static void recv_if_handler_send (NCDRequest *o, uint8_t *data, int data_len)
-{
-    DebugObject_Access(&o->d_obj);
-    ASSERT(o->state == STATE_CONNECTED)
-    ASSERT(!o->processing)
-    ASSERT(data_len >= 0)
-    ASSERT(data_len <= RECV_MTU)
-    
-    if (data_len < sizeof(struct requestproto_header)) {
-        BLog(BLOG_ERROR, "missing requestproto header");
-        goto fail;
-    }
-    
-    struct requestproto_header *header = (struct requestproto_header *)data;
-    uint32_t request_id = ltoh32(header->request_id);
-    uint32_t flags = ltoh32(header->flags);
-    
-    uint8_t *payload = data + sizeof(*header);
-    int payload_len = data_len - sizeof(*header);
-    
-    if (request_id != o->request_id) {
-        BLog(BLOG_ERROR, "invalid request ID");
-        goto fail;
-    }
-    
-    if (flags == REQUESTPROTO_REPLY_FLAG_DATA) {
-        NCDValue value;
-        if (!NCDValueParser_Parse(payload, payload_len, &value)) {
-            BLog(BLOG_ERROR, "NCDValueParser_Parse failed");
-            goto fail;
-        }
-        
-        // set processing
-        o->processing = 1;
-        
-        // call reply handler
-        o->handler_reply(o->user, value);
-        return;
-    }
-    
-    if (flags == REQUESTPROTO_REPLY_FLAG_END) {
-        if (payload_len != 0) {
-            BLog(BLOG_ERROR, "end reply has non-empty payload");
-            goto fail;
-        }
-        
-        // call finished handler
-        report_finished(o, 0);
-        return;
-    }
-    
-    BLog(BLOG_ERROR, "invalid requestproto flags");
-    
-fail:
-    report_finished(o, 1);
-}
-
-static void send_sender_iface_handler_done (NCDRequest *o)
-{
-    DebugObject_Access(&o->d_obj);
-    ASSERT(o->state == STATE_CONNECTED)
-}
-
-int NCDRequest_Init (NCDRequest *o, const char *socket_path, NCDValue *payload_value, BReactor *reactor, void *user, NCDRequest_handler_finished handler_finished, NCDRequest_handler_reply handler_reply)
-{
-    ASSERT(socket_path)
-    NCDValue_Type(payload_value);
-    ASSERT(handler_finished)
-    ASSERT(handler_reply)
-    
-    // init arguments
-    o->reactor = reactor;
-    o->user = user;
-    o->handler_finished = handler_finished;
-    o->handler_reply = handler_reply;
-    
-    // choose request ID
-    o->request_id = 175;
-    
-    // build request
-    if (!build_requestproto_packet(o->request_id, REQUESTPROTO_REQUEST_FLAG, payload_value, &o->request_data, &o->request_len)) {
-        BLog(BLOG_ERROR, "failed to build request");
-        goto fail0;
-    }
-    
-    // init connector
-    if (!BConnector_InitUnix(&o->connector, socket_path, reactor, o, (BConnector_handler)connector_handler)) {
-        BLog(BLOG_ERROR, "BConnector_InitUnix failed");
-        goto fail1;
-    }
-    
-    // set state connecting
-    o->state = STATE_CONNECTING;
-    
-    // set not processing
-    o->processing = 0;
-    
-    DebugError_Init(&o->d_err, BReactor_PendingGroup(reactor));
-    DebugObject_Init(&o->d_obj);
-    return 1;
-    
-fail1:
-    free(o->request_data);
-fail0:
-    return 0;
-}
-
-void NCDRequest_Free (NCDRequest *o)
-{
-    DebugObject_Free(&o->d_obj);
-    DebugError_Free(&o->d_err);
-    
-    if (o->state == STATE_CONNECTED) {
-        PacketStreamSender_Free(&o->send_sender);
-        PacketProtoDecoder_Free(&o->recv_decoder);
-        PacketPassInterface_Free(&o->recv_if);
-        BConnection_RecvAsync_Free(&o->con);
-        BConnection_SendAsync_Free(&o->con);
-        BConnection_Free(&o->con);
-    }
-    
-    BConnector_Free(&o->connector);
-    free(o->request_data);
-}
-
-void NCDRequest_Next (NCDRequest *o)
-{
-    DebugObject_Access(&o->d_obj);
-    ASSERT(o->state == STATE_CONNECTED)
-    ASSERT(o->processing)
-    
-    // set not processing
-    o->processing = 0;
-    
-    // accept received packet
-    PacketPassInterface_Done(&o->recv_if);
-}

+ 0 - 71
ncd/NCDRequest.h

@@ -1,71 +0,0 @@
-/**
- * @file NCDRequest.h
- * @author Ambroz Bizjak <ambrop7@gmail.com>
- * 
- * @section LICENSE
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * 1. Redistributions of source code must retain the above copyright
- *    notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- *    notice, this list of conditions and the following disclaimer in the
- *    documentation and/or other materials provided with the distribution.
- * 3. Neither the name of the author nor the
- *    names of its contributors may be used to endorse or promote products
- *    derived from this software without specific prior written permission.
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef BADVPN_NCDREQUEST_H
-#define BADVPN_NCDREQUEST_H
-
-#include <stdint.h>
-
-#include <misc/debug.h>
-#include <misc/debugerror.h>
-#include <base/DebugObject.h>
-#include <system/BConnection.h>
-#include <flow/PacketProtoDecoder.h>
-#include <flow/PacketStreamSender.h>
-#include <ncd/NCDValueGenerator.h>
-#include <ncd/NCDValueParser.h>
-
-typedef void (*NCDRequest_handler_finished) (void *user, int is_error);
-typedef void (*NCDRequest_handler_reply) (void *user, NCDValue reply_data);
-
-typedef struct {
-    BReactor *reactor;
-    void *user;
-    NCDRequest_handler_finished handler_finished;
-    NCDRequest_handler_reply handler_reply;
-    uint32_t request_id;
-    uint8_t *request_data;
-    int request_len;
-    BConnector connector;
-    BConnection con;
-    PacketPassInterface recv_if;
-    PacketProtoDecoder recv_decoder;
-    PacketStreamSender send_sender;
-    PacketPassInterface *send_sender_iface;
-    int state;
-    int processing;
-    DebugError d_err;
-    DebugObject d_obj;
-} NCDRequest;
-
-int NCDRequest_Init (NCDRequest *o, const char *socket_path, NCDValue *payload_value, BReactor *reactor, void *user, NCDRequest_handler_finished handler_finished, NCDRequest_handler_reply handler_reply) WARN_UNUSED;
-void NCDRequest_Free (NCDRequest *o);
-void NCDRequest_Next (NCDRequest *o);
-
-#endif

+ 589 - 0
ncd/NCDRequestClient.c

@@ -0,0 +1,589 @@
+/**
+ * @file NCDRequestClient.c
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the author nor the
+ *    names of its contributors may be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdlib.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <limits.h>
+#include <inttypes.h>
+
+#include <misc/byteorder.h>
+#include <misc/expstring.h>
+#include <misc/offset.h>
+#include <protocol/packetproto.h>
+#include <protocol/requestproto.h>
+#include <base/BLog.h>
+
+#include "NCDRequestClient.h"
+
+#include <generated/blog_channel_NCDRequestClient.h>
+
+#define SEND_PAYLOAD_MTU 32768
+#define RECV_PAYLOAD_MTU 32768
+
+#define SEND_MTU (SEND_PAYLOAD_MTU + sizeof(struct requestproto_header))
+#define RECV_MTU (RECV_PAYLOAD_MTU + sizeof(struct requestproto_header))
+
+#define CSTATE_CONNECTING 1
+#define CSTATE_CONNECTED 2
+
+#define RSTATE_SENDING_REQUEST 1
+#define RSTATE_READY 2
+#define RSTATE_SENDING_REQUEST_ABORT 3
+#define RSTATE_SENDING_ABORT 4
+#define RSTATE_WAITING_END 5
+#define RSTATE_DEAD_SENDING 6
+
+static int uint32_comparator (void *unused, void *vv1, void *vv2);
+static void report_error (NCDRequestClient *o);
+static void request_report_finished (NCDRequestClientRequest *o, int is_error);
+static void connector_handler (NCDRequestClient *o, int is_error);
+static void connection_handler (NCDRequestClient *o, int event);
+static void decoder_handler_error (NCDRequestClient *o);
+static void recv_if_handler_send (NCDRequestClient *o, uint8_t *data, int data_len);
+static struct NCDRequestClient_req * find_req (NCDRequestClient *o, uint32_t request_id);
+static int get_free_request_id (NCDRequestClient *o, uint32_t *out);
+static int build_requestproto_packet (uint32_t request_id, uint32_t type, NCDValue *payload_value, uint8_t **out_data, int *out_len);
+static void build_nodata_packet (uint32_t request_id, uint32_t type, uint8_t *data, int *out_len);
+static void req_free (struct NCDRequestClient_req *req);
+static void req_send_abort (struct NCDRequestClient_req *req);
+static void req_qflow_send_iface_handler_done (struct NCDRequestClient_req *req);
+
+static int uint32_comparator (void *unused, void *vv1, void *vv2)
+{
+    uint32_t *v1 = vv1;
+    uint32_t *v2 = vv2;
+    return (*v1 > *v2) - (*v1 < *v2);
+}
+
+static void report_error (NCDRequestClient *o)
+{
+    DEBUGERROR(&o->d_err, o->handler_error(o->user))
+}
+
+static void request_report_finished (NCDRequestClientRequest *o, int is_error)
+{
+    o->req = NULL;
+    
+    DEBUGERROR(&o->d_err, o->handler_finished(o->user, is_error))
+}
+
+static void connector_handler (NCDRequestClient *o, int is_error)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(o->state == CSTATE_CONNECTING)
+    
+    // check error
+    if (is_error) {
+        BLog(BLOG_ERROR, "failed to connect to socket");
+        goto fail0;
+    }
+    
+    BPendingGroup *pg = BReactor_PendingGroup(o->reactor);
+    
+    // init connection
+    if (!BConnection_Init(&o->con, BCONNECTION_SOURCE_CONNECTOR(&o->connector), o->reactor, o, (BConnection_handler)connection_handler)) {
+        BLog(BLOG_ERROR, "BConnection_Init failed");
+        goto fail0;
+    }
+    
+    // init connection interfaces
+    BConnection_SendAsync_Init(&o->con);
+    BConnection_RecvAsync_Init(&o->con);
+    StreamPassInterface *con_send_if = BConnection_SendAsync_GetIf(&o->con);
+    StreamRecvInterface *con_recv_if = BConnection_RecvAsync_GetIf(&o->con);
+    
+    // init receive interface
+    PacketPassInterface_Init(&o->recv_if, RECV_MTU, (PacketPassInterface_handler_send)recv_if_handler_send, o, pg);
+    
+    // init receive decoder
+    if (!PacketProtoDecoder_Init(&o->recv_decoder, con_recv_if, &o->recv_if, pg, o, (PacketProtoDecoder_handler_error)decoder_handler_error)) {
+        BLog(BLOG_ERROR, "PacketProtoDecoder_Init failed");
+        goto fail1;
+    }
+    
+    // init send sender
+    PacketStreamSender_Init(&o->send_sender, con_send_if, PACKETPROTO_ENCLEN(SEND_MTU), pg);
+    
+    // init send queue
+    PacketPassFifoQueue_Init(&o->send_queue, PacketStreamSender_GetInput(&o->send_sender), pg);
+    
+    // set state connected
+    o->state = CSTATE_CONNECTED;
+    
+    // call connected handler
+    o->handler_connected(o->user);
+    return;
+    
+fail1:
+    PacketPassInterface_Free(&o->recv_if);
+    BConnection_RecvAsync_Free(&o->con);
+    BConnection_SendAsync_Free(&o->con);
+    BConnection_Free(&o->con);
+fail0:
+    report_error(o);
+}
+
+static void connection_handler (NCDRequestClient *o, int event)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(o->state == CSTATE_CONNECTED)
+    
+    BLog(BLOG_ERROR, "connection error");
+    
+    report_error(o);
+}
+
+static void decoder_handler_error (NCDRequestClient *o)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(o->state == CSTATE_CONNECTED)
+    
+    BLog(BLOG_ERROR, "decoder error");
+    
+    report_error(o);
+}
+
+static void recv_if_handler_send (NCDRequestClient *o, uint8_t *data, int data_len)
+{
+    DebugObject_Access(&o->d_obj);
+    ASSERT(o->state == CSTATE_CONNECTED)
+    ASSERT(data_len >= 0)
+    ASSERT(data_len <= RECV_MTU)
+    
+    // accept packet
+    PacketPassInterface_Done(&o->recv_if);
+    
+    if (data_len < sizeof(struct requestproto_header)) {
+        BLog(BLOG_ERROR, "missing requestproto header");
+        goto fail;
+    }
+    
+    struct requestproto_header *header = (struct requestproto_header *)data;
+    uint32_t request_id = ltoh32(header->request_id);
+    uint32_t type = ltoh32(header->type);
+    
+    uint8_t *payload = data + sizeof(*header);
+    int payload_len = data_len - sizeof(*header);
+    
+    // find request
+    struct NCDRequestClient_req *req = find_req(o, request_id);
+    if (!req) {
+        BLog(BLOG_ERROR, "received packet with unknown request ID");
+        goto fail;
+    }
+    
+    switch (type) {
+        case REQUESTPROTO_TYPE_SERVER_REPLY: {
+            switch (o->state) {
+                case RSTATE_READY: {
+                    // parse payload
+                    NCDValue payload_value;
+                    if (!NCDValueParser_Parse(payload, payload_len, &payload_value)) {
+                        BLog(BLOG_ERROR, "failed to parse reply payload");
+                        goto fail;
+                    }
+                    
+                    // call reply handler
+                    req->creq->handler_reply(req->creq->user, payload_value);
+                    return;
+                } break;
+                
+                case RSTATE_SENDING_ABORT:
+                case RSTATE_WAITING_END:
+                    return;
+                
+                default:
+                    BLog(BLOG_ERROR, "received unexpected reply");
+                    goto fail;
+            }
+        } break;
+        
+        case REQUESTPROTO_TYPE_SERVER_FINISHED:
+        case REQUESTPROTO_TYPE_SERVER_ERROR: {
+            if (payload_len != 0) {
+                BLog(BLOG_ERROR, "finshed/aborted message has non-empty payload");
+                goto fail;
+            }
+            
+            switch (req->state) {
+                case RSTATE_SENDING_ABORT: {
+                    // set state dying send
+                    req->state = RSTATE_DEAD_SENDING;
+                    return;
+                } break;
+                
+                case RSTATE_WAITING_END: {
+                    // free req
+                    req_free(req);
+                    return;
+                } break;
+                
+                case RSTATE_READY: {
+                    NCDRequestClientRequest *creq = req->creq;
+                    
+                    // free req
+                    req_free(req);
+                    
+                    // report finished
+                    request_report_finished(creq, type == REQUESTPROTO_TYPE_SERVER_ERROR);
+                    return;
+                } break;
+                
+                default:
+                    BLog(BLOG_ERROR, "received unexpected finished/aborted");
+                    goto fail;
+            }
+        } break;
+        
+        default:
+            BLog(BLOG_ERROR, "received invalid message type");
+            goto fail;
+    }
+    
+    ASSERT(0)
+    
+fail:
+    report_error(o);
+}
+
+static struct NCDRequestClient_req * find_req (NCDRequestClient *o, uint32_t request_id)
+{
+    BAVLNode *tn = BAVL_LookupExact(&o->reqs_tree, &request_id);
+    if (!tn) {
+        return NULL;
+    }
+    
+    struct NCDRequestClient_req *req = UPPER_OBJECT(tn, struct NCDRequestClient_req, reqs_tree_node);
+    ASSERT(req->request_id == request_id)
+    
+    return req;
+}
+
+static int get_free_request_id (NCDRequestClient *o, uint32_t *out)
+{
+    uint32_t first = o->next_request_id;
+    
+    do {
+        if (!find_req(o, o->next_request_id)) {
+            *out = o->next_request_id;
+            return 1;
+        }
+        o->next_request_id++;
+    } while (o->next_request_id != first);
+    
+    return 0;
+}
+
+static int build_requestproto_packet (uint32_t request_id, uint32_t type, NCDValue *payload_value, uint8_t **out_data, int *out_len)
+{
+    struct header {
+        struct packetproto_header pp;
+        struct requestproto_header rp;
+    } __attribute__((packed));
+    
+    ExpString str;
+    if (!ExpString_Init(&str)) {
+        BLog(BLOG_ERROR, "ExpString_Init failed");
+        goto fail0;
+    }
+    
+    if (!ExpString_AppendZeros(&str, sizeof(struct header))) {
+        BLog(BLOG_ERROR, "ExpString_AppendBinary failed");
+        goto fail1;
+    }
+    
+    if (payload_value && !NCDValueGenerator_AppendGenerate(payload_value, &str)) {
+        BLog(BLOG_ERROR, "NCDValueGenerator_AppendGenerate failed");
+        goto fail1;
+    }
+    
+    size_t len = ExpString_Length(&str);
+    if (len > INT_MAX || len > PACKETPROTO_ENCLEN(SEND_MTU) || len - sizeof(struct packetproto_header) > UINT16_MAX) {
+        BLog(BLOG_ERROR, "reply is too long");
+        goto fail1;
+    }
+    
+    uint8_t *packet = ExpString_Get(&str);
+    
+    struct header *header = (void *)packet;
+    header->pp.len = htol16(len - sizeof(struct packetproto_header));
+    header->rp.request_id = htol32(request_id);
+    header->rp.type = htol32(type);
+    
+    *out_data = packet;
+    *out_len = len;
+    return 1;
+    
+fail1:
+    ExpString_Free(&str);
+fail0:
+    return 0;
+}
+
+static void build_nodata_packet (uint32_t request_id, uint32_t type, uint8_t *data, int *out_len)
+{
+    struct header {
+        struct packetproto_header pp;
+        struct requestproto_header rp;
+    } __attribute__((packed));
+    
+    struct header *header = (void *)data;
+    header->pp.len = htol16(sizeof(header->rp));
+    header->rp.request_id = htol32(request_id);
+    header->rp.type = htol32(type);
+}
+
+static void req_free (struct NCDRequestClient_req *req)
+{
+    NCDRequestClient *client = req->client;
+    PacketPassFifoQueueFlow_AssertFree(&req->send_qflow);
+    
+    // free queue flow
+    PacketPassFifoQueueFlow_Free(&req->send_qflow);
+    
+    // free request data
+    free(req->request_data);
+    
+    // remove from reqs tree
+    BAVL_Remove(&client->reqs_tree, &req->reqs_tree_node);
+    
+    // free structure
+    free(req);
+}
+
+static void req_send_abort (struct NCDRequestClient_req *req)
+{
+    // build packet
+    build_nodata_packet(req->request_id, REQUESTPROTO_TYPE_CLIENT_ABORT, req->request_data, &req->request_len);
+    
+    // start sending
+    PacketPassInterface_Sender_Send(req->send_qflow_iface, req->request_data, req->request_len);
+    
+    // set state sending abort
+    req->state = RSTATE_SENDING_ABORT;
+}
+
+static void req_qflow_send_iface_handler_done (struct NCDRequestClient_req *req)
+{
+    switch (req->state) {
+        case RSTATE_SENDING_REQUEST: {
+            // set state ready
+            req->state = RSTATE_READY;
+            
+            // call sent handler
+            req->creq->handler_sent(req->creq->user);
+            return;
+        } break;
+        
+        case RSTATE_SENDING_REQUEST_ABORT: {
+            // send abort
+            req_send_abort(req);
+        } break;
+        
+        case RSTATE_SENDING_ABORT: {
+            // set state waiting end
+            req->state = RSTATE_WAITING_END;
+        } break;
+        
+        case RSTATE_DEAD_SENDING: {
+            // free req
+            req_free(req);
+        } break;
+        
+        default: ASSERT(0);
+    }
+}
+
+int NCDRequestClient_Init (NCDRequestClient *o, const char *socket_path, BReactor *reactor, void *user,
+                           NCDRequestClient_handler_error handler_error,
+                           NCDRequestClient_handler_connected handler_connected)
+{
+    ASSERT(socket_path)
+    ASSERT(handler_error)
+    ASSERT(handler_connected)
+    
+    // init arguments
+    o->reactor = reactor;
+    o->user = user;
+    o->handler_error = handler_error;
+    o->handler_connected = handler_connected;
+    
+    // init connector
+    if (!BConnector_InitUnix(&o->connector, socket_path, reactor, o, (BConnector_handler)connector_handler)) {
+        BLog(BLOG_ERROR, "BConnector_InitUnix failed");
+        goto fail0;
+    }
+    
+    // init reqs tree
+    BAVL_Init(&o->reqs_tree, OFFSET_DIFF(struct NCDRequestClient_req, request_id, reqs_tree_node), uint32_comparator, NULL);
+    
+    // set next request ID
+    o->next_request_id = 0;
+    
+    // set state connecting
+    o->state = CSTATE_CONNECTING;
+    
+    DebugCounter_Init(&o->d_reqests_ctr);
+    DebugError_Init(&o->d_err, BReactor_PendingGroup(reactor));
+    DebugObject_Init(&o->d_obj);
+    return 1;
+    
+fail0:
+    return 0;
+}
+
+void NCDRequestClient_Free (NCDRequestClient *o)
+{
+    DebugObject_Free(&o->d_obj);
+    DebugError_Free(&o->d_err);
+    DebugCounter_Free(&o->d_reqests_ctr);
+    
+    if (o->state == CSTATE_CONNECTED) {
+        // allow freeing queue flow
+        PacketPassFifoQueue_PrepareFree(&o->send_queue);
+        
+        // free remaining reqs
+        BAVLNode *tn;
+        while (tn = BAVL_GetFirst(&o->reqs_tree)) {
+            struct NCDRequestClient_req *req = UPPER_OBJECT(tn, struct NCDRequestClient_req, reqs_tree_node);
+            ASSERT(req->state != RSTATE_SENDING_REQUEST)
+            ASSERT(req->state != RSTATE_READY)
+            req_free(req);
+        }
+        
+        // free connection stuff
+        PacketPassFifoQueue_Free(&o->send_queue);
+        PacketStreamSender_Free(&o->send_sender);
+        PacketProtoDecoder_Free(&o->recv_decoder);
+        PacketPassInterface_Free(&o->recv_if);
+        BConnection_RecvAsync_Free(&o->con);
+        BConnection_SendAsync_Free(&o->con);
+        BConnection_Free(&o->con);
+    }
+    
+    // free connector
+    BConnector_Free(&o->connector);
+}
+
+int NCDRequestClientRequest_Init (NCDRequestClientRequest *o, NCDRequestClient *client, NCDValue *payload_value, void *user,
+                                  NCDRequestClientRequest_handler_sent handler_sent,
+                                  NCDRequestClientRequest_handler_reply handler_reply,
+                                  NCDRequestClientRequest_handler_finished handler_finished)
+{
+    ASSERT(client->state == CSTATE_CONNECTED)
+    DebugError_AssertNoError(&client->d_err);
+    ASSERT(payload_value)
+    ASSERT(handler_sent)
+    ASSERT(handler_reply)
+    ASSERT(handler_finished)
+    
+    // init arguments
+    o->client = client;
+    o->user = user;
+    o->handler_sent = handler_sent;
+    o->handler_reply = handler_reply;
+    o->handler_finished = handler_finished;
+    
+    // allocate req structure
+    struct NCDRequestClient_req *req = malloc(sizeof(*req));
+    if (!req) {
+        BLog(BLOG_ERROR, "malloc failed");
+        goto fail0;
+    }
+    
+    // allocate request ID
+    if (!get_free_request_id(client, &req->request_id)) {
+        BLog(BLOG_ERROR, "failed to allocate request ID");
+        goto fail1;
+    }
+    
+    // insert to reqs tree
+    int res = BAVL_Insert(&client->reqs_tree, &req->reqs_tree_node, NULL);
+    ASSERT(res)
+    
+    // set pointers
+    o->req = req;
+    req->creq = o;
+    req->client = client;
+    
+    // build request
+    if (!build_requestproto_packet(req->request_id, REQUESTPROTO_TYPE_CLIENT_REQUEST, payload_value, &req->request_data, &req->request_len)) {
+        BLog(BLOG_ERROR, "failed to build request");
+        goto fail2;
+    }
+    
+    // init queue flow
+    PacketPassFifoQueueFlow_Init(&req->send_qflow, &client->send_queue);
+    
+    // init send interface
+    req->send_qflow_iface = PacketPassFifoQueueFlow_GetInput(&req->send_qflow);
+    PacketPassInterface_Sender_Init(req->send_qflow_iface, (PacketPassInterface_handler_done)req_qflow_send_iface_handler_done, req);
+    
+    // start sending request
+    PacketPassInterface_Sender_Send(req->send_qflow_iface, req->request_data, req->request_len);
+    
+    // set state sending request
+    req->state = RSTATE_SENDING_REQUEST;
+    
+    DebugCounter_Increment(&client->d_reqests_ctr);
+    DebugError_Init(&o->d_err, BReactor_PendingGroup(client->reactor));
+    DebugObject_Init(&o->d_obj);
+    return 1;
+    
+fail2:
+    BAVL_Remove(&client->reqs_tree, &req->reqs_tree_node);
+fail1:
+    free(req);
+fail0:
+    return 0;
+}
+
+void NCDRequestClientRequest_Free (NCDRequestClientRequest *o)
+{
+    NCDRequestClient *client = o->client;
+    struct NCDRequestClient_req *req = o->req;
+    DebugObject_Free(&o->d_obj);
+    DebugError_Free(&o->d_err);
+    DebugCounter_Decrement(&client->d_reqests_ctr);
+    
+    if (req) {
+        switch (req->state) {
+            case RSTATE_SENDING_REQUEST: {
+                req->state = RSTATE_SENDING_REQUEST_ABORT;
+            } break;
+            
+            case RSTATE_READY: {
+                req_send_abort(req);
+            } break;
+            
+            default: ASSERT(0);
+        }
+    }
+}

+ 108 - 0
ncd/NCDRequestClient.h

@@ -0,0 +1,108 @@
+/**
+ * @file NCDRequestClient.h
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the author nor the
+ *    names of its contributors may be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef BADVPN_NCDREQUESTCLIENT_H
+#define BADVPN_NCDREQUESTCLIENT_H
+
+#include <stdint.h>
+
+#include <misc/debug.h>
+#include <misc/debugerror.h>
+#include <misc/debugcounter.h>
+#include <structure/BAVL.h>
+#include <base/DebugObject.h>
+#include <system/BConnection.h>
+#include <flow/PacketProtoDecoder.h>
+#include <flow/PacketStreamSender.h>
+#include <flow/PacketPassFifoQueue.h>
+#include <ncd/NCDValueGenerator.h>
+#include <ncd/NCDValueParser.h>
+
+struct NCDRequestClient_req;
+
+typedef void (*NCDRequestClient_handler_error) (void *user);
+typedef void (*NCDRequestClient_handler_connected) (void *user);
+typedef void (*NCDRequestClientRequest_handler_sent) (void *user);
+typedef void (*NCDRequestClientRequest_handler_reply) (void *user, NCDValue reply_data);
+typedef void (*NCDRequestClientRequest_handler_finished) (void *user, int is_error);
+
+typedef struct {
+    BReactor *reactor;
+    void *user;
+    NCDRequestClient_handler_error handler_error;
+    NCDRequestClient_handler_connected handler_connected;
+    BConnector connector;
+    BConnection con;
+    PacketPassFifoQueue send_queue;
+    PacketStreamSender send_sender;
+    PacketProtoDecoder recv_decoder;
+    PacketPassInterface recv_if;
+    BAVL reqs_tree;
+    uint32_t next_request_id;
+    int state;
+    DebugCounter d_reqests_ctr;
+    DebugError d_err;
+    DebugObject d_obj;
+} NCDRequestClient;
+
+typedef struct {
+    NCDRequestClient *client;
+    void *user;
+    NCDRequestClientRequest_handler_sent handler_sent;
+    NCDRequestClientRequest_handler_reply handler_reply;
+    NCDRequestClientRequest_handler_finished handler_finished;
+    struct NCDRequestClient_req *req;
+    DebugError d_err;
+    DebugObject d_obj;
+} NCDRequestClientRequest;
+
+struct NCDRequestClient_req {
+    NCDRequestClientRequest *creq;
+    NCDRequestClient *client;
+    BAVLNode reqs_tree_node;
+    uint32_t request_id;
+    uint8_t *request_data;
+    int request_len;
+    PacketPassInterface *send_qflow_iface;
+    PacketPassFifoQueueFlow send_qflow;
+    int state;
+};
+
+int NCDRequestClient_Init (NCDRequestClient *o, const char *socket_path, BReactor *reactor, void *user,
+                           NCDRequestClient_handler_error handler_error,
+                           NCDRequestClient_handler_connected handler_connected) WARN_UNUSED;
+void NCDRequestClient_Free (NCDRequestClient *o);
+
+int NCDRequestClientRequest_Init (NCDRequestClientRequest *o, NCDRequestClient *client, NCDValue *payload_value, void *user,
+                                  NCDRequestClientRequest_handler_sent handler_sent,
+                                  NCDRequestClientRequest_handler_reply handler_reply,
+                                  NCDRequestClientRequest_handler_finished handler_finished) WARN_UNUSED;
+void NCDRequestClientRequest_Free (NCDRequestClientRequest *o);
+
+#endif

+ 99 - 55
ncd/modules/sys_request_server.c

@@ -115,8 +115,7 @@ struct request {
     LinkedList0Node requests_list_node;
     NCDValue request_data;
     NCDModuleProcess process;
-    BPending finish_job;
-    int finished;
+    int terminating;
 };
 
 struct reply {
@@ -136,11 +135,13 @@ static void connection_recv_decoder_handler_error (struct connection *c);
 static void connection_recv_if_handler_send (struct connection *c, uint8_t *data, int data_len);
 static int request_init (struct connection *c, uint32_t request_id, const uint8_t *data, int data_len);
 static void request_free (struct request *r);
+static struct request * find_request (struct connection *c, uint32_t request_id);
 static void request_process_handler_event (struct request *r, int event);
 static int request_process_func_getspecialobj (struct request *r, const char *name, NCDObject *out_object);
 static int request_process_request_obj_func_getvar (struct request *r, const char *name, NCDValue *out_value);
-static void request_finish_job_handler (struct request *r);
-static int reply_init (struct connection *c, uint32_t request_id, uint32_t flags, NCDValue *reply_data);
+static void request_terminate (struct request *r);
+static struct reply * reply_init (struct connection *c, uint32_t request_id, uint32_t type, NCDValue *reply_data);
+static void reply_start (struct reply *r);
 static void reply_free (struct reply *r);
 static void reply_send_qflow_if_handler_done (struct reply *r);
 static void instance_free (struct instance *o);
@@ -245,10 +246,9 @@ static void connection_terminate (struct connection *c)
     for (LinkedList0Node *ln = LinkedList0_GetFirst(&c->requests_list); ln; ln = LinkedList0Node_Next(ln)) {
         struct request *r = UPPER_OBJECT(ln, struct request, requests_list_node);
         
-        if (!r->finished) {
-            NCDModuleProcess_Terminate(&r->process);
+        if (!r->terminating) {
+            request_terminate(r);
         }
-        BPending_Unset(&r->finish_job);
     }
     
     connection_free_link(c);
@@ -276,7 +276,7 @@ static void connection_recv_decoder_handler_error (struct connection *c)
     struct instance *o = c->inst;
     ASSERT(c->state == CONNECTION_STATE_RUNNING)
     
-    ModuleLog(o->i, BLOG_INFO, "decoder error");
+    ModuleLog(o->i, BLOG_ERROR, "decoder error");
     
     connection_terminate(c);
 }
@@ -291,26 +291,62 @@ static void connection_recv_if_handler_send (struct connection *c, uint8_t *data
     PacketPassInterface_Done(&c->recv_if);
     
     if (data_len < sizeof(struct requestproto_header)) {
-        ModuleLog(o->i, BLOG_INFO, "missing requestproto header");
-        return;
+        ModuleLog(o->i, BLOG_ERROR, "missing requestproto header");
+        goto fail;
     }
     
-    struct requestproto_header *header = (struct requestproto_header *)data;
+    struct requestproto_header *header = (void *)data;
     uint32_t request_id = ltoh32(header->request_id);
-    uint32_t flags = ltoh32(header->flags);
+    uint32_t type = ltoh32(header->type);
     
-    if (flags != REQUESTPROTO_REQUEST_FLAG) {
-        ModuleLog(o->i, BLOG_INFO, "invalid requestproto flags");
-        return;
+    switch (type) {
+        case REQUESTPROTO_TYPE_CLIENT_REQUEST: {
+            if (find_request(c, request_id)) {
+                ModuleLog(o->i, BLOG_ERROR, "request with the same ID already exists");
+                goto fail;
+            }
+            
+            if (!request_init(c, request_id, data + sizeof(*header), data_len - sizeof(*header))) {
+                goto fail;
+            }
+        } break;
+        
+        case REQUESTPROTO_TYPE_CLIENT_ABORT: {
+            struct request *r = find_request(c, request_id);
+            if (!r) {
+                // this is expected if we finish before we get the abort
+                return;
+            }
+            
+            if (!r->terminating) {
+                struct reply *rpl = reply_init(c, r->request_id, REQUESTPROTO_TYPE_SERVER_ERROR, NULL);
+                if (!rpl) {
+                    ModuleLog(o->i, BLOG_ERROR, "failed to submit error");
+                    goto fail;
+                }
+                
+                // send reply first!
+                request_terminate(r);
+                reply_start(rpl);
+            }
+        } break;
+        
+        default:
+            ModuleLog(o->i, BLOG_ERROR, "invalid requestproto type");
+            goto fail;
     }
     
-    request_init(c, request_id, data + sizeof(*header), data_len - sizeof(*header));
+    return;
+    
+fail:
+    connection_terminate(c);
 }
 
 static int request_init (struct connection *c, uint32_t request_id, const uint8_t *data, int data_len)
 {
     struct instance *o = c->inst;
     ASSERT(c->state == CONNECTION_STATE_RUNNING)
+    ASSERT(!find_request(c, request_id))
     ASSERT(data_len >= 0)
     ASSERT(data_len <= RECV_PAYLOAD_MTU)
     
@@ -344,9 +380,7 @@ static int request_init (struct connection *c, uint32_t request_id, const uint8_
     
     NCDModuleProcess_SetSpecialFuncs(&r->process, (NCDModuleProcess_func_getspecialobj)request_process_func_getspecialobj);
     
-    BPending_Init(&r->finish_job, BReactor_PendingGroup(o->i->params->reactor), (BPending_handler)request_finish_job_handler, r);
-    
-    r->finished = 0;
+    r->terminating = 0;
     
     ModuleLog(o->i, BLOG_INFO, "request initialized");
     return 1;
@@ -363,14 +397,26 @@ fail0:
 static void request_free (struct request *r)
 {
     struct connection *c = r->con;
+    NCDModuleProcess_AssertFree(&r->process);
     
-    BPending_Free(&r->finish_job);
     NCDModuleProcess_Free(&r->process);
     NCDValue_Free(&r->request_data);
     LinkedList0_Remove(&c->requests_list, &r->requests_list_node);
     free(r);
 }
 
+static struct request * find_request (struct connection *c, uint32_t request_id)
+{
+    for (LinkedList0Node *ln = LinkedList0_GetFirst(&c->requests_list); ln; ln = LinkedList0Node_Next(ln)) {
+        struct request *r = UPPER_OBJECT(ln, struct request, requests_list_node);
+        if (!r->terminating && r->request_id == request_id) {
+            return r;
+        }
+    }
+    
+    return NULL;
+}
+
 static void request_process_handler_event (struct request *r, int event)
 {
     struct connection *c = r->con;
@@ -378,15 +424,17 @@ static void request_process_handler_event (struct request *r, int event)
     
     switch (event) {
         case NCDMODULEPROCESS_EVENT_UP: {
-            ASSERT(c->state == CONNECTION_STATE_RUNNING)
+            ASSERT(!r->terminating)
         } break;
         
         case NCDMODULEPROCESS_EVENT_DOWN: {
+            ASSERT(!r->terminating)
+            
             NCDModuleProcess_Continue(&r->process);
         } break;
         
         case NCDMODULEPROCESS_EVENT_TERMINATED: {
-            ASSERT(r->finished || c->state == CONNECTION_STATE_TERMINATING)
+            ASSERT(r->terminating)
             
             request_free(r);
             
@@ -429,18 +477,16 @@ static int request_process_request_obj_func_getvar (struct request *r, const cha
     return 0;
 }
 
-static void request_finish_job_handler (struct request *r)
+static void request_terminate (struct request *r)
 {
-    struct connection *c = r->con;
-    ASSERT(c->state == CONNECTION_STATE_RUNNING)
-    ASSERT(!r->finished)
+    ASSERT(!r->terminating)
     
     NCDModuleProcess_Terminate(&r->process);
     
-    r->finished = 1;
+    r->terminating = 1;
 }
 
-static int reply_init (struct connection *c, uint32_t request_id, uint32_t flags, NCDValue *reply_data)
+static struct reply * reply_init (struct connection *c, uint32_t request_id, uint32_t type, NCDValue *reply_data)
 {
     struct instance *o = c->inst;
     ASSERT(c->state == CONNECTION_STATE_RUNNING)
@@ -463,7 +509,7 @@ static int reply_init (struct connection *c, uint32_t request_id, uint32_t flags
     struct reply_header {
         struct packetproto_header pp;
         struct requestproto_header rp;
-    };
+    } __attribute__((packed));
     
     ExpString str;
     if (!ExpString_Init(&str)) {
@@ -489,13 +535,12 @@ static int reply_init (struct connection *c, uint32_t request_id, uint32_t flags
     
     r->send_buf = ExpString_Get(&str);
     
-    struct reply_header *header = (struct reply_header *)r->send_buf;
-    header->pp.len = htol16(len - sizeof(struct packetproto_header));
+    struct reply_header *header = (void *)r->send_buf;
+    header->pp.len = htol16(len - sizeof(header->pp));
     header->rp.request_id = htol32(request_id);
-    header->rp.flags = htol32(flags);
+    header->rp.type = htol32(type);
     
-    PacketPassInterface_Sender_Send(r->send_qflow_if, r->send_buf, len);
-    return 1;
+    return r;
     
 fail2:
     ExpString_Free(&str);
@@ -504,7 +549,14 @@ fail1:
     LinkedList0_Remove(&c->replies_list, &r->replies_list_node);
     free(r);
 fail0:
-    return 0;
+    return NULL;
+}
+
+static void reply_start (struct reply *r)
+{
+    int len = ltoh16(((struct packetproto_header *)r->send_buf)->len) + sizeof(struct packetproto_header);
+    
+    PacketPassInterface_Sender_Send(r->send_qflow_if, r->send_buf, len);
 }
 
 static void reply_free (struct reply *r)
@@ -640,21 +692,18 @@ static void reply_func_new (NCDModuleInst *i)
     struct request *r = i->method_user;
     struct connection *c = r->con;
     
-    if (c->state != CONNECTION_STATE_RUNNING) {
-        ModuleLog(i, BLOG_ERROR, "connection is terminating, cannot submit reply");
-        goto fail;
-    }
-    
-    if (r->finished || BPending_IsSet(&r->finish_job)) {
-        ModuleLog(i, BLOG_ERROR, "request is already finished, cannot submit reply");
+    if (r->terminating) {
+        ModuleLog(i, BLOG_ERROR, "request is dying, cannot submit reply");
         goto fail;
     }
     
-    if (!reply_init(c, r->request_id, REQUESTPROTO_REPLY_FLAG_DATA, reply_data)) {
+    struct reply *rpl = reply_init(c, r->request_id, REQUESTPROTO_TYPE_SERVER_REPLY, reply_data);
+    if (!rpl) {
         ModuleLog(i, BLOG_ERROR, "failed to submit reply");
         goto fail;
     }
     
+    reply_start(rpl);
     return;
     
 fail:
@@ -674,24 +723,19 @@ static void finish_func_new (NCDModuleInst *i)
     struct request *r = i->method_user;
     struct connection *c = r->con;
     
-    if (c->state != CONNECTION_STATE_RUNNING) {
-        ModuleLog(i, BLOG_ERROR, "connection is terminating, cannot submit reply");
+    if (r->terminating) {
+        ModuleLog(i, BLOG_ERROR, "request is dying, cannot submit finished");
         goto fail;
     }
     
-    if (r->finished || BPending_IsSet(&r->finish_job)) {
-        ModuleLog(i, BLOG_ERROR, "request is already finished, cannot submit reply");
-        goto fail;
-    }
-    
-    BPending_Set(&r->finish_job);
-    
-    if (!reply_init(c, r->request_id, REQUESTPROTO_REPLY_FLAG_END, NULL)) {
-        ModuleLog(i, BLOG_ERROR, "failed to submit reply");
-        BPending_Unset(&r->finish_job); // don't terminate request process!
+    struct reply *rpl = reply_init(c, r->request_id, REQUESTPROTO_TYPE_SERVER_FINISHED, NULL);
+    if (!rpl) {
+        ModuleLog(i, BLOG_ERROR, "failed to submit finished");
         goto fail;
     }
     
+    request_terminate(r);
+    reply_start(rpl);
     return;
     
 fail:

+ 6 - 4
protocol/requestproto.h

@@ -32,13 +32,15 @@
 
 #include <stdint.h>
 
-#define REQUESTPROTO_REQUEST_FLAG (1 << 0)
-#define REQUESTPROTO_REPLY_FLAG_DATA (1 << 1)
-#define REQUESTPROTO_REPLY_FLAG_END (1 << 2)
+#define REQUESTPROTO_TYPE_CLIENT_REQUEST 1
+#define REQUESTPROTO_TYPE_CLIENT_ABORT 2
+#define REQUESTPROTO_TYPE_SERVER_REPLY 3
+#define REQUESTPROTO_TYPE_SERVER_FINISHED 4
+#define REQUESTPROTO_TYPE_SERVER_ERROR 5
 
 struct requestproto_header {
     uint32_t request_id;
-    uint32_t flags;
+    uint32_t type;
 } __attribute__((packed));
 
 #endif