BConnection_unix.c 28 KB


  1. /**
  2. * @file BConnection_unix.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <string.h>
  30. #include <stddef.h>
  31. #include <unistd.h>
  32. #include <errno.h>
  33. #include <sys/types.h>
  34. #include <sys/socket.h>
  35. #include <sys/un.h>
  36. #include <misc/nonblocking.h>
  37. #include <misc/strdup.h>
  38. #include <base/BLog.h>
  39. #include "BConnection.h"
  40. #include <generated/blog_channel_BConnection.h>
  41. #define MAX_UNIX_SOCKET_PATH 200
  42. struct sys_addr {
  43. socklen_t len;
  44. union {
  45. struct sockaddr generic;
  46. struct sockaddr_in ipv4;
  47. struct sockaddr_in6 ipv6;
  48. } addr;
  49. };
  50. struct unix_addr {
  51. socklen_t len;
  52. union {
  53. struct sockaddr_un addr;
  54. uint8_t bytes[offsetof(struct sockaddr_un, sun_path) + MAX_UNIX_SOCKET_PATH + 1];
  55. } u;
  56. };
  57. static int build_unix_address (struct unix_addr *out, const char *socket_path);
  58. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  59. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  60. static void listener_fd_handler (BListener *o, int events);
  61. static void listener_default_job_handler (BListener *o);
  62. static void connector_fd_handler (BConnector *o, int events);
  63. static void connector_job_handler (BConnector *o);
  64. static void connection_report_error (BConnection *o);
  65. static void connection_send (BConnection *o);
  66. static void connection_recv (BConnection *o);
  67. static void connection_fd_handler (BConnection *o, int events);
  68. static void connection_send_job_handler (BConnection *o);
  69. static void connection_recv_job_handler (BConnection *o);
  70. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len);
  71. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len);
  72. static int build_unix_address (struct unix_addr *out, const char *socket_path)
  73. {
  74. ASSERT(socket_path);
  75. if (strlen(socket_path) > MAX_UNIX_SOCKET_PATH) {
  76. return 0;
  77. }
  78. out->len = offsetof(struct sockaddr_un, sun_path) + strlen(socket_path) + 1;
  79. out->u.addr.sun_family = AF_UNIX;
  80. strcpy(out->u.addr.sun_path, socket_path);
  81. return 1;
  82. }
  83. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  84. {
  85. switch (addr.type) {
  86. case BADDR_TYPE_IPV4: {
  87. out->len = sizeof(out->addr.ipv4);
  88. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  89. out->addr.ipv4.sin_family = AF_INET;
  90. out->addr.ipv4.sin_port = addr.ipv4.port;
  91. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  92. } break;
  93. case BADDR_TYPE_IPV6: {
  94. out->len = sizeof(out->addr.ipv6);
  95. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  96. out->addr.ipv6.sin6_family = AF_INET6;
  97. out->addr.ipv6.sin6_port = addr.ipv6.port;
  98. out->addr.ipv6.sin6_flowinfo = 0;
  99. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  100. out->addr.ipv6.sin6_scope_id = 0;
  101. } break;
  102. default: ASSERT(0);
  103. }
  104. }
  105. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  106. {
  107. switch (addr.addr.generic.sa_family) {
  108. case AF_INET: {
  109. ASSERT(addr.len == sizeof(struct sockaddr_in))
  110. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  111. } break;
  112. case AF_INET6: {
  113. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  114. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  115. } break;
  116. default: {
  117. BAddr_InitNone(out);
  118. } break;
  119. }
  120. }
  121. static void listener_fd_handler (BListener *o, int events)
  122. {
  123. DebugObject_Access(&o->d_obj);
  124. // set default job
  125. BPending_Set(&o->default_job);
  126. // call handler
  127. o->handler(o->user);
  128. return;
  129. }
  130. static void listener_default_job_handler (BListener *o)
  131. {
  132. DebugObject_Access(&o->d_obj);
  133. BLog(BLOG_ERROR, "discarding connection");
  134. // accept
  135. int newfd = accept(o->fd, NULL, NULL);
  136. if (newfd < 0) {
  137. BLog(BLOG_ERROR, "accept failed");
  138. return;
  139. }
  140. // close new fd
  141. if (close(newfd) < 0) {
  142. BLog(BLOG_ERROR, "close failed");
  143. }
  144. }
  145. static void connector_fd_handler (BConnector *o, int events)
  146. {
  147. DebugObject_Access(&o->d_obj);
  148. ASSERT(o->fd >= 0)
  149. ASSERT(!o->connected)
  150. ASSERT(o->have_bfd)
  151. // free BFileDescriptor
  152. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  153. // set have no BFileDescriptor
  154. o->have_bfd = 0;
  155. // read connection result
  156. int result;
  157. socklen_t result_len = sizeof(result);
  158. if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
  159. BLog(BLOG_ERROR, "getsockopt failed");
  160. goto fail0;
  161. }
  162. ASSERT_FORCE(result_len == sizeof(result))
  163. if (result != 0) {
  164. BLog(BLOG_ERROR, "connection failed");
  165. goto fail0;
  166. }
  167. // set connected
  168. o->connected = 1;
  169. fail0:
  170. // call handler
  171. o->handler(o->user, !o->connected);
  172. return;
  173. }
  174. static void connector_job_handler (BConnector *o)
  175. {
  176. DebugObject_Access(&o->d_obj);
  177. ASSERT(o->fd >= 0)
  178. ASSERT(o->connected)
  179. ASSERT(!o->have_bfd)
  180. // call handler
  181. o->handler(o->user, 0);
  182. return;
  183. }
  184. static void connection_report_error (BConnection *o)
  185. {
  186. DebugError_AssertNoError(&o->d_err);
  187. ASSERT(o->handler)
  188. // report error
  189. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  190. return;
  191. }
  192. static void connection_send (BConnection *o)
  193. {
  194. DebugError_AssertNoError(&o->d_err);
  195. ASSERT(o->send.inited)
  196. ASSERT(o->send.busy)
  197. // limit
  198. if (!BReactorLimit_Increment(&o->send.limit)) {
  199. // wait for fd
  200. o->wait_events |= BREACTOR_WRITE;
  201. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  202. return;
  203. }
  204. // send
  205. int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len);
  206. if (bytes < 0) {
  207. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  208. // wait for fd
  209. o->wait_events |= BREACTOR_WRITE;
  210. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  211. return;
  212. }
  213. BLog(BLOG_ERROR, "send failed");
  214. connection_report_error(o);
  215. return;
  216. }
  217. ASSERT(bytes > 0)
  218. ASSERT(bytes <= o->send.busy_data_len)
  219. // set not busy
  220. o->send.busy = 0;
  221. // done
  222. StreamPassInterface_Done(&o->send.iface, bytes);
  223. }
  224. static void connection_recv (BConnection *o)
  225. {
  226. DebugError_AssertNoError(&o->d_err);
  227. ASSERT(o->recv.inited)
  228. ASSERT(o->recv.busy)
  229. ASSERT(!o->recv.closed)
  230. // limit
  231. if (!BReactorLimit_Increment(&o->recv.limit)) {
  232. // wait for fd
  233. o->wait_events |= BREACTOR_READ;
  234. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  235. return;
  236. }
  237. // recv
  238. int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail);
  239. if (bytes < 0) {
  240. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  241. // wait for fd
  242. o->wait_events |= BREACTOR_READ;
  243. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  244. return;
  245. }
  246. BLog(BLOG_ERROR, "recv failed");
  247. connection_report_error(o);
  248. return;
  249. }
  250. if (bytes == 0) {
  251. // set recv closed
  252. o->recv.closed = 1;
  253. // report recv closed
  254. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  255. return;
  256. }
  257. ASSERT(bytes > 0)
  258. ASSERT(bytes <= o->recv.busy_data_avail)
  259. // set not busy
  260. o->recv.busy = 0;
  261. // done
  262. StreamRecvInterface_Done(&o->recv.iface, bytes);
  263. }
  264. static void connection_fd_handler (BConnection *o, int events)
  265. {
  266. DebugObject_Access(&o->d_obj);
  267. DebugError_AssertNoError(&o->d_err);
  268. // clear handled events
  269. o->wait_events &= ~events;
  270. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  271. int have_send = 0;
  272. int have_recv = 0;
  273. if ((events & BREACTOR_WRITE) || ((events & BREACTOR_ERROR) && o->send.inited && o->send.busy)) {
  274. ASSERT(o->send.inited)
  275. ASSERT(o->send.busy)
  276. have_send = 1;
  277. }
  278. if ((events & BREACTOR_READ) || ((events & BREACTOR_ERROR) && o->recv.inited && o->recv.busy && !o->recv.closed)) {
  279. ASSERT(o->recv.inited)
  280. ASSERT(o->recv.busy)
  281. ASSERT(!o->recv.closed)
  282. have_recv = 1;
  283. }
  284. if (have_send) {
  285. if (have_recv) {
  286. BPending_Set(&o->recv.job);
  287. }
  288. connection_send(o);
  289. return;
  290. }
  291. if (have_recv) {
  292. connection_recv(o);
  293. return;
  294. }
  295. BLog(BLOG_ERROR, "fd error event");
  296. connection_report_error(o);
  297. return;
  298. }
  299. static void connection_send_job_handler (BConnection *o)
  300. {
  301. DebugObject_Access(&o->d_obj);
  302. DebugError_AssertNoError(&o->d_err);
  303. ASSERT(o->send.inited)
  304. ASSERT(o->send.busy)
  305. connection_send(o);
  306. return;
  307. }
  308. static void connection_recv_job_handler (BConnection *o)
  309. {
  310. DebugObject_Access(&o->d_obj);
  311. DebugError_AssertNoError(&o->d_err);
  312. ASSERT(o->recv.inited)
  313. ASSERT(o->recv.busy)
  314. ASSERT(!o->recv.closed)
  315. connection_recv(o);
  316. return;
  317. }
  318. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len)
  319. {
  320. DebugObject_Access(&o->d_obj);
  321. DebugError_AssertNoError(&o->d_err);
  322. ASSERT(o->send.inited)
  323. ASSERT(!o->send.busy)
  324. ASSERT(data_len > 0)
  325. // remember data
  326. o->send.busy_data = data;
  327. o->send.busy_data_len = data_len;
  328. // set busy
  329. o->send.busy = 1;
  330. connection_send(o);
  331. return;
  332. }
  333. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail)
  334. {
  335. DebugObject_Access(&o->d_obj);
  336. DebugError_AssertNoError(&o->d_err);
  337. ASSERT(o->recv.inited)
  338. ASSERT(!o->recv.busy)
  339. ASSERT(!o->recv.closed)
  340. ASSERT(data_avail > 0)
  341. // remember data
  342. o->recv.busy_data = data;
  343. o->recv.busy_data_avail = data_avail;
  344. // set busy
  345. o->recv.busy = 1;
  346. connection_recv(o);
  347. return;
  348. }
  349. int BConnection_AddressSupported (BAddr addr)
  350. {
  351. BAddr_Assert(&addr);
  352. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  353. }
  354. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  355. BListener_handler handler)
  356. {
  357. ASSERT(handler)
  358. BNetwork_Assert();
  359. // init arguments
  360. o->reactor = reactor;
  361. o->user = user;
  362. o->handler = handler;
  363. // set no unix socket path
  364. o->unix_socket_path = NULL;
  365. // check address
  366. if (!BConnection_AddressSupported(addr)) {
  367. BLog(BLOG_ERROR, "address not supported");
  368. goto fail0;
  369. }
  370. // convert address
  371. struct sys_addr sysaddr;
  372. addr_socket_to_sys(&sysaddr, addr);
  373. // init fd
  374. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  375. BLog(BLOG_ERROR, "socket failed");
  376. goto fail0;
  377. }
  378. // set non-blocking
  379. if (!badvpn_set_nonblocking(o->fd)) {
  380. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  381. goto fail1;
  382. }
  383. // set SO_REUSEADDR
  384. int optval = 1;
  385. if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
  386. BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed");
  387. }
  388. // bind
  389. if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) {
  390. BLog(BLOG_ERROR, "bind failed");
  391. goto fail1;
  392. }
  393. // listen
  394. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  395. BLog(BLOG_ERROR, "listen failed");
  396. goto fail1;
  397. }
  398. // init BFileDescriptor
  399. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  400. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  401. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  402. goto fail1;
  403. }
  404. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  405. // init default job
  406. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  407. DebugObject_Init(&o->d_obj);
  408. return 1;
  409. fail1:
  410. if (close(o->fd) < 0) {
  411. BLog(BLOG_ERROR, "close failed");
  412. }
  413. fail0:
  414. return 0;
  415. }
  416. int BListener_InitUnix (BListener *o, const char *socket_path, BReactor *reactor, void *user,
  417. BListener_handler handler)
  418. {
  419. ASSERT(socket_path)
  420. ASSERT(handler)
  421. BNetwork_Assert();
  422. // init arguments
  423. o->reactor = reactor;
  424. o->user = user;
  425. o->handler = handler;
  426. // copy socket path
  427. o->unix_socket_path = b_strdup(socket_path);
  428. if (!o->unix_socket_path) {
  429. BLog(BLOG_ERROR, "b_strdup failed");
  430. goto fail0;
  431. }
  432. // build address
  433. struct unix_addr addr;
  434. if (!build_unix_address(&addr, socket_path)) {
  435. BLog(BLOG_ERROR, "build_unix_address failed");
  436. goto fail1;
  437. }
  438. // init fd
  439. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  440. BLog(BLOG_ERROR, "socket failed");
  441. goto fail1;
  442. }
  443. // set non-blocking
  444. if (!badvpn_set_nonblocking(o->fd)) {
  445. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  446. goto fail2;
  447. }
  448. // unlink existing socket
  449. if (unlink(o->unix_socket_path) < 0 && errno != ENOENT) {
  450. BLog(BLOG_ERROR, "unlink existing socket failed");
  451. goto fail2;
  452. }
  453. // bind
  454. if (bind(o->fd, (struct sockaddr *)&addr.u.addr, addr.len) < 0) {
  455. BLog(BLOG_ERROR, "bind failed");
  456. goto fail2;
  457. }
  458. // listen
  459. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  460. BLog(BLOG_ERROR, "listen failed");
  461. goto fail3;
  462. }
  463. // init BFileDescriptor
  464. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  465. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  466. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  467. goto fail3;
  468. }
  469. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  470. // init default job
  471. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  472. DebugObject_Init(&o->d_obj);
  473. return 1;
  474. fail3:
  475. if (unlink(o->unix_socket_path) < 0) {
  476. BLog(BLOG_ERROR, "unlink socket failed");
  477. }
  478. fail2:
  479. if (close(o->fd) < 0) {
  480. BLog(BLOG_ERROR, "close failed");
  481. }
  482. fail1:
  483. free(o->unix_socket_path);
  484. fail0:
  485. return 0;
  486. }
  487. void BListener_Free (BListener *o)
  488. {
  489. DebugObject_Free(&o->d_obj);
  490. // free default job
  491. BPending_Free(&o->default_job);
  492. // free BFileDescriptor
  493. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  494. // free fd
  495. if (close(o->fd) < 0) {
  496. BLog(BLOG_ERROR, "close failed");
  497. }
  498. // unlink unix socket
  499. if (o->unix_socket_path) {
  500. if (unlink(o->unix_socket_path) < 0) {
  501. BLog(BLOG_ERROR, "unlink socket failed");
  502. }
  503. }
  504. // free unix socket path
  505. if (o->unix_socket_path) {
  506. free(o->unix_socket_path);
  507. }
  508. }
  509. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  510. BConnector_handler handler)
  511. {
  512. ASSERT(handler)
  513. BNetwork_Assert();
  514. // init arguments
  515. o->reactor = reactor;
  516. o->user = user;
  517. o->handler = handler;
  518. // check address
  519. if (!BConnection_AddressSupported(addr)) {
  520. BLog(BLOG_ERROR, "address not supported");
  521. goto fail0;
  522. }
  523. // convert address
  524. struct sys_addr sysaddr;
  525. addr_socket_to_sys(&sysaddr, addr);
  526. // init job
  527. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  528. // init fd
  529. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  530. BLog(BLOG_ERROR, "socket failed");
  531. goto fail1;
  532. }
  533. // set fd non-blocking
  534. if (!badvpn_set_nonblocking(o->fd)) {
  535. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  536. goto fail2;
  537. }
  538. // connect fd
  539. int res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len);
  540. if (res < 0 && errno != EINPROGRESS) {
  541. BLog(BLOG_ERROR, "connect failed");
  542. goto fail2;
  543. }
  544. // set not connected
  545. o->connected = 0;
  546. // set have no BFileDescriptor
  547. o->have_bfd = 0;
  548. if (res < 0) {
  549. // init BFileDescriptor
  550. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  551. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  552. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  553. goto fail2;
  554. }
  555. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  556. // set have BFileDescriptor
  557. o->have_bfd = 1;
  558. } else {
  559. // set connected
  560. o->connected = 1;
  561. // set job
  562. BPending_Set(&o->job);
  563. }
  564. DebugObject_Init(&o->d_obj);
  565. return 1;
  566. fail2:
  567. if (close(o->fd) < 0) {
  568. BLog(BLOG_ERROR, "close failed");
  569. }
  570. fail1:
  571. BPending_Free(&o->job);
  572. fail0:
  573. return 0;
  574. }
  575. int BConnector_InitUnix (BConnector *o, const char *socket_path, BReactor *reactor, void *user,
  576. BConnector_handler handler)
  577. {
  578. ASSERT(socket_path)
  579. ASSERT(handler)
  580. BNetwork_Assert();
  581. // init arguments
  582. o->reactor = reactor;
  583. o->user = user;
  584. o->handler = handler;
  585. // build address
  586. struct unix_addr addr;
  587. if (!build_unix_address(&addr, socket_path)) {
  588. BLog(BLOG_ERROR, "build_unix_address failed");
  589. goto fail0;
  590. }
  591. // init job
  592. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  593. // init fd
  594. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  595. BLog(BLOG_ERROR, "socket failed");
  596. goto fail1;
  597. }
  598. // set fd non-blocking
  599. if (!badvpn_set_nonblocking(o->fd)) {
  600. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  601. goto fail2;
  602. }
  603. // connect fd
  604. int res = connect(o->fd, (struct sockaddr *)&addr.u.addr, addr.len);
  605. if (res < 0 && errno != EINPROGRESS) {
  606. BLog(BLOG_ERROR, "connect failed");
  607. goto fail2;
  608. }
  609. // set not connected
  610. o->connected = 0;
  611. // set have no BFileDescriptor
  612. o->have_bfd = 0;
  613. if (res < 0) {
  614. // init BFileDescriptor
  615. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  616. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  617. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  618. goto fail2;
  619. }
  620. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  621. // set have BFileDescriptor
  622. o->have_bfd = 1;
  623. } else {
  624. // set connected
  625. o->connected = 1;
  626. // set job
  627. BPending_Set(&o->job);
  628. }
  629. DebugObject_Init(&o->d_obj);
  630. return 1;
  631. fail2:
  632. if (close(o->fd) < 0) {
  633. BLog(BLOG_ERROR, "close failed");
  634. }
  635. fail1:
  636. BPending_Free(&o->job);
  637. fail0:
  638. return 0;
  639. }
  640. void BConnector_Free (BConnector *o)
  641. {
  642. DebugObject_Free(&o->d_obj);
  643. // free BFileDescriptor
  644. if (o->have_bfd) {
  645. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  646. }
  647. // close fd
  648. if (o->fd != -1) {
  649. if (close(o->fd) < 0) {
  650. BLog(BLOG_ERROR, "close failed");
  651. }
  652. }
  653. // free job
  654. BPending_Free(&o->job);
  655. }
  656. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  657. BConnection_handler handler)
  658. {
  659. switch (source.type) {
  660. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  661. BListener *listener = source.u.listener.listener;
  662. DebugObject_Access(&listener->d_obj);
  663. ASSERT(BPending_IsSet(&listener->default_job))
  664. } break;
  665. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  666. BConnector *connector = source.u.connector.connector;
  667. DebugObject_Access(&connector->d_obj);
  668. ASSERT(connector->fd >= 0)
  669. ASSERT(connector->connected)
  670. ASSERT(!connector->have_bfd)
  671. ASSERT(!BPending_IsSet(&connector->job))
  672. } break;
  673. case BCONNECTION_SOURCE_TYPE_PIPE: {
  674. ASSERT(source.u.pipe.pipefd >= 0)
  675. } break;
  676. default: ASSERT(0);
  677. }
  678. ASSERT(handler)
  679. BNetwork_Assert();
  680. // init arguments
  681. o->reactor = reactor;
  682. o->user = user;
  683. o->handler = handler;
  684. switch (source.type) {
  685. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  686. BListener *listener = source.u.listener.listener;
  687. // unset listener's default job
  688. BPending_Unset(&listener->default_job);
  689. // accept
  690. struct sys_addr sysaddr;
  691. sysaddr.len = sizeof(sysaddr.addr);
  692. if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) {
  693. BLog(BLOG_ERROR, "accept failed");
  694. goto fail0;
  695. }
  696. o->close_fd = 1;
  697. // set non-blocking
  698. if (!badvpn_set_nonblocking(o->fd)) {
  699. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  700. goto fail1;
  701. }
  702. // return address
  703. if (source.u.listener.out_addr) {
  704. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  705. }
  706. } break;
  707. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  708. BConnector *connector = source.u.connector.connector;
  709. // grab fd from connector
  710. o->fd = connector->fd;
  711. connector->fd = -1;
  712. o->close_fd = 1;
  713. } break;
  714. case BCONNECTION_SOURCE_TYPE_PIPE: {
  715. // use user-provided fd
  716. o->fd = source.u.pipe.pipefd;
  717. o->close_fd = 0;
  718. // set non-blocking
  719. if (!badvpn_set_nonblocking(o->fd)) {
  720. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  721. goto fail1;
  722. }
  723. } break;
  724. }
  725. // init BFileDescriptor
  726. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o);
  727. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  728. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  729. goto fail1;
  730. }
  731. // set no wait events
  732. o->wait_events = 0;
  733. // init limits
  734. BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT);
  735. BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT);
  736. // set send and recv not inited
  737. o->send.inited = 0;
  738. o->recv.inited = 0;
  739. // set recv not closed
  740. o->recv.closed = 0;
  741. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  742. DebugObject_Init(&o->d_obj);
  743. return 1;
  744. fail1:
  745. if (o->close_fd) {
  746. if (close(o->fd) < 0) {
  747. BLog(BLOG_ERROR, "close failed");
  748. }
  749. }
  750. fail0:
  751. return 0;
  752. }
  753. void BConnection_Free (BConnection *o)
  754. {
  755. DebugObject_Free(&o->d_obj);
  756. DebugError_Free(&o->d_err);
  757. ASSERT(!o->recv.inited)
  758. ASSERT(!o->send.inited)
  759. // free limits
  760. BReactorLimit_Free(&o->recv.limit);
  761. BReactorLimit_Free(&o->send.limit);
  762. // free BFileDescriptor
  763. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  764. // close fd
  765. if (o->close_fd) {
  766. if (close(o->fd) < 0) {
  767. BLog(BLOG_ERROR, "close failed");
  768. }
  769. }
  770. }
  771. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  772. {
  773. DebugObject_Access(&o->d_obj);
  774. // set handlers
  775. o->user = user;
  776. o->handler = handler;
  777. }
  778. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  779. {
  780. DebugObject_Access(&o->d_obj);
  781. if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  782. BLog(BLOG_ERROR, "setsockopt failed");
  783. return 0;
  784. }
  785. return 1;
  786. }
  787. void BConnection_SendAsync_Init (BConnection *o)
  788. {
  789. DebugObject_Access(&o->d_obj);
  790. DebugError_AssertNoError(&o->d_err);
  791. ASSERT(!o->send.inited)
  792. // init interface
  793. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor));
  794. // init job
  795. BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o);
  796. // set not busy
  797. o->send.busy = 0;
  798. // set inited
  799. o->send.inited = 1;
  800. }
  801. void BConnection_SendAsync_Free (BConnection *o)
  802. {
  803. DebugObject_Access(&o->d_obj);
  804. ASSERT(o->send.inited)
  805. // update events
  806. o->wait_events &= ~BREACTOR_WRITE;
  807. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  808. // free job
  809. BPending_Free(&o->send.job);
  810. // free interface
  811. StreamPassInterface_Free(&o->send.iface);
  812. // set not inited
  813. o->send.inited = 0;
  814. }
  815. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  816. {
  817. DebugObject_Access(&o->d_obj);
  818. ASSERT(o->send.inited)
  819. return &o->send.iface;
  820. }
  821. void BConnection_RecvAsync_Init (BConnection *o)
  822. {
  823. DebugObject_Access(&o->d_obj);
  824. DebugError_AssertNoError(&o->d_err);
  825. ASSERT(!o->recv.inited)
  826. ASSERT(!o->recv.closed)
  827. // init interface
  828. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor));
  829. // init job
  830. BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o);
  831. // set not busy
  832. o->recv.busy = 0;
  833. // set inited
  834. o->recv.inited = 1;
  835. }
  836. void BConnection_RecvAsync_Free (BConnection *o)
  837. {
  838. DebugObject_Access(&o->d_obj);
  839. ASSERT(o->recv.inited)
  840. // update events
  841. o->wait_events &= ~BREACTOR_READ;
  842. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  843. // free job
  844. BPending_Free(&o->recv.job);
  845. // free interface
  846. StreamRecvInterface_Free(&o->recv.iface);
  847. // set not inited
  848. o->recv.inited = 0;
  849. }
  850. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  851. {
  852. DebugObject_Access(&o->d_obj);
  853. ASSERT(o->recv.inited)
  854. return &o->recv.iface;
  855. }