BConnection_win.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
  1. /**
  2. * @file BConnection_win.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <stdlib.h>
  30. #include <limits.h>
  31. #include <base/BLog.h>
  32. #include "BConnection.h"
  33. #include <generated/blog_channel_BConnection.h>
  34. #define LISTEN_BACKLOG 128
  35. struct sys_addr {
  36. int len;
  37. union {
  38. struct sockaddr generic;
  39. struct sockaddr_in ipv4;
  40. struct sockaddr_in6 ipv6;
  41. } addr;
  42. };
  43. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  44. static void addr_any_to_sys (struct sys_addr *out, int family);
  45. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  46. static void listener_next_job_handler (BListener *o);
  47. static void listener_olap_handler (BListener *o, int event, DWORD bytes);
  48. static void connector_olap_handler (BConnector *o, int event, DWORD bytes);
  49. static void connector_abort (BConnector *o);
  50. static void connection_report_error (BConnection *o);
  51. static void connection_abort (BConnection *o);
  52. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len);
  53. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len);
  54. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes);
  55. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes);
  56. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  57. {
  58. switch (addr.type) {
  59. case BADDR_TYPE_IPV4: {
  60. out->len = sizeof(out->addr.ipv4);
  61. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  62. out->addr.ipv4.sin_family = AF_INET;
  63. out->addr.ipv4.sin_port = addr.ipv4.port;
  64. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  65. } break;
  66. case BADDR_TYPE_IPV6: {
  67. out->len = sizeof(out->addr.ipv6);
  68. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  69. out->addr.ipv6.sin6_family = AF_INET6;
  70. out->addr.ipv6.sin6_port = addr.ipv6.port;
  71. out->addr.ipv6.sin6_flowinfo = 0;
  72. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  73. out->addr.ipv6.sin6_scope_id = 0;
  74. } break;
  75. default: ASSERT(0);
  76. }
  77. }
  78. static void addr_any_to_sys (struct sys_addr *out, int family)
  79. {
  80. switch (family) {
  81. case BADDR_TYPE_IPV4: {
  82. out->len = sizeof(out->addr.ipv4);
  83. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  84. out->addr.ipv4.sin_family = AF_INET;
  85. out->addr.ipv4.sin_port = 0;
  86. out->addr.ipv4.sin_addr.s_addr = INADDR_ANY;
  87. } break;
  88. case BADDR_TYPE_IPV6: {
  89. out->len = sizeof(out->addr.ipv6);
  90. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  91. out->addr.ipv6.sin6_family = AF_INET6;
  92. out->addr.ipv6.sin6_port = 0;
  93. out->addr.ipv6.sin6_flowinfo = 0;
  94. struct in6_addr any = IN6ADDR_ANY_INIT;
  95. out->addr.ipv6.sin6_addr = any;
  96. out->addr.ipv6.sin6_scope_id = 0;
  97. } break;
  98. default: ASSERT(0);
  99. }
  100. }
  101. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  102. {
  103. switch (addr.addr.generic.sa_family) {
  104. case AF_INET: {
  105. ASSERT(addr.len == sizeof(struct sockaddr_in))
  106. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  107. } break;
  108. case AF_INET6: {
  109. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  110. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  111. } break;
  112. default: {
  113. BAddr_InitNone(out);
  114. } break;
  115. }
  116. }
  117. static void listener_next_job_handler (BListener *o)
  118. {
  119. DebugObject_Access(&o->d_obj);
  120. ASSERT(!o->busy)
  121. // free ready socket
  122. if (o->ready) {
  123. BLog(BLOG_ERROR, "discarding connection");
  124. // close new socket
  125. if (closesocket(o->newsock) == SOCKET_ERROR) {
  126. BLog(BLOG_ERROR, "closesocket failed");
  127. }
  128. // set not ready
  129. o->ready = 0;
  130. }
  131. // create new socket
  132. if ((o->newsock = WSASocket(o->sys_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  133. BLog(BLOG_ERROR, "WSASocket failed");
  134. goto fail0;
  135. }
  136. // start accept operation
  137. while (1) {
  138. memset(&o->olap.olap, 0, sizeof(o->olap.olap));
  139. DWORD bytes;
  140. BOOL res = o->fnAcceptEx(o->sock, o->newsock, o->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub), &bytes, &o->olap.olap);
  141. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  142. BLog(BLOG_ERROR, "AcceptEx failed");
  143. continue;
  144. }
  145. break;
  146. }
  147. // set busy
  148. o->busy = 1;
  149. return;
  150. fail0:
  151. return;
  152. }
  153. static void listener_olap_handler (BListener *o, int event, DWORD bytes)
  154. {
  155. DebugObject_Access(&o->d_obj);
  156. ASSERT(o->busy)
  157. ASSERT(!o->ready)
  158. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  159. // set not busy
  160. o->busy = 0;
  161. // schedule next accept
  162. BPending_Set(&o->next_job);
  163. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  164. BLog(BLOG_ERROR, "accepting failed");
  165. // close new socket
  166. if (closesocket(o->newsock) == SOCKET_ERROR) {
  167. BLog(BLOG_ERROR, "closesocket failed");
  168. }
  169. return;
  170. }
  171. BLog(BLOG_INFO, "connection accepted");
  172. // set ready
  173. o->ready = 1;
  174. // call handler
  175. o->handler(o->user);
  176. return;
  177. }
  178. static void connector_olap_handler (BConnector *o, int event, DWORD bytes)
  179. {
  180. DebugObject_Access(&o->d_obj);
  181. ASSERT(o->sock != INVALID_SOCKET)
  182. ASSERT(o->busy)
  183. ASSERT(!o->ready)
  184. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  185. // set not busy
  186. o->busy = 0;
  187. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  188. BLog(BLOG_ERROR, "connection failed");
  189. } else {
  190. // set ready
  191. o->ready = 1;
  192. }
  193. // call handler
  194. o->handler(o->user, !o->ready);
  195. return;
  196. }
  197. static void connector_abort (BConnector *o)
  198. {
  199. if (o->sock != INVALID_SOCKET) {
  200. // cancel I/O
  201. if (o->busy) {
  202. if (!CancelIo((HANDLE)o->sock)) {
  203. BLog(BLOG_ERROR, "CancelIo failed");
  204. }
  205. }
  206. // close socket
  207. if (closesocket(o->sock) == SOCKET_ERROR) {
  208. BLog(BLOG_ERROR, "closesocket failed");
  209. }
  210. }
  211. // wait for connect operation to finish
  212. if (o->busy) {
  213. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  214. }
  215. // free olap
  216. BReactorIOCPOverlapped_Free(&o->olap);
  217. }
  218. static void connection_report_error (BConnection *o)
  219. {
  220. DebugError_AssertNoError(&o->d_err);
  221. ASSERT(o->handler)
  222. // report error
  223. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  224. return;
  225. }
  226. static void connection_abort (BConnection *o)
  227. {
  228. ASSERT(!o->aborted)
  229. // cancel I/O
  230. if ((o->recv.inited && o->recv.busy) || (o->send.inited && o->send.busy)) {
  231. if (!CancelIo((HANDLE)o->sock)) {
  232. BLog(BLOG_ERROR, "CancelIo failed");
  233. }
  234. }
  235. // close socket
  236. if (closesocket(o->sock) == SOCKET_ERROR) {
  237. BLog(BLOG_ERROR, "closesocket failed");
  238. }
  239. // wait for receiving to complete
  240. if (o->recv.inited && o->recv.busy) {
  241. BReactorIOCPOverlapped_Wait(&o->recv.olap, NULL, NULL);
  242. }
  243. // wait for sending to complete
  244. if (o->send.inited && o->send.busy) {
  245. BReactorIOCPOverlapped_Wait(&o->send.olap, NULL, NULL);
  246. }
  247. // free recv olap
  248. BReactorIOCPOverlapped_Free(&o->recv.olap);
  249. // free send olap
  250. BReactorIOCPOverlapped_Free(&o->send.olap);
  251. // set aborted
  252. o->aborted = 1;
  253. }
  254. static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len)
  255. {
  256. DebugObject_Access(&o->d_obj);
  257. DebugError_AssertNoError(&o->d_err);
  258. ASSERT(!o->aborted)
  259. ASSERT(o->send.inited)
  260. ASSERT(!o->send.busy)
  261. ASSERT(data_len > 0)
  262. if (data_len > ULONG_MAX) {
  263. data_len = ULONG_MAX;
  264. }
  265. WSABUF buf;
  266. buf.buf = (char *)data;
  267. buf.len = data_len;
  268. memset(&o->send.olap.olap, 0, sizeof(o->send.olap.olap));
  269. // send
  270. int res = WSASend(o->sock, &buf, 1, NULL, 0, &o->send.olap.olap, NULL);
  271. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  272. BLog(BLOG_ERROR, "WSASend failed (%d)", WSAGetLastError());
  273. connection_report_error(o);
  274. return;
  275. }
  276. // set busy
  277. o->send.busy = 1;
  278. o->send.busy_data_len = data_len;
  279. }
  280. static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len)
  281. {
  282. DebugObject_Access(&o->d_obj);
  283. DebugError_AssertNoError(&o->d_err);
  284. ASSERT(!o->recv.closed)
  285. ASSERT(!o->aborted)
  286. ASSERT(o->recv.inited)
  287. ASSERT(!o->recv.busy)
  288. ASSERT(data_len > 0)
  289. if (data_len > ULONG_MAX) {
  290. data_len = ULONG_MAX;
  291. }
  292. WSABUF buf;
  293. buf.buf = (char *)data;
  294. buf.len = data_len;
  295. memset(&o->recv.olap.olap, 0, sizeof(o->recv.olap.olap));
  296. // recv
  297. DWORD flags = 0;
  298. int res = WSARecv(o->sock, &buf, 1, NULL, &flags, &o->recv.olap.olap, NULL);
  299. if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  300. BLog(BLOG_ERROR, "WSARecv failed (%d)", WSAGetLastError());
  301. connection_report_error(o);
  302. return;
  303. }
  304. // set busy
  305. o->recv.busy = 1;
  306. o->recv.busy_data_len = data_len;
  307. }
  308. static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes)
  309. {
  310. DebugObject_Access(&o->d_obj);
  311. DebugError_AssertNoError(&o->d_err);
  312. ASSERT(!o->aborted)
  313. ASSERT(o->send.inited)
  314. ASSERT(o->send.busy)
  315. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  316. // set not busy
  317. o->send.busy = 0;
  318. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  319. BLog(BLOG_ERROR, "sending failed");
  320. connection_report_error(o);
  321. return;
  322. }
  323. ASSERT(bytes > 0)
  324. ASSERT(bytes <= o->send.busy_data_len)
  325. // done
  326. StreamPassInterface_Done(&o->send.iface, bytes);
  327. }
  328. static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes)
  329. {
  330. DebugObject_Access(&o->d_obj);
  331. DebugError_AssertNoError(&o->d_err);
  332. ASSERT(!o->recv.closed)
  333. ASSERT(!o->aborted)
  334. ASSERT(o->recv.inited)
  335. ASSERT(o->recv.busy)
  336. ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
  337. // set not busy
  338. o->recv.busy = 0;
  339. if (event == BREACTOR_IOCP_EVENT_FAILED) {
  340. BLog(BLOG_ERROR, "receiving failed");
  341. connection_report_error(o);
  342. return;
  343. }
  344. if (bytes == 0) {
  345. // set closed
  346. o->recv.closed = 1;
  347. // report recv closed
  348. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  349. return;
  350. }
  351. ASSERT(bytes > 0)
  352. ASSERT(bytes <= o->recv.busy_data_len)
  353. // done
  354. StreamRecvInterface_Done(&o->recv.iface, bytes);
  355. }
  356. int BConnection_AddressSupported (BAddr addr)
  357. {
  358. BAddr_Assert(&addr);
  359. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  360. }
  361. int BListener_InitFrom (BListener *o, struct BLisCon_from from,
  362. BReactor *reactor, void *user,
  363. BListener_handler handler)
  364. {
  365. ASSERT(from.type == BLISCON_FROM_ADDR)
  366. ASSERT(handler)
  367. BNetwork_Assert();
  368. // init arguments
  369. o->reactor = reactor;
  370. o->user = user;
  371. o->handler = handler;
  372. // check address
  373. if (!BConnection_AddressSupported(from.u.from_addr.addr)) {
  374. BLog(BLOG_ERROR, "address not supported");
  375. goto fail0;
  376. }
  377. // convert address
  378. struct sys_addr sysaddr;
  379. addr_socket_to_sys(&sysaddr, from.u.from_addr.addr);
  380. // remember family
  381. o->sys_family = sysaddr.addr.generic.sa_family;
  382. // init socket
  383. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  384. BLog(BLOG_ERROR, "WSASocket failed");
  385. goto fail0;
  386. }
  387. // associate with IOCP
  388. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  389. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  390. goto fail1;
  391. }
  392. // bind
  393. if (bind(o->sock, &sysaddr.addr.generic, sysaddr.len) < 0) {
  394. BLog(BLOG_ERROR, "bind failed");
  395. goto fail1;
  396. }
  397. // listen
  398. if (listen(o->sock, LISTEN_BACKLOG) < 0) {
  399. BLog(BLOG_ERROR, "listen failed");
  400. goto fail1;
  401. }
  402. DWORD out_bytes;
  403. // obtain AcceptEx
  404. GUID guid1 = WSAID_ACCEPTEX;
  405. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, sizeof(guid1), &o->fnAcceptEx, sizeof(o->fnAcceptEx), &out_bytes, NULL, NULL) != 0) {
  406. BLog(BLOG_ERROR, "faild to obtain AcceptEx");
  407. goto fail1;
  408. }
  409. // obtain GetAcceptExSockaddrs
  410. GUID guid2 = WSAID_GETACCEPTEXSOCKADDRS;
  411. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, sizeof(guid2), &o->fnGetAcceptExSockaddrs, sizeof(o->fnGetAcceptExSockaddrs), &out_bytes, NULL, NULL) != 0) {
  412. BLog(BLOG_ERROR, "faild to obtain GetAcceptExSockaddrs");
  413. goto fail1;
  414. }
  415. // init olap
  416. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)listener_olap_handler);
  417. // init next job
  418. BPending_Init(&o->next_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_next_job_handler, o);
  419. // set not busy
  420. o->busy = 0;
  421. // set not ready
  422. o->ready = 0;
  423. // set next job
  424. BPending_Set(&o->next_job);
  425. DebugObject_Init(&o->d_obj);
  426. return 1;
  427. fail1:
  428. if (closesocket(o->sock) == SOCKET_ERROR) {
  429. BLog(BLOG_ERROR, "closesocket failed");
  430. }
  431. fail0:
  432. return 0;
  433. }
  434. void BListener_Free (BListener *o)
  435. {
  436. DebugObject_Free(&o->d_obj);
  437. // cancel I/O
  438. if (o->busy) {
  439. if (!CancelIo((HANDLE)o->sock)) {
  440. BLog(BLOG_ERROR, "CancelIo failed");
  441. }
  442. }
  443. // close socket
  444. if (closesocket(o->sock) == SOCKET_ERROR) {
  445. BLog(BLOG_ERROR, "closesocket failed");
  446. }
  447. // wait for accept operation to finish
  448. if (o->busy) {
  449. BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
  450. }
  451. // close new socket
  452. if (o->busy || o->ready) {
  453. if (closesocket(o->newsock) == SOCKET_ERROR) {
  454. BLog(BLOG_ERROR, "closesocket failed");
  455. }
  456. }
  457. // free next job
  458. BPending_Free(&o->next_job);
  459. // free olap
  460. BReactorIOCPOverlapped_Free(&o->olap);
  461. }
  462. int BConnector_InitFrom (BConnector *o, struct BLisCon_from from, BReactor *reactor, void *user,
  463. BConnector_handler handler)
  464. {
  465. ASSERT(from.type == BLISCON_FROM_ADDR)
  466. ASSERT(handler)
  467. BNetwork_Assert();
  468. // init arguments
  469. o->reactor = reactor;
  470. o->user = user;
  471. o->handler = handler;
  472. // check address
  473. if (!BConnection_AddressSupported(from.u.from_addr.addr)) {
  474. BLog(BLOG_ERROR, "address not supported");
  475. goto fail0;
  476. }
  477. // convert address
  478. struct sys_addr sysaddr;
  479. addr_socket_to_sys(&sysaddr, from.u.from_addr.addr);
  480. // create local any address
  481. struct sys_addr local_sysaddr;
  482. addr_any_to_sys(&local_sysaddr, from.u.from_addr.addr.type);
  483. // init socket
  484. if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
  485. BLog(BLOG_ERROR, "WSASocket failed");
  486. goto fail0;
  487. }
  488. // associate with IOCP
  489. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  490. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  491. goto fail1;
  492. }
  493. // bind socket
  494. if (bind(o->sock, &local_sysaddr.addr.generic, local_sysaddr.len) < 0) {
  495. BLog(BLOG_ERROR, "bind failed");
  496. goto fail1;
  497. }
  498. // obtain ConnectEx
  499. GUID guid = WSAID_CONNECTEX;
  500. DWORD out_bytes;
  501. if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnConnectEx, sizeof(o->fnConnectEx), &out_bytes, NULL, NULL) != 0) {
  502. BLog(BLOG_ERROR, "faild to get ConnectEx");
  503. goto fail1;
  504. }
  505. // init olap
  506. BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connector_olap_handler);
  507. // start connect operation
  508. BOOL res = o->fnConnectEx(o->sock, &sysaddr.addr.generic, sysaddr.len, NULL, 0, NULL, &o->olap.olap);
  509. if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
  510. BLog(BLOG_ERROR, "ConnectEx failed (%d)", WSAGetLastError());
  511. goto fail2;
  512. }
  513. // set busy
  514. o->busy = 1;
  515. // set not ready
  516. o->ready = 0;
  517. DebugObject_Init(&o->d_obj);
  518. return 1;
  519. fail2:
  520. BReactorIOCPOverlapped_Free(&o->olap);
  521. fail1:
  522. if (closesocket(o->sock) == SOCKET_ERROR) {
  523. BLog(BLOG_ERROR, "closesocket failed");
  524. }
  525. fail0:
  526. return 0;
  527. }
  528. void BConnector_Free (BConnector *o)
  529. {
  530. DebugObject_Free(&o->d_obj);
  531. if (o->sock != INVALID_SOCKET) {
  532. connector_abort(o);
  533. }
  534. }
  535. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  536. BConnection_handler handler)
  537. {
  538. switch (source.type) {
  539. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  540. BListener *listener = source.u.listener.listener;
  541. DebugObject_Access(&listener->d_obj);
  542. ASSERT(BPending_IsSet(&listener->next_job))
  543. ASSERT(!listener->busy)
  544. ASSERT(listener->ready)
  545. } break;
  546. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  547. BConnector *connector = source.u.connector.connector;
  548. DebugObject_Access(&connector->d_obj);
  549. ASSERT(connector->reactor == reactor)
  550. ASSERT(connector->sock != INVALID_SOCKET)
  551. ASSERT(!connector->busy)
  552. ASSERT(connector->ready)
  553. } break;
  554. default: ASSERT(0);
  555. }
  556. ASSERT(handler)
  557. BNetwork_Assert();
  558. // init arguments
  559. o->reactor = reactor;
  560. o->user = user;
  561. o->handler = handler;
  562. switch (source.type) {
  563. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  564. BListener *listener = source.u.listener.listener;
  565. // grab new socket from listener
  566. o->sock = listener->newsock;
  567. listener->ready = 0;
  568. // associate with IOCP
  569. if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
  570. BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
  571. goto fail1;
  572. }
  573. // return address
  574. if (source.u.listener.out_addr) {
  575. struct sockaddr *addr_local;
  576. struct sockaddr *addr_remote;
  577. int len_local;
  578. int len_remote;
  579. listener->fnGetAcceptExSockaddrs(listener->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub),
  580. &addr_local, &len_local, &addr_remote, &len_remote);
  581. struct sys_addr sysaddr;
  582. ASSERT_FORCE(len_remote >= 0)
  583. ASSERT_FORCE(len_remote <= sizeof(sysaddr.addr))
  584. memcpy((uint8_t *)&sysaddr.addr, (uint8_t *)addr_remote, len_remote);
  585. sysaddr.len = len_remote;
  586. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  587. }
  588. } break;
  589. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  590. BConnector *connector = source.u.connector.connector;
  591. // grab fd from connector
  592. o->sock = connector->sock;
  593. connector->sock = INVALID_SOCKET;
  594. // release connector resources
  595. connector_abort(connector);
  596. } break;
  597. }
  598. // set not aborted
  599. o->aborted = 0;
  600. // init send olap
  601. BReactorIOCPOverlapped_Init(&o->send.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_send_olap_handler);
  602. // set send not inited
  603. o->send.inited = 0;
  604. // init recv olap
  605. BReactorIOCPOverlapped_Init(&o->recv.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_recv_olap_handler);
  606. // set recv not closed
  607. o->recv.closed = 0;
  608. // set recv not inited
  609. o->recv.inited = 0;
  610. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  611. DebugObject_Init(&o->d_obj);
  612. return 1;
  613. fail1:
  614. if (closesocket(o->sock) == SOCKET_ERROR) {
  615. BLog(BLOG_ERROR, "closesocket failed");
  616. }
  617. return 0;
  618. }
  619. void BConnection_Free (BConnection *o)
  620. {
  621. DebugObject_Free(&o->d_obj);
  622. DebugError_Free(&o->d_err);
  623. ASSERT(!o->recv.inited)
  624. ASSERT(!o->send.inited)
  625. if (!o->aborted) {
  626. connection_abort(o);
  627. }
  628. }
  629. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  630. {
  631. DebugObject_Access(&o->d_obj);
  632. // set handlers
  633. o->user = user;
  634. o->handler = handler;
  635. }
  636. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  637. {
  638. DebugObject_Access(&o->d_obj);
  639. if (setsockopt(o->sock, SOL_SOCKET, SO_SNDBUF, (char *)&buf_size, sizeof(buf_size)) < 0) {
  640. BLog(BLOG_ERROR, "setsockopt failed");
  641. return 0;
  642. }
  643. return 1;
  644. }
  645. void BConnection_SendAsync_Init (BConnection *o)
  646. {
  647. DebugObject_Access(&o->d_obj);
  648. DebugError_AssertNoError(&o->d_err);
  649. ASSERT(!o->aborted)
  650. ASSERT(!o->send.inited)
  651. // init interface
  652. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_iface_handler_send, o, BReactor_PendingGroup(o->reactor));
  653. // set not busy
  654. o->send.busy = 0;
  655. // set inited
  656. o->send.inited = 1;
  657. }
  658. void BConnection_SendAsync_Free (BConnection *o)
  659. {
  660. DebugObject_Access(&o->d_obj);
  661. ASSERT(o->send.inited)
  662. // abort if busy
  663. if (o->send.busy && !o->aborted) {
  664. connection_abort(o);
  665. }
  666. // free interface
  667. StreamPassInterface_Free(&o->send.iface);
  668. // set not inited
  669. o->send.inited = 0;
  670. }
  671. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  672. {
  673. DebugObject_Access(&o->d_obj);
  674. ASSERT(o->send.inited)
  675. return &o->send.iface;
  676. }
  677. void BConnection_RecvAsync_Init (BConnection *o)
  678. {
  679. DebugObject_Access(&o->d_obj);
  680. DebugError_AssertNoError(&o->d_err);
  681. ASSERT(!o->recv.closed)
  682. ASSERT(!o->aborted)
  683. ASSERT(!o->recv.inited)
  684. // init interface
  685. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_iface_handler_recv, o, BReactor_PendingGroup(o->reactor));
  686. // set not busy
  687. o->recv.busy = 0;
  688. // set inited
  689. o->recv.inited = 1;
  690. }
  691. void BConnection_RecvAsync_Free (BConnection *o)
  692. {
  693. DebugObject_Access(&o->d_obj);
  694. ASSERT(o->recv.inited)
  695. // abort if busy
  696. if (o->recv.busy && !o->aborted) {
  697. connection_abort(o);
  698. }
  699. // free interface
  700. StreamRecvInterface_Free(&o->recv.iface);
  701. // set not inited
  702. o->recv.inited = 0;
  703. }
  704. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  705. {
  706. DebugObject_Access(&o->d_obj);
  707. ASSERT(o->recv.inited)
  708. return &o->recv.iface;
  709. }