Explorar o código

Unix sockets suck at message oriented communication. Just use a stream socket and PacketProto.

ambrop7 %!s(int64=15) %!d(string=hai) anos
pai
achega
5b8923d17f

+ 0 - 9
flow/CMakeLists.txt

@@ -1,11 +1,3 @@
-set(FLOW_ADDITIONAL_SOURCES)
-if (NOT WIN32)
-    list(APPEND FLOW_ADDITIONAL_SOURCES
-        SeqPacketSocketSink.c
-        SeqPacketSocketSource.c
-    )
-endif ()
-
 add_library(flow
     PacketPassFairQueue.c
     PacketPassPriorityQueue.c
@@ -35,6 +27,5 @@ add_library(flow
     DataProtoKeepaliveSource.c
     PacketProtoFlow.c
     SinglePacketSender.c
-    ${FLOW_ADDITIONAL_SOURCES}
 )
 target_link_libraries(flow system security)

+ 0 - 146
flow/SeqPacketSocketSink.c

@@ -1,146 +0,0 @@
-/**
- * @file SeqPacketSocketSink.c
- * @author Ambroz Bizjak <ambrop7@gmail.com>
- * 
- * @section LICENSE
- * 
- * This file is part of BadVPN.
- * 
- * BadVPN is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2
- * as published by the Free Software Foundation.
- * 
- * BadVPN is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#include <misc/debug.h>
-
-#include <flow/SeqPacketSocketSink.h>
-
-static void report_error (SeqPacketSocketSink *s, int error)
-{
-    #ifndef NDEBUG
-    s->in_error = 1;
-    DEAD_ENTER(s->dead)
-    #endif
-    
-    FlowErrorReporter_ReportError(&s->rep, &error);
-    
-    #ifndef NDEBUG
-    ASSERT(DEAD_KILLED)
-    DEAD_LEAVE(s->dead);
-    #endif
-}
-
-static int input_handler_send (SeqPacketSocketSink *s, uint8_t *data, int data_len)
-{
-    ASSERT(s->in_len == -1)
-    ASSERT(data_len >= 0)
-    ASSERT(!s->in_error)
-    DebugObject_Access(&s->d_obj);
-    
-    int res = BSocket_Send(s->bsock, data, data_len);
-    if (res < 0) {
-        int error = BSocket_GetError(s->bsock);
-        if (error == BSOCKET_ERROR_LATER) {
-            s->in_len = data_len;
-            s->in = data;
-            BSocket_EnableEvent(s->bsock, BSOCKET_WRITE);
-            return 0;
-        }
-        report_error(s, SEQPACKETSOCKETSINK_ERROR_BSOCKET);
-        return -1;
-    } else {
-        if (res != data_len) {
-            report_error(s, SEQPACKETSOCKETSINK_ERROR_WRONGSIZE);
-            return -1;
-        }
-    }
-    
-    return 1;
-}
-
-static void socket_handler (SeqPacketSocketSink *s, int event)
-{
-    ASSERT(s->in_len >= 0)
-    ASSERT(event == BSOCKET_WRITE)
-    ASSERT(!s->in_error)
-    DebugObject_Access(&s->d_obj);
-    
-    int res = BSocket_Send(s->bsock, s->in, s->in_len);
-    if (res < 0) {
-        int error = BSocket_GetError(s->bsock);
-        if (error == BSOCKET_ERROR_LATER) {
-            return;
-        }
-        report_error(s, SEQPACKETSOCKETSINK_ERROR_BSOCKET);
-        return;
-    } else {
-        if (res != s->in_len) {
-            report_error(s, SEQPACKETSOCKETSINK_ERROR_WRONGSIZE);
-            return;
-        }
-    }
-    
-    BSocket_DisableEvent(s->bsock, BSOCKET_WRITE);
-    s->in_len = -1;
-    
-    PacketPassInterface_Done(&s->input);
-    return;
-}
-
-void SeqPacketSocketSink_Init (SeqPacketSocketSink *s, FlowErrorReporter rep, BSocket *bsock, int mtu)
-{
-    ASSERT(mtu >= 0)
-    
-    // init arguments
-    s->rep = rep;
-    s->bsock = bsock;
-    
-    // init dead var
-    DEAD_INIT(s->dead);
-    
-    // add socket event handler
-    BSocket_AddEventHandler(s->bsock, BSOCKET_WRITE, (BSocket_handler)socket_handler, s);
-    
-    // init input
-    PacketPassInterface_Init(&s->input, mtu, (PacketPassInterface_handler_send)input_handler_send, s);
-    
-    // have no input packet
-    s->in_len = -1;
-    
-    // init debugging
-    #ifndef NDEBUG
-    s->in_error = 0;
-    #endif
-    
-    DebugObject_Init(&s->d_obj);
-}
-
-void SeqPacketSocketSink_Free (SeqPacketSocketSink *s)
-{
-    DebugObject_Free(&s->d_obj);
-
-    // free input
-    PacketPassInterface_Free(&s->input);
-    
-    // remove socket event handler
-    BSocket_RemoveEventHandler(s->bsock, BSOCKET_WRITE);
-    
-    // free dead var
-    DEAD_KILL(s->dead);
-}
-
-PacketPassInterface * SeqPacketSocketSink_GetInput (SeqPacketSocketSink *s)
-{
-    DebugObject_Access(&s->d_obj);
-    
-    return &s->input;
-}

+ 0 - 89
flow/SeqPacketSocketSink.h

@@ -1,89 +0,0 @@
-/**
- * @file SeqPacketSocketSink.h
- * @author Ambroz Bizjak <ambrop7@gmail.com>
- * 
- * @section LICENSE
- * 
- * This file is part of BadVPN.
- * 
- * BadVPN is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2
- * as published by the Free Software Foundation.
- * 
- * BadVPN is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- * 
- * @section DESCRIPTION
- * 
- * A {@link PacketPassInterface} sink which sends packets to a seqpacket socket.
- */
-
-#ifndef BADVPN_FLOW_SEQPACKETSOCKETSINK_H
-#define BADVPN_FLOW_SEQPACKETSOCKETSINK_H
-
-#include <stdint.h>
-
-#include <misc/dead.h>
-#include <system/DebugObject.h>
-#include <system/BSocket.h>
-#include <flow/PacketPassInterface.h>
-#include <flow/error.h>
-
-#define SEQPACKETSOCKETSINK_ERROR_BSOCKET 1
-#define SEQPACKETSOCKETSINK_ERROR_WRONGSIZE 2
-
-/**
- * A {@link PacketPassInterface} sink which sends packets to a seqpacket socket.
- */
-typedef struct {
-    DebugObject d_obj;
-    dead_t dead;
-    FlowErrorReporter rep;
-    BSocket *bsock;
-    PacketPassInterface input;
-    int in_len;
-    uint8_t *in;
-    #ifndef NDEBUG
-    int in_error;
-    #endif
-} SeqPacketSocketSink;
-
-/**
- * Initializes the sink.
- *
- * @param s the object
- * @param rep error reporting data. Error code is an int. Possible error codes:
- *              - SEQPACKETSOCKETSINK_ERROR_BSOCKET: {@link BSocket_Send} failed
- *                with an unhandled error code
- *              - SEQPACKETSOCKETSINK_ERROR_WRONGSIZE: {@link BSocket_Send} succeeded,
- *                but did not send all of the packet
- *            The object must be freed from the error handler.
- * @param bsock socket to write packets to. Registers a BSOCKET_WRITE handler which
- *              must not be registered.
- * @param mtu maximum packet size. Must be >=0.
- */
-void SeqPacketSocketSink_Init (SeqPacketSocketSink *s, FlowErrorReporter rep, BSocket *bsock, int mtu);
-
-/**
- * Frees the sink.
- *
- * @param s the object
- */
-void SeqPacketSocketSink_Free (SeqPacketSocketSink *s);
-
-/**
- * Returns the input interface.
- * The MTU of the interface will be as in {@link SeqPacketSocketSink_Init}.
- *
- * @param s the object
- * @return input interface
- */
-PacketPassInterface * SeqPacketSocketSink_GetInput (SeqPacketSocketSink *s);
-
-#endif

+ 0 - 150
flow/SeqPacketSocketSource.c

@@ -1,150 +0,0 @@
-/**
- * @file SeqPacketSocketSource.c
- * @author Ambroz Bizjak <ambrop7@gmail.com>
- * 
- * @section LICENSE
- * 
- * This file is part of BadVPN.
- * 
- * BadVPN is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2
- * as published by the Free Software Foundation.
- * 
- * BadVPN is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#include <stdlib.h>
-
-#include <misc/debug.h>
-
-#include <flow/SeqPacketSocketSource.h>
-
-static void report_error (SeqPacketSocketSource *s, int error)
-{
-    #ifndef NDEBUG
-    s->in_error = 1;
-    DEAD_ENTER(s->dead)
-    #endif
-    
-    FlowErrorReporter_ReportError(&s->rep, &error);
-    
-    #ifndef NDEBUG
-    ASSERT(DEAD_KILLED)
-    DEAD_LEAVE(s->dead);
-    #endif
-}
-
-static int output_handler_recv (SeqPacketSocketSource *s, uint8_t *data, int *data_len)
-{
-    ASSERT(!s->out_have)
-    ASSERT(!s->in_error)
-    DebugObject_Access(&s->d_obj);
-    
-    int res = BSocket_Recv(s->bsock, data, s->mtu);
-    if (res < 0) {
-        int error = BSocket_GetError(s->bsock);
-        if (error == BSOCKET_ERROR_LATER) {
-            s->out_have = 1;
-            s->out = data;
-            BSocket_EnableEvent(s->bsock, BSOCKET_READ);
-            return 0;
-        }
-        report_error(s, SEQPACKETSOCKETSOURCE_ERROR_BSOCKET);
-        return -1;
-    }
-    
-    if (res == 0) {
-        report_error(s, SEQPACKETSOCKETSOURCE_ERROR_CLOSED);
-        return -1;
-    }
-    
-    *data_len = res;
-    return 1;
-}
-
-static void socket_handler (SeqPacketSocketSource *s, int event)
-{
-    ASSERT(s->out_have)
-    ASSERT(event == BSOCKET_READ)
-    ASSERT(!s->in_error)
-    DebugObject_Access(&s->d_obj);
-    
-    int res = BSocket_Recv(s->bsock, s->out, s->mtu);
-    if (res < 0) {
-        int error = BSocket_GetError(s->bsock);
-        if (error == BSOCKET_ERROR_LATER) {
-            // nothing to receive, continue in socket_handler
-            return;
-        }
-        report_error(s, SEQPACKETSOCKETSOURCE_ERROR_BSOCKET);
-        return;
-    }
-    
-    if (res == 0) {
-        report_error(s, SEQPACKETSOCKETSOURCE_ERROR_CLOSED);
-        return;
-    }
-    
-    BSocket_DisableEvent(s->bsock, BSOCKET_READ);
-    s->out_have = 0;
-    
-    PacketRecvInterface_Done(&s->output, res);
-    return;
-}
-
-void SeqPacketSocketSource_Init (SeqPacketSocketSource *s, FlowErrorReporter rep, BSocket *bsock, int mtu)
-{
-    ASSERT(mtu >= 0)
-    
-    // init arguments
-    s->rep = rep;
-    s->bsock = bsock;
-    s->mtu = mtu;
-    
-    // init dead var
-    DEAD_INIT(s->dead);
-    
-    // add socket event handler
-    BSocket_AddEventHandler(s->bsock, BSOCKET_READ, (BSocket_handler)socket_handler, s);
-    
-    // init output
-    PacketRecvInterface_Init(&s->output, mtu, (PacketRecvInterface_handler_recv)output_handler_recv, s);
-    
-    // have no output packet
-    s->out_have = 0;
-    
-    // init debugging
-    #ifndef NDEBUG
-    s->in_error = 0;
-    #endif
-    
-    DebugObject_Init(&s->d_obj);
-}
-
-void SeqPacketSocketSource_Free (SeqPacketSocketSource *s)
-{
-    DebugObject_Free(&s->d_obj);
-
-    // free output
-    PacketRecvInterface_Free(&s->output);
-    
-    // remove socket event handler
-    BSocket_RemoveEventHandler(s->bsock, BSOCKET_READ);
-    
-    // free dead var
-    DEAD_KILL(s->dead);
-}
-
-PacketRecvInterface * SeqPacketSocketSource_GetOutput (SeqPacketSocketSource *s)
-{
-    DebugObject_Access(&s->d_obj);
-    
-    return &s->output;
-}

+ 0 - 87
flow/SeqPacketSocketSource.h

@@ -1,87 +0,0 @@
-/**
- * @file SeqPacketSocketSource.h
- * @author Ambroz Bizjak <ambrop7@gmail.com>
- * 
- * @section LICENSE
- * 
- * This file is part of BadVPN.
- * 
- * BadVPN is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2
- * as published by the Free Software Foundation.
- * 
- * BadVPN is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- * 
- * @section DESCRIPTION
- * 
- * A {@link PacketRecvInterface} source which receives packets from a seqpacket socket.
- */
-
-#ifndef BADVPN_FLOW_SEQPACKETSOCKETSOURCE_H
-#define BADVPN_FLOW_SEQPACKETSOCKETSOURCE_H
-
-#include <misc/dead.h>
-#include <system/DebugObject.h>
-#include <system/BSocket.h>
-#include <flow/error.h>
-#include <flow/PacketRecvInterface.h>
-
-#define SEQPACKETSOCKETSOURCE_ERROR_CLOSED 0
-#define SEQPACKETSOCKETSOURCE_ERROR_BSOCKET 1
-
-/**
- * A {@link PacketRecvInterface} source which receives packets from a seqpacket socket.
- */
-typedef struct {
-    DebugObject d_obj;
-    dead_t dead;
-    FlowErrorReporter rep;
-    BSocket *bsock;
-    int mtu;
-    PacketRecvInterface output;
-    int out_have;
-    uint8_t *out;
-    #ifndef NDEBUG
-    int in_error;
-    #endif
-} SeqPacketSocketSource;
-
-/**
- * Initializes the object.
- *
- * @param s the object
- * @param rep error reporting data. Error code is an int. Possible error codes:
- *              - SEQPACKETSOCKETSOURCE_ERROR_CLOSED: {@link BSocket_Recv} returned 0
- *              - SEQPACKETSOCKETSOURCE_ERROR_BSOCKET: {@link BSocket_Recv} failed
- *                with an unhandled error code
- *            The object must be freed from the error handler.
- * @param bsock socket to read data from. The BSOCKET_READ event must be disabled.
- * *            Takes over reading on the socket.
- * @param mtu maximum packet size. Must be >=0.
- */
-void SeqPacketSocketSource_Init (SeqPacketSocketSource *s, FlowErrorReporter rep, BSocket *bsock, int mtu);
-
-/**
- * Frees the object.
- *
- * @param s the object
- */
-void SeqPacketSocketSource_Free (SeqPacketSocketSource *s);
-
-/**
- * Returns the output interface.
- * The MTU of the interface will be as in {@link SeqPacketSocketSource_Init}.
- *
- * @param s the object
- * @return output interface
- */
-PacketRecvInterface * SeqPacketSocketSource_GetOutput (SeqPacketSocketSource *s);
-
-#endif

+ 71 - 27
ipc/BIPC.c

@@ -24,10 +24,11 @@
 
 #define COMPONENT_SOURCE 1
 #define COMPONENT_SINK 2
+#define COMPONENT_DECODER 3
 
 static void error_handler (BIPC *o, int component, const void *data)
 {
-    ASSERT(component == COMPONENT_SOURCE || component == COMPONENT_SINK)
+    ASSERT(component == COMPONENT_SOURCE || component == COMPONENT_SINK || component == COMPONENT_DECODER)
     DebugObject_Access(&o->d_obj);
     
     #ifndef NDEBUG
@@ -42,10 +43,62 @@ static void error_handler (BIPC *o, int component, const void *data)
     #endif
 }
 
+static int init_io (BIPC *o, int send_mtu, int recv_mtu, BReactor *reactor)
+{
+    // init error domain
+    FlowErrorDomain_Init(&o->domain, (FlowErrorDomain_handler)error_handler, o);
+    
+    // init sending
+    StreamSocketSink_Init(&o->send_sink, FlowErrorReporter_Create(&o->domain, COMPONENT_SINK), &o->sock);
+    PacketStreamSender_Init(&o->send_pss, StreamSocketSink_GetInput(&o->send_sink), PACKETPROTO_ENCLEN(send_mtu));
+    PacketCopier_Init(&o->send_copier, send_mtu);
+    PacketProtoEncoder_Init(&o->send_encoder, PacketCopier_GetOutput(&o->send_copier));
+    if (!SinglePacketBuffer_Init(&o->send_buf, PacketProtoEncoder_GetOutput(&o->send_encoder), PacketStreamSender_GetInput(&o->send_pss), BReactor_PendingGroup(reactor))) {
+        goto fail1;
+    }
+    
+    // init receiving
+    StreamSocketSource_Init(&o->recv_source, FlowErrorReporter_Create(&o->domain, COMPONENT_SOURCE), &o->sock);
+    PacketCopier_Init(&o->recv_copier, recv_mtu);
+    if (!PacketProtoDecoder_Init(&o->recv_decoder, FlowErrorReporter_Create(&o->domain, COMPONENT_DECODER), StreamSocketSource_GetOutput(&o->recv_source), PacketCopier_GetInput(&o->recv_copier), BReactor_PendingGroup(reactor))) {
+        goto fail2;
+    }
+    
+    return 1;
+    
+fail2:
+    PacketCopier_Free(&o->recv_copier);
+    StreamSocketSource_Free(&o->recv_source);
+    SinglePacketBuffer_Free(&o->send_buf);
+fail1:
+    PacketProtoEncoder_Free(&o->send_encoder);
+    PacketCopier_Free(&o->send_copier);
+    PacketStreamSender_Free(&o->send_pss);
+    StreamSocketSink_Free(&o->send_sink);
+    return 0;
+}
+
+static void free_io (BIPC *o)
+{
+    // free receiving
+    PacketProtoDecoder_Free(&o->recv_decoder);
+    PacketCopier_Free(&o->recv_copier);
+    StreamSocketSource_Free(&o->recv_source);
+    
+    // free sending
+    SinglePacketBuffer_Free(&o->send_buf);
+    PacketProtoEncoder_Free(&o->send_encoder);
+    PacketCopier_Free(&o->send_copier);
+    PacketStreamSender_Free(&o->send_pss);
+    StreamSocketSink_Free(&o->send_sink);
+}
+
 int BIPC_InitConnect (BIPC *o, const char *path, int send_mtu, int recv_mtu, BIPC_handler handler, void *user, BReactor *reactor)
 {
     ASSERT(send_mtu >= 0)
+    ASSERT(send_mtu <= PACKETPROTO_MAXPAYLOAD)
     ASSERT(recv_mtu >= 0)
+    ASSERT(recv_mtu <= PACKETPROTO_MAXPAYLOAD)
     
     // init arguments
     o->handler = handler;
@@ -55,7 +108,7 @@ int BIPC_InitConnect (BIPC *o, const char *path, int send_mtu, int recv_mtu, BIP
     DEAD_INIT(o->dead);
     
     // init socket
-    if (BSocket_Init(&o->sock, reactor, BADDR_TYPE_UNIX, BSOCKET_TYPE_DGRAM) < 0) {
+    if (BSocket_Init(&o->sock, reactor, BADDR_TYPE_UNIX, BSOCKET_TYPE_STREAM) < 0) {
         DEBUG("BSocket_Init failed");
         goto fail0;
     }
@@ -66,14 +119,10 @@ int BIPC_InitConnect (BIPC *o, const char *path, int send_mtu, int recv_mtu, BIP
         goto fail1;
     }
     
-    // init error domain
-    FlowErrorDomain_Init(&o->domain, (FlowErrorDomain_handler)error_handler, o);
-    
-    // init sink
-    SeqPacketSocketSink_Init(&o->sink, FlowErrorReporter_Create(&o->domain, COMPONENT_SINK), &o->sock, send_mtu);
-    
-    // init source
-    SeqPacketSocketSource_Init(&o->source, FlowErrorReporter_Create(&o->domain, COMPONENT_SOURCE), &o->sock, recv_mtu);
+    // init I/O
+    if (!init_io(o, send_mtu, recv_mtu, reactor)) {
+        goto fail1;
+    }
     
     DebugObject_Init(&o->d_obj);
     
@@ -85,7 +134,7 @@ fail0:
     return 0;
 }
 
-int BIPC_InitAccept (BIPC *o, BIPCServer *server, int send_mtu, int recv_mtu, BIPC_handler handler, void *user)
+int BIPC_InitAccept (BIPC *o, BIPCServer *server, int send_mtu, int recv_mtu, BIPC_handler handler, void *user, BReactor *reactor)
 {
     ASSERT(send_mtu >= 0)
     ASSERT(recv_mtu >= 0)
@@ -98,24 +147,22 @@ int BIPC_InitAccept (BIPC *o, BIPCServer *server, int send_mtu, int recv_mtu, BI
     DEAD_INIT(o->dead);
     
     // accept socket
-    if (Listener_Accept(&server->listener, &o->sock, NULL) < 0) {
+    if (!Listener_Accept(&server->listener, &o->sock, NULL)) {
         DEBUG("Listener_Accept failed");
         goto fail0;
     }
     
-    // init error domain
-    FlowErrorDomain_Init(&o->domain, (FlowErrorDomain_handler)error_handler, o);
-    
-    // init sink
-    SeqPacketSocketSink_Init(&o->sink, FlowErrorReporter_Create(&o->domain, COMPONENT_SINK), &o->sock, send_mtu);
-    
-    // init source
-    SeqPacketSocketSource_Init(&o->source, FlowErrorReporter_Create(&o->domain, COMPONENT_SOURCE), &o->sock, recv_mtu);
+    // init I/O
+    if (!init_io(o, send_mtu, recv_mtu, reactor)) {
+        goto fail1;
+    }
     
     DebugObject_Init(&o->d_obj);
     
     return 1;
     
+fail1:
+    BSocket_Free(&o->sock);
 fail0:
     return 0;
 }
@@ -124,11 +171,8 @@ void BIPC_Free (BIPC *o)
 {
     DebugObject_Free(&o->d_obj);
     
-    // free source
-    SeqPacketSocketSource_Free(&o->source);
-    
-    // free sink
-    SeqPacketSocketSink_Free(&o->sink);
+    // free I/O
+    free_io(o);
     
     // free socket
     BSocket_Free(&o->sock);
@@ -141,12 +185,12 @@ PacketPassInterface * BIPC_GetSendInterface (BIPC *o)
 {
     DebugObject_Access(&o->d_obj);
     
-    return SeqPacketSocketSink_GetInput(&o->sink);
+    return PacketCopier_GetInput(&o->send_copier);
 }
 
 PacketRecvInterface * BIPC_GetRecvInterface (BIPC *o)
 {
     DebugObject_Access(&o->d_obj);
     
-    return SeqPacketSocketSource_GetOutput(&o->source);
+    return PacketCopier_GetOutput(&o->recv_copier);
 }

+ 25 - 7
ipc/BIPC.h

@@ -28,12 +28,18 @@
 #ifndef BADVPN_IPC_BIPC_H
 #define BADVPN_IPC_BIPC_H
 
+#include <protocol/packetproto.h>
 #include <misc/debug.h>
 #include <misc/dead.h>
 #include <system/BSocket.h>
 #include <system/DebugObject.h>
-#include <flow/SeqPacketSocketSink.h>
-#include <flow/SeqPacketSocketSource.h>
+#include <flow/StreamSocketSink.h>
+#include <flow/StreamSocketSource.h>
+#include <flow/PacketProtoEncoder.h>
+#include <flow/PacketProtoDecoder.h>
+#include <flow/SinglePacketBuffer.h>
+#include <flow/PacketCopier.h>
+#include <flow/PacketStreamSender.h>
 #include <ipc/BIPCServer.h>
 
 /**
@@ -51,10 +57,21 @@ typedef struct {
     dead_t dead;
     BSocket sock;
     FlowErrorDomain domain;
-    SeqPacketSocketSink sink;
-    SeqPacketSocketSource source;
     BIPC_handler handler;
     void *user;
+    
+    // sending
+    PacketCopier send_copier;
+    PacketProtoEncoder send_encoder;
+    SinglePacketBuffer send_buf;
+    PacketStreamSender send_pss;
+    StreamSocketSink send_sink;
+    
+    // receiving
+    StreamSocketSource recv_source;
+    PacketProtoDecoder recv_decoder;
+    PacketCopier recv_copier;
+    
     DebugObject d_obj;
 } BIPC;
 
@@ -64,8 +81,8 @@ typedef struct {
  * @param o the object
  * @param path path of the IPC object. On *nix path of the unix socket, on Windows
  *             path of the named pipe.
- * @param send_mtu maximum packet size for sending. Must be >=0.
- * @param recv_mtu maximum packet size for receiving. Must be >=0.
+ * @param send_mtu maximum packet size for sending. Must be >=0 and <=PACKETPROTO_MAXPAYLOAD.
+ * @param recv_mtu maximum packet size for receiving. Must be >=0 and <=PACKETPROTO_MAXPAYLOAD.
  * @param handler handler function called when an error occurs
  * @param user value to pass to handler function
  * @param reactor reactor we live in
@@ -82,9 +99,10 @@ int BIPC_InitConnect (BIPC *o, const char *path, int send_mtu, int recv_mtu, BIP
  * @param recv_mtu maximum packet size for receiving. Must be >=0.
  * @param handler handler function called when an error occurs
  * @param user value to pass to handler function
+ * @param reactor reactor we live in
  * @return 1 on success, 0 on failure
  */
-int BIPC_InitAccept (BIPC *o, BIPCServer *server, int send_mtu, int recv_mtu, BIPC_handler handler, void *user) WARN_UNUSED;
+int BIPC_InitAccept (BIPC *o, BIPCServer *server, int send_mtu, int recv_mtu, BIPC_handler handler, void *user, BReactor *reactor) WARN_UNUSED;
 
 /**
  * Frees the object.

+ 1 - 1
ipc/BIPCServer.c

@@ -40,7 +40,7 @@ int BIPCServer_Init (BIPCServer *o, const char *path, BIPCServer_handler handler
     DEAD_INIT(o->dead);
     
     // init socket
-    if (BSocket_Init(&o->sock, reactor, BADDR_TYPE_UNIX, BSOCKET_TYPE_DGRAM) < 0) {
+    if (BSocket_Init(&o->sock, reactor, BADDR_TYPE_UNIX, BSOCKET_TYPE_STREAM) < 0) {
         DEBUG("BSocket_Init failed");
         goto fail0;
     }