Просмотр исходного кода

client: move chat out of server_flow into peer

ambrop7 14 лет назад
Родитель
Сommit
681c939284
2 измененных файлов с 80 добавлено и 85 удалено
  1. 70 79
      client/client.c
  2. 10 6
      client/client.h

+ 70 - 79
client/client.c

@@ -334,12 +334,10 @@ static void server_handler_message (void *user, peerid_t peer_id, uint8_t *data,
 static void peer_job_send_seed_after_binding (struct peer_data *peer);
 static void peer_job_send_seed_after_binding (struct peer_data *peer);
 static void peer_job_init (struct peer_data *peer);
 static void peer_job_init (struct peer_data *peer);
 
 
-static struct server_flow * server_flow_init (peerid_t peer_id);
+static struct server_flow * server_flow_init (peerid_t peer_id, PacketRecvInterface *input);
 static void server_flow_free (struct server_flow *flow);
 static void server_flow_free (struct server_flow *flow);
 static void server_flow_die (struct server_flow *flow);
 static void server_flow_die (struct server_flow *flow);
 static void server_flow_qflow_handler_busy (struct server_flow *flow);
 static void server_flow_qflow_handler_busy (struct server_flow *flow);
-static int server_flow_start_message (struct server_flow *flow, uint8_t **buf, int len);
-static void server_flow_end_message (struct server_flow *flow);
 
 
 int main (int argc, char *argv[])
 int main (int argc, char *argv[])
 {
 {
@@ -1279,12 +1277,30 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
     // set init job (must be before initing server flow so we can send)
     // set init job (must be before initing server flow so we can send)
     BPending_Set(&peer->job_init);
     BPending_Set(&peer->job_init);
     
     
+    // init chat
+    if (!PeerChat_Init(&peer->chat, peer->id, BReactor_PendingGroup(&ss), NULL, NULL)) {
+        peer_log(peer, BLOG_ERROR, "PeerChat_Init failed");
+        goto fail1;
+    }
+    
     // init server flow
     // init server flow
-    if (!(peer->server_flow = server_flow_init(peer->id))) {
+    if (!(peer->chat_send_flow = server_flow_init(peer->id, PeerChat_GetSendOutput(&peer->chat)))) {
         peer_log(peer, BLOG_ERROR, "server_flow_init failed");
         peer_log(peer, BLOG_ERROR, "server_flow_init failed");
-        goto fail1;
+        goto fail1a;
+    }
+    
+    // init chat send writer
+    BufferWriter_Init(&peer->chat_send_writer, SC_MAX_MSGLEN, BReactor_PendingGroup(&ss));
+    
+    // init chat send buffer
+    if (!PacketBuffer_Init(&peer->chat_send_buffer, BufferWriter_GetOutput(&peer->chat_send_writer), PeerChat_GetSendInput(&peer->chat), SERVER_BUFFER_MIN_PACKETS, BReactor_PendingGroup(&ss))) {
+        peer_log(peer, BLOG_ERROR, "PacketBuffer_Init failed");
+        goto fail1b;
     }
     }
     
     
+    // set no message
+    peer->chat_send_msg_len = -1;
+    
     if (options.ssl) {
     if (options.ssl) {
         // remember certificate
         // remember certificate
         memcpy(peer->cert, cert, cert_len);
         memcpy(peer->cert, cert, cert_len);
@@ -1294,7 +1310,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         // in which case following workaroud wouldn't help
         // in which case following workaroud wouldn't help
         if (!(cert_len > 0 && (cert[0] & 0x1f) == 0x10)) {
         if (!(cert_len > 0 && (cert[0] & 0x1f) == 0x10)) {
             peer_log(peer, BLOG_ERROR, "certificate does not look like DER");
             peer_log(peer, BLOG_ERROR, "certificate does not look like DER");
-            goto fail1a;
+            goto fail1c;
         }
         }
         
         
         // copy the certificate and append it a good load of zero bytes,
         // copy the certificate and append it a good load of zero bytes,
@@ -1303,7 +1319,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         uint8_t *certbuf = malloc(cert_len + 100);
         uint8_t *certbuf = malloc(cert_len + 100);
         if (!certbuf) {
         if (!certbuf) {
             peer_log(peer, BLOG_ERROR, "malloc failed");
             peer_log(peer, BLOG_ERROR, "malloc failed");
-            goto fail1a;
+            goto fail1c;
         }
         }
         memcpy(certbuf, cert, cert_len);
         memcpy(certbuf, cert, cert_len);
         memset(certbuf + cert_len, 0, 100);
         memset(certbuf + cert_len, 0, 100);
@@ -1313,7 +1329,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         if (!nsscert) {
         if (!nsscert) {
             peer_log(peer, BLOG_ERROR, "CERT_DecodeCertFromPackage failed (%d)", PORT_GetError());
             peer_log(peer, BLOG_ERROR, "CERT_DecodeCertFromPackage failed (%d)", PORT_GetError());
             free(certbuf);
             free(certbuf);
-            goto fail1a;
+            goto fail1c;
         }
         }
         
         
         free(certbuf);
         free(certbuf);
@@ -1322,7 +1338,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
         if (!(peer->common_name = CERT_GetCommonName(&nsscert->subject))) {
         if (!(peer->common_name = CERT_GetCommonName(&nsscert->subject))) {
             peer_log(peer, BLOG_ERROR, "CERT_GetCommonName failed");
             peer_log(peer, BLOG_ERROR, "CERT_GetCommonName failed");
             CERT_DestroyCertificate(nsscert);
             CERT_DestroyCertificate(nsscert);
-            goto fail1a;
+            goto fail1c;
         }
         }
         
         
         CERT_DestroyCertificate(nsscert);
         CERT_DestroyCertificate(nsscert);
@@ -1375,8 +1391,13 @@ fail2:
     if (peer->common_name) {
     if (peer->common_name) {
         PORT_Free(peer->common_name);
         PORT_Free(peer->common_name);
     }
     }
+fail1c:
+    PacketBuffer_Free(&peer->chat_send_buffer);
+fail1b:
+    BufferWriter_Free(&peer->chat_send_writer);
+    server_flow_free(peer->chat_send_flow);
 fail1a:
 fail1a:
-    server_flow_free(peer->server_flow);
+    PeerChat_Free(&peer->chat);
 fail1:
 fail1:
     BPending_Free(&peer->job_init);
     BPending_Free(&peer->job_init);
     BPending_Free(&peer->job_send_seed_after_binding);
     BPending_Free(&peer->job_send_seed_after_binding);
@@ -1423,13 +1444,22 @@ void peer_remove (struct peer_data *peer, int exiting)
     // free local flow
     // free local flow
     DataProtoFlow_Free(&peer->local_dpflow);
     DataProtoFlow_Free(&peer->local_dpflow);
     
     
+    // free chat send buffer
+    PacketBuffer_Free(&peer->chat_send_buffer);
+    
+    // free chat send writer
+    BufferWriter_Free(&peer->chat_send_writer);
+    
     // free/die server flow
     // free/die server flow
-    if (exiting || !PacketPassFairQueueFlow_IsBusy(&peer->server_flow->qflow)) {
-        server_flow_free(peer->server_flow);
+    if (exiting || !PacketPassFairQueueFlow_IsBusy(&peer->chat_send_flow->qflow)) {
+        server_flow_free(peer->chat_send_flow);
     } else {
     } else {
-        server_flow_die(peer->server_flow);
+        server_flow_die(peer->chat_send_flow);
     }
     }
     
     
+    // free chat
+    PeerChat_Free(&peer->chat);
+    
     // free jobs
     // free jobs
     BPending_Free(&peer->job_init);
     BPending_Free(&peer->job_init);
     BPending_Free(&peer->job_send_seed_after_binding);
     BPending_Free(&peer->job_send_seed_after_binding);
@@ -2257,30 +2287,41 @@ static int peer_start_msg (struct peer_data *peer, void **data, int type, int le
     ASSERT(len >= 0)
     ASSERT(len >= 0)
     ASSERT(len <= MSG_MAX_PAYLOAD)
     ASSERT(len <= MSG_MAX_PAYLOAD)
     ASSERT(!(len > 0) || data)
     ASSERT(!(len > 0) || data)
+    ASSERT(peer->chat_send_msg_len == -1)
     
     
+    // obtain buffer location
     uint8_t *packet;
     uint8_t *packet;
-    if (!server_flow_start_message(peer->server_flow, &packet, msg_SIZEtype + msg_SIZEpayload(len))) {
+    if (!BufferWriter_StartPacket(&peer->chat_send_writer, &packet)) {
         BLog(BLOG_ERROR, "out of peer server buffer, exiting");
         BLog(BLOG_ERROR, "out of peer server buffer, exiting");
         terminate();
         terminate();
         return -1;
         return -1;
     }
     }
     
     
+    // write fields
     msgWriter writer;
     msgWriter writer;
     msgWriter_Init(&writer, packet);
     msgWriter_Init(&writer, packet);
     msgWriter_Addtype(&writer, type);
     msgWriter_Addtype(&writer, type);
     uint8_t *payload_dst = msgWriter_Addpayload(&writer, len);
     uint8_t *payload_dst = msgWriter_Addpayload(&writer, len);
     msgWriter_Finish(&writer);
     msgWriter_Finish(&writer);
     
     
+    // set have message
+    peer->chat_send_msg_len = len;
+    
     if (data) {
     if (data) {
         *data = payload_dst;
         *data = payload_dst;
     }
     }
-    
     return 0;
     return 0;
 }
 }
 
 
 static void peer_end_msg (struct peer_data *peer)
 static void peer_end_msg (struct peer_data *peer)
 {
 {
-    server_flow_end_message(peer->server_flow);
+    ASSERT(peer->chat_send_msg_len >= 0)
+    
+    // submit packet to buffer
+    BufferWriter_EndPacket(&peer->chat_send_writer, msg_SIZEtype + msg_SIZEpayload(peer->chat_send_msg_len));
+    
+    // set no message
+    peer->chat_send_msg_len = -1;
 }
 }
 
 
 void peer_send_simple (struct peer_data *peer, int msgid)
 void peer_send_simple (struct peer_data *peer, int msgid)
@@ -2675,7 +2716,7 @@ void peer_job_init (struct peer_data *peer)
     }
     }
 }
 }
 
 
-struct server_flow * server_flow_init (peerid_t peer_id)
+struct server_flow * server_flow_init (peerid_t peer_id, PacketRecvInterface *input)
 {
 {
     ASSERT(server_ready)
     ASSERT(server_ready)
     
     
@@ -2689,38 +2730,22 @@ struct server_flow * server_flow_init (peerid_t peer_id)
     // init queue flow
     // init queue flow
     PacketPassFairQueueFlow_Init(&flow->qflow, &server_queue);
     PacketPassFairQueueFlow_Init(&flow->qflow, &server_queue);
     
     
-    // init sender
-    if (!PeerChat_Init(&flow->sender, peer_id, BReactor_PendingGroup(&ss), NULL, NULL)) {
-        BLog(BLOG_ERROR, "PeerChat_Init failed");
-        goto fail1;
-    }
-    
-    // init writer
-    BufferWriter_Init(&flow->writer, SC_MAX_MSGLEN, BReactor_PendingGroup(&ss));
+    // init connector
+    PacketRecvConnector_Init(&flow->connector, sizeof(struct packetproto_header) + SC_MAX_ENC, BReactor_PendingGroup(&ss));
     
     
     // init encoder buffer
     // init encoder buffer
-    if (!SinglePacketBuffer_Init(&flow->encoder_buffer, PeerChat_GetSendOutput(&flow->sender), PacketPassFairQueueFlow_GetInput(&flow->qflow), BReactor_PendingGroup(&ss))) {
+    if (!SinglePacketBuffer_Init(&flow->encoder_buffer, PacketRecvConnector_GetOutput(&flow->connector), PacketPassFairQueueFlow_GetInput(&flow->qflow), BReactor_PendingGroup(&ss))) {
         BLog(BLOG_ERROR, "SinglePacketBuffer_Init failed");
         BLog(BLOG_ERROR, "SinglePacketBuffer_Init failed");
-        goto fail2;
-    }
-    
-    // init buffer
-    if (!PacketBuffer_Init(&flow->buffer, BufferWriter_GetOutput(&flow->writer), PeerChat_GetSendInput(&flow->sender), SERVER_BUFFER_MIN_PACKETS, BReactor_PendingGroup(&ss))) {
-        BLog(BLOG_ERROR, "PacketBuffer_Init failed");
-        goto fail3;
+        goto fail1;
     }
     }
     
     
-    // set no message
-    flow->msg_len = -1;
+    // connect input
+    PacketRecvConnector_ConnectInput(&flow->connector, input);
     
     
     return flow;
     return flow;
     
     
-fail3:
-    SinglePacketBuffer_Free(&flow->encoder_buffer);
-fail2:
-    BufferWriter_Free(&flow->writer);
-    PeerChat_Free(&flow->sender);
 fail1:
 fail1:
+    PacketRecvConnector_Free(&flow->connector);
     PacketPassFairQueueFlow_Free(&flow->qflow);
     PacketPassFairQueueFlow_Free(&flow->qflow);
     free(flow);
     free(flow);
 fail0:
 fail0:
@@ -2736,17 +2761,11 @@ void server_flow_free (struct server_flow *flow)
         dying_server_flow = NULL;
         dying_server_flow = NULL;
     }
     }
     
     
-    // free buffer
-    PacketBuffer_Free(&flow->buffer);
-    
     // free encoder buffer
     // free encoder buffer
     SinglePacketBuffer_Free(&flow->encoder_buffer);
     SinglePacketBuffer_Free(&flow->encoder_buffer);
     
     
-    // free writer
-    BufferWriter_Free(&flow->writer);
-    
-    // free sender
-    PeerChat_Free(&flow->sender);
+    // free connector
+    PacketRecvConnector_Free(&flow->connector);
     
     
     // free queue flow
     // free queue flow
     PacketPassFairQueueFlow_Free(&flow->qflow);
     PacketPassFairQueueFlow_Free(&flow->qflow);
@@ -2760,6 +2779,9 @@ void server_flow_die (struct server_flow *flow)
     ASSERT(PacketPassFairQueueFlow_IsBusy(&flow->qflow))
     ASSERT(PacketPassFairQueueFlow_IsBusy(&flow->qflow))
     ASSERT(!dying_server_flow)
     ASSERT(!dying_server_flow)
     
     
+    // disconnect input
+    PacketRecvConnector_DisconnectInput(&flow->connector);
+    
     // request notification when flow is done
     // request notification when flow is done
     PacketPassFairQueueFlow_SetBusyHandler(&flow->qflow, (PacketPassFairQueue_handler_busy)server_flow_qflow_handler_busy, flow);
     PacketPassFairQueueFlow_SetBusyHandler(&flow->qflow, (PacketPassFairQueue_handler_busy)server_flow_qflow_handler_busy, flow);
     
     
@@ -2775,34 +2797,3 @@ void server_flow_qflow_handler_busy (struct server_flow *flow)
     // finally free flow
     // finally free flow
     server_flow_free(flow);
     server_flow_free(flow);
 }
 }
-
-int server_flow_start_message (struct server_flow *flow, uint8_t **buf, int len)
-{
-    ASSERT(flow != dying_server_flow)
-    ASSERT(flow->msg_len == -1)
-    ASSERT(len >= 0)
-    ASSERT(len <= SC_MAX_MSGLEN)
-    
-    // obtain buffer location
-    if (!BufferWriter_StartPacket(&flow->writer, buf)) {
-        BLog(BLOG_ERROR, "BufferWriter_StartPacket failed");
-        return 0;
-    }
-    
-    // set have message
-    flow->msg_len = len;
-    
-    return 1;
-}
-
-void server_flow_end_message (struct server_flow *flow)
-{
-    ASSERT(flow != dying_server_flow)
-    ASSERT(flow->msg_len >= 0)
-    
-    // submit packet to buffer
-    BufferWriter_EndPacket(&flow->writer, flow->msg_len);
-    
-    // set no message
-    flow->msg_len = -1;
-}

+ 10 - 6
client/client.h

@@ -29,6 +29,7 @@
 #include <flow/PacketBuffer.h>
 #include <flow/PacketBuffer.h>
 #include <flow/BufferWriter.h>
 #include <flow/BufferWriter.h>
 #include <flow/SinglePacketBuffer.h>
 #include <flow/SinglePacketBuffer.h>
+#include <flow/PacketRecvConnector.h>
 #include <client/DatagramPeerIO.h>
 #include <client/DatagramPeerIO.h>
 #include <client/StreamPeerIO.h>
 #include <client/StreamPeerIO.h>
 #include <client/DataProto.h>
 #include <client/DataProto.h>
@@ -89,10 +90,7 @@
 struct server_flow {
 struct server_flow {
     PacketPassFairQueueFlow qflow;
     PacketPassFairQueueFlow qflow;
     SinglePacketBuffer encoder_buffer;
     SinglePacketBuffer encoder_buffer;
-    PeerChat sender;
-    PacketBuffer buffer;
-    BufferWriter writer;
-    int msg_len;
+    PacketRecvConnector connector;
 };
 };
 
 
 struct peer_data {
 struct peer_data {
@@ -111,8 +109,14 @@ struct peer_data {
     BPending job_send_seed_after_binding;
     BPending job_send_seed_after_binding;
     BPending job_init;
     BPending job_init;
     
     
-    // server flow
-    struct server_flow *server_flow;
+    // chat
+    PeerChat chat;
+    
+    // chat sending
+    struct server_flow *chat_send_flow;
+    PacketBuffer chat_send_buffer;
+    BufferWriter chat_send_writer;
+    int chat_send_msg_len;
     
     
     // local flow
     // local flow
     DataProtoFlow local_dpflow;
     DataProtoFlow local_dpflow;