BConnection_win.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. /**
  2. * @file BConnection_win.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <stdlib.h>
  30. #include <limits.h>
  31. #include <base/BLog.h>
  32. #include "BConnection.h"
  33. #include <generated/blog_channel_BConnection.h>
  34. #define LISTEN_BACKLOG 128
  35. struct sys_addr {
  36. int len;
  37. union {
  38. struct sockaddr generic;
  39. struct sockaddr_in ipv4;
  40. struct sockaddr_in6 ipv6;
  41. } addr;
  42. };
  43. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  44. static void addr_any_to_sys (struct sys_addr *out, int family);
  45. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  46. static void listener_next_job_handler (BListener *o);
  47. static void listener_olap_handler (BListener *o, int event, DWORD bytes);
  48. static void connector_olap_handler (BConnector *o, int event, DWORD bytes);
  49. static void connector_abort (BConnector *o);
  50. static void connection_report_error (BConnection *o);
  51. static void connection_abort (BConnection *o);
  52. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len);
  53. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len);
  54. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes);
  55. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes);
  56. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  57. {
  58. switch (addr.type) {
  59. case BADDR_TYPE_IPV4: {
  60. out->len = sizeof(out->addr.ipv4);
  61. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  62. out->addr.ipv4.sin_family = AF_INET;
  63. out->addr.ipv4.sin_port = addr.ipv4.port;
  64. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  65. } break;
  66. case BADDR_TYPE_IPV6: {
  67. out->len = sizeof(out->addr.ipv6);
  68. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  69. out->addr.ipv6.sin6_family = AF_INET6;
  70. out->addr.ipv6.sin6_port = addr.ipv6.port;
  71. out->addr.ipv6.sin6_flowinfo = 0;
  72. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  73. out->addr.ipv6.sin6_scope_id = 0;
  74. } break;
  75. default: ASSERT(0);
  76. }
  77. }
  78. static void addr_any_to_sys (struct sys_addr *out, int family)
  79. {
  80. switch (family) {
  81. case BADDR_TYPE_IPV4: {
  82. out->len = sizeof(out->addr.ipv4);
  83. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  84. out->addr.ipv4.sin_family = AF_INET;
  85. out->addr.ipv4.sin_port = 0;
  86. out->addr.ipv4.sin_addr.s_addr = INADDR_ANY;
  87. } break;
  88. case BADDR_TYPE_IPV6: {
  89. out->len = sizeof(out->addr.ipv6);
  90. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  91. out->addr.ipv6.sin6_family = AF_INET6;
  92. out->addr.ipv6.sin6_port = 0;
  93. out->addr.ipv6.sin6_flowinfo = 0;
  94. out->addr.ipv6.sin6_addr = (struct in6_addr)IN6ADDR_ANY_INIT;
  95. out->addr.ipv6.sin6_scope_id = 0;
  96. } break;
  97. default: ASSERT(0);
  98. }
  99. }
  100. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  101. {
  102. switch (addr.addr.generic.sa_family) {
  103. case AF_INET: {
  104. ASSERT(addr.len == sizeof(struct sockaddr_in))
  105. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  106. } break;
  107. case AF_INET6: {
  108. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  109. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  110. } break;
  111. default: {
  112. BAddr_InitNone(out);
  113. } break;
  114. }
  115. }
  116. static void listener_next_job_handler (BListener *o)
  117. {
  118. DebugObject_Access(&o->d_obj);
  119. ASSERT(!o->busy)
  120. // free ready socket
  121. if (o->ready) {
  122. BLog(BLOG_ERROR, "discarding connection");
  123. // close new socket
  124. if (closesocket(o->newsock) == SOCKET_ERROR) {
  125. BLog(BLOG_ERROR, "closesocket failed");
  126. }
  127. // set not ready
  128. o->ready = 0;
  129. }
  130. // create new socket
  131. if ((o->newsock = WSASocket(o->sys_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  132. BLog(BLOG_ERROR, "WSASocket failed");
  133. goto fail0;
  134. }
  135. // start accept operation
  136. while (1) {
  137. memset(&o->olap.olap, 0, sizeof(o->olap.olap));
  138. DWORD bytes;
  139. BOOL res = o->fnAcceptEx(o->sock, o->newsock, o->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub), &bytes, &o->olap.olap);
  140. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  141. BLog(BLOG_ERROR, "AcceptEx failed");
  142. continue;
  143. }
  144. break;
  145. }
  146. // set busy
  147. o->busy = 1;
  148. return;
  149. fail0:
  150. return;
  151. }
  152. static void listener_olap_handler (BListener *o, int event, DWORD bytes)
  153. {
  154. DebugObject_Access(&o->d_obj);
  155. ASSERT(o->busy)
  156. ASSERT(!o->ready)
  157. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  158. // set not busy
  159. o->busy = 0;
  160. // schedule next accept
  161. BPending_Set(&o->next_job);
  162. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  163. BLog(BLOG_ERROR, "accepting failed");
  164. // close new socket
  165. if (closesocket(o->newsock) == SOCKET_ERROR) {
  166. BLog(BLOG_ERROR, "closesocket failed");
  167. }
  168. return;
  169. }
  170. BLog(BLOG_INFO, "connection accepted");
  171. // set ready
  172. o->ready = 1;
  173. // call handler
  174. o->handler(o->user);
  175. return;
  176. }
  177. static void connector_olap_handler (BConnector *o, int event, DWORD bytes)
  178. {
  179. DebugObject_Access(&o->d_obj);
  180. ASSERT(o->sock != INVALID_SOCKET)
  181. ASSERT(o->busy)
  182. ASSERT(!o->ready)
  183. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  184. // set not busy
  185. o->busy = 0;
  186. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  187. BLog(BLOG_ERROR, "connection failed");
  188. } else {
  189. // set ready
  190. o->ready = 1;
  191. }
  192. // call handler
  193. o->handler(o->user, !o->ready);
  194. return;
  195. }
  196. static void connector_abort (BConnector *o)
  197. {
  198. if (o->sock != INVALID_SOCKET) {
  199. // cancel I/O
  200. if (o->busy) {
  201. if (!CancelIo((HANDLE)o->sock)) {
  202. BLog(BLOG_ERROR, "CancelIo failed");
  203. }
  204. }
  205. // close socket
  206. if (closesocket(o->sock) == SOCKET_ERROR) {
  207. BLog(BLOG_ERROR, "closesocket failed");
  208. }
  209. }
  210. // wait for connect operation to finish
  211. if (o->busy) {
  212. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  213. }
  214. // free olap
  215. BReactorIOCPOverlapped_Free(&o->olap);
  216. }
  217. static void connection_report_error (BConnection *o)
  218. {
  219. DebugError_AssertNoError(&o->d_err);
  220. ASSERT(o->handler)
  221. // report error
  222. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  223. return;
  224. }
  225. static void connection_abort (BConnection *o)
  226. {
  227. ASSERT(!o->aborted)
  228. // cancel I/O
  229. if ((o->recv.inited && o->recv.busy) || (o->send.inited && o->send.busy)) {
  230. if (!CancelIo((HANDLE)o->sock)) {
  231. BLog(BLOG_ERROR, "CancelIo failed");
  232. }
  233. }
  234. // close socket
  235. if (closesocket(o->sock) == SOCKET_ERROR) {
  236. BLog(BLOG_ERROR, "closesocket failed");
  237. }
  238. // wait for receiving to complete
  239. if (o->recv.inited && o->recv.busy) {
  240. BReactorIOCPOverlapped_Wait(&o->recv.olap, NULL, NULL);
  241. }
  242. // wait for sending to complete
  243. if (o->send.inited && o->send.busy) {
  244. BReactorIOCPOverlapped_Wait(&o->send.olap, NULL, NULL);
  245. }
  246. // free recv olap
  247. BReactorIOCPOverlapped_Free(&o->recv.olap);
  248. // free send olap
  249. BReactorIOCPOverlapped_Free(&o->send.olap);
  250. // set aborted
  251. o->aborted = 1;
  252. }
  253. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len)
  254. {
  255. DebugObject_Access(&o->d_obj);
  256. DebugError_AssertNoError(&o->d_err);
  257. ASSERT(!o->aborted)
  258. ASSERT(o->send.inited)
  259. ASSERT(!o->send.busy)
  260. ASSERT(data_len > 0)
  261. if (data_len > ULONG_MAX) {
  262. data_len = ULONG_MAX;
  263. }
  264. WSABUF buf;
  265. buf.buf = data;
  266. buf.len = data_len;
  267. memset(&o->send.olap.olap, 0, sizeof(o->send.olap.olap));
  268. // send
  269. int res = WSASend(o->sock, &buf, 1, NULL, 0, &o->send.olap.olap, NULL);
  270. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  271. BLog(BLOG_ERROR, "WSASend failed (%d)", WSAGetLastError());
  272. connection_report_error(o);
  273. return;
  274. }
  275. // set busy
  276. o->send.busy = 1;
  277. o->send.busy_data_len = data_len;
  278. }
  279. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len)
  280. {
  281. DebugObject_Access(&o->d_obj);
  282. DebugError_AssertNoError(&o->d_err);
  283. ASSERT(!o->recv.closed)
  284. ASSERT(!o->aborted)
  285. ASSERT(o->recv.inited)
  286. ASSERT(!o->recv.busy)
  287. ASSERT(data_len > 0)
  288. if (data_len > ULONG_MAX) {
  289. data_len = ULONG_MAX;
  290. }
  291. WSABUF buf;
  292. buf.buf = data;
  293. buf.len = data_len;
  294. memset(&o->recv.olap.olap, 0, sizeof(o->recv.olap.olap));
  295. // recv
  296. DWORD flags = 0;
  297. int res = WSARecv(o->sock, &buf, 1, NULL, &flags, &o->recv.olap.olap, NULL);
  298. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  299. BLog(BLOG_ERROR, "WSARecv failed (%d)", WSAGetLastError());
  300. connection_report_error(o);
  301. return;
  302. }
  303. // set busy
  304. o->recv.busy = 1;
  305. o->recv.busy_data_len = data_len;
  306. }
  307. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes)
  308. {
  309. DebugObject_Access(&o->d_obj);
  310. DebugError_AssertNoError(&o->d_err);
  311. ASSERT(!o->aborted)
  312. ASSERT(o->send.inited)
  313. ASSERT(o->send.busy)
  314. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  315. // set not busy
  316. o->send.busy = 0;
  317. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  318. BLog(BLOG_ERROR, "sending failed");
  319. connection_report_error(o);
  320. return;
  321. }
  322. ASSERT(bytes > 0)
  323. ASSERT(bytes <= o->send.busy_data_len)
  324. // done
  325. StreamPassInterface_Done(&o->send.iface, bytes);
  326. }
  327. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes)
  328. {
  329. DebugObject_Access(&o->d_obj);
  330. DebugError_AssertNoError(&o->d_err);
  331. ASSERT(!o->recv.closed)
  332. ASSERT(!o->aborted)
  333. ASSERT(o->recv.inited)
  334. ASSERT(o->recv.busy)
  335. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  336. // set not busy
  337. o->recv.busy = 0;
  338. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  339. BLog(BLOG_ERROR, "receiving failed");
  340. connection_report_error(o);
  341. return;
  342. }
  343. if (bytes == 0) {
  344. // set closed
  345. o->recv.closed = 1;
  346. // report recv closed
  347. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  348. return;
  349. }
  350. ASSERT(bytes > 0)
  351. ASSERT(bytes <= o->recv.busy_data_len)
  352. // done
  353. StreamRecvInterface_Done(&o->recv.iface, bytes);
  354. }
  355. int BConnection_AddressSupported (BAddr addr)
  356. {
  357. BAddr_Assert(&addr);
  358. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  359. }
  360. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  361. BListener_handler handler)
  362. {
  363. ASSERT(BConnection_AddressSupported(addr))
  364. ASSERT(handler)
  365. BNetwork_Assert();
  366. // init arguments
  367. o->reactor = reactor;
  368. o->user = user;
  369. o->handler = handler;
  370. // convert address
  371. struct sys_addr sysaddr;
  372. addr_socket_to_sys(&sysaddr, addr);
  373. // remember family
  374. o->sys_family = sysaddr.addr.generic.sa_family;
  375. // init socket
  376. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  377. BLog(BLOG_ERROR, "WSASocket failed");
  378. goto fail0;
  379. }
  380. // associate with IOCP
  381. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  382. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  383. goto fail1;
  384. }
  385. // bind
  386. if (bind(o->sock, &sysaddr.addr.generic, sysaddr.len) < 0) {
  387. BLog(BLOG_ERROR, "bind failed");
  388. goto fail1;
  389. }
  390. // listen
  391. if (listen(o->sock, LISTEN_BACKLOG) < 0) {
  392. BLog(BLOG_ERROR, "listen failed");
  393. goto fail1;
  394. }
  395. GUID guid;
  396. DWORD out_bytes;
  397. // obtain AcceptEx
  398. guid = (GUID)WSAID_ACCEPTEX;
  399. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnAcceptEx, sizeof(o->fnAcceptEx), &out_bytes, NULL, NULL) != 0) {
  400. BLog(BLOG_ERROR, "faild to obtain AcceptEx");
  401. goto fail1;
  402. }
  403. // obtain GetAcceptExSockaddrs
  404. guid = (GUID)WSAID_GETACCEPTEXSOCKADDRS;
  405. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnGetAcceptExSockaddrs, sizeof(o->fnGetAcceptExSockaddrs), &out_bytes, NULL, NULL) != 0) {
  406. BLog(BLOG_ERROR, "faild to obtain GetAcceptExSockaddrs");
  407. goto fail1;
  408. }
  409. // init olap
  410. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)listener_olap_handler);
  411. // init next job
  412. BPending_Init(&o->next_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_next_job_handler, o);
  413. // set not busy
  414. o->busy = 0;
  415. // set not ready
  416. o->ready = 0;
  417. // set next job
  418. BPending_Set(&o->next_job);
  419. DebugObject_Init(&o->d_obj);
  420. return 1;
  421. fail1:
  422. if (closesocket(o->sock) == SOCKET_ERROR) {
  423. BLog(BLOG_ERROR, "closesocket failed");
  424. }
  425. fail0:
  426. return 0;
  427. }
  428. void BListener_Free (BListener *o)
  429. {
  430. DebugObject_Free(&o->d_obj);
  431. // cancel I/O
  432. if (o->busy) {
  433. if (!CancelIo((HANDLE)o->sock)) {
  434. BLog(BLOG_ERROR, "CancelIo failed");
  435. }
  436. }
  437. // close socket
  438. if (closesocket(o->sock) == SOCKET_ERROR) {
  439. BLog(BLOG_ERROR, "closesocket failed");
  440. }
  441. // wait for accept operation to finish
  442. if (o->busy) {
  443. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  444. }
  445. // close new socket
  446. if (o->busy || o->ready) {
  447. if (closesocket(o->newsock) == SOCKET_ERROR) {
  448. BLog(BLOG_ERROR, "closesocket failed");
  449. }
  450. }
  451. // free next job
  452. BPending_Free(&o->next_job);
  453. // free olap
  454. BReactorIOCPOverlapped_Free(&o->olap);
  455. }
  456. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  457. BConnector_handler handler)
  458. {
  459. ASSERT(BConnection_AddressSupported(addr))
  460. ASSERT(handler)
  461. BNetwork_Assert();
  462. // init arguments
  463. o->reactor = reactor;
  464. o->user = user;
  465. o->handler = handler;
  466. // convert address
  467. struct sys_addr sysaddr;
  468. addr_socket_to_sys(&sysaddr, addr);
  469. // create local any address
  470. struct sys_addr local_sysaddr;
  471. addr_any_to_sys(&local_sysaddr, addr.type);
  472. // init socket
  473. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  474. BLog(BLOG_ERROR, "WSASocket failed");
  475. goto fail0;
  476. }
  477. // associate with IOCP
  478. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  479. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  480. goto fail1;
  481. }
  482. // bind socket
  483. if (bind(o->sock, &local_sysaddr.addr.generic, local_sysaddr.len) < 0) {
  484. BLog(BLOG_ERROR, "bind failed");
  485. goto fail1;
  486. }
  487. // obtain ConnectEx
  488. GUID guid = WSAID_CONNECTEX;
  489. DWORD out_bytes;
  490. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnConnectEx, sizeof(o->fnConnectEx), &out_bytes, NULL, NULL) != 0) {
  491. BLog(BLOG_ERROR, "faild to get ConnectEx");
  492. goto fail1;
  493. }
  494. // init olap
  495. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connector_olap_handler);
  496. // start connect operation
  497. BOOL res = o->fnConnectEx(o->sock, &sysaddr.addr.generic, sysaddr.len, NULL, 0, NULL, &o->olap.olap);
  498. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  499. BLog(BLOG_ERROR, "ConnectEx failed (%d)", WSAGetLastError());
  500. goto fail2;
  501. }
  502. // set busy
  503. o->busy = 1;
  504. // set not ready
  505. o->ready = 0;
  506. DebugObject_Init(&o->d_obj);
  507. return 1;
  508. fail2:
  509. BReactorIOCPOverlapped_Free(&o->olap);
  510. fail1:
  511. if (closesocket(o->sock) == SOCKET_ERROR) {
  512. BLog(BLOG_ERROR, "closesocket failed");
  513. }
  514. fail0:
  515. return 0;
  516. }
  517. void BConnector_Free (BConnector *o)
  518. {
  519. DebugObject_Free(&o->d_obj);
  520. if (o->sock != INVALID_SOCKET) {
  521. connector_abort(o);
  522. }
  523. }
  524. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  525. BConnection_handler handler)
  526. {
  527. switch (source.type) {
  528. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  529. BListener *listener = source.u.listener.listener;
  530. DebugObject_Access(&listener->d_obj);
  531. ASSERT(BPending_IsSet(&listener->next_job))
  532. ASSERT(!listener->busy)
  533. ASSERT(listener->ready)
  534. } break;
  535. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  536. BConnector *connector = source.u.connector.connector;
  537. DebugObject_Access(&connector->d_obj);
  538. ASSERT(connector->reactor == reactor)
  539. ASSERT(connector->sock != INVALID_SOCKET)
  540. ASSERT(!connector->busy)
  541. ASSERT(connector->ready)
  542. } break;
  543. default: ASSERT(0);
  544. }
  545. ASSERT(handler)
  546. BNetwork_Assert();
  547. // init arguments
  548. o->reactor = reactor;
  549. o->user = user;
  550. o->handler = handler;
  551. switch (source.type) {
  552. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  553. BListener *listener = source.u.listener.listener;
  554. // grab new socket from listener
  555. o->sock = listener->newsock;
  556. listener->ready = 0;
  557. // associate with IOCP
  558. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  559. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  560. goto fail1;
  561. }
  562. // return address
  563. if (source.u.listener.out_addr) {
  564. struct sockaddr *addr_local;
  565. struct sockaddr *addr_remote;
  566. int len_local;
  567. int len_remote;
  568. listener->fnGetAcceptExSockaddrs(listener->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub),
  569. &addr_local, &len_local, &addr_remote, &len_remote);
  570. struct sys_addr sysaddr;
  571. ASSERT_FORCE(len_remote >= 0)
  572. ASSERT_FORCE(len_remote <= sizeof(sysaddr.addr))
  573. memcpy((uint8_t *)&sysaddr.addr, (uint8_t *)addr_remote, len_remote);
  574. sysaddr.len = len_remote;
  575. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  576. }
  577. } break;
  578. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  579. BConnector *connector = source.u.connector.connector;
  580. // grab fd from connector
  581. o->sock = connector->sock;
  582. connector->sock = INVALID_SOCKET;
  583. // release connector resources
  584. connector_abort(connector);
  585. } break;
  586. }
  587. // set not aborted
  588. o->aborted = 0;
  589. // init send olap
  590. BReactorIOCPOverlapped_Init(&o->send.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_send_olap_handler);
  591. // set send not inited
  592. o->send.inited = 0;
  593. // init recv olap
  594. BReactorIOCPOverlapped_Init(&o->recv.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_recv_olap_handler);
  595. // set recv not closed
  596. o->recv.closed = 0;
  597. // set recv not inited
  598. o->recv.inited = 0;
  599. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  600. DebugObject_Init(&o->d_obj);
  601. return 1;
  602. fail1:
  603. if (closesocket(o->sock) == SOCKET_ERROR) {
  604. BLog(BLOG_ERROR, "closesocket failed");
  605. }
  606. return 0;
  607. }
  608. void BConnection_Free (BConnection *o)
  609. {
  610. DebugObject_Free(&o->d_obj);
  611. DebugError_Free(&o->d_err);
  612. ASSERT(!o->recv.inited)
  613. ASSERT(!o->send.inited)
  614. if (!o->aborted) {
  615. connection_abort(o);
  616. }
  617. }
  618. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  619. {
  620. DebugObject_Access(&o->d_obj);
  621. // set handlers
  622. o->user = user;
  623. o->handler = handler;
  624. }
  625. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  626. {
  627. DebugObject_Access(&o->d_obj);
  628. if (setsockopt(o->sock, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  629. BLog(BLOG_ERROR, "setsockopt failed");
  630. return 0;
  631. }
  632. return 1;
  633. }
  634. void BConnection_SendAsync_Init (BConnection *o)
  635. {
  636. DebugObject_Access(&o->d_obj);
  637. DebugError_AssertNoError(&o->d_err);
  638. ASSERT(!o->aborted)
  639. ASSERT(!o->send.inited)
  640. // init interface
  641. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_iface_handler_send, o, BReactor_PendingGroup(o->reactor));
  642. // set not busy
  643. o->send.busy = 0;
  644. // set inited
  645. o->send.inited = 1;
  646. }
  647. void BConnection_SendAsync_Free (BConnection *o)
  648. {
  649. DebugObject_Access(&o->d_obj);
  650. ASSERT(o->send.inited)
  651. // abort if busy
  652. if (o->send.busy && !o->aborted) {
  653. connection_abort(o);
  654. }
  655. // free interface
  656. StreamPassInterface_Free(&o->send.iface);
  657. // set not inited
  658. o->send.inited = 0;
  659. }
  660. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  661. {
  662. DebugObject_Access(&o->d_obj);
  663. ASSERT(o->send.inited)
  664. return &o->send.iface;
  665. }
  666. void BConnection_RecvAsync_Init (BConnection *o)
  667. {
  668. DebugObject_Access(&o->d_obj);
  669. DebugError_AssertNoError(&o->d_err);
  670. ASSERT(!o->recv.closed)
  671. ASSERT(!o->aborted)
  672. ASSERT(!o->recv.inited)
  673. // init interface
  674. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_iface_handler_recv, o, BReactor_PendingGroup(o->reactor));
  675. // set not busy
  676. o->recv.busy = 0;
  677. // set inited
  678. o->recv.inited = 1;
  679. }
  680. void BConnection_RecvAsync_Free (BConnection *o)
  681. {
  682. DebugObject_Access(&o->d_obj);
  683. ASSERT(o->recv.inited)
  684. // abort if busy
  685. if (o->recv.busy && !o->aborted) {
  686. connection_abort(o);
  687. }
  688. // free interface
  689. StreamRecvInterface_Free(&o->recv.iface);
  690. // set not inited
  691. o->recv.inited = 0;
  692. }
  693. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  694. {
  695. DebugObject_Access(&o->d_obj);
  696. ASSERT(o->recv.inited)
  697. return &o->recv.iface;
  698. }