Просмотр исходного кода

client: give each peer its own buffer for sending server messages to that peer. Add PeerChatSender and
SCOutmsgEncoder for this.

ambrop7 14 лет назад
Родитель
Сommit
06b7b694d7
7 измененных файлов с 532 добавлено и 65 удалено
  1. 2 0
      client/CMakeLists.txt
  2. 83 0
      client/PeerChatSender.c
  3. 52 0
      client/PeerChatSender.h
  4. 97 0
      client/SCOutmsgEncoder.c
  5. 46 0
      client/SCOutmsgEncoder.h
  6. 233 62
      client/client.c
  7. 19 3
      client/client.h

+ 2 - 0
client/CMakeLists.txt

@@ -12,6 +12,8 @@ add_executable(badvpn-client
     SPProtoEncoder.c
     SPProtoDecoder.c
     DataProtoKeepaliveSource.c
+    PeerChatSender.c
+    SCOutmsgEncoder.c
 )
 target_link_libraries(badvpn-client system flow flowextra tuntap server_conection security ${NSPR_LIBRARIES} ${NSS_LIBRARIES})
 

+ 83 - 0
client/PeerChatSender.c

@@ -0,0 +1,83 @@
+/**
+ * @file PeerChatSender.c
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <string.h>
+
+#include <misc/byteorder.h>
+
+#include "PeerChatSender.h"
+
+int PeerChatSender_Init (PeerChatSender *o, peerid_t peer_id, PacketPassInterface *output, BPendingGroup *pg, void *user, PeerChatSender_handler_error handler_error)
+{
+    ASSERT(PacketPassInterface_GetMTU(output) >= sizeof(struct packetproto_header) + SC_MAX_ENC)
+    
+    // init arguments
+    o->user = user;
+    o->handler_error = handler_error;
+    
+    // init copier
+    PacketCopier_Init(&o->copier, SC_MAX_MSGLEN, pg);
+    
+    // init SC encoder
+    SCOutmsgEncoder_Init(&o->sc_encoder, peer_id, PacketCopier_GetOutput(&o->copier), pg);
+    
+    // init PacketProto encoder
+    PacketProtoEncoder_Init(&o->pp_encoder, SCOutmsgEncoder_GetOutput(&o->sc_encoder), pg);
+    
+    // init buffer
+    if (!SinglePacketBuffer_Init(&o->buffer, PacketProtoEncoder_GetOutput(&o->pp_encoder), output, pg)) {
+        goto fail1;
+    }
+    
+    DebugObject_Init(&o->d_obj);
+    return 1;
+    
+fail1:
+    PacketProtoEncoder_Free(&o->pp_encoder);
+    SCOutmsgEncoder_Free(&o->sc_encoder);
+    PacketCopier_Free(&o->copier);
+    return 0;
+}
+
+void PeerChatSender_Free (PeerChatSender *o)
+{
+    DebugObject_Free(&o->d_obj);
+    
+    // free buffer
+    SinglePacketBuffer_Free(&o->buffer);
+    
+    // free PacketProto encoder
+    PacketProtoEncoder_Free(&o->pp_encoder);
+    
+    // free SC encoder
+    SCOutmsgEncoder_Free(&o->sc_encoder);
+    
+    // free copier
+    PacketCopier_Free(&o->copier);
+}
+
+PacketPassInterface * PeerChatSender_GetInput (PeerChatSender *o)
+{
+    DebugObject_Access(&o->d_obj);
+    
+    return PacketCopier_GetInput(&o->copier);
+}

+ 52 - 0
client/PeerChatSender.h

@@ -0,0 +1,52 @@
+/**
+ * @file PeerChatSender.h
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef BADVPN_PEERCHATSENDER_H
+#define BADVPN_PEERCHATSENDER_H
+
+#include <protocol/packetproto.h>
+#include <protocol/scproto.h>
+#include <misc/debug.h>
+#include <base/DebugObject.h>
+#include <flow/SinglePacketSender.h>
+#include <flow/SinglePacketBuffer.h>
+#include <flow/PacketProtoEncoder.h>
+#include <flow/PacketCopier.h>
+#include <client/SCOutmsgEncoder.h>
+
+typedef void (*PeerChatSender_handler_error) (void *user);
+
+typedef struct {
+    void *user;
+    PeerChatSender_handler_error handler_error;
+    SinglePacketBuffer buffer;
+    PacketProtoEncoder pp_encoder;
+    SCOutmsgEncoder sc_encoder;
+    PacketCopier copier;
+    DebugObject d_obj;
+} PeerChatSender;
+
+int PeerChatSender_Init (PeerChatSender *o, peerid_t peer_id, PacketPassInterface *output, BPendingGroup *pg, void *user, PeerChatSender_handler_error handler_error) WARN_UNUSED;
+void PeerChatSender_Free (PeerChatSender *o);
+PacketPassInterface * PeerChatSender_GetInput (PeerChatSender *o);
+
+#endif

+ 97 - 0
client/SCOutmsgEncoder.c

@@ -0,0 +1,97 @@
+/**
+ * @file SCOutmsgEncoder.c
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <stddef.h>
+#include <limits.h>
+
+#include <misc/balign.h>
+#include <misc/debug.h>
+#include <misc/byteorder.h>
+
+#include "SCOutmsgEncoder.h"
+
+static void output_handler_recv (SCOutmsgEncoder *enc, uint8_t *data)
+{
+    ASSERT(!enc->output_packet)
+    ASSERT(data)
+    DebugObject_Access(&enc->d_obj);
+    
+    // schedule receive
+    enc->output_packet = data;
+    PacketRecvInterface_Receiver_Recv(enc->input, enc->output_packet + SCOUTMSG_OVERHEAD);
+}
+
+static void input_handler_done (SCOutmsgEncoder *enc, int in_len)
+{
+    ASSERT(enc->output_packet)
+    DebugObject_Access(&enc->d_obj);
+    
+    // write SC header
+    struct sc_header *header = (struct sc_header *)enc->output_packet;
+    header->type = htol8(SCID_OUTMSG);
+    
+    // write outmsg
+    struct sc_client_outmsg *outmsg = (struct sc_client_outmsg *)(header + 1);
+    outmsg->clientid = htol16(enc->peer_id);
+    
+    // finish output packet
+    enc->output_packet = NULL;
+    PacketRecvInterface_Done(&enc->output, SCOUTMSG_OVERHEAD + in_len);
+}
+
+void SCOutmsgEncoder_Init (SCOutmsgEncoder *enc, peerid_t peer_id, PacketRecvInterface *input, BPendingGroup *pg)
+{
+    ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - SCOUTMSG_OVERHEAD)
+    
+    // init arguments
+    enc->peer_id = peer_id;
+    enc->input = input;
+    
+    // init input
+    PacketRecvInterface_Receiver_Init(enc->input, (PacketRecvInterface_handler_done)input_handler_done, enc);
+    
+    // init output
+    PacketRecvInterface_Init(
+        &enc->output, SCOUTMSG_OVERHEAD + PacketRecvInterface_GetMTU(enc->input),
+        (PacketRecvInterface_handler_recv)output_handler_recv, enc, pg
+    );
+    
+    // set no output packet
+    enc->output_packet = NULL;
+    
+    DebugObject_Init(&enc->d_obj);
+}
+
+void SCOutmsgEncoder_Free (SCOutmsgEncoder *enc)
+{
+    DebugObject_Free(&enc->d_obj);
+
+    // free input
+    PacketRecvInterface_Free(&enc->output);
+}
+
+PacketRecvInterface * SCOutmsgEncoder_GetOutput (SCOutmsgEncoder *enc)
+{
+    DebugObject_Access(&enc->d_obj);
+    
+    return &enc->output;
+}

+ 46 - 0
client/SCOutmsgEncoder.h

@@ -0,0 +1,46 @@
+/**
+ * @file SCOutmsgEncoder.h
+ * @author Ambroz Bizjak <ambrop7@gmail.com>
+ * 
+ * @section LICENSE
+ * 
+ * This file is part of BadVPN.
+ * 
+ * BadVPN is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2
+ * as published by the Free Software Foundation.
+ * 
+ * BadVPN is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef BADVPN_SCOUTMSGENCODER_H
+#define BADVPN_SCOUTMSGENCODER_H
+
+#include <stdint.h>
+
+#include <protocol/scproto.h>
+#include <base/DebugObject.h>
+#include <flow/PacketRecvInterface.h>
+
+#define SCOUTMSG_OVERHEAD (sizeof(struct sc_header) + sizeof(struct sc_client_outmsg))
+
+typedef struct {
+    peerid_t peer_id;
+    PacketRecvInterface *input;
+    PacketRecvInterface output;
+    uint8_t *output_packet;
+    DebugObject d_obj;
+} SCOutmsgEncoder;
+
+void SCOutmsgEncoder_Init (SCOutmsgEncoder *enc, peerid_t peer_id, PacketRecvInterface *input, BPendingGroup *pg);
+void SCOutmsgEncoder_Free (SCOutmsgEncoder *enc);
+PacketRecvInterface * SCOutmsgEncoder_GetOutput (SCOutmsgEncoder *enc);
+
+#endif

+ 233 - 62
client/client.c

@@ -183,6 +183,15 @@ ServerConnection server;
 // my ID, defined only after server_ready
 peerid_t my_id;
 
+// fair queue for sending peer messages to the server
+PacketPassFairQueue server_queue;
+
+// whether server is ready
+int server_ready;
+
+// dying server flow
+struct server_flow *dying_server_flow;
+
 // stops event processing, causing the program to exit
 static void terminate (void);
 
@@ -201,17 +210,11 @@ static int process_arguments (void);
 // handler for program termination request
 static void signal_handler (void *unused);
 
-// provides a buffer for sending a packet to the server
-static int server_start_msg (void **data, peerid_t peer_id, int type, int len);
-
-// submits a written packet to the server
-static void server_end_msg (void);
-
 // adds a new peer
 static void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len);
 
 // removes a peer
-static void peer_remove (struct peer_data *peer);
+static void peer_remove (struct peer_data *peer, int exiting);
 
 // passes a message to the logger, prepending it info about the peer
 static void peer_log (struct peer_data *peer, int level, const char *fmt, ...);
@@ -327,8 +330,16 @@ static void server_handler_newclient (void *user, peerid_t peer_id, int flags, c
 static void server_handler_endclient (void *user, peerid_t peer_id);
 static void server_handler_message (void *user, peerid_t peer_id, uint8_t *data, int data_len);
 
-// job to generate and send OTP seed after binding
+// jobs
 static void peer_job_send_seed_after_binding (struct peer_data *peer);
+static void peer_job_init (struct peer_data *peer);
+
+static struct server_flow * server_flow_init (peerid_t peer_id);
+static void server_flow_free (struct server_flow *flow);
+static void server_flow_die (struct server_flow *flow);
+static void server_flow_qflow_handler_busy (struct server_flow *flow);
+static int server_flow_start_message (struct server_flow *flow, uint8_t **buf, int len);
+static void server_flow_end_message (struct server_flow *flow);
 
 int main (int argc, char *argv[])
 {
@@ -532,17 +543,36 @@ int main (int argc, char *argv[])
         goto fail9;
     }
     
+    // set server not ready
+    server_ready = 0;
+    
+    // set no dying flow
+    dying_server_flow = NULL;
+    
     // enter event loop
     BLog(BLOG_NOTICE, "entering event loop");
     BReactor_Exec(&ss);
     
+    // allow freeing server queue flows
+    if (server_ready) {
+        PacketPassFairQueue_PrepareFree(&server_queue);
+    }
+    
     // free peers
     LinkedList2Node *node;
     while (node = LinkedList2_GetFirst(&peers)) {
         struct peer_data *peer = UPPER_OBJECT(node, struct peer_data, list_node);
-        peer_remove(peer);
+        peer_remove(peer, 1);
     }
     
+    // free dying server flow
+    if (dying_server_flow) {
+        server_flow_free(dying_server_flow);
+    }
+    
+    if (server_ready) {
+        PacketPassFairQueue_Free(&server_queue);
+    }
     ServerConnection_Free(&server);
 fail9:
     FrameDecider_Free(&frame_decider);
@@ -1217,40 +1247,6 @@ void signal_handler (void *unused)
     terminate();
 }
 
-int server_start_msg (void **data, peerid_t peer_id, int type, int len)
-{
-    ASSERT(ServerConnection_IsReady(&server))
-    ASSERT(len >= 0)
-    ASSERT(len <= MSG_MAX_PAYLOAD)
-    ASSERT(!(len > 0) || data)
-    
-    uint8_t *packet;
-    if (!ServerConnection_StartMessage(&server, &packet, peer_id, msg_SIZEtype + msg_SIZEpayload(len))) {
-        BLog(BLOG_ERROR, "out of server buffer, exiting");
-        terminate();
-        return -1;
-    }
-    
-    msgWriter writer;
-    msgWriter_Init(&writer, packet);
-    msgWriter_Addtype(&writer, type);
-    uint8_t *payload_dst = msgWriter_Addpayload(&writer, len);
-    msgWriter_Finish(&writer);
-    
-    if (data) {
-        *data = payload_dst;
-    }
-    
-    return 0;
-}
-
-void server_end_msg (void)
-{
-    ASSERT(ServerConnection_IsReady(&server))
-    
-    ServerConnection_EndMessage(&server);
-}
-
 void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
 {
     ASSERT(ServerConnection_IsReady(&server))
@@ -1276,6 +1272,19 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     // set no common name
     peer->common_name = NULL;
     
+    // init jobs
+    BPending_Init(&peer->job_send_seed_after_binding, BReactor_PendingGroup(&ss), (BPending_handler)peer_job_send_seed_after_binding, peer);
+    BPending_Init(&peer->job_init, BReactor_PendingGroup(&ss), (BPending_handler)peer_job_init, peer);
+    
+    // set init job (must be before initing server flow so we can send)
+    BPending_Set(&peer->job_init);
+    
+    // init server flow
+    if (!(peer->server_flow = server_flow_init(peer->id))) {
+        peer_log(peer, BLOG_ERROR, "server_flow_init failed");
+        goto fail1;
+    }
+    
     if (options.ssl) {
         // remember certificate
         memcpy(peer->cert, cert, cert_len);
@@ -1285,7 +1294,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         // in which case following workaroud wouldn't help
         if (!(cert_len > 0 && (cert[0] & 0x1f) == 0x10)) {
             peer_log(peer, BLOG_ERROR, "certificate does not look like DER");
-            goto fail1;
+            goto fail1a;
         }
         
         // copy the certificate and append it a good load of zero bytes,
@@ -1294,7 +1303,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         uint8_t *certbuf = malloc(cert_len + 100);
         if (!certbuf) {
             peer_log(peer, BLOG_ERROR, "malloc failed");
-            goto fail1;
+            goto fail1a;
         }
         memcpy(certbuf, cert, cert_len);
         memset(certbuf + cert_len, 0, 100);
@@ -1304,7 +1313,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         if (!nsscert) {
             peer_log(peer, BLOG_ERROR, "CERT_DecodeCertFromPackage failed (%d)", PORT_GetError());
             free(certbuf);
-            goto fail1;
+            goto fail1a;
         }
         
         free(certbuf);
@@ -1313,7 +1322,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         if (!(peer->common_name = CERT_GetCommonName(&nsscert->subject))) {
             peer_log(peer, BLOG_ERROR, "CERT_GetCommonName failed");
             CERT_DestroyCertificate(nsscert);
-            goto fail1;
+            goto fail1a;
         }
         
         CERT_DestroyCertificate(nsscert);
@@ -1352,20 +1361,12 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     // init binding
     peer->binding = 0;
     
-    // init send seed job
-    BPending_Init(&peer->job_send_seed_after_binding, BReactor_PendingGroup(&ss), (BPending_handler)peer_job_send_seed_after_binding, peer);
-    
     // add to peers list
     LinkedList2_Append(&peers, &peer->list_node);
     num_peers++;
     
     peer_log(peer, BLOG_INFO, "initialized");
     
-    // start setup process
-    if (peer_am_master(peer)) {
-        peer_start_binding(peer);
-    }
-    
     return;
     
 fail3:
@@ -1374,12 +1375,16 @@ fail2:
     if (peer->common_name) {
         PORT_Free(peer->common_name);
     }
+fail1a:
+    server_flow_free(peer->server_flow);
 fail1:
+    BPending_Free(&peer->job_init);
+    BPending_Free(&peer->job_send_seed_after_binding);
     free(peer);
 fail0:;
 }
 
-void peer_remove (struct peer_data *peer)
+void peer_remove (struct peer_data *peer, int exiting)
 {
     peer_log(peer, BLOG_INFO, "removing");
     
@@ -1406,9 +1411,6 @@ void peer_remove (struct peer_data *peer)
     LinkedList2_Remove(&peers, &peer->list_node);
     num_peers--;
     
-    // free send seed job
-    BPending_Free(&peer->job_send_seed_after_binding);
-    
     // free reset timer
     BReactor_RemoveTimer(&ss, &peer->reset_timer);
     
@@ -1421,6 +1423,17 @@ void peer_remove (struct peer_data *peer)
     // free local flow
     DataProtoFlow_Free(&peer->local_dpflow);
     
+    // free/die server flow
+    if (exiting || !PacketPassFairQueueFlow_IsBusy(&peer->server_flow->qflow)) {
+        server_flow_free(peer->server_flow);
+    } else {
+        server_flow_die(peer->server_flow);
+    }
+    
+    // free jobs
+    BPending_Free(&peer->job_init);
+    BPending_Free(&peer->job_send_seed_after_binding);
+    
     // free common name
     if (peer->common_name) {
         PORT_Free(peer->common_name);
@@ -2241,12 +2254,33 @@ void peer_connect (struct peer_data *peer, BAddr addr, uint8_t* encryption_key,
 
 static int peer_start_msg (struct peer_data *peer, void **data, int type, int len)
 {
-    return server_start_msg(data, peer->id, type, len);
+    ASSERT(len >= 0)
+    ASSERT(len <= MSG_MAX_PAYLOAD)
+    ASSERT(!(len > 0) || data)
+    
+    uint8_t *packet;
+    if (!server_flow_start_message(peer->server_flow, &packet, msg_SIZEtype + msg_SIZEpayload(len))) {
+        BLog(BLOG_ERROR, "out of peer server buffer, exiting");
+        terminate();
+        return -1;
+    }
+    
+    msgWriter writer;
+    msgWriter_Init(&writer, packet);
+    msgWriter_Addtype(&writer, type);
+    uint8_t *payload_dst = msgWriter_Addpayload(&writer, len);
+    msgWriter_Finish(&writer);
+    
+    if (data) {
+        *data = payload_dst;
+    }
+    
+    return 0;
 }
 
 static void peer_end_msg (struct peer_data *peer)
 {
-    server_end_msg();
+    server_flow_end_message(peer->server_flow);
 }
 
 void peer_send_simple (struct peer_data *peer, int msgid)
@@ -2522,6 +2556,8 @@ void server_handler_error (void *user)
 
 void server_handler_ready (void *user, peerid_t param_my_id, uint32_t ext_ip)
 {
+    ASSERT(!server_ready)
+    
     // remember our ID
     my_id = param_my_id;
     
@@ -2547,6 +2583,12 @@ void server_handler_ready (void *user, peerid_t param_my_id, uint32_t ext_ip)
     // give receive device the ID
     DPReceiveDevice_SetPeerID(&device_output_dprd, my_id);
     
+    // init server queue
+    PacketPassFairQueue_Init(&server_queue, ServerConnection_GetSendInterface(&server), BReactor_PendingGroup(&ss), 0, 1);
+    
+    // set server ready
+    server_ready = 1;
+    
     BLog(BLOG_INFO, "server: ready, my ID is %d", (int)my_id);
 }
 
@@ -2594,7 +2636,7 @@ void server_handler_endclient (void *user, peerid_t peer_id)
     }
     
     // remove peer
-    peer_remove(peer);
+    peer_remove(peer, 0);
 }
 
 void server_handler_message (void *user, peerid_t peer_id, uint8_t *data, int data_len)
@@ -2624,3 +2666,132 @@ void peer_job_send_seed_after_binding (struct peer_data *peer)
     
     peer_generate_and_send_seed(peer);
 }
+
+void peer_job_init (struct peer_data *peer)
+{
+    // start setup process
+    if (peer_am_master(peer)) {
+        peer_start_binding(peer);
+    }
+}
+
+struct server_flow * server_flow_init (peerid_t peer_id)
+{
+    ASSERT(server_ready)
+    
+    // allocate structure
+    struct server_flow *flow = malloc(sizeof(*flow));
+    if (!flow) {
+        BLog(BLOG_ERROR, "malloc failed");
+        goto fail0;
+    }
+    
+    // init queue flow
+    PacketPassFairQueueFlow_Init(&flow->qflow, &server_queue);
+    
+    // init sender
+    if (!PeerChatSender_Init(&flow->sender, peer_id, PacketPassFairQueueFlow_GetInput(&flow->qflow), BReactor_PendingGroup(&ss), NULL, NULL)) {
+        BLog(BLOG_ERROR, "PeerChatSender_Init failed");
+        goto fail1;
+    }
+    
+    // init writer
+    BufferWriter_Init(&flow->writer, SC_MAX_MSGLEN, BReactor_PendingGroup(&ss));
+    
+    // init buffer
+    if (!PacketBuffer_Init(&flow->buffer, BufferWriter_GetOutput(&flow->writer), PeerChatSender_GetInput(&flow->sender), SERVER_BUFFER_MIN_PACKETS, BReactor_PendingGroup(&ss))) {
+        BLog(BLOG_ERROR, "PacketBuffer_Init failed");
+        goto fail2;
+    }
+    
+    // set no message
+    flow->msg_len = -1;
+    
+    return flow;
+    
+fail2:
+    BufferWriter_Free(&flow->writer);
+    PeerChatSender_Free(&flow->sender);
+fail1:
+    PacketPassFairQueueFlow_Free(&flow->qflow);
+    free(flow);
+fail0:
+    return NULL;
+}
+
+void server_flow_free (struct server_flow *flow)
+{
+    PacketPassFairQueueFlow_AssertFree(&flow->qflow);
+    
+    // remove dying flow reference
+    if (flow == dying_server_flow) {
+        dying_server_flow = NULL;
+    }
+    
+    // free buffer
+    PacketBuffer_Free(&flow->buffer);
+    
+    // free writer
+    BufferWriter_Free(&flow->writer);
+    
+    // free sender
+    PeerChatSender_Free(&flow->sender);
+    
+    // free queue flow
+    PacketPassFairQueueFlow_Free(&flow->qflow);
+    
+    // free structure
+    free(flow);
+}
+
+void server_flow_die (struct server_flow *flow)
+{
+    ASSERT(PacketPassFairQueueFlow_IsBusy(&flow->qflow))
+    ASSERT(!dying_server_flow)
+    
+    // request notification when flow is done
+    PacketPassFairQueueFlow_SetBusyHandler(&flow->qflow, (PacketPassFairQueue_handler_busy)server_flow_qflow_handler_busy, flow);
+    
+    // set dying flow
+    dying_server_flow = flow;
+}
+
+void server_flow_qflow_handler_busy (struct server_flow *flow)
+{
+    ASSERT(flow == dying_server_flow)
+    PacketPassFairQueueFlow_AssertFree(&flow->qflow);
+    
+    // finally free flow
+    server_flow_free(flow);
+}
+
+int server_flow_start_message (struct server_flow *flow, uint8_t **buf, int len)
+{
+    ASSERT(flow != dying_server_flow)
+    ASSERT(flow->msg_len == -1)
+    ASSERT(len >= 0)
+    ASSERT(len <= SC_MAX_MSGLEN)
+    
+    // obtain buffer location
+    if (!BufferWriter_StartPacket(&flow->writer, buf)) {
+        BLog(BLOG_ERROR, "BufferWriter_StartPacket failed");
+        return 0;
+    }
+    
+    // set have message
+    flow->msg_len = len;
+    
+    return 1;
+}
+
+void server_flow_end_message (struct server_flow *flow)
+{
+    ASSERT(flow != dying_server_flow)
+    ASSERT(flow->msg_len >= 0)
+    
+    // submit packet to buffer
+    BufferWriter_EndPacket(&flow->writer, flow->msg_len);
+    
+    // set no message
+    flow->msg_len = -1;
+}

+ 19 - 3
client/client.h

@@ -25,11 +25,15 @@
 
 #include <protocol/scproto.h>
 #include <structure/LinkedList2.h>
+#include <flow/PacketPassFairQueue.h>
+#include <flow/PacketBuffer.h>
+#include <flow/BufferWriter.h>
 #include <client/DatagramPeerIO.h>
 #include <client/StreamPeerIO.h>
 #include <client/DataProto.h>
 #include <client/DPReceive.h>
 #include <client/FrameDecider.h>
+#include <client/PeerChatSender.h>
 
 // NOTE: all time values are in milliseconds
 
@@ -81,6 +85,14 @@
 // maximum scopes
 #define MAX_SCOPES 8
 
+struct server_flow {
+    PacketPassFairQueueFlow qflow;
+    PeerChatSender sender;
+    PacketBuffer buffer;
+    BufferWriter writer;
+    int msg_len;
+};
+
 struct peer_data {
     // peer identifier
     peerid_t id;
@@ -93,6 +105,13 @@ struct peer_data {
     int cert_len;
     char *common_name;
     
+    // jobs
+    BPending job_send_seed_after_binding;
+    BPending job_init;
+    
+    // server flow
+    struct server_flow *server_flow;
+    
     // local flow
     DataProtoFlow local_dpflow;
     
@@ -147,9 +166,6 @@ struct peer_data {
     int binding;
     int binding_addrpos;
     
-    // jobs
-    BPending job_send_seed_after_binding;
-    
     // peers linked list node
     LinkedList2Node list_node;
 };