BConnection_unix.c 28 KB


  1. /**
  2. * @file BConnection_unix.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <string.h>
  30. #include <stddef.h>
  31. #include <unistd.h>
  32. #include <errno.h>
  33. #include <sys/types.h>
  34. #include <sys/socket.h>
  35. #include <sys/un.h>
  36. #include <misc/nonblocking.h>
  37. #include <misc/strdup.h>
  38. #include <base/BLog.h>
  39. #include "BConnection.h"
  40. #include <generated/blog_channel_BConnection.h>
  41. #define MAX_UNIX_SOCKET_PATH 200
  42. #define SEND_STATE_NOT_INITED 0
  43. #define SEND_STATE_READY 1
  44. #define SEND_STATE_BUSY 2
  45. #define RECV_STATE_NOT_INITED 0
  46. #define RECV_STATE_READY 1
  47. #define RECV_STATE_BUSY 2
  48. #define RECV_STATE_INITED_CLOSED 3
  49. #define RECV_STATE_NOT_INITED_CLOSED 4
  50. struct sys_addr {
  51. socklen_t len;
  52. union {
  53. struct sockaddr generic;
  54. struct sockaddr_in ipv4;
  55. struct sockaddr_in6 ipv6;
  56. } addr;
  57. };
  58. struct unix_addr {
  59. socklen_t len;
  60. union {
  61. struct sockaddr_un addr;
  62. uint8_t bytes[offsetof(struct sockaddr_un, sun_path) + MAX_UNIX_SOCKET_PATH + 1];
  63. } u;
  64. };
  65. static int build_unix_address (struct unix_addr *out, const char *socket_path);
  66. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  67. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  68. static void listener_fd_handler (BListener *o, int events);
  69. static void listener_default_job_handler (BListener *o);
  70. static void connector_fd_handler (BConnector *o, int events);
  71. static void connector_job_handler (BConnector *o);
  72. static void connection_report_error (BConnection *o);
  73. static void connection_send (BConnection *o);
  74. static void connection_recv (BConnection *o);
  75. static void connection_fd_handler (BConnection *o, int events);
  76. static void connection_send_job_handler (BConnection *o);
  77. static void connection_recv_job_handler (BConnection *o);
  78. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len);
  79. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len);
  80. static int build_unix_address (struct unix_addr *out, const char *socket_path)
  81. {
  82. ASSERT(socket_path);
  83. if (strlen(socket_path) > MAX_UNIX_SOCKET_PATH) {
  84. return 0;
  85. }
  86. out->len = offsetof(struct sockaddr_un, sun_path) + strlen(socket_path) + 1;
  87. out->u.addr.sun_family = AF_UNIX;
  88. strcpy(out->u.addr.sun_path, socket_path);
  89. return 1;
  90. }
  91. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  92. {
  93. switch (addr.type) {
  94. case BADDR_TYPE_IPV4: {
  95. out->len = sizeof(out->addr.ipv4);
  96. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  97. out->addr.ipv4.sin_family = AF_INET;
  98. out->addr.ipv4.sin_port = addr.ipv4.port;
  99. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  100. } break;
  101. case BADDR_TYPE_IPV6: {
  102. out->len = sizeof(out->addr.ipv6);
  103. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  104. out->addr.ipv6.sin6_family = AF_INET6;
  105. out->addr.ipv6.sin6_port = addr.ipv6.port;
  106. out->addr.ipv6.sin6_flowinfo = 0;
  107. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  108. out->addr.ipv6.sin6_scope_id = 0;
  109. } break;
  110. default: ASSERT(0);
  111. }
  112. }
  113. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  114. {
  115. switch (addr.addr.generic.sa_family) {
  116. case AF_INET: {
  117. ASSERT(addr.len == sizeof(struct sockaddr_in))
  118. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  119. } break;
  120. case AF_INET6: {
  121. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  122. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  123. } break;
  124. default: {
  125. BAddr_InitNone(out);
  126. } break;
  127. }
  128. }
  129. static void listener_fd_handler (BListener *o, int events)
  130. {
  131. DebugObject_Access(&o->d_obj);
  132. // set default job
  133. BPending_Set(&o->default_job);
  134. // call handler
  135. o->handler(o->user);
  136. return;
  137. }
  138. static void listener_default_job_handler (BListener *o)
  139. {
  140. DebugObject_Access(&o->d_obj);
  141. BLog(BLOG_ERROR, "discarding connection");
  142. // accept
  143. int newfd = accept(o->fd, NULL, NULL);
  144. if (newfd < 0) {
  145. BLog(BLOG_ERROR, "accept failed");
  146. return;
  147. }
  148. // close new fd
  149. if (close(newfd) < 0) {
  150. BLog(BLOG_ERROR, "close failed");
  151. }
  152. }
  153. static void connector_fd_handler (BConnector *o, int events)
  154. {
  155. DebugObject_Access(&o->d_obj);
  156. ASSERT(o->fd >= 0)
  157. ASSERT(!o->connected)
  158. ASSERT(o->have_bfd)
  159. // free BFileDescriptor
  160. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  161. // set have no BFileDescriptor
  162. o->have_bfd = 0;
  163. // read connection result
  164. int result;
  165. socklen_t result_len = sizeof(result);
  166. if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
  167. BLog(BLOG_ERROR, "getsockopt failed");
  168. goto fail0;
  169. }
  170. ASSERT_FORCE(result_len == sizeof(result))
  171. if (result != 0) {
  172. BLog(BLOG_ERROR, "connection failed");
  173. goto fail0;
  174. }
  175. // set connected
  176. o->connected = 1;
  177. fail0:
  178. // call handler
  179. o->handler(o->user, !o->connected);
  180. return;
  181. }
  182. static void connector_job_handler (BConnector *o)
  183. {
  184. DebugObject_Access(&o->d_obj);
  185. ASSERT(o->fd >= 0)
  186. ASSERT(o->connected)
  187. ASSERT(!o->have_bfd)
  188. // call handler
  189. o->handler(o->user, 0);
  190. return;
  191. }
  192. static void connection_report_error (BConnection *o)
  193. {
  194. DebugError_AssertNoError(&o->d_err);
  195. ASSERT(o->handler)
  196. // report error
  197. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  198. return;
  199. }
  200. static void connection_send (BConnection *o)
  201. {
  202. DebugError_AssertNoError(&o->d_err);
  203. ASSERT(o->send.state == SEND_STATE_BUSY)
  204. // limit
  205. if (!o->is_hupd) {
  206. if (!BReactorLimit_Increment(&o->send.limit)) {
  207. // wait for fd
  208. o->wait_events |= BREACTOR_WRITE;
  209. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  210. return;
  211. }
  212. }
  213. // send
  214. int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len);
  215. if (bytes < 0) {
  216. if (!o->is_hupd && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  217. // wait for fd
  218. o->wait_events |= BREACTOR_WRITE;
  219. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  220. return;
  221. }
  222. BLog(BLOG_ERROR, "send failed");
  223. connection_report_error(o);
  224. return;
  225. }
  226. ASSERT(bytes > 0)
  227. ASSERT(bytes <= o->send.busy_data_len)
  228. // set ready
  229. o->send.state = SEND_STATE_READY;
  230. // done
  231. StreamPassInterface_Done(&o->send.iface, bytes);
  232. }
  233. static void connection_recv (BConnection *o)
  234. {
  235. DebugError_AssertNoError(&o->d_err);
  236. ASSERT(o->recv.state == RECV_STATE_BUSY)
  237. // limit
  238. if (!o->is_hupd) {
  239. if (!BReactorLimit_Increment(&o->recv.limit)) {
  240. // wait for fd
  241. o->wait_events |= BREACTOR_READ;
  242. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  243. return;
  244. }
  245. }
  246. // recv
  247. int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail);
  248. if (bytes < 0) {
  249. if (!o->is_hupd && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  250. // wait for fd
  251. o->wait_events |= BREACTOR_READ;
  252. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  253. return;
  254. }
  255. BLog(BLOG_ERROR, "recv failed");
  256. connection_report_error(o);
  257. return;
  258. }
  259. if (bytes == 0) {
  260. // set recv inited closed
  261. o->recv.state = RECV_STATE_INITED_CLOSED;
  262. // report recv closed
  263. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  264. return;
  265. }
  266. ASSERT(bytes > 0)
  267. ASSERT(bytes <= o->recv.busy_data_avail)
  268. // set not busy
  269. o->recv.state = RECV_STATE_READY;
  270. // done
  271. StreamRecvInterface_Done(&o->recv.iface, bytes);
  272. }
  273. static void connection_fd_handler (BConnection *o, int events)
  274. {
  275. DebugObject_Access(&o->d_obj);
  276. DebugError_AssertNoError(&o->d_err);
  277. ASSERT(!o->is_hupd)
  278. // clear handled events
  279. o->wait_events &= ~events;
  280. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  281. int have_send = 0;
  282. int have_recv = 0;
  283. // if we got a HUP event, stop monitoring the file descriptor
  284. if ((events & BREACTOR_HUP)) {
  285. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  286. o->is_hupd = 1;
  287. }
  288. if ((events & BREACTOR_WRITE) || ((events & (BREACTOR_ERROR|BREACTOR_HUP)) && o->send.state == SEND_STATE_BUSY)) {
  289. ASSERT(o->send.state == SEND_STATE_BUSY)
  290. have_send = 1;
  291. }
  292. if ((events & BREACTOR_READ) || ((events & (BREACTOR_ERROR|BREACTOR_HUP)) && o->recv.state == RECV_STATE_BUSY)) {
  293. ASSERT(o->recv.state == RECV_STATE_BUSY)
  294. have_recv = 1;
  295. }
  296. if (have_send) {
  297. if (have_recv) {
  298. BPending_Set(&o->recv.job);
  299. }
  300. connection_send(o);
  301. return;
  302. }
  303. if (have_recv) {
  304. connection_recv(o);
  305. return;
  306. }
  307. if (!o->is_hupd) {
  308. BLog(BLOG_ERROR, "fd error event");
  309. connection_report_error(o);
  310. return;
  311. }
  312. }
  313. static void connection_send_job_handler (BConnection *o)
  314. {
  315. DebugObject_Access(&o->d_obj);
  316. DebugError_AssertNoError(&o->d_err);
  317. ASSERT(o->send.state == SEND_STATE_BUSY)
  318. connection_send(o);
  319. return;
  320. }
  321. static void connection_recv_job_handler (BConnection *o)
  322. {
  323. DebugObject_Access(&o->d_obj);
  324. DebugError_AssertNoError(&o->d_err);
  325. ASSERT(o->recv.state == RECV_STATE_BUSY)
  326. connection_recv(o);
  327. return;
  328. }
  329. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len)
  330. {
  331. DebugObject_Access(&o->d_obj);
  332. DebugError_AssertNoError(&o->d_err);
  333. ASSERT(o->send.state == SEND_STATE_READY)
  334. ASSERT(data_len > 0)
  335. // remember data
  336. o->send.busy_data = data;
  337. o->send.busy_data_len = data_len;
  338. // set busy
  339. o->send.state = SEND_STATE_BUSY;
  340. connection_send(o);
  341. return;
  342. }
  343. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail)
  344. {
  345. DebugObject_Access(&o->d_obj);
  346. DebugError_AssertNoError(&o->d_err);
  347. ASSERT(o->recv.state == RECV_STATE_READY)
  348. ASSERT(data_avail > 0)
  349. // remember data
  350. o->recv.busy_data = data;
  351. o->recv.busy_data_avail = data_avail;
  352. // set busy
  353. o->recv.state = RECV_STATE_BUSY;
  354. connection_recv(o);
  355. return;
  356. }
  357. int BConnection_AddressSupported (BAddr addr)
  358. {
  359. BAddr_Assert(&addr);
  360. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  361. }
  362. int BListener_InitFrom (BListener *o, struct BLisCon_from from,
  363. BReactor *reactor, void *user,
  364. BListener_handler handler)
  365. {
  366. ASSERT(from.type == BLISCON_FROM_ADDR || from.type == BLISCON_FROM_UNIX)
  367. ASSERT(from.type != BLISCON_FROM_UNIX || from.u.from_unix.socket_path)
  368. ASSERT(handler)
  369. BNetwork_Assert();
  370. // init arguments
  371. o->reactor = reactor;
  372. o->user = user;
  373. o->handler = handler;
  374. // init socket path
  375. o->unix_socket_path = NULL;
  376. if (from.type == BLISCON_FROM_UNIX) {
  377. o->unix_socket_path = b_strdup(from.u.from_unix.socket_path);
  378. if (!o->unix_socket_path) {
  379. BLog(BLOG_ERROR, "b_strdup failed");
  380. goto fail0;
  381. }
  382. }
  383. struct unix_addr unixaddr;
  384. struct sys_addr sysaddr;
  385. if (from.type == BLISCON_FROM_UNIX) {
  386. // build address
  387. if (!build_unix_address(&unixaddr, o->unix_socket_path)) {
  388. BLog(BLOG_ERROR, "build_unix_address failed");
  389. goto fail1;
  390. }
  391. // init fd
  392. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  393. BLog(BLOG_ERROR, "socket failed");
  394. goto fail1;
  395. }
  396. } else {
  397. // check address
  398. if (!BConnection_AddressSupported(from.u.from_addr.addr)) {
  399. BLog(BLOG_ERROR, "address not supported");
  400. goto fail1;
  401. }
  402. // convert address
  403. addr_socket_to_sys(&sysaddr, from.u.from_addr.addr);
  404. // init fd
  405. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  406. BLog(BLOG_ERROR, "socket failed");
  407. goto fail1;
  408. }
  409. }
  410. // set non-blocking
  411. if (!badvpn_set_nonblocking(o->fd)) {
  412. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  413. goto fail2;
  414. }
  415. if (from.type == BLISCON_FROM_UNIX) {
  416. // unlink existing socket
  417. if (unlink(o->unix_socket_path) < 0 && errno != ENOENT) {
  418. BLog(BLOG_ERROR, "unlink existing socket failed");
  419. goto fail2;
  420. }
  421. // bind
  422. if (bind(o->fd, (struct sockaddr *)&unixaddr.u.addr, unixaddr.len) < 0) {
  423. BLog(BLOG_ERROR, "bind failed");
  424. goto fail2;
  425. }
  426. } else {
  427. // set SO_REUSEADDR
  428. int optval = 1;
  429. if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
  430. BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed");
  431. }
  432. // bind
  433. if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) {
  434. BLog(BLOG_ERROR, "bind failed");
  435. goto fail2;
  436. }
  437. }
  438. // listen
  439. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  440. BLog(BLOG_ERROR, "listen failed");
  441. goto fail3;
  442. }
  443. // init BFileDescriptor
  444. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  445. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  446. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  447. goto fail3;
  448. }
  449. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  450. // init default job
  451. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  452. DebugObject_Init(&o->d_obj);
  453. return 1;
  454. fail3:
  455. if (from.type == BLISCON_FROM_UNIX) {
  456. if (unlink(o->unix_socket_path) < 0) {
  457. BLog(BLOG_ERROR, "unlink socket failed");
  458. }
  459. }
  460. fail2:
  461. if (close(o->fd) < 0) {
  462. BLog(BLOG_ERROR, "close failed");
  463. }
  464. fail1:
  465. free(o->unix_socket_path);
  466. fail0:
  467. return 0;
  468. }
  469. void BListener_Free (BListener *o)
  470. {
  471. DebugObject_Free(&o->d_obj);
  472. // free default job
  473. BPending_Free(&o->default_job);
  474. // free BFileDescriptor
  475. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  476. // free fd
  477. if (close(o->fd) < 0) {
  478. BLog(BLOG_ERROR, "close failed");
  479. }
  480. // unlink unix socket
  481. if (o->unix_socket_path) {
  482. if (unlink(o->unix_socket_path) < 0) {
  483. BLog(BLOG_ERROR, "unlink socket failed");
  484. }
  485. }
  486. // free unix socket path
  487. if (o->unix_socket_path) {
  488. free(o->unix_socket_path);
  489. }
  490. }
  491. int BConnector_InitFrom (BConnector *o, struct BLisCon_from from, BReactor *reactor, void *user,
  492. BConnector_handler handler)
  493. {
  494. ASSERT(from.type == BLISCON_FROM_ADDR || from.type == BLISCON_FROM_UNIX)
  495. ASSERT(from.type != BLISCON_FROM_UNIX || from.u.from_unix.socket_path)
  496. ASSERT(handler)
  497. BNetwork_Assert();
  498. // init arguments
  499. o->reactor = reactor;
  500. o->user = user;
  501. o->handler = handler;
  502. struct unix_addr unixaddr;
  503. struct sys_addr sysaddr;
  504. if (from.type == BLISCON_FROM_UNIX) {
  505. // build address
  506. if (!build_unix_address(&unixaddr, from.u.from_unix.socket_path)) {
  507. BLog(BLOG_ERROR, "build_unix_address failed");
  508. goto fail0;
  509. }
  510. } else {
  511. // check address
  512. if (!BConnection_AddressSupported(from.u.from_addr.addr)) {
  513. BLog(BLOG_ERROR, "address not supported");
  514. goto fail0;
  515. }
  516. // convert address
  517. addr_socket_to_sys(&sysaddr, from.u.from_addr.addr);
  518. }
  519. // init job
  520. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  521. if (from.type == BLISCON_FROM_UNIX) {
  522. // init fd
  523. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  524. BLog(BLOG_ERROR, "socket failed");
  525. goto fail1;
  526. }
  527. } else {
  528. // init fd
  529. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  530. BLog(BLOG_ERROR, "socket failed");
  531. goto fail1;
  532. }
  533. }
  534. // set fd non-blocking
  535. if (!badvpn_set_nonblocking(o->fd)) {
  536. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  537. goto fail2;
  538. }
  539. // connect fd
  540. int connect_res;
  541. if (from.type == BLISCON_FROM_UNIX) {
  542. connect_res = connect(o->fd, (struct sockaddr *)&unixaddr.u.addr, unixaddr.len);
  543. } else {
  544. connect_res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len);
  545. }
  546. if (connect_res < 0 && errno != EINPROGRESS) {
  547. BLog(BLOG_ERROR, "connect failed");
  548. goto fail2;
  549. }
  550. // set not connected
  551. o->connected = 0;
  552. // set have no BFileDescriptor
  553. o->have_bfd = 0;
  554. if (connect_res < 0) {
  555. // init BFileDescriptor
  556. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  557. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  558. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  559. goto fail2;
  560. }
  561. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  562. // set have BFileDescriptor
  563. o->have_bfd = 1;
  564. } else {
  565. // set connected
  566. o->connected = 1;
  567. // set job
  568. BPending_Set(&o->job);
  569. }
  570. DebugObject_Init(&o->d_obj);
  571. return 1;
  572. fail2:
  573. if (close(o->fd) < 0) {
  574. BLog(BLOG_ERROR, "close failed");
  575. }
  576. fail1:
  577. BPending_Free(&o->job);
  578. fail0:
  579. return 0;
  580. }
  581. void BConnector_Free (BConnector *o)
  582. {
  583. DebugObject_Free(&o->d_obj);
  584. // free BFileDescriptor
  585. if (o->have_bfd) {
  586. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  587. }
  588. // close fd
  589. if (o->fd != -1) {
  590. if (close(o->fd) < 0) {
  591. BLog(BLOG_ERROR, "close failed");
  592. }
  593. }
  594. // free job
  595. BPending_Free(&o->job);
  596. }
  597. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  598. BConnection_handler handler)
  599. {
  600. switch (source.type) {
  601. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  602. BListener *listener = source.u.listener.listener;
  603. DebugObject_Access(&listener->d_obj);
  604. ASSERT(BPending_IsSet(&listener->default_job))
  605. } break;
  606. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  607. BConnector *connector = source.u.connector.connector;
  608. DebugObject_Access(&connector->d_obj);
  609. ASSERT(connector->fd >= 0)
  610. ASSERT(connector->connected)
  611. ASSERT(!connector->have_bfd)
  612. ASSERT(!BPending_IsSet(&connector->job))
  613. } break;
  614. case BCONNECTION_SOURCE_TYPE_PIPE: {
  615. ASSERT(source.u.pipe.pipefd >= 0)
  616. } break;
  617. default: ASSERT(0);
  618. }
  619. ASSERT(handler)
  620. BNetwork_Assert();
  621. // init arguments
  622. o->reactor = reactor;
  623. o->user = user;
  624. o->handler = handler;
  625. switch (source.type) {
  626. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  627. BListener *listener = source.u.listener.listener;
  628. // unset listener's default job
  629. BPending_Unset(&listener->default_job);
  630. // accept
  631. struct sys_addr sysaddr;
  632. sysaddr.len = sizeof(sysaddr.addr);
  633. if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) {
  634. BLog(BLOG_ERROR, "accept failed");
  635. goto fail0;
  636. }
  637. o->close_fd = 1;
  638. // set non-blocking
  639. if (!badvpn_set_nonblocking(o->fd)) {
  640. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  641. goto fail1;
  642. }
  643. // return address
  644. if (source.u.listener.out_addr) {
  645. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  646. }
  647. } break;
  648. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  649. BConnector *connector = source.u.connector.connector;
  650. // grab fd from connector
  651. o->fd = connector->fd;
  652. connector->fd = -1;
  653. o->close_fd = 1;
  654. } break;
  655. case BCONNECTION_SOURCE_TYPE_PIPE: {
  656. // use user-provided fd
  657. o->fd = source.u.pipe.pipefd;
  658. o->close_fd = !!source.u.pipe.close_it;
  659. // set non-blocking
  660. if (!badvpn_set_nonblocking(o->fd)) {
  661. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  662. goto fail1;
  663. }
  664. } break;
  665. }
  666. // set not HUPd
  667. o->is_hupd = 0;
  668. // init BFileDescriptor
  669. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o);
  670. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  671. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  672. goto fail1;
  673. }
  674. // set no wait events
  675. o->wait_events = 0;
  676. // init limits
  677. BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT);
  678. BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT);
  679. // set send and recv not inited
  680. o->send.state = SEND_STATE_NOT_INITED;
  681. o->recv.state = RECV_STATE_NOT_INITED;
  682. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  683. DebugObject_Init(&o->d_obj);
  684. return 1;
  685. fail1:
  686. if (o->close_fd) {
  687. if (close(o->fd) < 0) {
  688. BLog(BLOG_ERROR, "close failed");
  689. }
  690. }
  691. fail0:
  692. return 0;
  693. }
  694. void BConnection_Free (BConnection *o)
  695. {
  696. DebugObject_Free(&o->d_obj);
  697. DebugError_Free(&o->d_err);
  698. ASSERT(o->send.state == SEND_STATE_NOT_INITED)
  699. ASSERT(o->recv.state == RECV_STATE_NOT_INITED || o->recv.state == RECV_STATE_NOT_INITED_CLOSED)
  700. // free limits
  701. BReactorLimit_Free(&o->recv.limit);
  702. BReactorLimit_Free(&o->send.limit);
  703. // free BFileDescriptor
  704. if (!o->is_hupd) {
  705. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  706. }
  707. // close fd
  708. if (o->close_fd) {
  709. if (close(o->fd) < 0) {
  710. BLog(BLOG_ERROR, "close failed");
  711. }
  712. }
  713. }
  714. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  715. {
  716. DebugObject_Access(&o->d_obj);
  717. // set handlers
  718. o->user = user;
  719. o->handler = handler;
  720. }
  721. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  722. {
  723. DebugObject_Access(&o->d_obj);
  724. if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  725. BLog(BLOG_ERROR, "setsockopt failed");
  726. return 0;
  727. }
  728. return 1;
  729. }
  730. int BConnection_GetLocalAddress (BConnection *o, BAddr *local_addr)
  731. {
  732. DebugObject_Access(&o->d_obj);
  733. struct sys_addr sysaddr;
  734. sysaddr.len = sizeof(sysaddr.addr);
  735. if (getsockname(o->fd, &sysaddr.addr.generic, &sysaddr.len) != 0) {
  736. BLog(BLOG_ERROR, "BConnection_GetLocalAddress: getsockname failed");
  737. return 0;
  738. }
  739. BAddr addr;
  740. addr_sys_to_socket(&addr, sysaddr);
  741. if (addr.type == BADDR_TYPE_NONE) {
  742. BLog(BLOG_ERROR, "BConnection_GetLocalAddress: Unsupported address family "
  743. "from getsockname: %d", (int)sysaddr.addr.generic.sa_family);
  744. return 0;
  745. }
  746. *local_addr = addr;
  747. return 1;
  748. }
  749. void BConnection_SendAsync_Init (BConnection *o)
  750. {
  751. DebugObject_Access(&o->d_obj);
  752. DebugError_AssertNoError(&o->d_err);
  753. ASSERT(o->send.state == SEND_STATE_NOT_INITED)
  754. // init interface
  755. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor));
  756. // init job
  757. BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o);
  758. // set ready
  759. o->send.state = SEND_STATE_READY;
  760. }
  761. void BConnection_SendAsync_Free (BConnection *o)
  762. {
  763. DebugObject_Access(&o->d_obj);
  764. ASSERT(o->send.state == SEND_STATE_READY || o->send.state == SEND_STATE_BUSY)
  765. // update events
  766. if (!o->is_hupd) {
  767. o->wait_events &= ~BREACTOR_WRITE;
  768. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  769. }
  770. // free job
  771. BPending_Free(&o->send.job);
  772. // free interface
  773. StreamPassInterface_Free(&o->send.iface);
  774. // set not inited
  775. o->send.state = SEND_STATE_NOT_INITED;
  776. }
  777. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  778. {
  779. DebugObject_Access(&o->d_obj);
  780. ASSERT(o->send.state == SEND_STATE_READY || o->send.state == SEND_STATE_BUSY)
  781. return &o->send.iface;
  782. }
  783. void BConnection_RecvAsync_Init (BConnection *o)
  784. {
  785. DebugObject_Access(&o->d_obj);
  786. DebugError_AssertNoError(&o->d_err);
  787. ASSERT(o->recv.state == RECV_STATE_NOT_INITED)
  788. // init interface
  789. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor));
  790. // init job
  791. BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o);
  792. // set ready
  793. o->recv.state = RECV_STATE_READY;
  794. }
  795. void BConnection_RecvAsync_Free (BConnection *o)
  796. {
  797. DebugObject_Access(&o->d_obj);
  798. ASSERT(o->recv.state == RECV_STATE_READY || o->recv.state == RECV_STATE_BUSY || o->recv.state == RECV_STATE_INITED_CLOSED)
  799. // update events
  800. if (!o->is_hupd) {
  801. o->wait_events &= ~BREACTOR_READ;
  802. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  803. }
  804. // free job
  805. BPending_Free(&o->recv.job);
  806. // free interface
  807. StreamRecvInterface_Free(&o->recv.iface);
  808. // set not inited
  809. o->recv.state = RECV_STATE_NOT_INITED;
  810. }
  811. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  812. {
  813. DebugObject_Access(&o->d_obj);
  814. ASSERT(o->recv.state == RECV_STATE_READY || o->recv.state == RECV_STATE_BUSY || o->recv.state == RECV_STATE_INITED_CLOSED)
  815. return &o->recv.iface;
  816. }