|
|
@@ -1183,33 +1183,6 @@ int client_init_io (struct client_data *client)
|
|
|
// initialize error domain
|
|
|
FlowErrorDomain_Init(&client->domain, (FlowErrorDomain_handler)client_error_handler, client);
|
|
|
|
|
|
- // init input
|
|
|
-
|
|
|
- // init source
|
|
|
- StreamRecvInterface *source_interface;
|
|
|
- if (options.ssl) {
|
|
|
- PRStreamSource_Init(&client->input_source.ssl, FlowErrorReporter_Create(&client->domain, COMPONENT_SOURCE), &client->ssl_bprfd);
|
|
|
- source_interface = PRStreamSource_GetOutput(&client->input_source.ssl);
|
|
|
- } else {
|
|
|
- StreamSocketSource_Init(&client->input_source.plain, FlowErrorReporter_Create(&client->domain, COMPONENT_SOURCE), &client->sock);
|
|
|
- source_interface = StreamSocketSource_GetOutput(&client->input_source.plain);
|
|
|
- }
|
|
|
-
|
|
|
- // init interface
|
|
|
- PacketPassInterface_Init(&client->input_interface, SC_MAX_ENC, (PacketPassInterface_handler_send)client_input_handler_send, client);
|
|
|
-
|
|
|
- // init decoder
|
|
|
- if (!PacketProtoDecoder_Init(
|
|
|
- &client->input_decoder,
|
|
|
- FlowErrorReporter_Create(&client->domain, COMPONENT_DECODER),
|
|
|
- source_interface,
|
|
|
- &client->input_interface,
|
|
|
- BReactor_PendingGroup(&ss)
|
|
|
- )) {
|
|
|
- client_log(client, BLOG_ERROR, "PacketProtoDecoder_Init failed");
|
|
|
- goto fail0;
|
|
|
- }
|
|
|
-
|
|
|
// init output common
|
|
|
|
|
|
// init sink
|
|
|
@@ -1242,7 +1215,7 @@ int client_init_io (struct client_data *client)
|
|
|
BReactor_PendingGroup(&ss)
|
|
|
)) {
|
|
|
client_log(client, BLOG_ERROR, "PacketProtoFlow_Init failed");
|
|
|
- goto fail1;
|
|
|
+ goto fail0;
|
|
|
}
|
|
|
client->output_control_input = PacketBufferAsyncInput_GetInput(&client->output_control_oflow.ainput);
|
|
|
client->output_control_packet_len = -1;
|
|
|
@@ -1259,10 +1232,52 @@ int client_init_io (struct client_data *client)
|
|
|
// init list of flows
|
|
|
LinkedList2_Init(&client->output_peers_flows);
|
|
|
|
|
|
+ // init input
|
|
|
+ // NOTE: input must be initialized after output is, otherwise we might receive a packet from the client
|
|
|
+ // before output was started via the jobs system, and fail to send a response (because pending jobs are executed in
|
|
|
+ // the order they are registered).
|
|
|
+
|
|
|
+ // init source
|
|
|
+ StreamRecvInterface *source_interface;
|
|
|
+ if (options.ssl) {
|
|
|
+ PRStreamSource_Init(&client->input_source.ssl, FlowErrorReporter_Create(&client->domain, COMPONENT_SOURCE), &client->ssl_bprfd);
|
|
|
+ source_interface = PRStreamSource_GetOutput(&client->input_source.ssl);
|
|
|
+ } else {
|
|
|
+ StreamSocketSource_Init(&client->input_source.plain, FlowErrorReporter_Create(&client->domain, COMPONENT_SOURCE), &client->sock);
|
|
|
+ source_interface = StreamSocketSource_GetOutput(&client->input_source.plain);
|
|
|
+ }
|
|
|
+
|
|
|
+ // init interface
|
|
|
+ PacketPassInterface_Init(&client->input_interface, SC_MAX_ENC, (PacketPassInterface_handler_send)client_input_handler_send, client);
|
|
|
+
|
|
|
+ // init decoder
|
|
|
+ if (!PacketProtoDecoder_Init(
|
|
|
+ &client->input_decoder,
|
|
|
+ FlowErrorReporter_Create(&client->domain, COMPONENT_DECODER),
|
|
|
+ source_interface,
|
|
|
+ &client->input_interface,
|
|
|
+ BReactor_PendingGroup(&ss)
|
|
|
+ )) {
|
|
|
+ client_log(client, BLOG_ERROR, "PacketProtoDecoder_Init failed");
|
|
|
+ goto fail1;
|
|
|
+ }
|
|
|
+
|
|
|
return 1;
|
|
|
|
|
|
- // free output control flow
|
|
|
+ // free input
|
|
|
fail1:
|
|
|
+ PacketPassInterface_Free(&client->input_interface);
|
|
|
+ if (options.ssl) {
|
|
|
+ PRStreamSource_Free(&client->input_source.ssl);
|
|
|
+ } else {
|
|
|
+ StreamSocketSource_Free(&client->input_source.plain);
|
|
|
+ }
|
|
|
+ // free output peers flow
|
|
|
+ PacketPassFairQueue_Free(&client->output_peers_fairqueue);
|
|
|
+ PacketPassPriorityQueueFlow_Free(&client->output_peers_qflow);
|
|
|
+ // free output control flow
|
|
|
+ PacketProtoFlow_Free(&client->output_control_oflow);
|
|
|
+fail0:
|
|
|
PacketPassPriorityQueueFlow_Free(&client->output_control_qflow);
|
|
|
// free output common
|
|
|
PacketPassPriorityQueue_Free(&client->output_priorityqueue);
|
|
|
@@ -1272,20 +1287,20 @@ fail1:
|
|
|
} else {
|
|
|
StreamSocketSink_Free(&client->output_sink.plain);
|
|
|
}
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+void client_dealloc_io (struct client_data *client)
|
|
|
+{
|
|
|
// free input
|
|
|
PacketProtoDecoder_Free(&client->input_decoder);
|
|
|
-fail0:
|
|
|
PacketPassInterface_Free(&client->input_interface);
|
|
|
if (options.ssl) {
|
|
|
PRStreamSource_Free(&client->input_source.ssl);
|
|
|
} else {
|
|
|
StreamSocketSource_Free(&client->input_source.plain);
|
|
|
}
|
|
|
- return 0;
|
|
|
-}
|
|
|
-
|
|
|
-void client_dealloc_io (struct client_data *client)
|
|
|
-{
|
|
|
+
|
|
|
// allow freeing fair queue flows
|
|
|
PacketPassFairQueue_PrepareFree(&client->output_peers_fairqueue);
|
|
|
|
|
|
@@ -1316,15 +1331,6 @@ void client_dealloc_io (struct client_data *client)
|
|
|
} else {
|
|
|
StreamSocketSink_Free(&client->output_sink.plain);
|
|
|
}
|
|
|
-
|
|
|
- // free input
|
|
|
- PacketProtoDecoder_Free(&client->input_decoder);
|
|
|
- PacketPassInterface_Free(&client->input_interface);
|
|
|
- if (options.ssl) {
|
|
|
- PRStreamSource_Free(&client->input_source.ssl);
|
|
|
- } else {
|
|
|
- StreamSocketSource_Free(&client->input_source.plain);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
void client_error_handler (struct client_data *client, int component, const void *data)
|