BConnection_unix.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924
  1. /**
  2. * @file BConnection_unix.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Redistributions in binary form must reproduce the above copyright
  12. * notice, this list of conditions and the following disclaimer in the
  13. * documentation and/or other materials provided with the distribution.
  14. * 3. Neither the name of the author nor the
  15. * names of its contributors may be used to endorse or promote products
  16. * derived from this software without specific prior written permission.
  17. *
  18. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  19. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  20. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  21. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
  22. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  23. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  24. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  25. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  26. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  27. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include <string.h>
  30. #include <stddef.h>
  31. #include <unistd.h>
  32. #include <errno.h>
  33. #include <sys/types.h>
  34. #include <sys/socket.h>
  35. #include <sys/un.h>
  36. #include <misc/nonblocking.h>
  37. #include <base/BLog.h>
  38. #include "BConnection.h"
  39. #include <generated/blog_channel_BConnection.h>
  40. #define MAX_UNIX_SOCKET_PATH 200
  41. struct sys_addr {
  42. socklen_t len;
  43. union {
  44. struct sockaddr generic;
  45. struct sockaddr_in ipv4;
  46. struct sockaddr_in6 ipv6;
  47. } addr;
  48. };
  49. struct unix_addr {
  50. socklen_t len;
  51. union {
  52. struct sockaddr_un addr;
  53. uint8_t bytes[offsetof(struct sockaddr_un, sun_path) + MAX_UNIX_SOCKET_PATH + 1];
  54. } u;
  55. };
  56. static int build_unix_address (struct unix_addr *out, const char *socket_path);
  57. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  58. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  59. static void listener_fd_handler (BListener *o, int events);
  60. static void listener_default_job_handler (BListener *o);
  61. static void connector_fd_handler (BConnector *o, int events);
  62. static void connector_job_handler (BConnector *o);
  63. static void connection_report_error (BConnection *o);
  64. static void connection_send (BConnection *o);
  65. static void connection_recv (BConnection *o);
  66. static void connection_fd_handler (BConnection *o, int events);
  67. static void connection_send_job_handler (BConnection *o);
  68. static void connection_recv_job_handler (BConnection *o);
  69. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len);
  70. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len);
  71. static int build_unix_address (struct unix_addr *out, const char *socket_path)
  72. {
  73. ASSERT(socket_path);
  74. if (strlen(socket_path) > MAX_UNIX_SOCKET_PATH) {
  75. return 0;
  76. }
  77. out->len = offsetof(struct sockaddr_un, sun_path) + strlen(socket_path) + 1;
  78. out->u.addr.sun_family = AF_UNIX;
  79. strcpy(out->u.addr.sun_path, socket_path);
  80. return 1;
  81. }
  82. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  83. {
  84. switch (addr.type) {
  85. case BADDR_TYPE_IPV4: {
  86. out->len = sizeof(out->addr.ipv4);
  87. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  88. out->addr.ipv4.sin_family = AF_INET;
  89. out->addr.ipv4.sin_port = addr.ipv4.port;
  90. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  91. } break;
  92. case BADDR_TYPE_IPV6: {
  93. out->len = sizeof(out->addr.ipv6);
  94. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  95. out->addr.ipv6.sin6_family = AF_INET6;
  96. out->addr.ipv6.sin6_port = addr.ipv6.port;
  97. out->addr.ipv6.sin6_flowinfo = 0;
  98. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  99. out->addr.ipv6.sin6_scope_id = 0;
  100. } break;
  101. default: ASSERT(0);
  102. }
  103. }
  104. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  105. {
  106. switch (addr.addr.generic.sa_family) {
  107. case AF_INET: {
  108. ASSERT(addr.len == sizeof(struct sockaddr_in))
  109. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  110. } break;
  111. case AF_INET6: {
  112. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  113. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  114. } break;
  115. default: {
  116. BAddr_InitNone(out);
  117. } break;
  118. }
  119. }
  120. static void listener_fd_handler (BListener *o, int events)
  121. {
  122. DebugObject_Access(&o->d_obj);
  123. // set default job
  124. BPending_Set(&o->default_job);
  125. // call handler
  126. o->handler(o->user);
  127. return;
  128. }
  129. static void listener_default_job_handler (BListener *o)
  130. {
  131. DebugObject_Access(&o->d_obj);
  132. BLog(BLOG_ERROR, "discarding connection");
  133. // accept
  134. int newfd = accept(o->fd, NULL, NULL);
  135. if (newfd < 0) {
  136. BLog(BLOG_ERROR, "accept failed");
  137. return;
  138. }
  139. // close new fd
  140. if (close(newfd) < 0) {
  141. BLog(BLOG_ERROR, "close failed");
  142. }
  143. }
  144. static void connector_fd_handler (BConnector *o, int events)
  145. {
  146. DebugObject_Access(&o->d_obj);
  147. ASSERT(o->fd >= 0)
  148. ASSERT(!o->connected)
  149. ASSERT(o->have_bfd)
  150. // free BFileDescriptor
  151. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  152. // set have no BFileDescriptor
  153. o->have_bfd = 0;
  154. // read connection result
  155. int result;
  156. socklen_t result_len = sizeof(result);
  157. if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
  158. BLog(BLOG_ERROR, "getsockopt failed");
  159. goto fail0;
  160. }
  161. ASSERT_FORCE(result_len == sizeof(result))
  162. if (result != 0) {
  163. BLog(BLOG_ERROR, "connection failed");
  164. goto fail0;
  165. }
  166. // set connected
  167. o->connected = 1;
  168. fail0:
  169. // call handler
  170. o->handler(o->user, !o->connected);
  171. return;
  172. }
  173. static void connector_job_handler (BConnector *o)
  174. {
  175. DebugObject_Access(&o->d_obj);
  176. ASSERT(o->fd >= 0)
  177. ASSERT(o->connected)
  178. ASSERT(!o->have_bfd)
  179. // call handler
  180. o->handler(o->user, 0);
  181. return;
  182. }
  183. static void connection_report_error (BConnection *o)
  184. {
  185. DebugError_AssertNoError(&o->d_err);
  186. ASSERT(o->handler)
  187. // report error
  188. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  189. return;
  190. }
  191. static void connection_send (BConnection *o)
  192. {
  193. DebugError_AssertNoError(&o->d_err);
  194. ASSERT(o->send.inited)
  195. ASSERT(o->send.busy)
  196. // limit
  197. if (!BReactorLimit_Increment(&o->send.limit)) {
  198. // wait for fd
  199. o->wait_events |= BREACTOR_WRITE;
  200. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  201. return;
  202. }
  203. // send
  204. int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len);
  205. if (bytes < 0) {
  206. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  207. // wait for fd
  208. o->wait_events |= BREACTOR_WRITE;
  209. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  210. return;
  211. }
  212. BLog(BLOG_ERROR, "send failed");
  213. connection_report_error(o);
  214. return;
  215. }
  216. ASSERT(bytes > 0)
  217. ASSERT(bytes <= o->send.busy_data_len)
  218. // set not busy
  219. o->send.busy = 0;
  220. // done
  221. StreamPassInterface_Done(&o->send.iface, bytes);
  222. }
  223. static void connection_recv (BConnection *o)
  224. {
  225. DebugError_AssertNoError(&o->d_err);
  226. ASSERT(o->recv.inited)
  227. ASSERT(o->recv.busy)
  228. ASSERT(!o->recv.closed)
  229. // limit
  230. if (!BReactorLimit_Increment(&o->recv.limit)) {
  231. // wait for fd
  232. o->wait_events |= BREACTOR_READ;
  233. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  234. return;
  235. }
  236. // recv
  237. int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail);
  238. if (bytes < 0) {
  239. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  240. // wait for fd
  241. o->wait_events |= BREACTOR_READ;
  242. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  243. return;
  244. }
  245. BLog(BLOG_ERROR, "recv failed");
  246. connection_report_error(o);
  247. return;
  248. }
  249. if (bytes == 0) {
  250. // set recv closed
  251. o->recv.closed = 1;
  252. // report recv closed
  253. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  254. return;
  255. }
  256. ASSERT(bytes > 0)
  257. ASSERT(bytes <= o->recv.busy_data_avail)
  258. // set not busy
  259. o->recv.busy = 0;
  260. // done
  261. StreamRecvInterface_Done(&o->recv.iface, bytes);
  262. }
  263. static void connection_fd_handler (BConnection *o, int events)
  264. {
  265. DebugObject_Access(&o->d_obj);
  266. DebugError_AssertNoError(&o->d_err);
  267. // clear handled events
  268. o->wait_events &= ~events;
  269. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  270. int have_send = 0;
  271. int have_recv = 0;
  272. if ((events & BREACTOR_WRITE) || ((events & BREACTOR_ERROR) && o->send.inited && o->send.busy)) {
  273. ASSERT(o->send.inited)
  274. ASSERT(o->send.busy)
  275. have_send = 1;
  276. }
  277. if ((events & BREACTOR_READ) || ((events & BREACTOR_ERROR) && o->recv.inited && o->recv.busy && !o->recv.closed)) {
  278. ASSERT(o->recv.inited)
  279. ASSERT(o->recv.busy)
  280. ASSERT(!o->recv.closed)
  281. have_recv = 1;
  282. }
  283. if (have_send) {
  284. if (have_recv) {
  285. BPending_Set(&o->recv.job);
  286. }
  287. connection_send(o);
  288. return;
  289. }
  290. if (have_recv) {
  291. connection_recv(o);
  292. return;
  293. }
  294. BLog(BLOG_ERROR, "fd error event");
  295. connection_report_error(o);
  296. return;
  297. }
  298. static void connection_send_job_handler (BConnection *o)
  299. {
  300. DebugObject_Access(&o->d_obj);
  301. DebugError_AssertNoError(&o->d_err);
  302. ASSERT(o->send.inited)
  303. ASSERT(o->send.busy)
  304. connection_send(o);
  305. return;
  306. }
  307. static void connection_recv_job_handler (BConnection *o)
  308. {
  309. DebugObject_Access(&o->d_obj);
  310. DebugError_AssertNoError(&o->d_err);
  311. ASSERT(o->recv.inited)
  312. ASSERT(o->recv.busy)
  313. ASSERT(!o->recv.closed)
  314. connection_recv(o);
  315. return;
  316. }
  317. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len)
  318. {
  319. DebugObject_Access(&o->d_obj);
  320. DebugError_AssertNoError(&o->d_err);
  321. ASSERT(o->send.inited)
  322. ASSERT(!o->send.busy)
  323. ASSERT(data_len > 0)
  324. // remember data
  325. o->send.busy_data = data;
  326. o->send.busy_data_len = data_len;
  327. // set busy
  328. o->send.busy = 1;
  329. connection_send(o);
  330. return;
  331. }
  332. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail)
  333. {
  334. DebugObject_Access(&o->d_obj);
  335. DebugError_AssertNoError(&o->d_err);
  336. ASSERT(o->recv.inited)
  337. ASSERT(!o->recv.busy)
  338. ASSERT(!o->recv.closed)
  339. ASSERT(data_avail > 0)
  340. // remember data
  341. o->recv.busy_data = data;
  342. o->recv.busy_data_avail = data_avail;
  343. // set busy
  344. o->recv.busy = 1;
  345. connection_recv(o);
  346. return;
  347. }
  348. int BConnection_AddressSupported (BAddr addr)
  349. {
  350. BAddr_Assert(&addr);
  351. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  352. }
  353. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  354. BListener_handler handler)
  355. {
  356. ASSERT(BConnection_AddressSupported(addr))
  357. ASSERT(handler)
  358. BNetwork_Assert();
  359. // init arguments
  360. o->reactor = reactor;
  361. o->user = user;
  362. o->handler = handler;
  363. // convert address
  364. struct sys_addr sysaddr;
  365. addr_socket_to_sys(&sysaddr, addr);
  366. // init fd
  367. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  368. BLog(BLOG_ERROR, "socket failed");
  369. goto fail0;
  370. }
  371. // set non-blocking
  372. if (!badvpn_set_nonblocking(o->fd)) {
  373. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  374. goto fail1;
  375. }
  376. // set SO_REUSEADDR
  377. int optval = 1;
  378. if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
  379. BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed");
  380. }
  381. // bind
  382. if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) {
  383. BLog(BLOG_ERROR, "bind failed");
  384. goto fail1;
  385. }
  386. // listen
  387. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  388. BLog(BLOG_ERROR, "listen failed");
  389. goto fail1;
  390. }
  391. // init BFileDescriptor
  392. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  393. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  394. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  395. goto fail1;
  396. }
  397. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  398. // init default job
  399. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  400. DebugObject_Init(&o->d_obj);
  401. return 1;
  402. fail1:
  403. if (close(o->fd) < 0) {
  404. BLog(BLOG_ERROR, "close failed");
  405. }
  406. fail0:
  407. return 0;
  408. }
  409. int BListener_InitUnix (BListener *o, const char *socket_path, BReactor *reactor, void *user,
  410. BListener_handler handler)
  411. {
  412. ASSERT(socket_path)
  413. ASSERT(handler)
  414. BNetwork_Assert();
  415. // init arguments
  416. o->reactor = reactor;
  417. o->user = user;
  418. o->handler = handler;
  419. // build address
  420. struct unix_addr addr;
  421. if (!build_unix_address(&addr, socket_path)) {
  422. BLog(BLOG_ERROR, "build_unix_address failed");
  423. goto fail0;
  424. }
  425. // init fd
  426. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  427. BLog(BLOG_ERROR, "socket failed");
  428. goto fail0;
  429. }
  430. // set non-blocking
  431. if (!badvpn_set_nonblocking(o->fd)) {
  432. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  433. goto fail1;
  434. }
  435. // bind
  436. if (bind(o->fd, (struct sockaddr *)&addr.u.addr, addr.len) < 0) {
  437. BLog(BLOG_ERROR, "bind failed");
  438. goto fail1;
  439. }
  440. // listen
  441. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  442. BLog(BLOG_ERROR, "listen failed");
  443. goto fail1;
  444. }
  445. // init BFileDescriptor
  446. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  447. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  448. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  449. goto fail1;
  450. }
  451. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  452. // init default job
  453. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  454. DebugObject_Init(&o->d_obj);
  455. return 1;
  456. fail1:
  457. if (close(o->fd) < 0) {
  458. BLog(BLOG_ERROR, "close failed");
  459. }
  460. fail0:
  461. return 0;
  462. }
  463. void BListener_Free (BListener *o)
  464. {
  465. DebugObject_Free(&o->d_obj);
  466. // free default job
  467. BPending_Free(&o->default_job);
  468. // free BFileDescriptor
  469. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  470. // free fd
  471. if (close(o->fd) < 0) {
  472. BLog(BLOG_ERROR, "close failed");
  473. }
  474. }
  475. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  476. BConnector_handler handler)
  477. {
  478. ASSERT(BConnection_AddressSupported(addr))
  479. ASSERT(handler)
  480. BNetwork_Assert();
  481. // init arguments
  482. o->reactor = reactor;
  483. o->user = user;
  484. o->handler = handler;
  485. // convert address
  486. struct sys_addr sysaddr;
  487. addr_socket_to_sys(&sysaddr, addr);
  488. // init job
  489. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  490. // init fd
  491. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  492. BLog(BLOG_ERROR, "socket failed");
  493. goto fail1;
  494. }
  495. // set fd non-blocking
  496. if (!badvpn_set_nonblocking(o->fd)) {
  497. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  498. goto fail2;
  499. }
  500. // connect fd
  501. int res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len);
  502. if (res < 0 && errno != EINPROGRESS) {
  503. BLog(BLOG_ERROR, "connect failed");
  504. goto fail2;
  505. }
  506. // set not connected
  507. o->connected = 0;
  508. // set have no BFileDescriptor
  509. o->have_bfd = 0;
  510. if (res < 0) {
  511. // init BFileDescriptor
  512. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  513. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  514. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  515. goto fail2;
  516. }
  517. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  518. // set have BFileDescriptor
  519. o->have_bfd = 1;
  520. } else {
  521. // set connected
  522. o->connected = 1;
  523. // set job
  524. BPending_Set(&o->job);
  525. }
  526. DebugObject_Init(&o->d_obj);
  527. return 1;
  528. fail2:
  529. if (close(o->fd) < 0) {
  530. BLog(BLOG_ERROR, "close failed");
  531. }
  532. fail1:
  533. BPending_Free(&o->job);
  534. return 0;
  535. }
  536. void BConnector_Free (BConnector *o)
  537. {
  538. DebugObject_Free(&o->d_obj);
  539. // free BFileDescriptor
  540. if (o->have_bfd) {
  541. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  542. }
  543. // close fd
  544. if (o->fd != -1) {
  545. if (close(o->fd) < 0) {
  546. BLog(BLOG_ERROR, "close failed");
  547. }
  548. }
  549. // free job
  550. BPending_Free(&o->job);
  551. }
  552. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  553. BConnection_handler handler)
  554. {
  555. switch (source.type) {
  556. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  557. BListener *listener = source.u.listener.listener;
  558. DebugObject_Access(&listener->d_obj);
  559. ASSERT(BPending_IsSet(&listener->default_job))
  560. } break;
  561. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  562. BConnector *connector = source.u.connector.connector;
  563. DebugObject_Access(&connector->d_obj);
  564. ASSERT(connector->fd >= 0)
  565. ASSERT(connector->connected)
  566. ASSERT(!connector->have_bfd)
  567. ASSERT(!BPending_IsSet(&connector->job))
  568. } break;
  569. case BCONNECTION_SOURCE_TYPE_PIPE: {
  570. ASSERT(source.u.pipe.pipefd >= 0)
  571. } break;
  572. default: ASSERT(0);
  573. }
  574. ASSERT(handler)
  575. BNetwork_Assert();
  576. // init arguments
  577. o->reactor = reactor;
  578. o->user = user;
  579. o->handler = handler;
  580. switch (source.type) {
  581. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  582. BListener *listener = source.u.listener.listener;
  583. // unset listener's default job
  584. BPending_Unset(&listener->default_job);
  585. // accept
  586. struct sys_addr sysaddr;
  587. sysaddr.len = sizeof(sysaddr.addr);
  588. if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) {
  589. BLog(BLOG_ERROR, "accept failed");
  590. goto fail0;
  591. }
  592. o->close_fd = 1;
  593. // set non-blocking
  594. if (!badvpn_set_nonblocking(o->fd)) {
  595. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  596. goto fail1;
  597. }
  598. // return address
  599. if (source.u.listener.out_addr) {
  600. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  601. }
  602. } break;
  603. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  604. BConnector *connector = source.u.connector.connector;
  605. // grab fd from connector
  606. o->fd = connector->fd;
  607. connector->fd = -1;
  608. o->close_fd = 1;
  609. } break;
  610. case BCONNECTION_SOURCE_TYPE_PIPE: {
  611. // use user-provided fd
  612. o->fd = source.u.pipe.pipefd;
  613. o->close_fd = 0;
  614. // set non-blocking
  615. if (!badvpn_set_nonblocking(o->fd)) {
  616. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  617. goto fail1;
  618. }
  619. } break;
  620. }
  621. // init BFileDescriptor
  622. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o);
  623. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  624. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  625. goto fail1;
  626. }
  627. // set no wait events
  628. o->wait_events = 0;
  629. // init limits
  630. BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT);
  631. BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT);
  632. // set send and recv not inited
  633. o->send.inited = 0;
  634. o->recv.inited = 0;
  635. // set recv not closed
  636. o->recv.closed = 0;
  637. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  638. DebugObject_Init(&o->d_obj);
  639. return 1;
  640. fail1:
  641. if (o->close_fd) {
  642. if (close(o->fd) < 0) {
  643. BLog(BLOG_ERROR, "close failed");
  644. }
  645. }
  646. fail0:
  647. return 0;
  648. }
  649. void BConnection_Free (BConnection *o)
  650. {
  651. DebugObject_Free(&o->d_obj);
  652. DebugError_Free(&o->d_err);
  653. ASSERT(!o->recv.inited)
  654. ASSERT(!o->send.inited)
  655. // free limits
  656. BReactorLimit_Free(&o->recv.limit);
  657. BReactorLimit_Free(&o->send.limit);
  658. // free BFileDescriptor
  659. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  660. // close fd
  661. if (o->close_fd) {
  662. if (close(o->fd) < 0) {
  663. BLog(BLOG_ERROR, "close failed");
  664. }
  665. }
  666. }
  667. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  668. {
  669. DebugObject_Access(&o->d_obj);
  670. // set handlers
  671. o->user = user;
  672. o->handler = handler;
  673. }
  674. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  675. {
  676. DebugObject_Access(&o->d_obj);
  677. if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  678. BLog(BLOG_ERROR, "setsockopt failed");
  679. return 0;
  680. }
  681. return 1;
  682. }
  683. void BConnection_SendAsync_Init (BConnection *o)
  684. {
  685. DebugObject_Access(&o->d_obj);
  686. DebugError_AssertNoError(&o->d_err);
  687. ASSERT(!o->send.inited)
  688. // init interface
  689. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor));
  690. // init job
  691. BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o);
  692. // set not busy
  693. o->send.busy = 0;
  694. // set inited
  695. o->send.inited = 1;
  696. }
  697. void BConnection_SendAsync_Free (BConnection *o)
  698. {
  699. DebugObject_Access(&o->d_obj);
  700. ASSERT(o->send.inited)
  701. // update events
  702. o->wait_events &= ~BREACTOR_WRITE;
  703. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  704. // free job
  705. BPending_Free(&o->send.job);
  706. // free interface
  707. StreamPassInterface_Free(&o->send.iface);
  708. // set not inited
  709. o->send.inited = 0;
  710. }
  711. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  712. {
  713. DebugObject_Access(&o->d_obj);
  714. ASSERT(o->send.inited)
  715. return &o->send.iface;
  716. }
  717. void BConnection_RecvAsync_Init (BConnection *o)
  718. {
  719. DebugObject_Access(&o->d_obj);
  720. DebugError_AssertNoError(&o->d_err);
  721. ASSERT(!o->recv.inited)
  722. ASSERT(!o->recv.closed)
  723. // init interface
  724. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor));
  725. // init job
  726. BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o);
  727. // set not busy
  728. o->recv.busy = 0;
  729. // set inited
  730. o->recv.inited = 1;
  731. }
  732. void BConnection_RecvAsync_Free (BConnection *o)
  733. {
  734. DebugObject_Access(&o->d_obj);
  735. ASSERT(o->recv.inited)
  736. // update events
  737. o->wait_events &= ~BREACTOR_READ;
  738. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  739. // free job
  740. BPending_Free(&o->recv.job);
  741. // free interface
  742. StreamRecvInterface_Free(&o->recv.iface);
  743. // set not inited
  744. o->recv.inited = 0;
  745. }
  746. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  747. {
  748. DebugObject_Access(&o->d_obj);
  749. ASSERT(o->recv.inited)
  750. return &o->recv.iface;
  751. }