BConnection_unix.c 27 KB


  1. /**
  2. * @file BConnection_unix.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <string.h>
  30. #include <stddef.h>
  31. #include <unistd.h>
  32. #include <errno.h>
  33. #include <sys/types.h>
  34. #include <sys/socket.h>
  35. #include <sys/un.h>
  36. #include <misc/nonblocking.h>
  37. #include <base/BLog.h>
  38. #include "BConnection.h"
  39. #include <generated/blog_channel_BConnection.h>
  40. #define MAX_UNIX_SOCKET_PATH 200
  41. struct sys_addr {
  42. socklen_t len;
  43. union {
  44. struct sockaddr generic;
  45. struct sockaddr_in ipv4;
  46. struct sockaddr_in6 ipv6;
  47. } addr;
  48. };
  49. struct unix_addr {
  50. socklen_t len;
  51. union {
  52. struct sockaddr_un addr;
  53. uint8_t bytes[offsetof(struct sockaddr_un, sun_path) + MAX_UNIX_SOCKET_PATH + 1];
  54. } u;
  55. };
  56. static int build_unix_address (struct unix_addr *out, const char *socket_path);
  57. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  58. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  59. static void listener_fd_handler (BListener *o, int events);
  60. static void listener_default_job_handler (BListener *o);
  61. static void connector_fd_handler (BConnector *o, int events);
  62. static void connector_job_handler (BConnector *o);
  63. static void connection_report_error (BConnection *o);
  64. static void connection_send (BConnection *o);
  65. static void connection_recv (BConnection *o);
  66. static void connection_fd_handler (BConnection *o, int events);
  67. static void connection_send_job_handler (BConnection *o);
  68. static void connection_recv_job_handler (BConnection *o);
  69. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len);
  70. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len);
  71. static int build_unix_address (struct unix_addr *out, const char *socket_path)
  72. {
  73. ASSERT(socket_path);
  74. if (strlen(socket_path) > MAX_UNIX_SOCKET_PATH) {
  75. return 0;
  76. }
  77. out->len = offsetof(struct sockaddr_un, sun_path) + strlen(socket_path) + 1;
  78. out->u.addr.sun_family = AF_UNIX;
  79. strcpy(out->u.addr.sun_path, socket_path);
  80. return 1;
  81. }
  82. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  83. {
  84. switch (addr.type) {
  85. case BADDR_TYPE_IPV4: {
  86. out->len = sizeof(out->addr.ipv4);
  87. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  88. out->addr.ipv4.sin_family = AF_INET;
  89. out->addr.ipv4.sin_port = addr.ipv4.port;
  90. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  91. } break;
  92. case BADDR_TYPE_IPV6: {
  93. out->len = sizeof(out->addr.ipv6);
  94. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  95. out->addr.ipv6.sin6_family = AF_INET6;
  96. out->addr.ipv6.sin6_port = addr.ipv6.port;
  97. out->addr.ipv6.sin6_flowinfo = 0;
  98. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  99. out->addr.ipv6.sin6_scope_id = 0;
  100. } break;
  101. default: ASSERT(0);
  102. }
  103. }
  104. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  105. {
  106. switch (addr.addr.generic.sa_family) {
  107. case AF_INET: {
  108. ASSERT(addr.len == sizeof(struct sockaddr_in))
  109. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  110. } break;
  111. case AF_INET6: {
  112. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  113. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  114. } break;
  115. default: {
  116. BAddr_InitNone(out);
  117. } break;
  118. }
  119. }
  120. static void listener_fd_handler (BListener *o, int events)
  121. {
  122. DebugObject_Access(&o->d_obj);
  123. // set default job
  124. BPending_Set(&o->default_job);
  125. // call handler
  126. o->handler(o->user);
  127. return;
  128. }
  129. static void listener_default_job_handler (BListener *o)
  130. {
  131. DebugObject_Access(&o->d_obj);
  132. BLog(BLOG_ERROR, "discarding connection");
  133. // accept
  134. int newfd = accept(o->fd, NULL, NULL);
  135. if (newfd < 0) {
  136. BLog(BLOG_ERROR, "accept failed");
  137. return;
  138. }
  139. // close new fd
  140. if (close(newfd) < 0) {
  141. BLog(BLOG_ERROR, "close failed");
  142. }
  143. }
  144. static void connector_fd_handler (BConnector *o, int events)
  145. {
  146. DebugObject_Access(&o->d_obj);
  147. ASSERT(o->fd >= 0)
  148. ASSERT(!o->connected)
  149. ASSERT(o->have_bfd)
  150. // free BFileDescriptor
  151. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  152. // set have no BFileDescriptor
  153. o->have_bfd = 0;
  154. // read connection result
  155. int result;
  156. socklen_t result_len = sizeof(result);
  157. if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
  158. BLog(BLOG_ERROR, "getsockopt failed");
  159. goto fail0;
  160. }
  161. ASSERT_FORCE(result_len == sizeof(result))
  162. if (result != 0) {
  163. BLog(BLOG_ERROR, "connection failed");
  164. goto fail0;
  165. }
  166. // set connected
  167. o->connected = 1;
  168. fail0:
  169. // call handler
  170. o->handler(o->user, !o->connected);
  171. return;
  172. }
  173. static void connector_job_handler (BConnector *o)
  174. {
  175. DebugObject_Access(&o->d_obj);
  176. ASSERT(o->fd >= 0)
  177. ASSERT(o->connected)
  178. ASSERT(!o->have_bfd)
  179. // call handler
  180. o->handler(o->user, 0);
  181. return;
  182. }
  183. static void connection_report_error (BConnection *o)
  184. {
  185. DebugError_AssertNoError(&o->d_err);
  186. ASSERT(o->handler)
  187. // report error
  188. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  189. return;
  190. }
  191. static void connection_send (BConnection *o)
  192. {
  193. DebugError_AssertNoError(&o->d_err);
  194. ASSERT(o->send.inited)
  195. ASSERT(o->send.busy)
  196. // limit
  197. if (!BReactorLimit_Increment(&o->send.limit)) {
  198. // wait for fd
  199. o->wait_events |= BREACTOR_WRITE;
  200. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  201. return;
  202. }
  203. // send
  204. int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len);
  205. if (bytes < 0) {
  206. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  207. // wait for fd
  208. o->wait_events |= BREACTOR_WRITE;
  209. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  210. return;
  211. }
  212. BLog(BLOG_ERROR, "send failed");
  213. connection_report_error(o);
  214. return;
  215. }
  216. ASSERT(bytes > 0)
  217. ASSERT(bytes <= o->send.busy_data_len)
  218. // set not busy
  219. o->send.busy = 0;
  220. // done
  221. StreamPassInterface_Done(&o->send.iface, bytes);
  222. }
  223. static void connection_recv (BConnection *o)
  224. {
  225. DebugError_AssertNoError(&o->d_err);
  226. ASSERT(o->recv.inited)
  227. ASSERT(o->recv.busy)
  228. ASSERT(!o->recv.closed)
  229. // limit
  230. if (!BReactorLimit_Increment(&o->recv.limit)) {
  231. // wait for fd
  232. o->wait_events |= BREACTOR_READ;
  233. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  234. return;
  235. }
  236. // recv
  237. int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail);
  238. if (bytes < 0) {
  239. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  240. // wait for fd
  241. o->wait_events |= BREACTOR_READ;
  242. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  243. return;
  244. }
  245. BLog(BLOG_ERROR, "recv failed");
  246. connection_report_error(o);
  247. return;
  248. }
  249. if (bytes == 0) {
  250. // set recv closed
  251. o->recv.closed = 1;
  252. // report recv closed
  253. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  254. return;
  255. }
  256. ASSERT(bytes > 0)
  257. ASSERT(bytes <= o->recv.busy_data_avail)
  258. // set not busy
  259. o->recv.busy = 0;
  260. // done
  261. StreamRecvInterface_Done(&o->recv.iface, bytes);
  262. }
  263. static void connection_fd_handler (BConnection *o, int events)
  264. {
  265. DebugObject_Access(&o->d_obj);
  266. DebugError_AssertNoError(&o->d_err);
  267. // clear handled events
  268. o->wait_events &= ~events;
  269. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  270. int have_send = 0;
  271. int have_recv = 0;
  272. if ((events & BREACTOR_WRITE) || ((events & BREACTOR_ERROR) && o->send.inited && o->send.busy)) {
  273. ASSERT(o->send.inited)
  274. ASSERT(o->send.busy)
  275. have_send = 1;
  276. }
  277. if ((events & BREACTOR_READ) || ((events & BREACTOR_ERROR) && o->recv.inited && o->recv.busy && !o->recv.closed)) {
  278. ASSERT(o->recv.inited)
  279. ASSERT(o->recv.busy)
  280. ASSERT(!o->recv.closed)
  281. have_recv = 1;
  282. }
  283. if (have_send) {
  284. if (have_recv) {
  285. BPending_Set(&o->recv.job);
  286. }
  287. connection_send(o);
  288. return;
  289. }
  290. if (have_recv) {
  291. connection_recv(o);
  292. return;
  293. }
  294. BLog(BLOG_ERROR, "fd error event");
  295. connection_report_error(o);
  296. return;
  297. }
  298. static void connection_send_job_handler (BConnection *o)
  299. {
  300. DebugObject_Access(&o->d_obj);
  301. DebugError_AssertNoError(&o->d_err);
  302. ASSERT(o->send.inited)
  303. ASSERT(o->send.busy)
  304. connection_send(o);
  305. return;
  306. }
  307. static void connection_recv_job_handler (BConnection *o)
  308. {
  309. DebugObject_Access(&o->d_obj);
  310. DebugError_AssertNoError(&o->d_err);
  311. ASSERT(o->recv.inited)
  312. ASSERT(o->recv.busy)
  313. ASSERT(!o->recv.closed)
  314. connection_recv(o);
  315. return;
  316. }
  317. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len)
  318. {
  319. DebugObject_Access(&o->d_obj);
  320. DebugError_AssertNoError(&o->d_err);
  321. ASSERT(o->send.inited)
  322. ASSERT(!o->send.busy)
  323. ASSERT(data_len > 0)
  324. // remember data
  325. o->send.busy_data = data;
  326. o->send.busy_data_len = data_len;
  327. // set busy
  328. o->send.busy = 1;
  329. connection_send(o);
  330. return;
  331. }
  332. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail)
  333. {
  334. DebugObject_Access(&o->d_obj);
  335. DebugError_AssertNoError(&o->d_err);
  336. ASSERT(o->recv.inited)
  337. ASSERT(!o->recv.busy)
  338. ASSERT(!o->recv.closed)
  339. ASSERT(data_avail > 0)
  340. // remember data
  341. o->recv.busy_data = data;
  342. o->recv.busy_data_avail = data_avail;
  343. // set busy
  344. o->recv.busy = 1;
  345. connection_recv(o);
  346. return;
  347. }
  348. int BConnection_AddressSupported (BAddr addr)
  349. {
  350. BAddr_Assert(&addr);
  351. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  352. }
  353. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  354. BListener_handler handler)
  355. {
  356. ASSERT(handler)
  357. BNetwork_Assert();
  358. // init arguments
  359. o->reactor = reactor;
  360. o->user = user;
  361. o->handler = handler;
  362. // check address
  363. if (!BConnection_AddressSupported(addr)) {
  364. BLog(BLOG_ERROR, "address not supported");
  365. goto fail0;
  366. }
  367. // convert address
  368. struct sys_addr sysaddr;
  369. addr_socket_to_sys(&sysaddr, addr);
  370. // init fd
  371. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  372. BLog(BLOG_ERROR, "socket failed");
  373. goto fail0;
  374. }
  375. // set non-blocking
  376. if (!badvpn_set_nonblocking(o->fd)) {
  377. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  378. goto fail1;
  379. }
  380. // set SO_REUSEADDR
  381. int optval = 1;
  382. if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
  383. BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed");
  384. }
  385. // bind
  386. if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) {
  387. BLog(BLOG_ERROR, "bind failed");
  388. goto fail1;
  389. }
  390. // listen
  391. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  392. BLog(BLOG_ERROR, "listen failed");
  393. goto fail1;
  394. }
  395. // init BFileDescriptor
  396. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  397. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  398. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  399. goto fail1;
  400. }
  401. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  402. // init default job
  403. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  404. DebugObject_Init(&o->d_obj);
  405. return 1;
  406. fail1:
  407. if (close(o->fd) < 0) {
  408. BLog(BLOG_ERROR, "close failed");
  409. }
  410. fail0:
  411. return 0;
  412. }
  413. int BListener_InitUnix (BListener *o, const char *socket_path, BReactor *reactor, void *user,
  414. BListener_handler handler)
  415. {
  416. ASSERT(socket_path)
  417. ASSERT(handler)
  418. BNetwork_Assert();
  419. // init arguments
  420. o->reactor = reactor;
  421. o->user = user;
  422. o->handler = handler;
  423. // build address
  424. struct unix_addr addr;
  425. if (!build_unix_address(&addr, socket_path)) {
  426. BLog(BLOG_ERROR, "build_unix_address failed");
  427. goto fail0;
  428. }
  429. // init fd
  430. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  431. BLog(BLOG_ERROR, "socket failed");
  432. goto fail0;
  433. }
  434. // set non-blocking
  435. if (!badvpn_set_nonblocking(o->fd)) {
  436. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  437. goto fail1;
  438. }
  439. // bind
  440. if (bind(o->fd, (struct sockaddr *)&addr.u.addr, addr.len) < 0) {
  441. BLog(BLOG_ERROR, "bind failed");
  442. goto fail1;
  443. }
  444. // listen
  445. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  446. BLog(BLOG_ERROR, "listen failed");
  447. goto fail1;
  448. }
  449. // init BFileDescriptor
  450. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  451. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  452. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  453. goto fail1;
  454. }
  455. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  456. // init default job
  457. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  458. DebugObject_Init(&o->d_obj);
  459. return 1;
  460. fail1:
  461. if (close(o->fd) < 0) {
  462. BLog(BLOG_ERROR, "close failed");
  463. }
  464. fail0:
  465. return 0;
  466. }
  467. void BListener_Free (BListener *o)
  468. {
  469. DebugObject_Free(&o->d_obj);
  470. // free default job
  471. BPending_Free(&o->default_job);
  472. // free BFileDescriptor
  473. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  474. // free fd
  475. if (close(o->fd) < 0) {
  476. BLog(BLOG_ERROR, "close failed");
  477. }
  478. }
  479. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  480. BConnector_handler handler)
  481. {
  482. ASSERT(handler)
  483. BNetwork_Assert();
  484. // init arguments
  485. o->reactor = reactor;
  486. o->user = user;
  487. o->handler = handler;
  488. // check address
  489. if (!BConnection_AddressSupported(addr)) {
  490. BLog(BLOG_ERROR, "address not supported");
  491. goto fail0;
  492. }
  493. // convert address
  494. struct sys_addr sysaddr;
  495. addr_socket_to_sys(&sysaddr, addr);
  496. // init job
  497. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  498. // init fd
  499. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  500. BLog(BLOG_ERROR, "socket failed");
  501. goto fail1;
  502. }
  503. // set fd non-blocking
  504. if (!badvpn_set_nonblocking(o->fd)) {
  505. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  506. goto fail2;
  507. }
  508. // connect fd
  509. int res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len);
  510. if (res < 0 && errno != EINPROGRESS) {
  511. BLog(BLOG_ERROR, "connect failed");
  512. goto fail2;
  513. }
  514. // set not connected
  515. o->connected = 0;
  516. // set have no BFileDescriptor
  517. o->have_bfd = 0;
  518. if (res < 0) {
  519. // init BFileDescriptor
  520. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  521. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  522. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  523. goto fail2;
  524. }
  525. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  526. // set have BFileDescriptor
  527. o->have_bfd = 1;
  528. } else {
  529. // set connected
  530. o->connected = 1;
  531. // set job
  532. BPending_Set(&o->job);
  533. }
  534. DebugObject_Init(&o->d_obj);
  535. return 1;
  536. fail2:
  537. if (close(o->fd) < 0) {
  538. BLog(BLOG_ERROR, "close failed");
  539. }
  540. fail1:
  541. BPending_Free(&o->job);
  542. fail0:
  543. return 0;
  544. }
  545. int BConnector_InitUnix (BConnector *o, const char *socket_path, BReactor *reactor, void *user,
  546. BConnector_handler handler)
  547. {
  548. ASSERT(socket_path)
  549. ASSERT(handler)
  550. BNetwork_Assert();
  551. // init arguments
  552. o->reactor = reactor;
  553. o->user = user;
  554. o->handler = handler;
  555. // build address
  556. struct unix_addr addr;
  557. if (!build_unix_address(&addr, socket_path)) {
  558. BLog(BLOG_ERROR, "build_unix_address failed");
  559. goto fail0;
  560. }
  561. // init job
  562. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  563. // init fd
  564. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  565. BLog(BLOG_ERROR, "socket failed");
  566. goto fail1;
  567. }
  568. // set fd non-blocking
  569. if (!badvpn_set_nonblocking(o->fd)) {
  570. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  571. goto fail2;
  572. }
  573. // connect fd
  574. int res = connect(o->fd, (struct sockaddr *)&addr.u.addr, addr.len);
  575. if (res < 0 && errno != EINPROGRESS) {
  576. BLog(BLOG_ERROR, "connect failed");
  577. goto fail2;
  578. }
  579. // set not connected
  580. o->connected = 0;
  581. // set have no BFileDescriptor
  582. o->have_bfd = 0;
  583. if (res < 0) {
  584. // init BFileDescriptor
  585. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  586. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  587. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  588. goto fail2;
  589. }
  590. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  591. // set have BFileDescriptor
  592. o->have_bfd = 1;
  593. } else {
  594. // set connected
  595. o->connected = 1;
  596. // set job
  597. BPending_Set(&o->job);
  598. }
  599. DebugObject_Init(&o->d_obj);
  600. return 1;
  601. fail2:
  602. if (close(o->fd) < 0) {
  603. BLog(BLOG_ERROR, "close failed");
  604. }
  605. fail1:
  606. BPending_Free(&o->job);
  607. fail0:
  608. return 0;
  609. }
  610. void BConnector_Free (BConnector *o)
  611. {
  612. DebugObject_Free(&o->d_obj);
  613. // free BFileDescriptor
  614. if (o->have_bfd) {
  615. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  616. }
  617. // close fd
  618. if (o->fd != -1) {
  619. if (close(o->fd) < 0) {
  620. BLog(BLOG_ERROR, "close failed");
  621. }
  622. }
  623. // free job
  624. BPending_Free(&o->job);
  625. }
  626. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  627. BConnection_handler handler)
  628. {
  629. switch (source.type) {
  630. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  631. BListener *listener = source.u.listener.listener;
  632. DebugObject_Access(&listener->d_obj);
  633. ASSERT(BPending_IsSet(&listener->default_job))
  634. } break;
  635. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  636. BConnector *connector = source.u.connector.connector;
  637. DebugObject_Access(&connector->d_obj);
  638. ASSERT(connector->fd >= 0)
  639. ASSERT(connector->connected)
  640. ASSERT(!connector->have_bfd)
  641. ASSERT(!BPending_IsSet(&connector->job))
  642. } break;
  643. case BCONNECTION_SOURCE_TYPE_PIPE: {
  644. ASSERT(source.u.pipe.pipefd >= 0)
  645. } break;
  646. default: ASSERT(0);
  647. }
  648. ASSERT(handler)
  649. BNetwork_Assert();
  650. // init arguments
  651. o->reactor = reactor;
  652. o->user = user;
  653. o->handler = handler;
  654. switch (source.type) {
  655. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  656. BListener *listener = source.u.listener.listener;
  657. // unset listener's default job
  658. BPending_Unset(&listener->default_job);
  659. // accept
  660. struct sys_addr sysaddr;
  661. sysaddr.len = sizeof(sysaddr.addr);
  662. if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) {
  663. BLog(BLOG_ERROR, "accept failed");
  664. goto fail0;
  665. }
  666. o->close_fd = 1;
  667. // set non-blocking
  668. if (!badvpn_set_nonblocking(o->fd)) {
  669. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  670. goto fail1;
  671. }
  672. // return address
  673. if (source.u.listener.out_addr) {
  674. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  675. }
  676. } break;
  677. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  678. BConnector *connector = source.u.connector.connector;
  679. // grab fd from connector
  680. o->fd = connector->fd;
  681. connector->fd = -1;
  682. o->close_fd = 1;
  683. } break;
  684. case BCONNECTION_SOURCE_TYPE_PIPE: {
  685. // use user-provided fd
  686. o->fd = source.u.pipe.pipefd;
  687. o->close_fd = 0;
  688. // set non-blocking
  689. if (!badvpn_set_nonblocking(o->fd)) {
  690. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  691. goto fail1;
  692. }
  693. } break;
  694. }
  695. // init BFileDescriptor
  696. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o);
  697. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  698. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  699. goto fail1;
  700. }
  701. // set no wait events
  702. o->wait_events = 0;
  703. // init limits
  704. BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT);
  705. BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT);
  706. // set send and recv not inited
  707. o->send.inited = 0;
  708. o->recv.inited = 0;
  709. // set recv not closed
  710. o->recv.closed = 0;
  711. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  712. DebugObject_Init(&o->d_obj);
  713. return 1;
  714. fail1:
  715. if (o->close_fd) {
  716. if (close(o->fd) < 0) {
  717. BLog(BLOG_ERROR, "close failed");
  718. }
  719. }
  720. fail0:
  721. return 0;
  722. }
  723. void BConnection_Free (BConnection *o)
  724. {
  725. DebugObject_Free(&o->d_obj);
  726. DebugError_Free(&o->d_err);
  727. ASSERT(!o->recv.inited)
  728. ASSERT(!o->send.inited)
  729. // free limits
  730. BReactorLimit_Free(&o->recv.limit);
  731. BReactorLimit_Free(&o->send.limit);
  732. // free BFileDescriptor
  733. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  734. // close fd
  735. if (o->close_fd) {
  736. if (close(o->fd) < 0) {
  737. BLog(BLOG_ERROR, "close failed");
  738. }
  739. }
  740. }
  741. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  742. {
  743. DebugObject_Access(&o->d_obj);
  744. // set handlers
  745. o->user = user;
  746. o->handler = handler;
  747. }
  748. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  749. {
  750. DebugObject_Access(&o->d_obj);
  751. if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  752. BLog(BLOG_ERROR, "setsockopt failed");
  753. return 0;
  754. }
  755. return 1;
  756. }
  757. void BConnection_SendAsync_Init (BConnection *o)
  758. {
  759. DebugObject_Access(&o->d_obj);
  760. DebugError_AssertNoError(&o->d_err);
  761. ASSERT(!o->send.inited)
  762. // init interface
  763. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor));
  764. // init job
  765. BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o);
  766. // set not busy
  767. o->send.busy = 0;
  768. // set inited
  769. o->send.inited = 1;
  770. }
  771. void BConnection_SendAsync_Free (BConnection *o)
  772. {
  773. DebugObject_Access(&o->d_obj);
  774. ASSERT(o->send.inited)
  775. // update events
  776. o->wait_events &= ~BREACTOR_WRITE;
  777. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  778. // free job
  779. BPending_Free(&o->send.job);
  780. // free interface
  781. StreamPassInterface_Free(&o->send.iface);
  782. // set not inited
  783. o->send.inited = 0;
  784. }
  785. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  786. {
  787. DebugObject_Access(&o->d_obj);
  788. ASSERT(o->send.inited)
  789. return &o->send.iface;
  790. }
  791. void BConnection_RecvAsync_Init (BConnection *o)
  792. {
  793. DebugObject_Access(&o->d_obj);
  794. DebugError_AssertNoError(&o->d_err);
  795. ASSERT(!o->recv.inited)
  796. ASSERT(!o->recv.closed)
  797. // init interface
  798. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor));
  799. // init job
  800. BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o);
  801. // set not busy
  802. o->recv.busy = 0;
  803. // set inited
  804. o->recv.inited = 1;
  805. }
  806. void BConnection_RecvAsync_Free (BConnection *o)
  807. {
  808. DebugObject_Access(&o->d_obj);
  809. ASSERT(o->recv.inited)
  810. // update events
  811. o->wait_events &= ~BREACTOR_READ;
  812. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  813. // free job
  814. BPending_Free(&o->recv.job);
  815. // free interface
  816. StreamRecvInterface_Free(&o->recv.iface);
  817. // set not inited
  818. o->recv.inited = 0;
  819. }
  820. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  821. {
  822. DebugObject_Access(&o->d_obj);
  823. ASSERT(o->recv.inited)
  824. return &o->recv.iface;
  825. }