|
|
@@ -112,6 +112,7 @@ struct {
|
|
|
int max_groups;
|
|
|
int igmp_group_membership_interval;
|
|
|
int igmp_last_member_query_time;
|
|
|
+ int allow_peer_talk_without_ssl;
|
|
|
} options;
|
|
|
|
|
|
// bind addresses
|
|
|
@@ -222,6 +223,8 @@ static void peer_log (struct peer_data *peer, int level, const char *fmt, ...);
|
|
|
// see if we are the master relative to this peer
|
|
|
static int peer_am_master (struct peer_data *peer);
|
|
|
|
|
|
+static void peer_free_chat (struct peer_data *peer);
|
|
|
+
|
|
|
// initializes the link
|
|
|
static int peer_init_link (struct peer_data *peer);
|
|
|
|
|
|
@@ -255,7 +258,8 @@ static void peer_unregister_need_relay (struct peer_data *peer);
|
|
|
// handle a link setup failure
|
|
|
static void peer_reset (struct peer_data *peer);
|
|
|
|
|
|
-// handle incoming peer messages
|
|
|
+// chat handlers
|
|
|
+static void peer_chat_handler_error (struct peer_data *peer);
|
|
|
static void peer_chat_handler_message (struct peer_data *peer, uint8_t *data, int data_len);
|
|
|
|
|
|
// handlers for different message types
|
|
|
@@ -334,10 +338,12 @@ static void server_handler_message (void *user, peerid_t peer_id, uint8_t *data,
|
|
|
static void peer_job_send_seed_after_binding (struct peer_data *peer);
|
|
|
static void peer_job_init (struct peer_data *peer);
|
|
|
|
|
|
-static struct server_flow * server_flow_init (peerid_t peer_id, PacketRecvInterface *input);
|
|
|
+static struct server_flow * server_flow_init (peerid_t peer_id);
|
|
|
static void server_flow_free (struct server_flow *flow);
|
|
|
static void server_flow_die (struct server_flow *flow);
|
|
|
static void server_flow_qflow_handler_busy (struct server_flow *flow);
|
|
|
+static void server_flow_connect (struct server_flow *flow, PacketRecvInterface *input);
|
|
|
+static void server_flow_disconnect (struct server_flow *flow);
|
|
|
|
|
|
int main (int argc, char *argv[])
|
|
|
{
|
|
|
@@ -667,6 +673,7 @@ void print_help (const char *name)
|
|
|
" [--max-groups <num>]\n"
|
|
|
" [--igmp-group-membership-interval <ms>]\n"
|
|
|
" [--igmp-last-member-query-time <ms>]\n"
|
|
|
+ " [--allow-peer-talk-without-ssl]\n"
|
|
|
"Address format is a.b.c.d:port (IPv4) or [addr]:port (IPv6).\n",
|
|
|
name
|
|
|
);
|
|
|
@@ -716,6 +723,7 @@ int parse_arguments (int argc, char *argv[])
|
|
|
options.max_groups = PEER_DEFAULT_MAX_GROUPS;
|
|
|
options.igmp_group_membership_interval = DEFAULT_IGMP_GROUP_MEMBERSHIP_INTERVAL;
|
|
|
options.igmp_last_member_query_time = DEFAULT_IGMP_LAST_MEMBER_QUERY_TIME;
|
|
|
+ options.allow_peer_talk_without_ssl = 0;
|
|
|
|
|
|
int have_fragmentation_latency = 0;
|
|
|
|
|
|
@@ -1089,6 +1097,9 @@ int parse_arguments (int argc, char *argv[])
|
|
|
}
|
|
|
i++;
|
|
|
}
|
|
|
+ else if (!strcmp(arg, "--allow-peer-talk-without-ssl")) {
|
|
|
+ options.allow_peer_talk_without_ssl = 1;
|
|
|
+ }
|
|
|
else {
|
|
|
fprintf(stderr, "unknown option: %s\n", arg);
|
|
|
return 0;
|
|
|
@@ -1270,39 +1281,6 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
|
|
|
// set no common name
|
|
|
peer->common_name = NULL;
|
|
|
|
|
|
- // init jobs
|
|
|
- BPending_Init(&peer->job_send_seed_after_binding, BReactor_PendingGroup(&ss), (BPending_handler)peer_job_send_seed_after_binding, peer);
|
|
|
- BPending_Init(&peer->job_init, BReactor_PendingGroup(&ss), (BPending_handler)peer_job_init, peer);
|
|
|
-
|
|
|
- // set init job (must be before initing server flow so we can send)
|
|
|
- BPending_Set(&peer->job_init);
|
|
|
-
|
|
|
- // init chat
|
|
|
- if (!PeerChat_Init(&peer->chat, peer->id, BReactor_PendingGroup(&ss), peer, NULL,
|
|
|
- (PeerChat_handler_message)peer_chat_handler_message
|
|
|
- )) {
|
|
|
- peer_log(peer, BLOG_ERROR, "PeerChat_Init failed");
|
|
|
- goto fail1;
|
|
|
- }
|
|
|
-
|
|
|
- // init server flow
|
|
|
- if (!(peer->chat_send_flow = server_flow_init(peer->id, PeerChat_GetSendOutput(&peer->chat)))) {
|
|
|
- peer_log(peer, BLOG_ERROR, "server_flow_init failed");
|
|
|
- goto fail1a;
|
|
|
- }
|
|
|
-
|
|
|
- // init chat send writer
|
|
|
- BufferWriter_Init(&peer->chat_send_writer, SC_MAX_MSGLEN, BReactor_PendingGroup(&ss));
|
|
|
-
|
|
|
- // init chat send buffer
|
|
|
- if (!PacketBuffer_Init(&peer->chat_send_buffer, BufferWriter_GetOutput(&peer->chat_send_writer), PeerChat_GetSendInput(&peer->chat), SERVER_BUFFER_MIN_PACKETS, BReactor_PendingGroup(&ss))) {
|
|
|
- peer_log(peer, BLOG_ERROR, "PacketBuffer_Init failed");
|
|
|
- goto fail1b;
|
|
|
- }
|
|
|
-
|
|
|
- // set no message
|
|
|
- peer->chat_send_msg_len = -1;
|
|
|
-
|
|
|
if (options.ssl) {
|
|
|
// remember certificate
|
|
|
memcpy(peer->cert, cert, cert_len);
|
|
|
@@ -1312,7 +1290,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
|
|
|
// in which case following workaroud wouldn't help
|
|
|
if (!(cert_len > 0 && (cert[0] & 0x1f) == 0x10)) {
|
|
|
peer_log(peer, BLOG_ERROR, "certificate does not look like DER");
|
|
|
- goto fail1c;
|
|
|
+ goto fail1;
|
|
|
}
|
|
|
|
|
|
// copy the certificate and append it a good load of zero bytes,
|
|
|
@@ -1321,7 +1299,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
|
|
|
uint8_t *certbuf = malloc(cert_len + 100);
|
|
|
if (!certbuf) {
|
|
|
peer_log(peer, BLOG_ERROR, "malloc failed");
|
|
|
- goto fail1c;
|
|
|
+ goto fail1;
|
|
|
}
|
|
|
memcpy(certbuf, cert, cert_len);
|
|
|
memset(certbuf + cert_len, 0, 100);
|
|
|
@@ -1331,7 +1309,7 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
|
|
|
if (!nsscert) {
|
|
|
peer_log(peer, BLOG_ERROR, "CERT_DecodeCertFromPackage failed (%d)", PORT_GetError());
|
|
|
free(certbuf);
|
|
|
- goto fail1c;
|
|
|
+ goto fail1;
|
|
|
}
|
|
|
|
|
|
free(certbuf);
|
|
|
@@ -1340,22 +1318,93 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
|
|
|
if (!(peer->common_name = CERT_GetCommonName(&nsscert->subject))) {
|
|
|
peer_log(peer, BLOG_ERROR, "CERT_GetCommonName failed");
|
|
|
CERT_DestroyCertificate(nsscert);
|
|
|
- goto fail1c;
|
|
|
+ goto fail1;
|
|
|
}
|
|
|
|
|
|
CERT_DestroyCertificate(nsscert);
|
|
|
}
|
|
|
|
|
|
+ // init jobs
|
|
|
+ BPending_Init(&peer->job_send_seed_after_binding, BReactor_PendingGroup(&ss), (BPending_handler)peer_job_send_seed_after_binding, peer);
|
|
|
+ BPending_Init(&peer->job_init, BReactor_PendingGroup(&ss), (BPending_handler)peer_job_init, peer);
|
|
|
+
|
|
|
+ // set init job (must be before initing server flow so we can send)
|
|
|
+ BPending_Set(&peer->job_init);
|
|
|
+
|
|
|
+ // init server flow
|
|
|
+ if (!(peer->server_flow = server_flow_init(peer->id))) {
|
|
|
+ peer_log(peer, BLOG_ERROR, "server_flow_init failed");
|
|
|
+ goto fail2;
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((peer->flags & SCID_NEWCLIENT_FLAG_SSL) && !options.ssl) {
|
|
|
+ peer_log(peer, BLOG_ERROR, "peer requires talking with SSL, but we're not using SSL!?");
|
|
|
+ goto fail3;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!(peer->flags & SCID_NEWCLIENT_FLAG_SSL) && !options.allow_peer_talk_without_ssl) {
|
|
|
+ peer_log(peer, BLOG_ERROR, "peer requires talking without SSL, but we don't allow that");
|
|
|
+ goto fail3;
|
|
|
+ }
|
|
|
+
|
|
|
+ // choose chat SSL mode
|
|
|
+ int chat_ssl_mode = PEERCHAT_SSL_NONE;
|
|
|
+ if ((peer->flags & SCID_NEWCLIENT_FLAG_SSL)) {
|
|
|
+ chat_ssl_mode = (peer_am_master(peer) ? PEERCHAT_SSL_SERVER : PEERCHAT_SSL_CLIENT);
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (chat_ssl_mode) {
|
|
|
+ case PEERCHAT_SSL_NONE:
|
|
|
+ peer_log(peer, BLOG_INFO, "talking to peer in plaintext mode");
|
|
|
+ break;
|
|
|
+ case PEERCHAT_SSL_CLIENT:
|
|
|
+ peer_log(peer, BLOG_INFO, "talking to peer in SSL client mode");
|
|
|
+ break;
|
|
|
+ case PEERCHAT_SSL_SERVER:
|
|
|
+ peer_log(peer, BLOG_INFO, "talking to peer in SSL server mode");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // init chat
|
|
|
+ if (!PeerChat_Init(&peer->chat, peer->id, chat_ssl_mode, client_cert, client_key, peer->cert, peer->cert_len, BReactor_PendingGroup(&ss), peer,
|
|
|
+ (PeerChat_handler_error)peer_chat_handler_error,
|
|
|
+ (PeerChat_handler_message)peer_chat_handler_message
|
|
|
+ )) {
|
|
|
+ peer_log(peer, BLOG_ERROR, "PeerChat_Init failed");
|
|
|
+ goto fail3;
|
|
|
+ }
|
|
|
+
|
|
|
+ // init chat send writer
|
|
|
+ BufferWriter_Init(&peer->chat_send_writer, SC_MAX_MSGLEN, BReactor_PendingGroup(&ss));
|
|
|
+
|
|
|
+ // init chat send buffer
|
|
|
+ if (!PacketBuffer_Init(&peer->chat_send_buffer, BufferWriter_GetOutput(&peer->chat_send_writer), PeerChat_GetSendInput(&peer->chat), SERVER_BUFFER_MIN_PACKETS, BReactor_PendingGroup(&ss))) {
|
|
|
+ peer_log(peer, BLOG_ERROR, "PacketBuffer_Init failed");
|
|
|
+ goto fail4;
|
|
|
+ }
|
|
|
+
|
|
|
+ // set no message
|
|
|
+ peer->chat_send_msg_len = -1;
|
|
|
+
|
|
|
+ // connect server flow to chat
|
|
|
+ server_flow_connect(peer->server_flow, PeerChat_GetSendOutput(&peer->chat));
|
|
|
+
|
|
|
+ // set have chat
|
|
|
+ peer->have_chat = 1;
|
|
|
+
|
|
|
+ // set have no resetpeer
|
|
|
+ peer->have_resetpeer = 0;
|
|
|
+
|
|
|
// init local flow
|
|
|
if (!DataProtoFlow_Init(&peer->local_dpflow, &device_input_dpd, my_id, peer->id, options.send_buffer_size, -1, NULL, NULL)) {
|
|
|
peer_log(peer, BLOG_ERROR, "DataProtoFlow_Init failed");
|
|
|
- goto fail2;
|
|
|
+ goto fail5;
|
|
|
}
|
|
|
|
|
|
// init frame decider peer
|
|
|
if (!FrameDeciderPeer_Init(&peer->decider_peer, &frame_decider)) {
|
|
|
peer_log(peer, BLOG_ERROR, "FrameDeciderPeer_Init failed");
|
|
|
- goto fail3;
|
|
|
+ goto fail6;
|
|
|
}
|
|
|
|
|
|
// init receive peer
|
|
|
@@ -1387,22 +1436,23 @@ void peer_add (peerid_t id, int flags, const uint8_t *cert, int cert_len)
|
|
|
|
|
|
return;
|
|
|
|
|
|
-fail3:
|
|
|
+fail6:
|
|
|
DataProtoFlow_Free(&peer->local_dpflow);
|
|
|
-fail2:
|
|
|
- if (peer->common_name) {
|
|
|
- PORT_Free(peer->common_name);
|
|
|
- }
|
|
|
-fail1c:
|
|
|
+fail5:
|
|
|
+ server_flow_disconnect(peer->server_flow);
|
|
|
PacketBuffer_Free(&peer->chat_send_buffer);
|
|
|
-fail1b:
|
|
|
+fail4:
|
|
|
BufferWriter_Free(&peer->chat_send_writer);
|
|
|
- server_flow_free(peer->chat_send_flow);
|
|
|
-fail1a:
|
|
|
PeerChat_Free(&peer->chat);
|
|
|
-fail1:
|
|
|
+fail3:
|
|
|
+ server_flow_free(peer->server_flow);
|
|
|
+fail2:
|
|
|
BPending_Free(&peer->job_init);
|
|
|
BPending_Free(&peer->job_send_seed_after_binding);
|
|
|
+ if (peer->common_name) {
|
|
|
+ PORT_Free(peer->common_name);
|
|
|
+ }
|
|
|
+fail1:
|
|
|
free(peer);
|
|
|
fail0:;
|
|
|
}
|
|
|
@@ -1446,22 +1496,27 @@ void peer_remove (struct peer_data *peer, int exiting)
|
|
|
// free local flow
|
|
|
DataProtoFlow_Free(&peer->local_dpflow);
|
|
|
|
|
|
- // free chat send buffer
|
|
|
- PacketBuffer_Free(&peer->chat_send_buffer);
|
|
|
+ // free chat
|
|
|
+ if (peer->have_chat) {
|
|
|
+ peer_free_chat(peer);
|
|
|
+ }
|
|
|
|
|
|
- // free chat send writer
|
|
|
- BufferWriter_Free(&peer->chat_send_writer);
|
|
|
+ // free resetpeer
|
|
|
+ if (peer->have_resetpeer) {
|
|
|
+ // disconnect resetpeer source from server flow
|
|
|
+ server_flow_disconnect(peer->server_flow);
|
|
|
+
|
|
|
+ // free resetpeer source
|
|
|
+ SinglePacketSource_Free(&peer->resetpeer_source);
|
|
|
+ }
|
|
|
|
|
|
// free/die server flow
|
|
|
- if (exiting || !PacketPassFairQueueFlow_IsBusy(&peer->chat_send_flow->qflow)) {
|
|
|
- server_flow_free(peer->chat_send_flow);
|
|
|
+ if (exiting || !PacketPassFairQueueFlow_IsBusy(&peer->server_flow->qflow)) {
|
|
|
+ server_flow_free(peer->server_flow);
|
|
|
} else {
|
|
|
- server_flow_die(peer->chat_send_flow);
|
|
|
+ server_flow_die(peer->server_flow);
|
|
|
}
|
|
|
|
|
|
- // free chat
|
|
|
- PeerChat_Free(&peer->chat);
|
|
|
-
|
|
|
// free jobs
|
|
|
BPending_Free(&peer->job_init);
|
|
|
BPending_Free(&peer->job_send_seed_after_binding);
|
|
|
@@ -1493,6 +1548,26 @@ int peer_am_master (struct peer_data *peer)
|
|
|
return (my_id > peer->id);
|
|
|
}
|
|
|
|
|
|
+void peer_free_chat (struct peer_data *peer)
|
|
|
+{
|
|
|
+ ASSERT(peer->have_chat)
|
|
|
+
|
|
|
+ // disconnect chat from server flow
|
|
|
+ server_flow_disconnect(peer->server_flow);
|
|
|
+
|
|
|
+ // free chat send buffer
|
|
|
+ PacketBuffer_Free(&peer->chat_send_buffer);
|
|
|
+
|
|
|
+ // free chat send writer
|
|
|
+ BufferWriter_Free(&peer->chat_send_writer);
|
|
|
+
|
|
|
+ // free chat
|
|
|
+ PeerChat_Free(&peer->chat);
|
|
|
+
|
|
|
+ // set have no chat
|
|
|
+ peer->have_chat = 0;
|
|
|
+}
|
|
|
+
|
|
|
int peer_init_link (struct peer_data *peer)
|
|
|
{
|
|
|
ASSERT(!peer->have_link)
|
|
|
@@ -1794,8 +1869,37 @@ void peer_reset (struct peer_data *peer)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void peer_chat_handler_error (struct peer_data *peer)
|
|
|
+{
|
|
|
+ ASSERT(peer->have_chat)
|
|
|
+ ASSERT(!peer->have_resetpeer)
|
|
|
+
|
|
|
+ peer_log(peer, BLOG_ERROR, "chat error, resetting peer");
|
|
|
+
|
|
|
+ // free chat
|
|
|
+ peer_free_chat(peer);
|
|
|
+
|
|
|
+ // build resetpeer packet
|
|
|
+ struct packetproto_header *pp_header = (struct packetproto_header *)peer->resetpeer_packet;
|
|
|
+ pp_header->len = htol16(sizeof(struct sc_header) + sizeof(struct sc_client_resetpeer));
|
|
|
+ struct sc_header *sc_header = (struct sc_header *)(pp_header + 1);
|
|
|
+ sc_header->type = htol8(SCID_RESETPEER);
|
|
|
+ struct sc_client_resetpeer *sc_resetpeer = (struct sc_client_resetpeer *)(sc_header + 1);
|
|
|
+ sc_resetpeer->clientid = htol16(peer->id);
|
|
|
+
|
|
|
+ // init resetpeer sourse
|
|
|
+ SinglePacketSource_Init(&peer->resetpeer_source, peer->resetpeer_packet, sizeof(peer->resetpeer_packet), BReactor_PendingGroup(&ss));
|
|
|
+
|
|
|
+ // connect server flow to resetpeer source
|
|
|
+ server_flow_connect(peer->server_flow, SinglePacketSource_GetOutput(&peer->resetpeer_source));
|
|
|
+
|
|
|
+ // set have resetpeer
|
|
|
+ peer->have_resetpeer = 1;
|
|
|
+}
|
|
|
+
|
|
|
void peer_chat_handler_message (struct peer_data *peer, uint8_t *data, int data_len)
|
|
|
{
|
|
|
+ ASSERT(peer->have_chat)
|
|
|
ASSERT(data_len >= 0)
|
|
|
ASSERT(data_len <= SC_MAX_MSGLEN)
|
|
|
|
|
|
@@ -2291,12 +2395,17 @@ static int peer_start_msg (struct peer_data *peer, void **data, int type, int le
|
|
|
ASSERT(!(len > 0) || data)
|
|
|
ASSERT(peer->chat_send_msg_len == -1)
|
|
|
|
|
|
+ // make sure we have chat
|
|
|
+ if (!peer->have_chat) {
|
|
|
+ peer_log(peer, BLOG_ERROR, "cannot send message, chat is down");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
// obtain buffer location
|
|
|
uint8_t *packet;
|
|
|
if (!BufferWriter_StartPacket(&peer->chat_send_writer, &packet)) {
|
|
|
- BLog(BLOG_ERROR, "out of peer server buffer, exiting");
|
|
|
- terminate();
|
|
|
- return -1;
|
|
|
+ peer_log(peer, BLOG_ERROR, "cannot send message, out of buffer");
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
// write fields
|
|
|
@@ -2312,12 +2421,13 @@ static int peer_start_msg (struct peer_data *peer, void **data, int type, int le
|
|
|
if (data) {
|
|
|
*data = payload_dst;
|
|
|
}
|
|
|
- return 0;
|
|
|
+ return 1;
|
|
|
}
|
|
|
|
|
|
static void peer_end_msg (struct peer_data *peer)
|
|
|
{
|
|
|
ASSERT(peer->chat_send_msg_len >= 0)
|
|
|
+ ASSERT(peer->have_chat)
|
|
|
|
|
|
// submit packet to buffer
|
|
|
BufferWriter_EndPacket(&peer->chat_send_writer, msg_SIZEtype + msg_SIZEpayload(peer->chat_send_msg_len));
|
|
|
@@ -2328,7 +2438,7 @@ static void peer_end_msg (struct peer_data *peer)
|
|
|
|
|
|
void peer_send_simple (struct peer_data *peer, int msgid)
|
|
|
{
|
|
|
- if (peer_start_msg(peer, NULL, msgid, 0) < 0) {
|
|
|
+ if (!peer_start_msg(peer, NULL, msgid, 0)) {
|
|
|
return;
|
|
|
}
|
|
|
peer_end_msg(peer);
|
|
|
@@ -2378,7 +2488,7 @@ void peer_send_conectinfo (struct peer_data *peer, int addr_index, int port_adju
|
|
|
|
|
|
// start message
|
|
|
uint8_t *msg;
|
|
|
- if (peer_start_msg(peer, (void **)&msg, MSGID_YOUCONNECT, msg_len) < 0) {
|
|
|
+ if (!peer_start_msg(peer, (void **)&msg, MSGID_YOUCONNECT, msg_len)) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -2457,7 +2567,7 @@ void peer_generate_and_send_seed (struct peer_data *peer)
|
|
|
// send seed to the peer
|
|
|
int msg_len = msg_seed_SIZEseed_id + msg_seed_SIZEkey(key_len) + msg_seed_SIZEiv(iv_len);
|
|
|
uint8_t *msg;
|
|
|
- if (peer_start_msg(peer, (void **)&msg, MSGID_SEED, msg_len) < 0) {
|
|
|
+ if (!peer_start_msg(peer, (void **)&msg, MSGID_SEED, msg_len)) {
|
|
|
return;
|
|
|
}
|
|
|
msg_seedWriter writer;
|
|
|
@@ -2479,7 +2589,7 @@ void peer_send_confirmseed (struct peer_data *peer, uint16_t seed_id)
|
|
|
// send confirmation
|
|
|
int msg_len = msg_confirmseed_SIZEseed_id;
|
|
|
uint8_t *msg;
|
|
|
- if (peer_start_msg(peer, (void **)&msg, MSGID_CONFIRMSEED, msg_len) < 0) {
|
|
|
+ if (!peer_start_msg(peer, (void **)&msg, MSGID_CONFIRMSEED, msg_len)) {
|
|
|
return;
|
|
|
}
|
|
|
msg_confirmseedWriter writer;
|
|
|
@@ -2695,6 +2805,12 @@ void server_handler_message (void *user, peerid_t peer_id, uint8_t *data, int da
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ // make sure we have chat
|
|
|
+ if (!peer->have_chat) {
|
|
|
+ peer_log(peer, BLOG_ERROR, "cannot process message, chat is down");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
// pass message to chat
|
|
|
PeerChat_InputReceived(&peer->chat, data, data_len);
|
|
|
}
|
|
|
@@ -2717,7 +2833,7 @@ void peer_job_init (struct peer_data *peer)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-struct server_flow * server_flow_init (peerid_t peer_id, PacketRecvInterface *input)
|
|
|
+struct server_flow * server_flow_init (peerid_t peer_id)
|
|
|
{
|
|
|
ASSERT(server_ready)
|
|
|
|
|
|
@@ -2740,8 +2856,8 @@ struct server_flow * server_flow_init (peerid_t peer_id, PacketRecvInterface *in
|
|
|
goto fail1;
|
|
|
}
|
|
|
|
|
|
- // connect input
|
|
|
- PacketRecvConnector_ConnectInput(&flow->connector, input);
|
|
|
+ // set not connected
|
|
|
+ flow->connected = 0;
|
|
|
|
|
|
return flow;
|
|
|
|
|
|
@@ -2756,6 +2872,7 @@ fail0:
|
|
|
void server_flow_free (struct server_flow *flow)
|
|
|
{
|
|
|
PacketPassFairQueueFlow_AssertFree(&flow->qflow);
|
|
|
+ ASSERT(!flow->connected)
|
|
|
|
|
|
// remove dying flow reference
|
|
|
if (flow == dying_server_flow) {
|
|
|
@@ -2778,11 +2895,9 @@ void server_flow_free (struct server_flow *flow)
|
|
|
void server_flow_die (struct server_flow *flow)
|
|
|
{
|
|
|
ASSERT(PacketPassFairQueueFlow_IsBusy(&flow->qflow))
|
|
|
+ ASSERT(!flow->connected)
|
|
|
ASSERT(!dying_server_flow)
|
|
|
|
|
|
- // disconnect input
|
|
|
- PacketRecvConnector_DisconnectInput(&flow->connector);
|
|
|
-
|
|
|
// request notification when flow is done
|
|
|
PacketPassFairQueueFlow_SetBusyHandler(&flow->qflow, (PacketPassFairQueue_handler_busy)server_flow_qflow_handler_busy, flow);
|
|
|
|
|
|
@@ -2793,8 +2908,33 @@ void server_flow_die (struct server_flow *flow)
|
|
|
void server_flow_qflow_handler_busy (struct server_flow *flow)
|
|
|
{
|
|
|
ASSERT(flow == dying_server_flow)
|
|
|
+ ASSERT(!flow->connected)
|
|
|
PacketPassFairQueueFlow_AssertFree(&flow->qflow);
|
|
|
|
|
|
// finally free flow
|
|
|
server_flow_free(flow);
|
|
|
}
|
|
|
+
|
|
|
+void server_flow_connect (struct server_flow *flow, PacketRecvInterface *input)
|
|
|
+{
|
|
|
+ ASSERT(!flow->connected)
|
|
|
+ ASSERT(flow != dying_server_flow)
|
|
|
+
|
|
|
+ // connect input
|
|
|
+ PacketRecvConnector_ConnectInput(&flow->connector, input);
|
|
|
+
|
|
|
+ // set connected
|
|
|
+ flow->connected = 1;
|
|
|
+}
|
|
|
+
|
|
|
+void server_flow_disconnect (struct server_flow *flow)
|
|
|
+{
|
|
|
+ ASSERT(flow->connected)
|
|
|
+ ASSERT(flow != dying_server_flow)
|
|
|
+
|
|
|
+ // disconnect input
|
|
|
+ PacketRecvConnector_DisconnectInput(&flow->connector);
|
|
|
+
|
|
|
+ // set not connected
|
|
|
+ flow->connected = 0;
|
|
|
+}
|