BConnection_unix.c 29 KB


  1. /**
  2. * @file BConnection_unix.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <string.h>
  30. #include <stddef.h>
  31. #include <unistd.h>
  32. #include <errno.h>
  33. #include <sys/types.h>
  34. #include <sys/socket.h>
  35. #include <sys/un.h>
  36. #include <misc/nonblocking.h>
  37. #include <misc/strdup.h>
  38. #include <base/BLog.h>
  39. #include "BConnection.h"
  40. #include <generated/blog_channel_BConnection.h>
  41. #define MAX_UNIX_SOCKET_PATH 200
  42. #define SEND_STATE_NOT_INITED 0
  43. #define SEND_STATE_READY 1
  44. #define SEND_STATE_BUSY 2
  45. #define RECV_STATE_NOT_INITED 0
  46. #define RECV_STATE_READY 1
  47. #define RECV_STATE_BUSY 2
  48. #define RECV_STATE_INITED_CLOSED 3
  49. #define RECV_STATE_NOT_INITED_CLOSED 4
  50. struct sys_addr {
  51. socklen_t len;
  52. union {
  53. struct sockaddr generic;
  54. struct sockaddr_in ipv4;
  55. struct sockaddr_in6 ipv6;
  56. } addr;
  57. };
  58. struct unix_addr {
  59. socklen_t len;
  60. union {
  61. struct sockaddr_un addr;
  62. uint8_t bytes[offsetof(struct sockaddr_un, sun_path) + MAX_UNIX_SOCKET_PATH + 1];
  63. } u;
  64. };
  65. static int build_unix_address (struct unix_addr *out, const char *socket_path);
  66. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  67. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  68. static void listener_fd_handler (BListener *o, int events);
  69. static void listener_default_job_handler (BListener *o);
  70. static void connector_fd_handler (BConnector *o, int events);
  71. static void connector_job_handler (BConnector *o);
  72. static void connection_report_error (BConnection *o);
  73. static void connection_send (BConnection *o);
  74. static void connection_recv (BConnection *o);
  75. static void connection_fd_handler (BConnection *o, int events);
  76. static void connection_send_job_handler (BConnection *o);
  77. static void connection_recv_job_handler (BConnection *o);
  78. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len);
  79. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len);
  80. static int build_unix_address (struct unix_addr *out, const char *socket_path)
  81. {
  82. ASSERT(socket_path);
  83. if (strlen(socket_path) > MAX_UNIX_SOCKET_PATH) {
  84. return 0;
  85. }
  86. out->len = offsetof(struct sockaddr_un, sun_path) + strlen(socket_path) + 1;
  87. out->u.addr.sun_family = AF_UNIX;
  88. strcpy(out->u.addr.sun_path, socket_path);
  89. return 1;
  90. }
  91. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  92. {
  93. switch (addr.type) {
  94. case BADDR_TYPE_IPV4: {
  95. out->len = sizeof(out->addr.ipv4);
  96. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  97. out->addr.ipv4.sin_family = AF_INET;
  98. out->addr.ipv4.sin_port = addr.ipv4.port;
  99. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  100. } break;
  101. case BADDR_TYPE_IPV6: {
  102. out->len = sizeof(out->addr.ipv6);
  103. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  104. out->addr.ipv6.sin6_family = AF_INET6;
  105. out->addr.ipv6.sin6_port = addr.ipv6.port;
  106. out->addr.ipv6.sin6_flowinfo = 0;
  107. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  108. out->addr.ipv6.sin6_scope_id = 0;
  109. } break;
  110. default: ASSERT(0);
  111. }
  112. }
  113. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  114. {
  115. switch (addr.addr.generic.sa_family) {
  116. case AF_INET: {
  117. ASSERT(addr.len == sizeof(struct sockaddr_in))
  118. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  119. } break;
  120. case AF_INET6: {
  121. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  122. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  123. } break;
  124. default: {
  125. BAddr_InitNone(out);
  126. } break;
  127. }
  128. }
  129. static void listener_fd_handler (BListener *o, int events)
  130. {
  131. DebugObject_Access(&o->d_obj);
  132. // set default job
  133. BPending_Set(&o->default_job);
  134. // call handler
  135. o->handler(o->user);
  136. return;
  137. }
  138. static void listener_default_job_handler (BListener *o)
  139. {
  140. DebugObject_Access(&o->d_obj);
  141. BLog(BLOG_ERROR, "discarding connection");
  142. // accept
  143. int newfd = accept(o->fd, NULL, NULL);
  144. if (newfd < 0) {
  145. BLog(BLOG_ERROR, "accept failed");
  146. return;
  147. }
  148. // close new fd
  149. if (close(newfd) < 0) {
  150. BLog(BLOG_ERROR, "close failed");
  151. }
  152. }
  153. static void connector_fd_handler (BConnector *o, int events)
  154. {
  155. DebugObject_Access(&o->d_obj);
  156. ASSERT(o->fd >= 0)
  157. ASSERT(!o->connected)
  158. ASSERT(o->have_bfd)
  159. // free BFileDescriptor
  160. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  161. // set have no BFileDescriptor
  162. o->have_bfd = 0;
  163. // read connection result
  164. int result;
  165. socklen_t result_len = sizeof(result);
  166. if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
  167. BLog(BLOG_ERROR, "getsockopt failed");
  168. goto fail0;
  169. }
  170. ASSERT_FORCE(result_len == sizeof(result))
  171. if (result != 0) {
  172. BLog(BLOG_ERROR, "connection failed");
  173. goto fail0;
  174. }
  175. // set connected
  176. o->connected = 1;
  177. fail0:
  178. // call handler
  179. o->handler(o->user, !o->connected);
  180. return;
  181. }
  182. static void connector_job_handler (BConnector *o)
  183. {
  184. DebugObject_Access(&o->d_obj);
  185. ASSERT(o->fd >= 0)
  186. ASSERT(o->connected)
  187. ASSERT(!o->have_bfd)
  188. // call handler
  189. o->handler(o->user, 0);
  190. return;
  191. }
  192. static void connection_report_error (BConnection *o)
  193. {
  194. DebugError_AssertNoError(&o->d_err);
  195. ASSERT(o->handler)
  196. // report error
  197. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  198. return;
  199. }
  200. static void connection_send (BConnection *o)
  201. {
  202. DebugError_AssertNoError(&o->d_err);
  203. ASSERT(o->send.state == SEND_STATE_BUSY)
  204. // limit
  205. if (!o->is_hupd) {
  206. if (!BReactorLimit_Increment(&o->send.limit)) {
  207. // wait for fd
  208. o->wait_events |= BREACTOR_WRITE;
  209. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  210. return;
  211. }
  212. }
  213. // send
  214. int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len);
  215. if (bytes < 0) {
  216. if (!o->is_hupd && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  217. // wait for fd
  218. o->wait_events |= BREACTOR_WRITE;
  219. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  220. return;
  221. }
  222. BLog(BLOG_ERROR, "send failed");
  223. connection_report_error(o);
  224. return;
  225. }
  226. ASSERT(bytes > 0)
  227. ASSERT(bytes <= o->send.busy_data_len)
  228. // set ready
  229. o->send.state = SEND_STATE_READY;
  230. // done
  231. StreamPassInterface_Done(&o->send.iface, bytes);
  232. }
  233. static void connection_recv (BConnection *o)
  234. {
  235. DebugError_AssertNoError(&o->d_err);
  236. ASSERT(o->recv.state == RECV_STATE_BUSY)
  237. // limit
  238. if (!o->is_hupd) {
  239. if (!BReactorLimit_Increment(&o->recv.limit)) {
  240. // wait for fd
  241. o->wait_events |= BREACTOR_READ;
  242. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  243. return;
  244. }
  245. }
  246. // recv
  247. int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail);
  248. if (bytes < 0) {
  249. if (!o->is_hupd && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  250. // wait for fd
  251. o->wait_events |= BREACTOR_READ;
  252. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  253. return;
  254. }
  255. BLog(BLOG_ERROR, "recv failed");
  256. connection_report_error(o);
  257. return;
  258. }
  259. if (bytes == 0) {
  260. // set recv inited closed
  261. o->recv.state = RECV_STATE_INITED_CLOSED;
  262. // report recv closed
  263. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  264. return;
  265. }
  266. ASSERT(bytes > 0)
  267. ASSERT(bytes <= o->recv.busy_data_avail)
  268. // set not busy
  269. o->recv.state = RECV_STATE_READY;
  270. // done
  271. StreamRecvInterface_Done(&o->recv.iface, bytes);
  272. }
  273. static void connection_fd_handler (BConnection *o, int events)
  274. {
  275. DebugObject_Access(&o->d_obj);
  276. DebugError_AssertNoError(&o->d_err);
  277. ASSERT(!o->is_hupd)
  278. // clear handled events
  279. o->wait_events &= ~events;
  280. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  281. int have_send = 0;
  282. int have_recv = 0;
  283. // if we got a HUP event, stop monitoring the file descriptor
  284. if ((events & BREACTOR_HUP)) {
  285. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  286. o->is_hupd = 1;
  287. }
  288. if ((events & BREACTOR_WRITE) || ((events & (BREACTOR_ERROR|BREACTOR_HUP)) && o->send.state == SEND_STATE_BUSY)) {
  289. ASSERT(o->send.state == SEND_STATE_BUSY)
  290. have_send = 1;
  291. }
  292. if ((events & BREACTOR_READ) || ((events & (BREACTOR_ERROR|BREACTOR_HUP)) && o->recv.state == RECV_STATE_BUSY)) {
  293. ASSERT(o->recv.state == RECV_STATE_BUSY)
  294. have_recv = 1;
  295. }
  296. if (have_send) {
  297. if (have_recv) {
  298. BPending_Set(&o->recv.job);
  299. }
  300. connection_send(o);
  301. return;
  302. }
  303. if (have_recv) {
  304. connection_recv(o);
  305. return;
  306. }
  307. if (!o->is_hupd) {
  308. BLog(BLOG_ERROR, "fd error event");
  309. connection_report_error(o);
  310. return;
  311. }
  312. }
  313. static void connection_send_job_handler (BConnection *o)
  314. {
  315. DebugObject_Access(&o->d_obj);
  316. DebugError_AssertNoError(&o->d_err);
  317. ASSERT(o->send.state == SEND_STATE_BUSY)
  318. connection_send(o);
  319. return;
  320. }
  321. static void connection_recv_job_handler (BConnection *o)
  322. {
  323. DebugObject_Access(&o->d_obj);
  324. DebugError_AssertNoError(&o->d_err);
  325. ASSERT(o->recv.state == RECV_STATE_BUSY)
  326. connection_recv(o);
  327. return;
  328. }
  329. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len)
  330. {
  331. DebugObject_Access(&o->d_obj);
  332. DebugError_AssertNoError(&o->d_err);
  333. ASSERT(o->send.state == SEND_STATE_READY)
  334. ASSERT(data_len > 0)
  335. // remember data
  336. o->send.busy_data = data;
  337. o->send.busy_data_len = data_len;
  338. // set busy
  339. o->send.state = SEND_STATE_BUSY;
  340. connection_send(o);
  341. return;
  342. }
  343. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail)
  344. {
  345. DebugObject_Access(&o->d_obj);
  346. DebugError_AssertNoError(&o->d_err);
  347. ASSERT(o->recv.state == RECV_STATE_READY)
  348. ASSERT(data_avail > 0)
  349. // remember data
  350. o->recv.busy_data = data;
  351. o->recv.busy_data_avail = data_avail;
  352. // set busy
  353. o->recv.state = RECV_STATE_BUSY;
  354. connection_recv(o);
  355. return;
  356. }
  357. int BConnection_AddressSupported (BAddr addr)
  358. {
  359. BAddr_Assert(&addr);
  360. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  361. }
  362. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  363. BListener_handler handler)
  364. {
  365. ASSERT(handler)
  366. BNetwork_Assert();
  367. // init arguments
  368. o->reactor = reactor;
  369. o->user = user;
  370. o->handler = handler;
  371. // set no unix socket path
  372. o->unix_socket_path = NULL;
  373. // check address
  374. if (!BConnection_AddressSupported(addr)) {
  375. BLog(BLOG_ERROR, "address not supported");
  376. goto fail0;
  377. }
  378. // convert address
  379. struct sys_addr sysaddr;
  380. addr_socket_to_sys(&sysaddr, addr);
  381. // init fd
  382. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  383. BLog(BLOG_ERROR, "socket failed");
  384. goto fail0;
  385. }
  386. // set non-blocking
  387. if (!badvpn_set_nonblocking(o->fd)) {
  388. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  389. goto fail1;
  390. }
  391. // set SO_REUSEADDR
  392. int optval = 1;
  393. if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
  394. BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed");
  395. }
  396. // bind
  397. if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) {
  398. BLog(BLOG_ERROR, "bind failed");
  399. goto fail1;
  400. }
  401. // listen
  402. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  403. BLog(BLOG_ERROR, "listen failed");
  404. goto fail1;
  405. }
  406. // init BFileDescriptor
  407. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  408. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  409. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  410. goto fail1;
  411. }
  412. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  413. // init default job
  414. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  415. DebugObject_Init(&o->d_obj);
  416. return 1;
  417. fail1:
  418. if (close(o->fd) < 0) {
  419. BLog(BLOG_ERROR, "close failed");
  420. }
  421. fail0:
  422. return 0;
  423. }
  424. int BListener_InitUnix (BListener *o, const char *socket_path, BReactor *reactor, void *user,
  425. BListener_handler handler)
  426. {
  427. ASSERT(socket_path)
  428. ASSERT(handler)
  429. BNetwork_Assert();
  430. // init arguments
  431. o->reactor = reactor;
  432. o->user = user;
  433. o->handler = handler;
  434. // copy socket path
  435. o->unix_socket_path = b_strdup(socket_path);
  436. if (!o->unix_socket_path) {
  437. BLog(BLOG_ERROR, "b_strdup failed");
  438. goto fail0;
  439. }
  440. // build address
  441. struct unix_addr addr;
  442. if (!build_unix_address(&addr, socket_path)) {
  443. BLog(BLOG_ERROR, "build_unix_address failed");
  444. goto fail1;
  445. }
  446. // init fd
  447. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  448. BLog(BLOG_ERROR, "socket failed");
  449. goto fail1;
  450. }
  451. // set non-blocking
  452. if (!badvpn_set_nonblocking(o->fd)) {
  453. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  454. goto fail2;
  455. }
  456. // unlink existing socket
  457. if (unlink(o->unix_socket_path) < 0 && errno != ENOENT) {
  458. BLog(BLOG_ERROR, "unlink existing socket failed");
  459. goto fail2;
  460. }
  461. // bind
  462. if (bind(o->fd, (struct sockaddr *)&addr.u.addr, addr.len) < 0) {
  463. BLog(BLOG_ERROR, "bind failed");
  464. goto fail2;
  465. }
  466. // listen
  467. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  468. BLog(BLOG_ERROR, "listen failed");
  469. goto fail3;
  470. }
  471. // init BFileDescriptor
  472. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  473. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  474. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  475. goto fail3;
  476. }
  477. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  478. // init default job
  479. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  480. DebugObject_Init(&o->d_obj);
  481. return 1;
  482. fail3:
  483. if (unlink(o->unix_socket_path) < 0) {
  484. BLog(BLOG_ERROR, "unlink socket failed");
  485. }
  486. fail2:
  487. if (close(o->fd) < 0) {
  488. BLog(BLOG_ERROR, "close failed");
  489. }
  490. fail1:
  491. free(o->unix_socket_path);
  492. fail0:
  493. return 0;
  494. }
  495. void BListener_Free (BListener *o)
  496. {
  497. DebugObject_Free(&o->d_obj);
  498. // free default job
  499. BPending_Free(&o->default_job);
  500. // free BFileDescriptor
  501. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  502. // free fd
  503. if (close(o->fd) < 0) {
  504. BLog(BLOG_ERROR, "close failed");
  505. }
  506. // unlink unix socket
  507. if (o->unix_socket_path) {
  508. if (unlink(o->unix_socket_path) < 0) {
  509. BLog(BLOG_ERROR, "unlink socket failed");
  510. }
  511. }
  512. // free unix socket path
  513. if (o->unix_socket_path) {
  514. free(o->unix_socket_path);
  515. }
  516. }
  517. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  518. BConnector_handler handler)
  519. {
  520. ASSERT(handler)
  521. BNetwork_Assert();
  522. // init arguments
  523. o->reactor = reactor;
  524. o->user = user;
  525. o->handler = handler;
  526. // check address
  527. if (!BConnection_AddressSupported(addr)) {
  528. BLog(BLOG_ERROR, "address not supported");
  529. goto fail0;
  530. }
  531. // convert address
  532. struct sys_addr sysaddr;
  533. addr_socket_to_sys(&sysaddr, addr);
  534. // init job
  535. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  536. // init fd
  537. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  538. BLog(BLOG_ERROR, "socket failed");
  539. goto fail1;
  540. }
  541. // set fd non-blocking
  542. if (!badvpn_set_nonblocking(o->fd)) {
  543. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  544. goto fail2;
  545. }
  546. // connect fd
  547. int res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len);
  548. if (res < 0 && errno != EINPROGRESS) {
  549. BLog(BLOG_ERROR, "connect failed");
  550. goto fail2;
  551. }
  552. // set not connected
  553. o->connected = 0;
  554. // set have no BFileDescriptor
  555. o->have_bfd = 0;
  556. if (res < 0) {
  557. // init BFileDescriptor
  558. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  559. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  560. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  561. goto fail2;
  562. }
  563. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  564. // set have BFileDescriptor
  565. o->have_bfd = 1;
  566. } else {
  567. // set connected
  568. o->connected = 1;
  569. // set job
  570. BPending_Set(&o->job);
  571. }
  572. DebugObject_Init(&o->d_obj);
  573. return 1;
  574. fail2:
  575. if (close(o->fd) < 0) {
  576. BLog(BLOG_ERROR, "close failed");
  577. }
  578. fail1:
  579. BPending_Free(&o->job);
  580. fail0:
  581. return 0;
  582. }
  583. int BConnector_InitUnix (BConnector *o, const char *socket_path, BReactor *reactor, void *user,
  584. BConnector_handler handler)
  585. {
  586. ASSERT(socket_path)
  587. ASSERT(handler)
  588. BNetwork_Assert();
  589. // init arguments
  590. o->reactor = reactor;
  591. o->user = user;
  592. o->handler = handler;
  593. // build address
  594. struct unix_addr addr;
  595. if (!build_unix_address(&addr, socket_path)) {
  596. BLog(BLOG_ERROR, "build_unix_address failed");
  597. goto fail0;
  598. }
  599. // init job
  600. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  601. // init fd
  602. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  603. BLog(BLOG_ERROR, "socket failed");
  604. goto fail1;
  605. }
  606. // set fd non-blocking
  607. if (!badvpn_set_nonblocking(o->fd)) {
  608. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  609. goto fail2;
  610. }
  611. // connect fd
  612. int res = connect(o->fd, (struct sockaddr *)&addr.u.addr, addr.len);
  613. if (res < 0 && errno != EINPROGRESS) {
  614. BLog(BLOG_ERROR, "connect failed");
  615. goto fail2;
  616. }
  617. // set not connected
  618. o->connected = 0;
  619. // set have no BFileDescriptor
  620. o->have_bfd = 0;
  621. if (res < 0) {
  622. // init BFileDescriptor
  623. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  624. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  625. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  626. goto fail2;
  627. }
  628. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  629. // set have BFileDescriptor
  630. o->have_bfd = 1;
  631. } else {
  632. // set connected
  633. o->connected = 1;
  634. // set job
  635. BPending_Set(&o->job);
  636. }
  637. DebugObject_Init(&o->d_obj);
  638. return 1;
  639. fail2:
  640. if (close(o->fd) < 0) {
  641. BLog(BLOG_ERROR, "close failed");
  642. }
  643. fail1:
  644. BPending_Free(&o->job);
  645. fail0:
  646. return 0;
  647. }
  648. void BConnector_Free (BConnector *o)
  649. {
  650. DebugObject_Free(&o->d_obj);
  651. // free BFileDescriptor
  652. if (o->have_bfd) {
  653. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  654. }
  655. // close fd
  656. if (o->fd != -1) {
  657. if (close(o->fd) < 0) {
  658. BLog(BLOG_ERROR, "close failed");
  659. }
  660. }
  661. // free job
  662. BPending_Free(&o->job);
  663. }
  664. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  665. BConnection_handler handler)
  666. {
  667. switch (source.type) {
  668. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  669. BListener *listener = source.u.listener.listener;
  670. DebugObject_Access(&listener->d_obj);
  671. ASSERT(BPending_IsSet(&listener->default_job))
  672. } break;
  673. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  674. BConnector *connector = source.u.connector.connector;
  675. DebugObject_Access(&connector->d_obj);
  676. ASSERT(connector->fd >= 0)
  677. ASSERT(connector->connected)
  678. ASSERT(!connector->have_bfd)
  679. ASSERT(!BPending_IsSet(&connector->job))
  680. } break;
  681. case BCONNECTION_SOURCE_TYPE_PIPE: {
  682. ASSERT(source.u.pipe.pipefd >= 0)
  683. } break;
  684. default: ASSERT(0);
  685. }
  686. ASSERT(handler)
  687. BNetwork_Assert();
  688. // init arguments
  689. o->reactor = reactor;
  690. o->user = user;
  691. o->handler = handler;
  692. switch (source.type) {
  693. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  694. BListener *listener = source.u.listener.listener;
  695. // unset listener's default job
  696. BPending_Unset(&listener->default_job);
  697. // accept
  698. struct sys_addr sysaddr;
  699. sysaddr.len = sizeof(sysaddr.addr);
  700. if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) {
  701. BLog(BLOG_ERROR, "accept failed");
  702. goto fail0;
  703. }
  704. o->close_fd = 1;
  705. // set non-blocking
  706. if (!badvpn_set_nonblocking(o->fd)) {
  707. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  708. goto fail1;
  709. }
  710. // return address
  711. if (source.u.listener.out_addr) {
  712. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  713. }
  714. } break;
  715. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  716. BConnector *connector = source.u.connector.connector;
  717. // grab fd from connector
  718. o->fd = connector->fd;
  719. connector->fd = -1;
  720. o->close_fd = 1;
  721. } break;
  722. case BCONNECTION_SOURCE_TYPE_PIPE: {
  723. // use user-provided fd
  724. o->fd = source.u.pipe.pipefd;
  725. o->close_fd = 0;
  726. // set non-blocking
  727. if (!badvpn_set_nonblocking(o->fd)) {
  728. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  729. goto fail1;
  730. }
  731. } break;
  732. }
  733. // set not HUPd
  734. o->is_hupd = 0;
  735. // init BFileDescriptor
  736. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o);
  737. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  738. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  739. goto fail1;
  740. }
  741. // set no wait events
  742. o->wait_events = 0;
  743. // init limits
  744. BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT);
  745. BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT);
  746. // set send and recv not inited
  747. o->send.state = SEND_STATE_NOT_INITED;
  748. o->recv.state = RECV_STATE_NOT_INITED;
  749. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  750. DebugObject_Init(&o->d_obj);
  751. return 1;
  752. fail1:
  753. if (o->close_fd) {
  754. if (close(o->fd) < 0) {
  755. BLog(BLOG_ERROR, "close failed");
  756. }
  757. }
  758. fail0:
  759. return 0;
  760. }
  761. void BConnection_Free (BConnection *o)
  762. {
  763. DebugObject_Free(&o->d_obj);
  764. DebugError_Free(&o->d_err);
  765. ASSERT(o->send.state == SEND_STATE_NOT_INITED)
  766. ASSERT(o->recv.state == RECV_STATE_NOT_INITED || o->recv.state == RECV_STATE_NOT_INITED_CLOSED)
  767. // free limits
  768. BReactorLimit_Free(&o->recv.limit);
  769. BReactorLimit_Free(&o->send.limit);
  770. // free BFileDescriptor
  771. if (!o->is_hupd) {
  772. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  773. }
  774. // close fd
  775. if (o->close_fd) {
  776. if (close(o->fd) < 0) {
  777. BLog(BLOG_ERROR, "close failed");
  778. }
  779. }
  780. }
  781. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  782. {
  783. DebugObject_Access(&o->d_obj);
  784. // set handlers
  785. o->user = user;
  786. o->handler = handler;
  787. }
  788. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  789. {
  790. DebugObject_Access(&o->d_obj);
  791. if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  792. BLog(BLOG_ERROR, "setsockopt failed");
  793. return 0;
  794. }
  795. return 1;
  796. }
  797. void BConnection_SendAsync_Init (BConnection *o)
  798. {
  799. DebugObject_Access(&o->d_obj);
  800. DebugError_AssertNoError(&o->d_err);
  801. ASSERT(o->send.state == SEND_STATE_NOT_INITED)
  802. // init interface
  803. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor));
  804. // init job
  805. BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o);
  806. // set ready
  807. o->send.state = SEND_STATE_READY;
  808. }
  809. void BConnection_SendAsync_Free (BConnection *o)
  810. {
  811. DebugObject_Access(&o->d_obj);
  812. ASSERT(o->send.state == SEND_STATE_READY || o->send.state == SEND_STATE_BUSY)
  813. // update events
  814. if (!o->is_hupd) {
  815. o->wait_events &= ~BREACTOR_WRITE;
  816. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  817. }
  818. // free job
  819. BPending_Free(&o->send.job);
  820. // free interface
  821. StreamPassInterface_Free(&o->send.iface);
  822. // set not inited
  823. o->send.state = SEND_STATE_NOT_INITED;
  824. }
  825. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  826. {
  827. DebugObject_Access(&o->d_obj);
  828. ASSERT(o->send.state == SEND_STATE_READY || o->send.state == SEND_STATE_BUSY)
  829. return &o->send.iface;
  830. }
  831. void BConnection_RecvAsync_Init (BConnection *o)
  832. {
  833. DebugObject_Access(&o->d_obj);
  834. DebugError_AssertNoError(&o->d_err);
  835. ASSERT(o->recv.state == RECV_STATE_NOT_INITED)
  836. // init interface
  837. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor));
  838. // init job
  839. BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o);
  840. // set ready
  841. o->recv.state = RECV_STATE_READY;
  842. }
  843. void BConnection_RecvAsync_Free (BConnection *o)
  844. {
  845. DebugObject_Access(&o->d_obj);
  846. ASSERT(o->recv.state == RECV_STATE_READY || o->recv.state == RECV_STATE_BUSY || o->recv.state == RECV_STATE_INITED_CLOSED)
  847. // update events
  848. if (!o->is_hupd) {
  849. o->wait_events &= ~BREACTOR_READ;
  850. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  851. }
  852. // free job
  853. BPending_Free(&o->recv.job);
  854. // free interface
  855. StreamRecvInterface_Free(&o->recv.iface);
  856. // set not inited
  857. o->recv.state = RECV_STATE_NOT_INITED;
  858. }
  859. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  860. {
  861. DebugObject_Access(&o->d_obj);
  862. ASSERT(o->recv.state == RECV_STATE_READY || o->recv.state == RECV_STATE_BUSY || o->recv.state == RECV_STATE_INITED_CLOSED)
  863. return &o->recv.iface;
  864. }