BConnection_win.c 25 KB


  1. /**
  2. * @file BConnection_win.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <stdlib.h>
  30. #include <limits.h>
  31. #include <base/BLog.h>
  32. #include "BConnection.h"
  33. #include <generated/blog_channel_BConnection.h>
  34. #define LISTEN_BACKLOG 128
  35. struct sys_addr {
  36. int len;
  37. union {
  38. struct sockaddr generic;
  39. struct sockaddr_in ipv4;
  40. struct sockaddr_in6 ipv6;
  41. } addr;
  42. };
  43. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  44. static void addr_any_to_sys (struct sys_addr *out, int family);
  45. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  46. static void listener_next_job_handler (BListener *o);
  47. static void listener_olap_handler (BListener *o, int event, DWORD bytes);
  48. static void connector_olap_handler (BConnector *o, int event, DWORD bytes);
  49. static void connector_abort (BConnector *o);
  50. static void connection_report_error (BConnection *o);
  51. static void connection_abort (BConnection *o);
  52. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len);
  53. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len);
  54. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes);
  55. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes);
  56. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  57. {
  58. switch (addr.type) {
  59. case BADDR_TYPE_IPV4: {
  60. out->len = sizeof(out->addr.ipv4);
  61. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  62. out->addr.ipv4.sin_family = AF_INET;
  63. out->addr.ipv4.sin_port = addr.ipv4.port;
  64. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  65. } break;
  66. case BADDR_TYPE_IPV6: {
  67. out->len = sizeof(out->addr.ipv6);
  68. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  69. out->addr.ipv6.sin6_family = AF_INET6;
  70. out->addr.ipv6.sin6_port = addr.ipv6.port;
  71. out->addr.ipv6.sin6_flowinfo = 0;
  72. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  73. out->addr.ipv6.sin6_scope_id = 0;
  74. } break;
  75. default: ASSERT(0);
  76. }
  77. }
  78. static void addr_any_to_sys (struct sys_addr *out, int family)
  79. {
  80. switch (family) {
  81. case BADDR_TYPE_IPV4: {
  82. out->len = sizeof(out->addr.ipv4);
  83. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  84. out->addr.ipv4.sin_family = AF_INET;
  85. out->addr.ipv4.sin_port = 0;
  86. out->addr.ipv4.sin_addr.s_addr = INADDR_ANY;
  87. } break;
  88. case BADDR_TYPE_IPV6: {
  89. out->len = sizeof(out->addr.ipv6);
  90. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  91. out->addr.ipv6.sin6_family = AF_INET6;
  92. out->addr.ipv6.sin6_port = 0;
  93. out->addr.ipv6.sin6_flowinfo = 0;
  94. struct in6_addr any = IN6ADDR_ANY_INIT;
  95. out->addr.ipv6.sin6_addr = any;
  96. out->addr.ipv6.sin6_scope_id = 0;
  97. } break;
  98. default: ASSERT(0);
  99. }
  100. }
  101. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  102. {
  103. switch (addr.addr.generic.sa_family) {
  104. case AF_INET: {
  105. ASSERT(addr.len == sizeof(struct sockaddr_in))
  106. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  107. } break;
  108. case AF_INET6: {
  109. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  110. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  111. } break;
  112. default: {
  113. BAddr_InitNone(out);
  114. } break;
  115. }
  116. }
  117. static void listener_next_job_handler (BListener *o)
  118. {
  119. DebugObject_Access(&o->d_obj);
  120. ASSERT(!o->busy)
  121. // free ready socket
  122. if (o->ready) {
  123. BLog(BLOG_ERROR, "discarding connection");
  124. // close new socket
  125. if (closesocket(o->newsock) == SOCKET_ERROR) {
  126. BLog(BLOG_ERROR, "closesocket failed");
  127. }
  128. // set not ready
  129. o->ready = 0;
  130. }
  131. // create new socket
  132. if ((o->newsock = WSASocket(o->sys_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  133. BLog(BLOG_ERROR, "WSASocket failed");
  134. goto fail0;
  135. }
  136. // start accept operation
  137. while (1) {
  138. memset(&o->olap.olap, 0, sizeof(o->olap.olap));
  139. DWORD bytes;
  140. BOOL res = o->fnAcceptEx(o->sock, o->newsock, o->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub), &bytes, &o->olap.olap);
  141. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  142. BLog(BLOG_ERROR, "AcceptEx failed");
  143. continue;
  144. }
  145. break;
  146. }
  147. // set busy
  148. o->busy = 1;
  149. return;
  150. fail0:
  151. return;
  152. }
  153. static void listener_olap_handler (BListener *o, int event, DWORD bytes)
  154. {
  155. DebugObject_Access(&o->d_obj);
  156. ASSERT(o->busy)
  157. ASSERT(!o->ready)
  158. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  159. // set not busy
  160. o->busy = 0;
  161. // schedule next accept
  162. BPending_Set(&o->next_job);
  163. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  164. BLog(BLOG_ERROR, "accepting failed");
  165. // close new socket
  166. if (closesocket(o->newsock) == SOCKET_ERROR) {
  167. BLog(BLOG_ERROR, "closesocket failed");
  168. }
  169. return;
  170. }
  171. BLog(BLOG_INFO, "connection accepted");
  172. // set ready
  173. o->ready = 1;
  174. // call handler
  175. o->handler(o->user);
  176. return;
  177. }
  178. static void connector_olap_handler (BConnector *o, int event, DWORD bytes)
  179. {
  180. DebugObject_Access(&o->d_obj);
  181. ASSERT(o->sock != INVALID_SOCKET)
  182. ASSERT(o->busy)
  183. ASSERT(!o->ready)
  184. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  185. // set not busy
  186. o->busy = 0;
  187. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  188. BLog(BLOG_ERROR, "connection failed");
  189. } else {
  190. // set ready
  191. o->ready = 1;
  192. }
  193. // call handler
  194. o->handler(o->user, !o->ready);
  195. return;
  196. }
  197. static void connector_abort (BConnector *o)
  198. {
  199. if (o->sock != INVALID_SOCKET) {
  200. // cancel I/O
  201. if (o->busy) {
  202. if (!CancelIo((HANDLE)o->sock)) {
  203. BLog(BLOG_ERROR, "CancelIo failed");
  204. }
  205. }
  206. // close socket
  207. if (closesocket(o->sock) == SOCKET_ERROR) {
  208. BLog(BLOG_ERROR, "closesocket failed");
  209. }
  210. }
  211. // wait for connect operation to finish
  212. if (o->busy) {
  213. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  214. }
  215. // free olap
  216. BReactorIOCPOverlapped_Free(&o->olap);
  217. }
  218. static void connection_report_error (BConnection *o)
  219. {
  220. DebugError_AssertNoError(&o->d_err);
  221. ASSERT(o->handler)
  222. // report error
  223. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  224. return;
  225. }
  226. static void connection_abort (BConnection *o)
  227. {
  228. ASSERT(!o->aborted)
  229. // cancel I/O
  230. if ((o->recv.inited && o->recv.busy) || (o->send.inited && o->send.busy)) {
  231. if (!CancelIo((HANDLE)o->sock)) {
  232. BLog(BLOG_ERROR, "CancelIo failed");
  233. }
  234. }
  235. // close socket
  236. if (closesocket(o->sock) == SOCKET_ERROR) {
  237. BLog(BLOG_ERROR, "closesocket failed");
  238. }
  239. // wait for receiving to complete
  240. if (o->recv.inited && o->recv.busy) {
  241. BReactorIOCPOverlapped_Wait(&o->recv.olap, NULL, NULL);
  242. }
  243. // wait for sending to complete
  244. if (o->send.inited && o->send.busy) {
  245. BReactorIOCPOverlapped_Wait(&o->send.olap, NULL, NULL);
  246. }
  247. // free recv olap
  248. BReactorIOCPOverlapped_Free(&o->recv.olap);
  249. // free send olap
  250. BReactorIOCPOverlapped_Free(&o->send.olap);
  251. // set aborted
  252. o->aborted = 1;
  253. }
  254. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len)
  255. {
  256. DebugObject_Access(&o->d_obj);
  257. DebugError_AssertNoError(&o->d_err);
  258. ASSERT(!o->aborted)
  259. ASSERT(o->send.inited)
  260. ASSERT(!o->send.busy)
  261. ASSERT(data_len > 0)
  262. if (data_len > ULONG_MAX) {
  263. data_len = ULONG_MAX;
  264. }
  265. WSABUF buf;
  266. buf.buf = (char *)data;
  267. buf.len = data_len;
  268. memset(&o->send.olap.olap, 0, sizeof(o->send.olap.olap));
  269. // send
  270. int res = WSASend(o->sock, &buf, 1, NULL, 0, &o->send.olap.olap, NULL);
  271. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  272. BLog(BLOG_ERROR, "WSASend failed (%d)", WSAGetLastError());
  273. connection_report_error(o);
  274. return;
  275. }
  276. // set busy
  277. o->send.busy = 1;
  278. o->send.busy_data_len = data_len;
  279. }
  280. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len)
  281. {
  282. DebugObject_Access(&o->d_obj);
  283. DebugError_AssertNoError(&o->d_err);
  284. ASSERT(!o->recv.closed)
  285. ASSERT(!o->aborted)
  286. ASSERT(o->recv.inited)
  287. ASSERT(!o->recv.busy)
  288. ASSERT(data_len > 0)
  289. if (data_len > ULONG_MAX) {
  290. data_len = ULONG_MAX;
  291. }
  292. WSABUF buf;
  293. buf.buf = (char *)data;
  294. buf.len = data_len;
  295. memset(&o->recv.olap.olap, 0, sizeof(o->recv.olap.olap));
  296. // recv
  297. DWORD flags = 0;
  298. int res = WSARecv(o->sock, &buf, 1, NULL, &flags, &o->recv.olap.olap, NULL);
  299. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  300. BLog(BLOG_ERROR, "WSARecv failed (%d)", WSAGetLastError());
  301. connection_report_error(o);
  302. return;
  303. }
  304. // set busy
  305. o->recv.busy = 1;
  306. o->recv.busy_data_len = data_len;
  307. }
  308. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes)
  309. {
  310. DebugObject_Access(&o->d_obj);
  311. DebugError_AssertNoError(&o->d_err);
  312. ASSERT(!o->aborted)
  313. ASSERT(o->send.inited)
  314. ASSERT(o->send.busy)
  315. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  316. // set not busy
  317. o->send.busy = 0;
  318. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  319. BLog(BLOG_ERROR, "sending failed");
  320. connection_report_error(o);
  321. return;
  322. }
  323. ASSERT(bytes > 0)
  324. ASSERT(bytes <= o->send.busy_data_len)
  325. // done
  326. StreamPassInterface_Done(&o->send.iface, bytes);
  327. }
  328. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes)
  329. {
  330. DebugObject_Access(&o->d_obj);
  331. DebugError_AssertNoError(&o->d_err);
  332. ASSERT(!o->recv.closed)
  333. ASSERT(!o->aborted)
  334. ASSERT(o->recv.inited)
  335. ASSERT(o->recv.busy)
  336. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  337. // set not busy
  338. o->recv.busy = 0;
  339. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  340. BLog(BLOG_ERROR, "receiving failed");
  341. connection_report_error(o);
  342. return;
  343. }
  344. if (bytes == 0) {
  345. // set closed
  346. o->recv.closed = 1;
  347. // report recv closed
  348. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  349. return;
  350. }
  351. ASSERT(bytes > 0)
  352. ASSERT(bytes <= o->recv.busy_data_len)
  353. // done
  354. StreamRecvInterface_Done(&o->recv.iface, bytes);
  355. }
  356. int BConnection_AddressSupported (BAddr addr)
  357. {
  358. BAddr_Assert(&addr);
  359. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  360. }
  361. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  362. BListener_handler handler)
  363. {
  364. ASSERT(handler)
  365. BNetwork_Assert();
  366. // init arguments
  367. o->reactor = reactor;
  368. o->user = user;
  369. o->handler = handler;
  370. // check address
  371. if (!BConnection_AddressSupported(addr)) {
  372. BLog(BLOG_ERROR, "address not supported");
  373. goto fail0;
  374. }
  375. // convert address
  376. struct sys_addr sysaddr;
  377. addr_socket_to_sys(&sysaddr, addr);
  378. // remember family
  379. o->sys_family = sysaddr.addr.generic.sa_family;
  380. // init socket
  381. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  382. BLog(BLOG_ERROR, "WSASocket failed");
  383. goto fail0;
  384. }
  385. // associate with IOCP
  386. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  387. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  388. goto fail1;
  389. }
  390. // bind
  391. if (bind(o->sock, &sysaddr.addr.generic, sysaddr.len) < 0) {
  392. BLog(BLOG_ERROR, "bind failed");
  393. goto fail1;
  394. }
  395. // listen
  396. if (listen(o->sock, LISTEN_BACKLOG) < 0) {
  397. BLog(BLOG_ERROR, "listen failed");
  398. goto fail1;
  399. }
  400. DWORD out_bytes;
  401. // obtain AcceptEx
  402. GUID guid1 = WSAID_ACCEPTEX;
  403. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, sizeof(guid1), &o->fnAcceptEx, sizeof(o->fnAcceptEx), &out_bytes, NULL, NULL) != 0) {
  404. BLog(BLOG_ERROR, "faild to obtain AcceptEx");
  405. goto fail1;
  406. }
  407. // obtain GetAcceptExSockaddrs
  408. GUID guid2 = WSAID_GETACCEPTEXSOCKADDRS;
  409. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, sizeof(guid2), &o->fnGetAcceptExSockaddrs, sizeof(o->fnGetAcceptExSockaddrs), &out_bytes, NULL, NULL) != 0) {
  410. BLog(BLOG_ERROR, "faild to obtain GetAcceptExSockaddrs");
  411. goto fail1;
  412. }
  413. // init olap
  414. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)listener_olap_handler);
  415. // init next job
  416. BPending_Init(&o->next_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_next_job_handler, o);
  417. // set not busy
  418. o->busy = 0;
  419. // set not ready
  420. o->ready = 0;
  421. // set next job
  422. BPending_Set(&o->next_job);
  423. DebugObject_Init(&o->d_obj);
  424. return 1;
  425. fail1:
  426. if (closesocket(o->sock) == SOCKET_ERROR) {
  427. BLog(BLOG_ERROR, "closesocket failed");
  428. }
  429. fail0:
  430. return 0;
  431. }
  432. void BListener_Free (BListener *o)
  433. {
  434. DebugObject_Free(&o->d_obj);
  435. // cancel I/O
  436. if (o->busy) {
  437. if (!CancelIo((HANDLE)o->sock)) {
  438. BLog(BLOG_ERROR, "CancelIo failed");
  439. }
  440. }
  441. // close socket
  442. if (closesocket(o->sock) == SOCKET_ERROR) {
  443. BLog(BLOG_ERROR, "closesocket failed");
  444. }
  445. // wait for accept operation to finish
  446. if (o->busy) {
  447. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  448. }
  449. // close new socket
  450. if (o->busy || o->ready) {
  451. if (closesocket(o->newsock) == SOCKET_ERROR) {
  452. BLog(BLOG_ERROR, "closesocket failed");
  453. }
  454. }
  455. // free next job
  456. BPending_Free(&o->next_job);
  457. // free olap
  458. BReactorIOCPOverlapped_Free(&o->olap);
  459. }
  460. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  461. BConnector_handler handler)
  462. {
  463. ASSERT(handler)
  464. BNetwork_Assert();
  465. // init arguments
  466. o->reactor = reactor;
  467. o->user = user;
  468. o->handler = handler;
  469. // check address
  470. if (!BConnection_AddressSupported(addr)) {
  471. BLog(BLOG_ERROR, "address not supported");
  472. goto fail0;
  473. }
  474. // convert address
  475. struct sys_addr sysaddr;
  476. addr_socket_to_sys(&sysaddr, addr);
  477. // create local any address
  478. struct sys_addr local_sysaddr;
  479. addr_any_to_sys(&local_sysaddr, addr.type);
  480. // init socket
  481. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  482. BLog(BLOG_ERROR, "WSASocket failed");
  483. goto fail0;
  484. }
  485. // associate with IOCP
  486. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  487. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  488. goto fail1;
  489. }
  490. // bind socket
  491. if (bind(o->sock, &local_sysaddr.addr.generic, local_sysaddr.len) < 0) {
  492. BLog(BLOG_ERROR, "bind failed");
  493. goto fail1;
  494. }
  495. // obtain ConnectEx
  496. GUID guid = WSAID_CONNECTEX;
  497. DWORD out_bytes;
  498. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnConnectEx, sizeof(o->fnConnectEx), &out_bytes, NULL, NULL) != 0) {
  499. BLog(BLOG_ERROR, "faild to get ConnectEx");
  500. goto fail1;
  501. }
  502. // init olap
  503. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connector_olap_handler);
  504. // start connect operation
  505. BOOL res = o->fnConnectEx(o->sock, &sysaddr.addr.generic, sysaddr.len, NULL, 0, NULL, &o->olap.olap);
  506. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  507. BLog(BLOG_ERROR, "ConnectEx failed (%d)", WSAGetLastError());
  508. goto fail2;
  509. }
  510. // set busy
  511. o->busy = 1;
  512. // set not ready
  513. o->ready = 0;
  514. DebugObject_Init(&o->d_obj);
  515. return 1;
  516. fail2:
  517. BReactorIOCPOverlapped_Free(&o->olap);
  518. fail1:
  519. if (closesocket(o->sock) == SOCKET_ERROR) {
  520. BLog(BLOG_ERROR, "closesocket failed");
  521. }
  522. fail0:
  523. return 0;
  524. }
  525. void BConnector_Free (BConnector *o)
  526. {
  527. DebugObject_Free(&o->d_obj);
  528. if (o->sock != INVALID_SOCKET) {
  529. connector_abort(o);
  530. }
  531. }
  532. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  533. BConnection_handler handler)
  534. {
  535. switch (source.type) {
  536. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  537. BListener *listener = source.u.listener.listener;
  538. DebugObject_Access(&listener->d_obj);
  539. ASSERT(BPending_IsSet(&listener->next_job))
  540. ASSERT(!listener->busy)
  541. ASSERT(listener->ready)
  542. } break;
  543. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  544. BConnector *connector = source.u.connector.connector;
  545. DebugObject_Access(&connector->d_obj);
  546. ASSERT(connector->reactor == reactor)
  547. ASSERT(connector->sock != INVALID_SOCKET)
  548. ASSERT(!connector->busy)
  549. ASSERT(connector->ready)
  550. } break;
  551. default: ASSERT(0);
  552. }
  553. ASSERT(handler)
  554. BNetwork_Assert();
  555. // init arguments
  556. o->reactor = reactor;
  557. o->user = user;
  558. o->handler = handler;
  559. switch (source.type) {
  560. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  561. BListener *listener = source.u.listener.listener;
  562. // grab new socket from listener
  563. o->sock = listener->newsock;
  564. listener->ready = 0;
  565. // associate with IOCP
  566. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  567. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  568. goto fail1;
  569. }
  570. // return address
  571. if (source.u.listener.out_addr) {
  572. struct sockaddr *addr_local;
  573. struct sockaddr *addr_remote;
  574. int len_local;
  575. int len_remote;
  576. listener->fnGetAcceptExSockaddrs(listener->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub),
  577. &addr_local, &len_local, &addr_remote, &len_remote);
  578. struct sys_addr sysaddr;
  579. ASSERT_FORCE(len_remote >= 0)
  580. ASSERT_FORCE(len_remote <= sizeof(sysaddr.addr))
  581. memcpy((uint8_t *)&sysaddr.addr, (uint8_t *)addr_remote, len_remote);
  582. sysaddr.len = len_remote;
  583. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  584. }
  585. } break;
  586. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  587. BConnector *connector = source.u.connector.connector;
  588. // grab fd from connector
  589. o->sock = connector->sock;
  590. connector->sock = INVALID_SOCKET;
  591. // release connector resources
  592. connector_abort(connector);
  593. } break;
  594. }
  595. // set not aborted
  596. o->aborted = 0;
  597. // init send olap
  598. BReactorIOCPOverlapped_Init(&o->send.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_send_olap_handler);
  599. // set send not inited
  600. o->send.inited = 0;
  601. // init recv olap
  602. BReactorIOCPOverlapped_Init(&o->recv.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_recv_olap_handler);
  603. // set recv not closed
  604. o->recv.closed = 0;
  605. // set recv not inited
  606. o->recv.inited = 0;
  607. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  608. DebugObject_Init(&o->d_obj);
  609. return 1;
  610. fail1:
  611. if (closesocket(o->sock) == SOCKET_ERROR) {
  612. BLog(BLOG_ERROR, "closesocket failed");
  613. }
  614. return 0;
  615. }
  616. void BConnection_Free (BConnection *o)
  617. {
  618. DebugObject_Free(&o->d_obj);
  619. DebugError_Free(&o->d_err);
  620. ASSERT(!o->recv.inited)
  621. ASSERT(!o->send.inited)
  622. if (!o->aborted) {
  623. connection_abort(o);
  624. }
  625. }
  626. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  627. {
  628. DebugObject_Access(&o->d_obj);
  629. // set handlers
  630. o->user = user;
  631. o->handler = handler;
  632. }
  633. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  634. {
  635. DebugObject_Access(&o->d_obj);
  636. if (setsockopt(o->sock, SOL_SOCKET, SO_SNDBUF, (char *)&buf_size, sizeof(buf_size)) < 0) {
  637. BLog(BLOG_ERROR, "setsockopt failed");
  638. return 0;
  639. }
  640. return 1;
  641. }
  642. void BConnection_SendAsync_Init (BConnection *o)
  643. {
  644. DebugObject_Access(&o->d_obj);
  645. DebugError_AssertNoError(&o->d_err);
  646. ASSERT(!o->aborted)
  647. ASSERT(!o->send.inited)
  648. // init interface
  649. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_iface_handler_send, o, BReactor_PendingGroup(o->reactor));
  650. // set not busy
  651. o->send.busy = 0;
  652. // set inited
  653. o->send.inited = 1;
  654. }
  655. void BConnection_SendAsync_Free (BConnection *o)
  656. {
  657. DebugObject_Access(&o->d_obj);
  658. ASSERT(o->send.inited)
  659. // abort if busy
  660. if (o->send.busy && !o->aborted) {
  661. connection_abort(o);
  662. }
  663. // free interface
  664. StreamPassInterface_Free(&o->send.iface);
  665. // set not inited
  666. o->send.inited = 0;
  667. }
  668. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  669. {
  670. DebugObject_Access(&o->d_obj);
  671. ASSERT(o->send.inited)
  672. return &o->send.iface;
  673. }
  674. void BConnection_RecvAsync_Init (BConnection *o)
  675. {
  676. DebugObject_Access(&o->d_obj);
  677. DebugError_AssertNoError(&o->d_err);
  678. ASSERT(!o->recv.closed)
  679. ASSERT(!o->aborted)
  680. ASSERT(!o->recv.inited)
  681. // init interface
  682. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_iface_handler_recv, o, BReactor_PendingGroup(o->reactor));
  683. // set not busy
  684. o->recv.busy = 0;
  685. // set inited
  686. o->recv.inited = 1;
  687. }
  688. void BConnection_RecvAsync_Free (BConnection *o)
  689. {
  690. DebugObject_Access(&o->d_obj);
  691. ASSERT(o->recv.inited)
  692. // abort if busy
  693. if (o->recv.busy && !o->aborted) {
  694. connection_abort(o);
  695. }
  696. // free interface
  697. StreamRecvInterface_Free(&o->recv.iface);
  698. // set not inited
  699. o->recv.inited = 0;
  700. }
  701. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  702. {
  703. DebugObject_Access(&o->d_obj);
  704. ASSERT(o->recv.inited)
  705. return &o->recv.iface;
  706. }