BConnection_win.c 25 KB


  1. /**
  2. * @file BConnection_win.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <stdlib.h>
  30. #include <limits.h>
  31. #include <base/BLog.h>
  32. #include "BConnection.h"
  33. #include <generated/blog_channel_BConnection.h>
  34. #define LISTEN_BACKLOG 128
  35. struct sys_addr {
  36. int len;
  37. union {
  38. struct sockaddr generic;
  39. struct sockaddr_in ipv4;
  40. struct sockaddr_in6 ipv6;
  41. } addr;
  42. };
  43. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  44. static void addr_any_to_sys (struct sys_addr *out, int family);
  45. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  46. static void listener_next_job_handler (BListener *o);
  47. static void listener_olap_handler (BListener *o, int event, DWORD bytes);
  48. static void connector_olap_handler (BConnector *o, int event, DWORD bytes);
  49. static void connector_abort (BConnector *o);
  50. static void connection_report_error (BConnection *o);
  51. static void connection_abort (BConnection *o);
  52. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len);
  53. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len);
  54. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes);
  55. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes);
  56. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  57. {
  58. switch (addr.type) {
  59. case BADDR_TYPE_IPV4: {
  60. out->len = sizeof(out->addr.ipv4);
  61. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  62. out->addr.ipv4.sin_family = AF_INET;
  63. out->addr.ipv4.sin_port = addr.ipv4.port;
  64. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  65. } break;
  66. case BADDR_TYPE_IPV6: {
  67. out->len = sizeof(out->addr.ipv6);
  68. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  69. out->addr.ipv6.sin6_family = AF_INET6;
  70. out->addr.ipv6.sin6_port = addr.ipv6.port;
  71. out->addr.ipv6.sin6_flowinfo = 0;
  72. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  73. out->addr.ipv6.sin6_scope_id = 0;
  74. } break;
  75. default: ASSERT(0);
  76. }
  77. }
  78. static void addr_any_to_sys (struct sys_addr *out, int family)
  79. {
  80. switch (family) {
  81. case BADDR_TYPE_IPV4: {
  82. out->len = sizeof(out->addr.ipv4);
  83. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  84. out->addr.ipv4.sin_family = AF_INET;
  85. out->addr.ipv4.sin_port = 0;
  86. out->addr.ipv4.sin_addr.s_addr = INADDR_ANY;
  87. } break;
  88. case BADDR_TYPE_IPV6: {
  89. out->len = sizeof(out->addr.ipv6);
  90. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  91. out->addr.ipv6.sin6_family = AF_INET6;
  92. out->addr.ipv6.sin6_port = 0;
  93. out->addr.ipv6.sin6_flowinfo = 0;
  94. out->addr.ipv6.sin6_addr = (struct in6_addr)IN6ADDR_ANY_INIT;
  95. out->addr.ipv6.sin6_scope_id = 0;
  96. } break;
  97. default: ASSERT(0);
  98. }
  99. }
  100. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  101. {
  102. switch (addr.addr.generic.sa_family) {
  103. case AF_INET: {
  104. ASSERT(addr.len == sizeof(struct sockaddr_in))
  105. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  106. } break;
  107. case AF_INET6: {
  108. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  109. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  110. } break;
  111. default: {
  112. BAddr_InitNone(out);
  113. } break;
  114. }
  115. }
  116. static void listener_next_job_handler (BListener *o)
  117. {
  118. DebugObject_Access(&o->d_obj);
  119. ASSERT(!o->busy)
  120. // free ready socket
  121. if (o->ready) {
  122. BLog(BLOG_ERROR, "discarding connection");
  123. // close new socket
  124. if (closesocket(o->newsock) == SOCKET_ERROR) {
  125. BLog(BLOG_ERROR, "closesocket failed");
  126. }
  127. // set not ready
  128. o->ready = 0;
  129. }
  130. // create new socket
  131. if ((o->newsock = WSASocket(o->sys_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  132. BLog(BLOG_ERROR, "WSASocket failed");
  133. goto fail0;
  134. }
  135. // start accept operation
  136. while (1) {
  137. memset(&o->olap.olap, 0, sizeof(o->olap.olap));
  138. DWORD bytes;
  139. BOOL res = o->fnAcceptEx(o->sock, o->newsock, o->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub), &bytes, &o->olap.olap);
  140. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  141. BLog(BLOG_ERROR, "AcceptEx failed");
  142. continue;
  143. }
  144. break;
  145. }
  146. // set busy
  147. o->busy = 1;
  148. return;
  149. fail0:
  150. return;
  151. }
  152. static void listener_olap_handler (BListener *o, int event, DWORD bytes)
  153. {
  154. DebugObject_Access(&o->d_obj);
  155. ASSERT(o->busy)
  156. ASSERT(!o->ready)
  157. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  158. // set not busy
  159. o->busy = 0;
  160. // schedule next accept
  161. BPending_Set(&o->next_job);
  162. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  163. BLog(BLOG_ERROR, "accepting failed");
  164. // close new socket
  165. if (closesocket(o->newsock) == SOCKET_ERROR) {
  166. BLog(BLOG_ERROR, "closesocket failed");
  167. }
  168. return;
  169. }
  170. BLog(BLOG_INFO, "connection accepted");
  171. // set ready
  172. o->ready = 1;
  173. // call handler
  174. o->handler(o->user);
  175. return;
  176. }
  177. static void connector_olap_handler (BConnector *o, int event, DWORD bytes)
  178. {
  179. DebugObject_Access(&o->d_obj);
  180. ASSERT(o->sock != INVALID_SOCKET)
  181. ASSERT(o->busy)
  182. ASSERT(!o->ready)
  183. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  184. // set not busy
  185. o->busy = 0;
  186. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  187. BLog(BLOG_ERROR, "connection failed");
  188. } else {
  189. // set ready
  190. o->ready = 1;
  191. }
  192. // call handler
  193. o->handler(o->user, !o->ready);
  194. return;
  195. }
  196. static void connector_abort (BConnector *o)
  197. {
  198. if (o->sock != INVALID_SOCKET) {
  199. // cancel I/O
  200. if (o->busy) {
  201. if (!CancelIo((HANDLE)o->sock)) {
  202. BLog(BLOG_ERROR, "CancelIo failed");
  203. }
  204. }
  205. // close socket
  206. if (closesocket(o->sock) == SOCKET_ERROR) {
  207. BLog(BLOG_ERROR, "closesocket failed");
  208. }
  209. }
  210. // wait for connect operation to finish
  211. if (o->busy) {
  212. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  213. }
  214. // free olap
  215. BReactorIOCPOverlapped_Free(&o->olap);
  216. }
  217. static void connection_report_error (BConnection *o)
  218. {
  219. DebugError_AssertNoError(&o->d_err);
  220. ASSERT(o->handler)
  221. // report error
  222. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  223. return;
  224. }
  225. static void connection_abort (BConnection *o)
  226. {
  227. ASSERT(!o->aborted)
  228. // cancel I/O
  229. if ((o->recv.inited && o->recv.busy) || (o->send.inited && o->send.busy)) {
  230. if (!CancelIo((HANDLE)o->sock)) {
  231. BLog(BLOG_ERROR, "CancelIo failed");
  232. }
  233. }
  234. // close socket
  235. if (closesocket(o->sock) == SOCKET_ERROR) {
  236. BLog(BLOG_ERROR, "closesocket failed");
  237. }
  238. // wait for receiving to complete
  239. if (o->recv.inited && o->recv.busy) {
  240. BReactorIOCPOverlapped_Wait(&o->recv.olap, NULL, NULL);
  241. }
  242. // wait for sending to complete
  243. if (o->send.inited && o->send.busy) {
  244. BReactorIOCPOverlapped_Wait(&o->send.olap, NULL, NULL);
  245. }
  246. // free recv olap
  247. BReactorIOCPOverlapped_Free(&o->recv.olap);
  248. // free send olap
  249. BReactorIOCPOverlapped_Free(&o->send.olap);
  250. // set aborted
  251. o->aborted = 1;
  252. }
  253. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len)
  254. {
  255. DebugObject_Access(&o->d_obj);
  256. DebugError_AssertNoError(&o->d_err);
  257. ASSERT(!o->aborted)
  258. ASSERT(o->send.inited)
  259. ASSERT(!o->send.busy)
  260. ASSERT(data_len > 0)
  261. if (data_len > ULONG_MAX) {
  262. data_len = ULONG_MAX;
  263. }
  264. WSABUF buf;
  265. buf.buf = data;
  266. buf.len = data_len;
  267. memset(&o->send.olap.olap, 0, sizeof(o->send.olap.olap));
  268. // send
  269. int res = WSASend(o->sock, &buf, 1, NULL, 0, &o->send.olap.olap, NULL);
  270. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  271. BLog(BLOG_ERROR, "WSASend failed (%d)", WSAGetLastError());
  272. connection_report_error(o);
  273. return;
  274. }
  275. // set busy
  276. o->send.busy = 1;
  277. o->send.busy_data_len = data_len;
  278. }
  279. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len)
  280. {
  281. DebugObject_Access(&o->d_obj);
  282. DebugError_AssertNoError(&o->d_err);
  283. ASSERT(!o->recv.closed)
  284. ASSERT(!o->aborted)
  285. ASSERT(o->recv.inited)
  286. ASSERT(!o->recv.busy)
  287. ASSERT(data_len > 0)
  288. if (data_len > ULONG_MAX) {
  289. data_len = ULONG_MAX;
  290. }
  291. WSABUF buf;
  292. buf.buf = data;
  293. buf.len = data_len;
  294. memset(&o->recv.olap.olap, 0, sizeof(o->recv.olap.olap));
  295. // recv
  296. DWORD flags = 0;
  297. int res = WSARecv(o->sock, &buf, 1, NULL, &flags, &o->recv.olap.olap, NULL);
  298. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  299. BLog(BLOG_ERROR, "WSARecv failed (%d)", WSAGetLastError());
  300. connection_report_error(o);
  301. return;
  302. }
  303. // set busy
  304. o->recv.busy = 1;
  305. o->recv.busy_data_len = data_len;
  306. }
  307. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes)
  308. {
  309. DebugObject_Access(&o->d_obj);
  310. DebugError_AssertNoError(&o->d_err);
  311. ASSERT(!o->aborted)
  312. ASSERT(o->send.inited)
  313. ASSERT(o->send.busy)
  314. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  315. // set not busy
  316. o->send.busy = 0;
  317. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  318. BLog(BLOG_ERROR, "sending failed");
  319. connection_report_error(o);
  320. return;
  321. }
  322. ASSERT(bytes > 0)
  323. ASSERT(bytes <= o->send.busy_data_len)
  324. // done
  325. StreamPassInterface_Done(&o->send.iface, bytes);
  326. }
  327. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes)
  328. {
  329. DebugObject_Access(&o->d_obj);
  330. DebugError_AssertNoError(&o->d_err);
  331. ASSERT(!o->recv.closed)
  332. ASSERT(!o->aborted)
  333. ASSERT(o->recv.inited)
  334. ASSERT(o->recv.busy)
  335. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  336. // set not busy
  337. o->recv.busy = 0;
  338. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  339. BLog(BLOG_ERROR, "receiving failed");
  340. connection_report_error(o);
  341. return;
  342. }
  343. if (bytes == 0) {
  344. // set closed
  345. o->recv.closed = 1;
  346. // report recv closed
  347. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  348. return;
  349. }
  350. ASSERT(bytes > 0)
  351. ASSERT(bytes <= o->recv.busy_data_len)
  352. // done
  353. StreamRecvInterface_Done(&o->recv.iface, bytes);
  354. }
  355. int BConnection_AddressSupported (BAddr addr)
  356. {
  357. BAddr_Assert(&addr);
  358. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  359. }
  360. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  361. BListener_handler handler)
  362. {
  363. ASSERT(handler)
  364. BNetwork_Assert();
  365. // init arguments
  366. o->reactor = reactor;
  367. o->user = user;
  368. o->handler = handler;
  369. // check address
  370. if (!BConnection_AddressSupported(addr)) {
  371. BLog(BLOG_ERROR, "address not supported");
  372. goto fail0;
  373. }
  374. // convert address
  375. struct sys_addr sysaddr;
  376. addr_socket_to_sys(&sysaddr, addr);
  377. // remember family
  378. o->sys_family = sysaddr.addr.generic.sa_family;
  379. // init socket
  380. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  381. BLog(BLOG_ERROR, "WSASocket failed");
  382. goto fail0;
  383. }
  384. // associate with IOCP
  385. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  386. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  387. goto fail1;
  388. }
  389. // bind
  390. if (bind(o->sock, &sysaddr.addr.generic, sysaddr.len) < 0) {
  391. BLog(BLOG_ERROR, "bind failed");
  392. goto fail1;
  393. }
  394. // listen
  395. if (listen(o->sock, LISTEN_BACKLOG) < 0) {
  396. BLog(BLOG_ERROR, "listen failed");
  397. goto fail1;
  398. }
  399. GUID guid;
  400. DWORD out_bytes;
  401. // obtain AcceptEx
  402. guid = (GUID)WSAID_ACCEPTEX;
  403. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnAcceptEx, sizeof(o->fnAcceptEx), &out_bytes, NULL, NULL) != 0) {
  404. BLog(BLOG_ERROR, "faild to obtain AcceptEx");
  405. goto fail1;
  406. }
  407. // obtain GetAcceptExSockaddrs
  408. guid = (GUID)WSAID_GETACCEPTEXSOCKADDRS;
  409. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnGetAcceptExSockaddrs, sizeof(o->fnGetAcceptExSockaddrs), &out_bytes, NULL, NULL) != 0) {
  410. BLog(BLOG_ERROR, "faild to obtain GetAcceptExSockaddrs");
  411. goto fail1;
  412. }
  413. // init olap
  414. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)listener_olap_handler);
  415. // init next job
  416. BPending_Init(&o->next_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_next_job_handler, o);
  417. // set not busy
  418. o->busy = 0;
  419. // set not ready
  420. o->ready = 0;
  421. // set next job
  422. BPending_Set(&o->next_job);
  423. DebugObject_Init(&o->d_obj);
  424. return 1;
  425. fail1:
  426. if (closesocket(o->sock) == SOCKET_ERROR) {
  427. BLog(BLOG_ERROR, "closesocket failed");
  428. }
  429. fail0:
  430. return 0;
  431. }
  432. void BListener_Free (BListener *o)
  433. {
  434. DebugObject_Free(&o->d_obj);
  435. // cancel I/O
  436. if (o->busy) {
  437. if (!CancelIo((HANDLE)o->sock)) {
  438. BLog(BLOG_ERROR, "CancelIo failed");
  439. }
  440. }
  441. // close socket
  442. if (closesocket(o->sock) == SOCKET_ERROR) {
  443. BLog(BLOG_ERROR, "closesocket failed");
  444. }
  445. // wait for accept operation to finish
  446. if (o->busy) {
  447. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  448. }
  449. // close new socket
  450. if (o->busy || o->ready) {
  451. if (closesocket(o->newsock) == SOCKET_ERROR) {
  452. BLog(BLOG_ERROR, "closesocket failed");
  453. }
  454. }
  455. // free next job
  456. BPending_Free(&o->next_job);
  457. // free olap
  458. BReactorIOCPOverlapped_Free(&o->olap);
  459. }
  460. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  461. BConnector_handler handler)
  462. {
  463. ASSERT(handler)
  464. BNetwork_Assert();
  465. // init arguments
  466. o->reactor = reactor;
  467. o->user = user;
  468. o->handler = handler;
  469. // check address
  470. if (!BConnection_AddressSupported(addr)) {
  471. BLog(BLOG_ERROR, "address not supported");
  472. goto fail0;
  473. }
  474. // convert address
  475. struct sys_addr sysaddr;
  476. addr_socket_to_sys(&sysaddr, addr);
  477. // create local any address
  478. struct sys_addr local_sysaddr;
  479. addr_any_to_sys(&local_sysaddr, addr.type);
  480. // init socket
  481. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  482. BLog(BLOG_ERROR, "WSASocket failed");
  483. goto fail0;
  484. }
  485. // associate with IOCP
  486. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  487. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  488. goto fail1;
  489. }
  490. // bind socket
  491. if (bind(o->sock, &local_sysaddr.addr.generic, local_sysaddr.len) < 0) {
  492. BLog(BLOG_ERROR, "bind failed");
  493. goto fail1;
  494. }
  495. // obtain ConnectEx
  496. GUID guid = WSAID_CONNECTEX;
  497. DWORD out_bytes;
  498. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnConnectEx, sizeof(o->fnConnectEx), &out_bytes, NULL, NULL) != 0) {
  499. BLog(BLOG_ERROR, "faild to get ConnectEx");
  500. goto fail1;
  501. }
  502. // init olap
  503. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connector_olap_handler);
  504. // start connect operation
  505. BOOL res = o->fnConnectEx(o->sock, &sysaddr.addr.generic, sysaddr.len, NULL, 0, NULL, &o->olap.olap);
  506. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  507. BLog(BLOG_ERROR, "ConnectEx failed (%d)", WSAGetLastError());
  508. goto fail2;
  509. }
  510. // set busy
  511. o->busy = 1;
  512. // set not ready
  513. o->ready = 0;
  514. DebugObject_Init(&o->d_obj);
  515. return 1;
  516. fail2:
  517. BReactorIOCPOverlapped_Free(&o->olap);
  518. fail1:
  519. if (closesocket(o->sock) == SOCKET_ERROR) {
  520. BLog(BLOG_ERROR, "closesocket failed");
  521. }
  522. fail0:
  523. return 0;
  524. }
  525. void BConnector_Free (BConnector *o)
  526. {
  527. DebugObject_Free(&o->d_obj);
  528. if (o->sock != INVALID_SOCKET) {
  529. connector_abort(o);
  530. }
  531. }
  532. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  533. BConnection_handler handler)
  534. {
  535. switch (source.type) {
  536. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  537. BListener *listener = source.u.listener.listener;
  538. DebugObject_Access(&listener->d_obj);
  539. ASSERT(BPending_IsSet(&listener->next_job))
  540. ASSERT(!listener->busy)
  541. ASSERT(listener->ready)
  542. } break;
  543. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  544. BConnector *connector = source.u.connector.connector;
  545. DebugObject_Access(&connector->d_obj);
  546. ASSERT(connector->reactor == reactor)
  547. ASSERT(connector->sock != INVALID_SOCKET)
  548. ASSERT(!connector->busy)
  549. ASSERT(connector->ready)
  550. } break;
  551. default: ASSERT(0);
  552. }
  553. ASSERT(handler)
  554. BNetwork_Assert();
  555. // init arguments
  556. o->reactor = reactor;
  557. o->user = user;
  558. o->handler = handler;
  559. switch (source.type) {
  560. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  561. BListener *listener = source.u.listener.listener;
  562. // grab new socket from listener
  563. o->sock = listener->newsock;
  564. listener->ready = 0;
  565. // associate with IOCP
  566. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  567. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  568. goto fail1;
  569. }
  570. // return address
  571. if (source.u.listener.out_addr) {
  572. struct sockaddr *addr_local;
  573. struct sockaddr *addr_remote;
  574. int len_local;
  575. int len_remote;
  576. listener->fnGetAcceptExSockaddrs(listener->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub),
  577. &addr_local, &len_local, &addr_remote, &len_remote);
  578. struct sys_addr sysaddr;
  579. ASSERT_FORCE(len_remote >= 0)
  580. ASSERT_FORCE(len_remote <= sizeof(sysaddr.addr))
  581. memcpy((uint8_t *)&sysaddr.addr, (uint8_t *)addr_remote, len_remote);
  582. sysaddr.len = len_remote;
  583. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  584. }
  585. } break;
  586. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  587. BConnector *connector = source.u.connector.connector;
  588. // grab fd from connector
  589. o->sock = connector->sock;
  590. connector->sock = INVALID_SOCKET;
  591. // release connector resources
  592. connector_abort(connector);
  593. } break;
  594. }
  595. // set not aborted
  596. o->aborted = 0;
  597. // init send olap
  598. BReactorIOCPOverlapped_Init(&o->send.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_send_olap_handler);
  599. // set send not inited
  600. o->send.inited = 0;
  601. // init recv olap
  602. BReactorIOCPOverlapped_Init(&o->recv.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_recv_olap_handler);
  603. // set recv not closed
  604. o->recv.closed = 0;
  605. // set recv not inited
  606. o->recv.inited = 0;
  607. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  608. DebugObject_Init(&o->d_obj);
  609. return 1;
  610. fail1:
  611. if (closesocket(o->sock) == SOCKET_ERROR) {
  612. BLog(BLOG_ERROR, "closesocket failed");
  613. }
  614. return 0;
  615. }
  616. void BConnection_Free (BConnection *o)
  617. {
  618. DebugObject_Free(&o->d_obj);
  619. DebugError_Free(&o->d_err);
  620. ASSERT(!o->recv.inited)
  621. ASSERT(!o->send.inited)
  622. if (!o->aborted) {
  623. connection_abort(o);
  624. }
  625. }
  626. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  627. {
  628. DebugObject_Access(&o->d_obj);
  629. // set handlers
  630. o->user = user;
  631. o->handler = handler;
  632. }
  633. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  634. {
  635. DebugObject_Access(&o->d_obj);
  636. if (setsockopt(o->sock, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  637. BLog(BLOG_ERROR, "setsockopt failed");
  638. return 0;
  639. }
  640. return 1;
  641. }
  642. void BConnection_SendAsync_Init (BConnection *o)
  643. {
  644. DebugObject_Access(&o->d_obj);
  645. DebugError_AssertNoError(&o->d_err);
  646. ASSERT(!o->aborted)
  647. ASSERT(!o->send.inited)
  648. // init interface
  649. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_iface_handler_send, o, BReactor_PendingGroup(o->reactor));
  650. // set not busy
  651. o->send.busy = 0;
  652. // set inited
  653. o->send.inited = 1;
  654. }
  655. void BConnection_SendAsync_Free (BConnection *o)
  656. {
  657. DebugObject_Access(&o->d_obj);
  658. ASSERT(o->send.inited)
  659. // abort if busy
  660. if (o->send.busy && !o->aborted) {
  661. connection_abort(o);
  662. }
  663. // free interface
  664. StreamPassInterface_Free(&o->send.iface);
  665. // set not inited
  666. o->send.inited = 0;
  667. }
  668. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  669. {
  670. DebugObject_Access(&o->d_obj);
  671. ASSERT(o->send.inited)
  672. return &o->send.iface;
  673. }
  674. void BConnection_RecvAsync_Init (BConnection *o)
  675. {
  676. DebugObject_Access(&o->d_obj);
  677. DebugError_AssertNoError(&o->d_err);
  678. ASSERT(!o->recv.closed)
  679. ASSERT(!o->aborted)
  680. ASSERT(!o->recv.inited)
  681. // init interface
  682. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_iface_handler_recv, o, BReactor_PendingGroup(o->reactor));
  683. // set not busy
  684. o->recv.busy = 0;
  685. // set inited
  686. o->recv.inited = 1;
  687. }
  688. void BConnection_RecvAsync_Free (BConnection *o)
  689. {
  690. DebugObject_Access(&o->d_obj);
  691. ASSERT(o->recv.inited)
  692. // abort if busy
  693. if (o->recv.busy && !o->aborted) {
  694. connection_abort(o);
  695. }
  696. // free interface
  697. StreamRecvInterface_Free(&o->recv.iface);
  698. // set not inited
  699. o->recv.inited = 0;
  700. }
  701. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  702. {
  703. DebugObject_Access(&o->d_obj);
  704. ASSERT(o->recv.inited)
  705. return &o->recv.iface;
  706. }