BConnection_unix.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824
  1. /**
  2. * @file BConnection_unix.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <string.h>
  23. #include <unistd.h>
  24. #include <errno.h>
  25. #include <sys/types.h>
  26. #include <sys/socket.h>
  27. #include <misc/nonblocking.h>
  28. #include <base/BLog.h>
  29. #include "BConnection.h"
  30. #include <generated/blog_channel_BConnection.h>
  31. struct sys_addr {
  32. socklen_t len;
  33. union {
  34. struct sockaddr generic;
  35. struct sockaddr_in ipv4;
  36. struct sockaddr_in6 ipv6;
  37. } addr;
  38. };
  39. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  40. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  41. static void listener_fd_handler (BListener *o, int events);
  42. static void listener_default_job_handler (BListener *o);
  43. static void connector_fd_handler (BConnector *o, int events);
  44. static void connector_job_handler (BConnector *o);
  45. static void connection_report_error (BConnection *o);
  46. static void connection_send (BConnection *o);
  47. static void connection_recv (BConnection *o);
  48. static void connection_fd_handler (BConnection *o, int events);
  49. static void connection_send_job_handler (BConnection *o);
  50. static void connection_recv_job_handler (BConnection *o);
  51. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len);
  52. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len);
  53. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  54. {
  55. switch (addr.type) {
  56. case BADDR_TYPE_IPV4: {
  57. out->len = sizeof(out->addr.ipv4);
  58. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  59. out->addr.ipv4.sin_family = AF_INET;
  60. out->addr.ipv4.sin_port = addr.ipv4.port;
  61. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  62. } break;
  63. case BADDR_TYPE_IPV6: {
  64. out->len = sizeof(out->addr.ipv6);
  65. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  66. out->addr.ipv6.sin6_family = AF_INET6;
  67. out->addr.ipv6.sin6_port = addr.ipv6.port;
  68. out->addr.ipv6.sin6_flowinfo = 0;
  69. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  70. out->addr.ipv6.sin6_scope_id = 0;
  71. } break;
  72. default: ASSERT(0);
  73. }
  74. }
  75. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  76. {
  77. switch (addr.addr.generic.sa_family) {
  78. case AF_INET: {
  79. ASSERT(addr.len == sizeof(struct sockaddr_in))
  80. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  81. } break;
  82. case AF_INET6: {
  83. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  84. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  85. } break;
  86. default: {
  87. BAddr_InitNone(out);
  88. } break;
  89. }
  90. }
  91. static void listener_fd_handler (BListener *o, int events)
  92. {
  93. DebugObject_Access(&o->d_obj);
  94. // set default job
  95. BPending_Set(&o->default_job);
  96. // call handler
  97. o->handler(o->user);
  98. return;
  99. }
  100. static void listener_default_job_handler (BListener *o)
  101. {
  102. DebugObject_Access(&o->d_obj);
  103. BLog(BLOG_ERROR, "discarding connection");
  104. // accept
  105. int newfd = accept(o->fd, NULL, NULL);
  106. if (newfd < 0) {
  107. BLog(BLOG_ERROR, "accept failed");
  108. return;
  109. }
  110. // close new fd
  111. if (close(newfd) < 0) {
  112. BLog(BLOG_ERROR, "close failed");
  113. }
  114. }
  115. static void connector_fd_handler (BConnector *o, int events)
  116. {
  117. DebugObject_Access(&o->d_obj);
  118. ASSERT(o->fd >= 0)
  119. ASSERT(!o->connected)
  120. ASSERT(o->have_bfd)
  121. // free BFileDescriptor
  122. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  123. // set have no BFileDescriptor
  124. o->have_bfd = 0;
  125. // read connection result
  126. int result;
  127. socklen_t result_len = sizeof(result);
  128. if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
  129. BLog(BLOG_ERROR, "getsockopt failed");
  130. goto fail0;
  131. }
  132. ASSERT_FORCE(result_len == sizeof(result))
  133. if (result != 0) {
  134. BLog(BLOG_ERROR, "connection failed");
  135. goto fail0;
  136. }
  137. // set connected
  138. o->connected = 1;
  139. fail0:
  140. // call handler
  141. o->handler(o->user, !o->connected);
  142. return;
  143. }
  144. static void connector_job_handler (BConnector *o)
  145. {
  146. DebugObject_Access(&o->d_obj);
  147. ASSERT(o->fd >= 0)
  148. ASSERT(o->connected)
  149. ASSERT(!o->have_bfd)
  150. // call handler
  151. o->handler(o->user, 0);
  152. return;
  153. }
  154. static void connection_report_error (BConnection *o)
  155. {
  156. DebugError_AssertNoError(&o->d_err);
  157. ASSERT(o->handler)
  158. // report error
  159. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  160. return;
  161. }
  162. static void connection_send (BConnection *o)
  163. {
  164. DebugError_AssertNoError(&o->d_err);
  165. ASSERT(o->send.inited)
  166. ASSERT(o->send.busy)
  167. // limit
  168. if (!BReactorLimit_Increment(&o->send.limit)) {
  169. // wait for fd
  170. o->wait_events |= BREACTOR_WRITE;
  171. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  172. return;
  173. }
  174. // send
  175. int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len);
  176. if (bytes < 0) {
  177. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  178. // wait for fd
  179. o->wait_events |= BREACTOR_WRITE;
  180. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  181. return;
  182. }
  183. BLog(BLOG_ERROR, "send failed");
  184. connection_report_error(o);
  185. return;
  186. }
  187. ASSERT(bytes > 0)
  188. ASSERT(bytes <= o->send.busy_data_len)
  189. // set not busy
  190. o->send.busy = 0;
  191. // done
  192. StreamPassInterface_Done(&o->send.iface, bytes);
  193. }
  194. static void connection_recv (BConnection *o)
  195. {
  196. DebugError_AssertNoError(&o->d_err);
  197. ASSERT(o->recv.inited)
  198. ASSERT(o->recv.busy)
  199. ASSERT(!o->recv.closed)
  200. // limit
  201. if (!BReactorLimit_Increment(&o->recv.limit)) {
  202. // wait for fd
  203. o->wait_events |= BREACTOR_READ;
  204. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  205. return;
  206. }
  207. // recv
  208. int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail);
  209. if (bytes < 0) {
  210. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  211. // wait for fd
  212. o->wait_events |= BREACTOR_READ;
  213. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  214. return;
  215. }
  216. BLog(BLOG_ERROR, "recv failed");
  217. connection_report_error(o);
  218. return;
  219. }
  220. if (bytes == 0) {
  221. // set recv closed
  222. o->recv.closed = 1;
  223. // report recv closed
  224. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  225. return;
  226. }
  227. ASSERT(bytes > 0)
  228. ASSERT(bytes <= o->recv.busy_data_avail)
  229. // set not busy
  230. o->recv.busy = 0;
  231. // done
  232. StreamRecvInterface_Done(&o->recv.iface, bytes);
  233. }
  234. static void connection_fd_handler (BConnection *o, int events)
  235. {
  236. DebugObject_Access(&o->d_obj);
  237. DebugError_AssertNoError(&o->d_err);
  238. // clear handled events
  239. o->wait_events &= ~events;
  240. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  241. int have_send = 0;
  242. int have_recv = 0;
  243. if ((events & BREACTOR_WRITE) || ((events & BREACTOR_ERROR) && o->send.inited && o->send.busy)) {
  244. ASSERT(o->send.inited)
  245. ASSERT(o->send.busy)
  246. have_send = 1;
  247. }
  248. if ((events & BREACTOR_READ) || ((events & BREACTOR_ERROR) && o->recv.inited && o->recv.busy && !o->recv.closed)) {
  249. ASSERT(o->recv.inited)
  250. ASSERT(o->recv.busy)
  251. ASSERT(!o->recv.closed)
  252. have_recv = 1;
  253. }
  254. if (have_send) {
  255. if (have_recv) {
  256. BPending_Set(&o->recv.job);
  257. }
  258. connection_send(o);
  259. return;
  260. }
  261. if (have_recv) {
  262. connection_recv(o);
  263. return;
  264. }
  265. BLog(BLOG_ERROR, "fd error event");
  266. connection_report_error(o);
  267. return;
  268. }
  269. static void connection_send_job_handler (BConnection *o)
  270. {
  271. DebugObject_Access(&o->d_obj);
  272. DebugError_AssertNoError(&o->d_err);
  273. ASSERT(o->send.inited)
  274. ASSERT(o->send.busy)
  275. connection_send(o);
  276. return;
  277. }
  278. static void connection_recv_job_handler (BConnection *o)
  279. {
  280. DebugObject_Access(&o->d_obj);
  281. DebugError_AssertNoError(&o->d_err);
  282. ASSERT(o->recv.inited)
  283. ASSERT(o->recv.busy)
  284. ASSERT(!o->recv.closed)
  285. connection_recv(o);
  286. return;
  287. }
  288. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len)
  289. {
  290. DebugObject_Access(&o->d_obj);
  291. DebugError_AssertNoError(&o->d_err);
  292. ASSERT(o->send.inited)
  293. ASSERT(!o->send.busy)
  294. ASSERT(data_len > 0)
  295. // remember data
  296. o->send.busy_data = data;
  297. o->send.busy_data_len = data_len;
  298. // set busy
  299. o->send.busy = 1;
  300. connection_send(o);
  301. return;
  302. }
  303. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail)
  304. {
  305. DebugObject_Access(&o->d_obj);
  306. DebugError_AssertNoError(&o->d_err);
  307. ASSERT(o->recv.inited)
  308. ASSERT(!o->recv.busy)
  309. ASSERT(!o->recv.closed)
  310. ASSERT(data_avail > 0)
  311. // remember data
  312. o->recv.busy_data = data;
  313. o->recv.busy_data_avail = data_avail;
  314. // set busy
  315. o->recv.busy = 1;
  316. connection_recv(o);
  317. return;
  318. }
  319. int BConnection_AddressSupported (BAddr addr)
  320. {
  321. BAddr_Assert(&addr);
  322. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  323. }
  324. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  325. BListener_handler handler)
  326. {
  327. ASSERT(BConnection_AddressSupported(addr))
  328. ASSERT(handler)
  329. BNetwork_Assert();
  330. // init arguments
  331. o->reactor = reactor;
  332. o->user = user;
  333. o->handler = handler;
  334. // convert address
  335. struct sys_addr sysaddr;
  336. addr_socket_to_sys(&sysaddr, addr);
  337. // init fd
  338. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  339. BLog(BLOG_ERROR, "socket failed");
  340. goto fail0;
  341. }
  342. // set non-blocking
  343. if (!badvpn_set_nonblocking(o->fd)) {
  344. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  345. goto fail1;
  346. }
  347. // set SO_REUSEADDR
  348. int optval = 1;
  349. if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
  350. BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed");
  351. }
  352. // bind
  353. if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) {
  354. BLog(BLOG_ERROR, "bind failed");
  355. goto fail1;
  356. }
  357. // listen
  358. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  359. BLog(BLOG_ERROR, "listen failed");
  360. goto fail1;
  361. }
  362. // init BFileDescriptor
  363. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  364. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  365. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  366. goto fail1;
  367. }
  368. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  369. // init default job
  370. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  371. DebugObject_Init(&o->d_obj);
  372. return 1;
  373. fail1:
  374. if (close(o->fd) < 0) {
  375. BLog(BLOG_ERROR, "close failed");
  376. }
  377. fail0:
  378. return 0;
  379. }
  380. void BListener_Free (BListener *o)
  381. {
  382. DebugObject_Free(&o->d_obj);
  383. // free default job
  384. BPending_Free(&o->default_job);
  385. // free BFileDescriptor
  386. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  387. // free fd
  388. if (close(o->fd) < 0) {
  389. BLog(BLOG_ERROR, "close failed");
  390. }
  391. }
  392. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  393. BConnector_handler handler)
  394. {
  395. ASSERT(BConnection_AddressSupported(addr))
  396. ASSERT(handler)
  397. BNetwork_Assert();
  398. // init arguments
  399. o->reactor = reactor;
  400. o->user = user;
  401. o->handler = handler;
  402. // convert address
  403. struct sys_addr sysaddr;
  404. addr_socket_to_sys(&sysaddr, addr);
  405. // init job
  406. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  407. // init fd
  408. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  409. BLog(BLOG_ERROR, "socket failed");
  410. goto fail1;
  411. }
  412. // set fd non-blocking
  413. if (!badvpn_set_nonblocking(o->fd)) {
  414. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  415. goto fail2;
  416. }
  417. // connect fd
  418. int res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len);
  419. if (res < 0 && errno != EINPROGRESS) {
  420. BLog(BLOG_ERROR, "connect failed");
  421. goto fail2;
  422. }
  423. // set not connected
  424. o->connected = 0;
  425. // set have no BFileDescriptor
  426. o->have_bfd = 0;
  427. if (res < 0) {
  428. // init BFileDescriptor
  429. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  430. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  431. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  432. goto fail2;
  433. }
  434. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  435. // set have BFileDescriptor
  436. o->have_bfd = 1;
  437. } else {
  438. // set connected
  439. o->connected = 1;
  440. // set job
  441. BPending_Set(&o->job);
  442. }
  443. DebugObject_Init(&o->d_obj);
  444. return 1;
  445. fail2:
  446. if (close(o->fd) < 0) {
  447. BLog(BLOG_ERROR, "close failed");
  448. }
  449. fail1:
  450. BPending_Free(&o->job);
  451. return 0;
  452. }
  453. void BConnector_Free (BConnector *o)
  454. {
  455. DebugObject_Free(&o->d_obj);
  456. // free BFileDescriptor
  457. if (o->have_bfd) {
  458. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  459. }
  460. // close fd
  461. if (o->fd != -1) {
  462. if (close(o->fd) < 0) {
  463. BLog(BLOG_ERROR, "close failed");
  464. }
  465. }
  466. // free job
  467. BPending_Free(&o->job);
  468. }
  469. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  470. BConnection_handler handler)
  471. {
  472. switch (source.type) {
  473. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  474. BListener *listener = source.u.listener.listener;
  475. DebugObject_Access(&listener->d_obj);
  476. ASSERT(BPending_IsSet(&listener->default_job))
  477. } break;
  478. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  479. BConnector *connector = source.u.connector.connector;
  480. DebugObject_Access(&connector->d_obj);
  481. ASSERT(connector->fd >= 0)
  482. ASSERT(connector->connected)
  483. ASSERT(!connector->have_bfd)
  484. ASSERT(!BPending_IsSet(&connector->job))
  485. } break;
  486. case BCONNECTION_SOURCE_TYPE_PIPE: {
  487. ASSERT(source.u.pipe.pipefd >= 0)
  488. } break;
  489. default: ASSERT(0);
  490. }
  491. ASSERT(handler)
  492. BNetwork_Assert();
  493. // init arguments
  494. o->reactor = reactor;
  495. o->user = user;
  496. o->handler = handler;
  497. switch (source.type) {
  498. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  499. BListener *listener = source.u.listener.listener;
  500. // unset listener's default job
  501. BPending_Unset(&listener->default_job);
  502. // accept
  503. struct sys_addr sysaddr;
  504. sysaddr.len = sizeof(sysaddr.addr);
  505. if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) {
  506. BLog(BLOG_ERROR, "accept failed");
  507. goto fail0;
  508. }
  509. o->close_fd = 1;
  510. // set non-blocking
  511. if (!badvpn_set_nonblocking(o->fd)) {
  512. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  513. goto fail1;
  514. }
  515. // return address
  516. if (source.u.listener.out_addr) {
  517. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  518. }
  519. } break;
  520. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  521. BConnector *connector = source.u.connector.connector;
  522. // grab fd from connector
  523. o->fd = connector->fd;
  524. connector->fd = -1;
  525. o->close_fd = 1;
  526. } break;
  527. case BCONNECTION_SOURCE_TYPE_PIPE: {
  528. // use user-provided fd
  529. o->fd = source.u.pipe.pipefd;
  530. o->close_fd = 0;
  531. // set non-blocking
  532. if (!badvpn_set_nonblocking(o->fd)) {
  533. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  534. goto fail1;
  535. }
  536. } break;
  537. }
  538. // init BFileDescriptor
  539. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o);
  540. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  541. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  542. goto fail1;
  543. }
  544. // set no wait events
  545. o->wait_events = 0;
  546. // init limits
  547. BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT);
  548. BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT);
  549. // set send and recv not inited
  550. o->send.inited = 0;
  551. o->recv.inited = 0;
  552. // set recv not closed
  553. o->recv.closed = 0;
  554. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  555. DebugObject_Init(&o->d_obj);
  556. return 1;
  557. fail1:
  558. if (o->close_fd) {
  559. if (close(o->fd) < 0) {
  560. BLog(BLOG_ERROR, "close failed");
  561. }
  562. }
  563. fail0:
  564. return 0;
  565. }
  566. void BConnection_Free (BConnection *o)
  567. {
  568. DebugObject_Free(&o->d_obj);
  569. DebugError_Free(&o->d_err);
  570. ASSERT(!o->recv.inited)
  571. ASSERT(!o->send.inited)
  572. // free limits
  573. BReactorLimit_Free(&o->recv.limit);
  574. BReactorLimit_Free(&o->send.limit);
  575. // free BFileDescriptor
  576. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  577. // close fd
  578. if (o->close_fd) {
  579. if (close(o->fd) < 0) {
  580. BLog(BLOG_ERROR, "close failed");
  581. }
  582. }
  583. }
  584. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  585. {
  586. DebugObject_Access(&o->d_obj);
  587. // set handlers
  588. o->user = user;
  589. o->handler = handler;
  590. }
  591. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  592. {
  593. DebugObject_Access(&o->d_obj);
  594. if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  595. BLog(BLOG_ERROR, "setsockopt failed");
  596. return 0;
  597. }
  598. return 1;
  599. }
  600. void BConnection_SendAsync_Init (BConnection *o)
  601. {
  602. DebugObject_Access(&o->d_obj);
  603. DebugError_AssertNoError(&o->d_err);
  604. ASSERT(!o->send.inited)
  605. // init interface
  606. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor));
  607. // init job
  608. BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o);
  609. // set not busy
  610. o->send.busy = 0;
  611. // set inited
  612. o->send.inited = 1;
  613. }
  614. void BConnection_SendAsync_Free (BConnection *o)
  615. {
  616. DebugObject_Access(&o->d_obj);
  617. ASSERT(o->send.inited)
  618. // update events
  619. o->wait_events &= ~BREACTOR_WRITE;
  620. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  621. // free job
  622. BPending_Free(&o->send.job);
  623. // free interface
  624. StreamPassInterface_Free(&o->send.iface);
  625. // set not inited
  626. o->send.inited = 0;
  627. }
  628. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  629. {
  630. DebugObject_Access(&o->d_obj);
  631. ASSERT(o->send.inited)
  632. return &o->send.iface;
  633. }
  634. void BConnection_RecvAsync_Init (BConnection *o)
  635. {
  636. DebugObject_Access(&o->d_obj);
  637. DebugError_AssertNoError(&o->d_err);
  638. ASSERT(!o->recv.inited)
  639. ASSERT(!o->recv.closed)
  640. // init interface
  641. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor));
  642. // init job
  643. BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o);
  644. // set not busy
  645. o->recv.busy = 0;
  646. // set inited
  647. o->recv.inited = 1;
  648. }
  649. void BConnection_RecvAsync_Free (BConnection *o)
  650. {
  651. DebugObject_Access(&o->d_obj);
  652. ASSERT(o->recv.inited)
  653. // update events
  654. o->wait_events &= ~BREACTOR_READ;
  655. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  656. // free job
  657. BPending_Free(&o->recv.job);
  658. // free interface
  659. StreamRecvInterface_Free(&o->recv.iface);
  660. // set not inited
  661. o->recv.inited = 0;
  662. }
  663. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  664. {
  665. DebugObject_Access(&o->d_obj);
  666. ASSERT(o->recv.inited)
  667. return &o->recv.iface;
  668. }