|
|
@@ -221,12 +221,18 @@ static void process_packet_outmsg (struct client_data *client, uint8_t *data, in
|
|
|
// processes resetpeer packets from clients
|
|
|
static void process_packet_resetpeer (struct client_data *client, uint8_t *data, int data_len);
|
|
|
|
|
|
+// processes acceptpeer packets from clients
|
|
|
+static void process_packet_acceptpeer (struct client_data *client, uint8_t *data, int data_len);
|
|
|
+
|
|
|
// creates a peer flow
|
|
|
static struct peer_flow * peer_flow_create (struct client_data *src_client, struct client_data *dest_client);
|
|
|
|
|
|
// deallocates a peer flow
|
|
|
static void peer_flow_dealloc (struct peer_flow *flow);
|
|
|
|
|
|
+static int peer_flow_init_io (struct peer_flow *flow);
|
|
|
+static void peer_flow_free_io (struct peer_flow *flow);
|
|
|
+
|
|
|
// disconnects the source client from a peer flow
|
|
|
static void peer_flow_disconnect (struct peer_flow *flow);
|
|
|
|
|
|
@@ -239,8 +245,10 @@ static void peer_flow_end_packet (struct peer_flow *flow, uint8_t type);
|
|
|
// handler called by the queue when a peer flow can be freed after its source has gone away
|
|
|
static void peer_flow_handler_canremove (struct peer_flow *flow);
|
|
|
|
|
|
-// schedules resetting of clients knowledge
|
|
|
-static void peer_flow_schedule_reset (struct peer_flow *flow);
|
|
|
+static void peer_flow_start_reset (struct peer_flow *flow);
|
|
|
+static void peer_flow_drive_reset (struct peer_flow *flow);
|
|
|
+
|
|
|
+static void peer_flow_reset_qflow_handler_busy (struct peer_flow *flow);
|
|
|
|
|
|
// resets clients knowledge after the timer expires
|
|
|
static void peer_flow_reset_timer_handler (struct peer_flow *flow);
|
|
|
@@ -291,7 +299,7 @@ static void know_inform_job_handler (struct peer_know *k);
|
|
|
static void uninform_know (struct peer_know *k);
|
|
|
static void know_uninform_job_handler (struct peer_know *k);
|
|
|
|
|
|
-static int create_know_pair (struct peer_flow *flow_to);
|
|
|
+static int launch_pair (struct peer_flow *flow_to);
|
|
|
|
|
|
// find flow from a client to some client
|
|
|
static struct peer_flow * find_flow (struct client_data *client, peerid_t dest_id);
|
|
|
@@ -1092,7 +1100,7 @@ void client_remove (struct client_data *client)
|
|
|
ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->dest_client->dying)
|
|
|
|
|
|
- if (PacketPassFairQueueFlow_IsBusy(&flow->qflow)) {
|
|
|
+ if (flow->have_io && PacketPassFairQueueFlow_IsBusy(&flow->qflow)) {
|
|
|
client_log(client, BLOG_DEBUG, "removing flow to %d later", (int)flow->dest_client->id);
|
|
|
peer_flow_disconnect(flow);
|
|
|
} else {
|
|
|
@@ -1398,6 +1406,9 @@ void client_input_handler_send (struct client_data *client, uint8_t *data, int d
|
|
|
case SCID_RESETPEER:
|
|
|
process_packet_resetpeer(client, data, data_len);
|
|
|
return;
|
|
|
+ case SCID_ACCEPTPEER:
|
|
|
+ process_packet_acceptpeer(client, data, data_len);
|
|
|
+ return;
|
|
|
default:
|
|
|
client_log(client, BLOG_NOTICE, "unknown packet type %d, removing", (int)type);
|
|
|
client_remove(client);
|
|
|
@@ -1463,9 +1474,9 @@ void process_packet_hello (struct client_data *client, uint8_t *data, int data_l
|
|
|
flow_to->opposite = flow_from;
|
|
|
flow_from->opposite = flow_to;
|
|
|
|
|
|
- // create knows
|
|
|
- if (!create_know_pair(flow_to)) {
|
|
|
- goto fail;
|
|
|
+ // launch pair
|
|
|
+ if (!launch_pair(flow_to)) {
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1518,10 +1529,15 @@ void process_packet_outmsg (struct client_data *client, uint8_t *data, int data_
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // if the flow has reset scheduled, ignore packet. Clients expect messages
|
|
|
- // to be reliable, so we can't deliver subsequent messages after we dropped one.
|
|
|
- if (BTimer_IsRunning(&flow->reset_timer)) {
|
|
|
- client_log(client, BLOG_INFO, "flow is resetting; not forwarding message to %d", (int)id);
|
|
|
+ // if pair is resetting, ignore message
|
|
|
+ if (flow->resetting || flow->opposite->resetting) {
|
|
|
+ client_log(client, BLOG_INFO, "pair is resetting; not forwarding message to %d", (int)id);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // if sending client hasn't accepted yet, ignore message
|
|
|
+ if (!flow->accepted) {
|
|
|
+ client_log(client, BLOG_INFO, "client hasn't accepted; not forwarding message to %d", (int)id);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -1529,8 +1545,8 @@ void process_packet_outmsg (struct client_data *client, uint8_t *data, int data_
|
|
|
uint8_t x;
|
|
|
BRandom_randomize(&x, sizeof(x));
|
|
|
if (x < SIMULATE_OUT_OF_FLOW_BUFFER) {
|
|
|
- client_log(client, BLOG_WARNING, "simulating error; scheduling reset to %d", (int)flow->dest_client->id);
|
|
|
- peer_flow_schedule_reset(flow);
|
|
|
+ client_log(client, BLOG_WARNING, "simulating error; resetting to %d", (int)flow->dest_client->id);
|
|
|
+ peer_flow_start_reset(flow);
|
|
|
return;
|
|
|
}
|
|
|
#endif
|
|
|
@@ -1539,8 +1555,8 @@ void process_packet_outmsg (struct client_data *client, uint8_t *data, int data_
|
|
|
struct sc_server_inmsg *pack;
|
|
|
if (!peer_flow_start_packet(flow, (void **)&pack, sizeof(struct sc_server_inmsg) + payload_size)) {
|
|
|
// out of buffer, reset these two clients
|
|
|
- client_log(client, BLOG_WARNING, "out of buffer; scheduling reset to %d", (int)flow->dest_client->id);
|
|
|
- peer_flow_schedule_reset(flow);
|
|
|
+ client_log(client, BLOG_WARNING, "out of buffer; resetting to %d", (int)flow->dest_client->id);
|
|
|
+ peer_flow_start_reset(flow);
|
|
|
return;
|
|
|
}
|
|
|
pack->clientid = htol16(client->id);
|
|
|
@@ -1568,14 +1584,74 @@ void process_packet_resetpeer (struct client_data *client, uint8_t *data, int da
|
|
|
// lookup flow to destination client
|
|
|
struct peer_flow *flow = find_flow(client, id);
|
|
|
if (!flow) {
|
|
|
- client_log(client, BLOG_INFO, "resetpeer: no flow to %d", (int)id);
|
|
|
+ client_log(client, BLOG_INFO, "no flow for reset to %d", (int)id);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- client_log(client, BLOG_WARNING, "resetpeer: scheduling reset to %d", (int)flow->dest_client->id);
|
|
|
+ // if pair is resetting, ignore message
|
|
|
+ if (flow->resetting || flow->opposite->resetting) {
|
|
|
+ client_log(client, BLOG_INFO, "pair is resetting; not resetting to %d", (int)id);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // if sending client hasn't accepted yet, ignore message
|
|
|
+ if (!flow->accepted) {
|
|
|
+ client_log(client, BLOG_INFO, "client hasn't accepted; not resetting to %d", (int)id);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ client_log(client, BLOG_WARNING, "resetting to %d", (int)flow->dest_client->id);
|
|
|
|
|
|
// reset clients
|
|
|
- peer_flow_schedule_reset(flow);
|
|
|
+ peer_flow_start_reset(flow);
|
|
|
+}
|
|
|
+
|
|
|
+void process_packet_acceptpeer (struct client_data *client, uint8_t *data, int data_len)
|
|
|
+{
|
|
|
+ if (client->initstatus != INITSTATUS_COMPLETE) {
|
|
|
+ client_log(client, BLOG_NOTICE, "acceptpeer: not expected");
|
|
|
+ client_remove(client);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (data_len != sizeof(struct sc_client_acceptpeer)) {
|
|
|
+ client_log(client, BLOG_NOTICE, "acceptpeer: wrong size");
|
|
|
+ client_remove(client);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ struct sc_client_acceptpeer *msg = (struct sc_client_acceptpeer *)data;
|
|
|
+ peerid_t id = ltoh16(msg->clientid);
|
|
|
+
|
|
|
+ // lookup flow to destination client
|
|
|
+ struct peer_flow *flow = find_flow(client, id);
|
|
|
+ if (!flow) {
|
|
|
+ // the specified client has probably gone away but the sending client didn't know
|
|
|
+ // that yet; this is expected
|
|
|
+ client_log(client, BLOG_INFO, "acceptpeer: no flow to %d", (int)id);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // client can only accept once
|
|
|
+ if (flow->accepted) {
|
|
|
+ // the previous accept is probably from an old client with the same ID as this one;
|
|
|
+ // this is bad, disconnect client
|
|
|
+ client_log(client, BLOG_ERROR, "acceptpeer: already accepted to %d", (int)id);
|
|
|
+ client_remove(client);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ client_log(client, BLOG_INFO, "accepted %d", (int)id);
|
|
|
+
|
|
|
+ // set accepted
|
|
|
+ flow->accepted = 1;
|
|
|
+
|
|
|
+ // if pair is resetting, continue
|
|
|
+ if (flow->resetting) {
|
|
|
+ peer_flow_drive_reset(flow);
|
|
|
+ } else if (flow->opposite->resetting) {
|
|
|
+ peer_flow_drive_reset(flow->opposite);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
struct peer_flow * peer_flow_create (struct client_data *src_client, struct client_data *dest_client)
|
|
|
@@ -1589,6 +1665,7 @@ struct peer_flow * peer_flow_create (struct client_data *src_client, struct clie
|
|
|
// allocate flow structure
|
|
|
struct peer_flow *flow = malloc(sizeof(*flow));
|
|
|
if (!flow) {
|
|
|
+ BLog(BLOG_ERROR, "malloc failed");
|
|
|
goto fail0;
|
|
|
}
|
|
|
|
|
|
@@ -1604,43 +1681,29 @@ struct peer_flow * peer_flow_create (struct client_data *src_client, struct clie
|
|
|
// add to destination client list
|
|
|
LinkedList2_Append(&flow->dest_client->output_peers_flows, &flow->dest_list_node);
|
|
|
|
|
|
- // initialize I/O
|
|
|
- PacketPassFairQueueFlow_Init(&flow->qflow, &flow->dest_client->output_peers_fairqueue);
|
|
|
- if (!PacketProtoFlow_Init(
|
|
|
- &flow->oflow, SC_MAX_ENC, CLIENT_PEER_FLOW_BUFFER_MIN_PACKETS,
|
|
|
- PacketPassFairQueueFlow_GetInput(&flow->qflow), BReactor_PendingGroup(&ss)
|
|
|
- )) {
|
|
|
- BLog(BLOG_ERROR, "PacketProtoFlow_Init failed");
|
|
|
- goto fail1;
|
|
|
- }
|
|
|
- flow->input = PacketProtoFlow_GetInput(&flow->oflow);
|
|
|
- flow->packet_len = -1;
|
|
|
+ // have no I/O
|
|
|
+ flow->have_io = 0;
|
|
|
|
|
|
// init reset timer
|
|
|
BTimer_Init(&flow->reset_timer, CLIENT_RESET_TIME, (BTimer_handler)peer_flow_reset_timer_handler, flow);
|
|
|
|
|
|
return flow;
|
|
|
|
|
|
-fail1:
|
|
|
- PacketPassFairQueueFlow_Free(&flow->qflow);
|
|
|
- LinkedList2_Remove(&flow->dest_client->output_peers_flows, &flow->dest_list_node);
|
|
|
- BAVL_Remove(&flow->src_client->peer_out_flows_tree, &flow->src_tree_node);
|
|
|
- LinkedList2_Remove(&flow->src_client->peer_out_flows_list, &flow->src_list_node);
|
|
|
- free(flow);
|
|
|
fail0:
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
|
void peer_flow_dealloc (struct peer_flow *flow)
|
|
|
{
|
|
|
- PacketPassFairQueueFlow_AssertFree(&flow->qflow);
|
|
|
+ if (flow->have_io) { PacketPassFairQueueFlow_AssertFree(&flow->qflow); }
|
|
|
|
|
|
// free reset timer
|
|
|
BReactor_RemoveTimer(&ss, &flow->reset_timer);
|
|
|
|
|
|
// free I/O
|
|
|
- PacketProtoFlow_Free(&flow->oflow);
|
|
|
- PacketPassFairQueueFlow_Free(&flow->qflow);
|
|
|
+ if (flow->have_io) {
|
|
|
+ peer_flow_free_io(flow);
|
|
|
+ }
|
|
|
|
|
|
// remove from destination client list
|
|
|
LinkedList2_Remove(&flow->dest_client->output_peers_flows, &flow->dest_list_node);
|
|
|
@@ -1655,9 +1718,61 @@ void peer_flow_dealloc (struct peer_flow *flow)
|
|
|
free(flow);
|
|
|
}
|
|
|
|
|
|
+int peer_flow_init_io (struct peer_flow *flow)
|
|
|
+{
|
|
|
+ ASSERT(!flow->have_io)
|
|
|
+
|
|
|
+ // init queue flow
|
|
|
+ PacketPassFairQueueFlow_Init(&flow->qflow, &flow->dest_client->output_peers_fairqueue);
|
|
|
+
|
|
|
+ // init PacketProtoFlow
|
|
|
+ if (!PacketProtoFlow_Init(
|
|
|
+ &flow->oflow, SC_MAX_ENC, CLIENT_PEER_FLOW_BUFFER_MIN_PACKETS,
|
|
|
+ PacketPassFairQueueFlow_GetInput(&flow->qflow), BReactor_PendingGroup(&ss)
|
|
|
+ )) {
|
|
|
+ BLog(BLOG_ERROR, "PacketProtoFlow_Init failed");
|
|
|
+ goto fail1;
|
|
|
+ }
|
|
|
+ flow->input = PacketProtoFlow_GetInput(&flow->oflow);
|
|
|
+
|
|
|
+ // set no packet
|
|
|
+ flow->packet_len = -1;
|
|
|
+
|
|
|
+ // set have I/O
|
|
|
+ flow->have_io = 1;
|
|
|
+
|
|
|
+ return 1;
|
|
|
+
|
|
|
+fail1:
|
|
|
+ PacketPassFairQueueFlow_Free(&flow->qflow);
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+void peer_flow_free_io (struct peer_flow *flow)
|
|
|
+{
|
|
|
+ ASSERT(flow->have_io)
|
|
|
+ PacketPassFairQueueFlow_AssertFree(&flow->qflow);
|
|
|
+
|
|
|
+ // free PacketProtoFlow
|
|
|
+ PacketProtoFlow_Free(&flow->oflow);
|
|
|
+
|
|
|
+ // free queue flow
|
|
|
+ PacketPassFairQueueFlow_Free(&flow->qflow);
|
|
|
+
|
|
|
+ // set have no I/O
|
|
|
+ flow->have_io = 0;
|
|
|
+}
|
|
|
+
|
|
|
void peer_flow_disconnect (struct peer_flow *flow)
|
|
|
{
|
|
|
ASSERT(flow->src_client)
|
|
|
+ ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
+ ASSERT(!flow->dest_client->dying)
|
|
|
+ ASSERT(flow->have_io)
|
|
|
+ ASSERT(PacketPassFairQueueFlow_IsBusy(&flow->qflow))
|
|
|
+
|
|
|
+ // stop reset timer
|
|
|
+ BReactor_RemoveTimer(&ss, &flow->reset_timer);
|
|
|
|
|
|
// remove from source list and hash table
|
|
|
BAVL_Remove(&flow->src_client->peer_out_flows_tree, &flow->src_tree_node);
|
|
|
@@ -1668,38 +1783,39 @@ void peer_flow_disconnect (struct peer_flow *flow)
|
|
|
|
|
|
// set busy handler
|
|
|
PacketPassFairQueueFlow_SetBusyHandler(&flow->qflow, (PacketPassFairQueue_handler_busy)peer_flow_handler_canremove, flow);
|
|
|
-
|
|
|
- // stop reset timer
|
|
|
- BReactor_RemoveTimer(&ss, &flow->reset_timer);
|
|
|
}
|
|
|
|
|
|
int peer_flow_start_packet (struct peer_flow *flow, void **data, int len)
|
|
|
{
|
|
|
- ASSERT(len >= 0)
|
|
|
- ASSERT(len <= SC_MAX_PAYLOAD)
|
|
|
- ASSERT(!(len > 0) || data)
|
|
|
ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->dest_client->dying)
|
|
|
ASSERT(flow->src_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->src_client->dying)
|
|
|
+ ASSERT(!flow->resetting)
|
|
|
+ ASSERT(!flow->opposite->resetting)
|
|
|
+ ASSERT(flow->have_io)
|
|
|
ASSERT(flow->packet_len == -1)
|
|
|
+ ASSERT(len >= 0)
|
|
|
+ ASSERT(len <= SC_MAX_PAYLOAD)
|
|
|
+ ASSERT(!(len > 0) || data)
|
|
|
|
|
|
// obtain location for writing the packet
|
|
|
if (!BufferWriter_StartPacket(flow->input, &flow->packet)) {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+ // remember packet length
|
|
|
flow->packet_len = len;
|
|
|
|
|
|
if (data) {
|
|
|
*data = flow->packet + sizeof(struct sc_header);
|
|
|
}
|
|
|
-
|
|
|
return 1;
|
|
|
}
|
|
|
|
|
|
void peer_flow_end_packet (struct peer_flow *flow, uint8_t type)
|
|
|
{
|
|
|
+ ASSERT(flow->have_io)
|
|
|
ASSERT(flow->packet_len >= 0)
|
|
|
ASSERT(flow->packet_len <= SC_MAX_PAYLOAD)
|
|
|
|
|
|
@@ -1710,6 +1826,7 @@ void peer_flow_end_packet (struct peer_flow *flow, uint8_t type)
|
|
|
// finish writing packet
|
|
|
BufferWriter_EndPacket(flow->input, sizeof(struct sc_header) + flow->packet_len);
|
|
|
|
|
|
+ // set have no packet
|
|
|
flow->packet_len = -1;
|
|
|
}
|
|
|
|
|
|
@@ -1718,6 +1835,8 @@ void peer_flow_handler_canremove (struct peer_flow *flow)
|
|
|
ASSERT(!flow->src_client)
|
|
|
ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->dest_client->dying)
|
|
|
+ ASSERT(flow->have_io)
|
|
|
+ PacketPassFairQueueFlow_AssertFree(&flow->qflow);
|
|
|
|
|
|
client_log(flow->dest_client, BLOG_DEBUG, "removing old flow");
|
|
|
|
|
|
@@ -1725,45 +1844,105 @@ void peer_flow_handler_canremove (struct peer_flow *flow)
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-void peer_flow_schedule_reset (struct peer_flow *flow)
|
|
|
+void peer_flow_start_reset (struct peer_flow *flow)
|
|
|
+{
|
|
|
+ ASSERT(flow->src_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
+ ASSERT(!flow->src_client->dying)
|
|
|
+ ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
+ ASSERT(!flow->dest_client->dying)
|
|
|
+ ASSERT(!flow->resetting)
|
|
|
+ ASSERT(!flow->opposite->resetting)
|
|
|
+ ASSERT(flow->have_io)
|
|
|
+ ASSERT(flow->opposite->have_io)
|
|
|
+
|
|
|
+ client_log(flow->src_client, BLOG_INFO, "starting reset to %d", (int)flow->dest_client->id);
|
|
|
+
|
|
|
+ // set resetting
|
|
|
+ flow->resetting = 1;
|
|
|
+
|
|
|
+ peer_flow_drive_reset(flow);
|
|
|
+}
|
|
|
+
|
|
|
+void peer_flow_drive_reset (struct peer_flow *flow)
|
|
|
{
|
|
|
ASSERT(flow->src_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->src_client->dying)
|
|
|
ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->dest_client->dying)
|
|
|
+ ASSERT(flow->resetting)
|
|
|
+ ASSERT(!flow->opposite->resetting)
|
|
|
+ ASSERT(!BTimer_IsRunning(&flow->reset_timer))
|
|
|
+
|
|
|
+ // try to free I/O
|
|
|
+ if (flow->have_io) {
|
|
|
+ if (PacketPassFairQueueFlow_IsBusy(&flow->qflow)) {
|
|
|
+ PacketPassFairQueueFlow_SetBusyHandler(&flow->qflow, (PacketPassFairQueue_handler_busy)peer_flow_reset_qflow_handler_busy, flow);
|
|
|
+ } else {
|
|
|
+ peer_flow_free_io(flow);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // try to free opposite I/O
|
|
|
+ if (flow->opposite->have_io) {
|
|
|
+ if (PacketPassFairQueueFlow_IsBusy(&flow->opposite->qflow)) {
|
|
|
+ PacketPassFairQueueFlow_SetBusyHandler(&flow->opposite->qflow, (PacketPassFairQueue_handler_busy)peer_flow_reset_qflow_handler_busy, flow->opposite);
|
|
|
+ } else {
|
|
|
+ peer_flow_free_io(flow->opposite);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // if we still got some I/O, or some client hasn't accepted yet, wait
|
|
|
+ if (flow->have_io || flow->opposite->have_io || !flow->accepted || !flow->opposite->accepted) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
// set reset timer
|
|
|
BReactor_SetTimer(&ss, &flow->reset_timer);
|
|
|
}
|
|
|
|
|
|
-void peer_flow_reset_timer_handler (struct peer_flow *flow)
|
|
|
+void peer_flow_reset_qflow_handler_busy (struct peer_flow *flow)
|
|
|
{
|
|
|
ASSERT(flow->src_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->src_client->dying)
|
|
|
ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!flow->dest_client->dying)
|
|
|
+ ASSERT(flow->resetting || flow->opposite->resetting)
|
|
|
+ ASSERT(flow->have_io)
|
|
|
+ ASSERT(!PacketPassFairQueueFlow_IsBusy(&flow->qflow))
|
|
|
|
|
|
- client_log(flow->src_client, BLOG_WARNING, "resetting to %d", (int)flow->dest_client->id);
|
|
|
+ if (flow->resetting) {
|
|
|
+ peer_flow_drive_reset(flow);
|
|
|
+ } else {
|
|
|
+ peer_flow_drive_reset(flow->opposite);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void peer_flow_reset_timer_handler (struct peer_flow *flow)
|
|
|
+{
|
|
|
+ ASSERT(flow->src_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
+ ASSERT(!flow->src_client->dying)
|
|
|
+ ASSERT(flow->dest_client->initstatus == INITSTATUS_COMPLETE)
|
|
|
+ ASSERT(!flow->dest_client->dying)
|
|
|
+ ASSERT(flow->resetting)
|
|
|
+ ASSERT(!flow->opposite->resetting)
|
|
|
+ ASSERT(!flow->have_io)
|
|
|
+ ASSERT(!flow->opposite->have_io)
|
|
|
+ ASSERT(flow->accepted)
|
|
|
+ ASSERT(flow->opposite->accepted)
|
|
|
|
|
|
- // stop opposite reset timer
|
|
|
- BReactor_RemoveTimer(&ss, &flow->opposite->reset_timer);
|
|
|
+ client_log(flow->src_client, BLOG_INFO, "finally resetting to %d", (int)flow->dest_client->id);
|
|
|
|
|
|
struct peer_know *know = flow->know;
|
|
|
struct peer_know *know_opposite = flow->opposite->know;
|
|
|
|
|
|
- // create new knows
|
|
|
- if (!create_know_pair(flow)) {
|
|
|
- goto fail;
|
|
|
+ // launch pair
|
|
|
+ if (!launch_pair(flow)) {
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
// remove old knows
|
|
|
uninform_know(know);
|
|
|
uninform_know(know_opposite);
|
|
|
-
|
|
|
- return;
|
|
|
-
|
|
|
-fail:
|
|
|
- client_remove(flow->src_client);
|
|
|
}
|
|
|
|
|
|
peerid_t new_client_id (void)
|
|
|
@@ -2015,7 +2194,7 @@ void know_uninform_job_handler (struct peer_know *k)
|
|
|
client_send_endclient(from, to->id);
|
|
|
}
|
|
|
|
|
|
-int create_know_pair (struct peer_flow *flow_to)
|
|
|
+int launch_pair (struct peer_flow *flow_to)
|
|
|
{
|
|
|
struct client_data *client = flow_to->src_client;
|
|
|
struct client_data *client2 = flow_to->dest_client;
|
|
|
@@ -2023,6 +2202,20 @@ int create_know_pair (struct peer_flow *flow_to)
|
|
|
ASSERT(!client->dying)
|
|
|
ASSERT(client2->initstatus == INITSTATUS_COMPLETE)
|
|
|
ASSERT(!client2->dying)
|
|
|
+ ASSERT(!flow_to->have_io)
|
|
|
+ ASSERT(!flow_to->opposite->have_io)
|
|
|
+ ASSERT(!BTimer_IsRunning(&flow_to->reset_timer))
|
|
|
+ ASSERT(!BTimer_IsRunning(&flow_to->opposite->reset_timer))
|
|
|
+
|
|
|
+ // init I/O
|
|
|
+ if (!peer_flow_init_io(flow_to)) {
|
|
|
+ goto fail;
|
|
|
+ }
|
|
|
+
|
|
|
+ // init opposite I/O
|
|
|
+ if (!peer_flow_init_io(flow_to->opposite)) {
|
|
|
+ goto fail;
|
|
|
+ }
|
|
|
|
|
|
// determine relay relations
|
|
|
int relay_to = relay_allowed(client, client2);
|
|
|
@@ -2046,9 +2239,18 @@ int create_know_pair (struct peer_flow *flow_to)
|
|
|
flow_to->know = know_to;
|
|
|
flow_to->opposite->know = know_from;
|
|
|
|
|
|
+ // set not accepted
|
|
|
+ flow_to->accepted = 0;
|
|
|
+ flow_to->opposite->accepted = 0;
|
|
|
+
|
|
|
+ // set not resetting
|
|
|
+ flow_to->resetting = 0;
|
|
|
+ flow_to->opposite->resetting = 0;
|
|
|
+
|
|
|
return 1;
|
|
|
|
|
|
fail:
|
|
|
+ client_remove(client);
|
|
|
return 0;
|
|
|
}
|
|
|
|