BConnection_unix.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917
  1. /**
  2. * @file BConnection_unix.c
  3. * @author Ambroz Bizjak <ambrop7@gmail.com>
  4. *
  5. * @section LICENSE
  6. *
  7. * This file is part of BadVPN.
  8. *
  9. * BadVPN is free software: you can redistribute it and/or modify
  10. * it under the terms of the GNU General Public License version 2
  11. * as published by the Free Software Foundation.
  12. *
  13. * BadVPN is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License along
  19. * with this program; if not, write to the Free Software Foundation, Inc.,
  20. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. */
  22. #include <string.h>
  23. #include <stddef.h>
  24. #include <unistd.h>
  25. #include <errno.h>
  26. #include <sys/types.h>
  27. #include <sys/socket.h>
  28. #include <sys/un.h>
  29. #include <misc/nonblocking.h>
  30. #include <base/BLog.h>
  31. #include "BConnection.h"
  32. #include <generated/blog_channel_BConnection.h>
  33. #define MAX_UNIX_SOCKET_PATH 200
  34. struct sys_addr {
  35. socklen_t len;
  36. union {
  37. struct sockaddr generic;
  38. struct sockaddr_in ipv4;
  39. struct sockaddr_in6 ipv6;
  40. } addr;
  41. };
  42. struct unix_addr {
  43. socklen_t len;
  44. union {
  45. struct sockaddr_un addr;
  46. uint8_t bytes[offsetof(struct sockaddr_un, sun_path) + MAX_UNIX_SOCKET_PATH + 1];
  47. } u;
  48. };
  49. static int build_unix_address (struct unix_addr *out, const char *socket_path);
  50. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
  51. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
  52. static void listener_fd_handler (BListener *o, int events);
  53. static void listener_default_job_handler (BListener *o);
  54. static void connector_fd_handler (BConnector *o, int events);
  55. static void connector_job_handler (BConnector *o);
  56. static void connection_report_error (BConnection *o);
  57. static void connection_send (BConnection *o);
  58. static void connection_recv (BConnection *o);
  59. static void connection_fd_handler (BConnection *o, int events);
  60. static void connection_send_job_handler (BConnection *o);
  61. static void connection_recv_job_handler (BConnection *o);
  62. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len);
  63. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_len);
  64. static int build_unix_address (struct unix_addr *out, const char *socket_path)
  65. {
  66. ASSERT(socket_path);
  67. if (strlen(socket_path) > MAX_UNIX_SOCKET_PATH) {
  68. return 0;
  69. }
  70. out->len = offsetof(struct sockaddr_un, sun_path) + strlen(socket_path) + 1;
  71. out->u.addr.sun_family = AF_UNIX;
  72. strcpy(out->u.addr.sun_path, socket_path);
  73. return 1;
  74. }
  75. static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
  76. {
  77. switch (addr.type) {
  78. case BADDR_TYPE_IPV4: {
  79. out->len = sizeof(out->addr.ipv4);
  80. memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
  81. out->addr.ipv4.sin_family = AF_INET;
  82. out->addr.ipv4.sin_port = addr.ipv4.port;
  83. out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
  84. } break;
  85. case BADDR_TYPE_IPV6: {
  86. out->len = sizeof(out->addr.ipv6);
  87. memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
  88. out->addr.ipv6.sin6_family = AF_INET6;
  89. out->addr.ipv6.sin6_port = addr.ipv6.port;
  90. out->addr.ipv6.sin6_flowinfo = 0;
  91. memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
  92. out->addr.ipv6.sin6_scope_id = 0;
  93. } break;
  94. default: ASSERT(0);
  95. }
  96. }
  97. static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
  98. {
  99. switch (addr.addr.generic.sa_family) {
  100. case AF_INET: {
  101. ASSERT(addr.len == sizeof(struct sockaddr_in))
  102. BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
  103. } break;
  104. case AF_INET6: {
  105. ASSERT(addr.len == sizeof(struct sockaddr_in6))
  106. BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
  107. } break;
  108. default: {
  109. BAddr_InitNone(out);
  110. } break;
  111. }
  112. }
  113. static void listener_fd_handler (BListener *o, int events)
  114. {
  115. DebugObject_Access(&o->d_obj);
  116. // set default job
  117. BPending_Set(&o->default_job);
  118. // call handler
  119. o->handler(o->user);
  120. return;
  121. }
  122. static void listener_default_job_handler (BListener *o)
  123. {
  124. DebugObject_Access(&o->d_obj);
  125. BLog(BLOG_ERROR, "discarding connection");
  126. // accept
  127. int newfd = accept(o->fd, NULL, NULL);
  128. if (newfd < 0) {
  129. BLog(BLOG_ERROR, "accept failed");
  130. return;
  131. }
  132. // close new fd
  133. if (close(newfd) < 0) {
  134. BLog(BLOG_ERROR, "close failed");
  135. }
  136. }
  137. static void connector_fd_handler (BConnector *o, int events)
  138. {
  139. DebugObject_Access(&o->d_obj);
  140. ASSERT(o->fd >= 0)
  141. ASSERT(!o->connected)
  142. ASSERT(o->have_bfd)
  143. // free BFileDescriptor
  144. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  145. // set have no BFileDescriptor
  146. o->have_bfd = 0;
  147. // read connection result
  148. int result;
  149. socklen_t result_len = sizeof(result);
  150. if (getsockopt(o->fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
  151. BLog(BLOG_ERROR, "getsockopt failed");
  152. goto fail0;
  153. }
  154. ASSERT_FORCE(result_len == sizeof(result))
  155. if (result != 0) {
  156. BLog(BLOG_ERROR, "connection failed");
  157. goto fail0;
  158. }
  159. // set connected
  160. o->connected = 1;
  161. fail0:
  162. // call handler
  163. o->handler(o->user, !o->connected);
  164. return;
  165. }
  166. static void connector_job_handler (BConnector *o)
  167. {
  168. DebugObject_Access(&o->d_obj);
  169. ASSERT(o->fd >= 0)
  170. ASSERT(o->connected)
  171. ASSERT(!o->have_bfd)
  172. // call handler
  173. o->handler(o->user, 0);
  174. return;
  175. }
  176. static void connection_report_error (BConnection *o)
  177. {
  178. DebugError_AssertNoError(&o->d_err);
  179. ASSERT(o->handler)
  180. // report error
  181. DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
  182. return;
  183. }
  184. static void connection_send (BConnection *o)
  185. {
  186. DebugError_AssertNoError(&o->d_err);
  187. ASSERT(o->send.inited)
  188. ASSERT(o->send.busy)
  189. // limit
  190. if (!BReactorLimit_Increment(&o->send.limit)) {
  191. // wait for fd
  192. o->wait_events |= BREACTOR_WRITE;
  193. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  194. return;
  195. }
  196. // send
  197. int bytes = write(o->fd, o->send.busy_data, o->send.busy_data_len);
  198. if (bytes < 0) {
  199. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  200. // wait for fd
  201. o->wait_events |= BREACTOR_WRITE;
  202. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  203. return;
  204. }
  205. BLog(BLOG_ERROR, "send failed");
  206. connection_report_error(o);
  207. return;
  208. }
  209. ASSERT(bytes > 0)
  210. ASSERT(bytes <= o->send.busy_data_len)
  211. // set not busy
  212. o->send.busy = 0;
  213. // done
  214. StreamPassInterface_Done(&o->send.iface, bytes);
  215. }
  216. static void connection_recv (BConnection *o)
  217. {
  218. DebugError_AssertNoError(&o->d_err);
  219. ASSERT(o->recv.inited)
  220. ASSERT(o->recv.busy)
  221. ASSERT(!o->recv.closed)
  222. // limit
  223. if (!BReactorLimit_Increment(&o->recv.limit)) {
  224. // wait for fd
  225. o->wait_events |= BREACTOR_READ;
  226. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  227. return;
  228. }
  229. // recv
  230. int bytes = read(o->fd, o->recv.busy_data, o->recv.busy_data_avail);
  231. if (bytes < 0) {
  232. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  233. // wait for fd
  234. o->wait_events |= BREACTOR_READ;
  235. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  236. return;
  237. }
  238. BLog(BLOG_ERROR, "recv failed");
  239. connection_report_error(o);
  240. return;
  241. }
  242. if (bytes == 0) {
  243. // set recv closed
  244. o->recv.closed = 1;
  245. // report recv closed
  246. o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
  247. return;
  248. }
  249. ASSERT(bytes > 0)
  250. ASSERT(bytes <= o->recv.busy_data_avail)
  251. // set not busy
  252. o->recv.busy = 0;
  253. // done
  254. StreamRecvInterface_Done(&o->recv.iface, bytes);
  255. }
  256. static void connection_fd_handler (BConnection *o, int events)
  257. {
  258. DebugObject_Access(&o->d_obj);
  259. DebugError_AssertNoError(&o->d_err);
  260. // clear handled events
  261. o->wait_events &= ~events;
  262. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  263. int have_send = 0;
  264. int have_recv = 0;
  265. if ((events & BREACTOR_WRITE) || ((events & BREACTOR_ERROR) && o->send.inited && o->send.busy)) {
  266. ASSERT(o->send.inited)
  267. ASSERT(o->send.busy)
  268. have_send = 1;
  269. }
  270. if ((events & BREACTOR_READ) || ((events & BREACTOR_ERROR) && o->recv.inited && o->recv.busy && !o->recv.closed)) {
  271. ASSERT(o->recv.inited)
  272. ASSERT(o->recv.busy)
  273. ASSERT(!o->recv.closed)
  274. have_recv = 1;
  275. }
  276. if (have_send) {
  277. if (have_recv) {
  278. BPending_Set(&o->recv.job);
  279. }
  280. connection_send(o);
  281. return;
  282. }
  283. if (have_recv) {
  284. connection_recv(o);
  285. return;
  286. }
  287. BLog(BLOG_ERROR, "fd error event");
  288. connection_report_error(o);
  289. return;
  290. }
  291. static void connection_send_job_handler (BConnection *o)
  292. {
  293. DebugObject_Access(&o->d_obj);
  294. DebugError_AssertNoError(&o->d_err);
  295. ASSERT(o->send.inited)
  296. ASSERT(o->send.busy)
  297. connection_send(o);
  298. return;
  299. }
  300. static void connection_recv_job_handler (BConnection *o)
  301. {
  302. DebugObject_Access(&o->d_obj);
  303. DebugError_AssertNoError(&o->d_err);
  304. ASSERT(o->recv.inited)
  305. ASSERT(o->recv.busy)
  306. ASSERT(!o->recv.closed)
  307. connection_recv(o);
  308. return;
  309. }
  310. static void connection_send_if_handler_send (BConnection *o, uint8_t *data, int data_len)
  311. {
  312. DebugObject_Access(&o->d_obj);
  313. DebugError_AssertNoError(&o->d_err);
  314. ASSERT(o->send.inited)
  315. ASSERT(!o->send.busy)
  316. ASSERT(data_len > 0)
  317. // remember data
  318. o->send.busy_data = data;
  319. o->send.busy_data_len = data_len;
  320. // set busy
  321. o->send.busy = 1;
  322. connection_send(o);
  323. return;
  324. }
  325. static void connection_recv_if_handler_recv (BConnection *o, uint8_t *data, int data_avail)
  326. {
  327. DebugObject_Access(&o->d_obj);
  328. DebugError_AssertNoError(&o->d_err);
  329. ASSERT(o->recv.inited)
  330. ASSERT(!o->recv.busy)
  331. ASSERT(!o->recv.closed)
  332. ASSERT(data_avail > 0)
  333. // remember data
  334. o->recv.busy_data = data;
  335. o->recv.busy_data_avail = data_avail;
  336. // set busy
  337. o->recv.busy = 1;
  338. connection_recv(o);
  339. return;
  340. }
  341. int BConnection_AddressSupported (BAddr addr)
  342. {
  343. BAddr_Assert(&addr);
  344. return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
  345. }
  346. int BListener_Init (BListener *o, BAddr addr, BReactor *reactor, void *user,
  347. BListener_handler handler)
  348. {
  349. ASSERT(BConnection_AddressSupported(addr))
  350. ASSERT(handler)
  351. BNetwork_Assert();
  352. // init arguments
  353. o->reactor = reactor;
  354. o->user = user;
  355. o->handler = handler;
  356. // convert address
  357. struct sys_addr sysaddr;
  358. addr_socket_to_sys(&sysaddr, addr);
  359. // init fd
  360. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  361. BLog(BLOG_ERROR, "socket failed");
  362. goto fail0;
  363. }
  364. // set non-blocking
  365. if (!badvpn_set_nonblocking(o->fd)) {
  366. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  367. goto fail1;
  368. }
  369. // set SO_REUSEADDR
  370. int optval = 1;
  371. if (setsockopt(o->fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
  372. BLog(BLOG_ERROR, "setsockopt(SO_REUSEADDR) failed");
  373. }
  374. // bind
  375. if (bind(o->fd, &sysaddr.addr.generic, sysaddr.len) < 0) {
  376. BLog(BLOG_ERROR, "bind failed");
  377. goto fail1;
  378. }
  379. // listen
  380. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  381. BLog(BLOG_ERROR, "listen failed");
  382. goto fail1;
  383. }
  384. // init BFileDescriptor
  385. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  386. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  387. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  388. goto fail1;
  389. }
  390. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  391. // init default job
  392. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  393. DebugObject_Init(&o->d_obj);
  394. return 1;
  395. fail1:
  396. if (close(o->fd) < 0) {
  397. BLog(BLOG_ERROR, "close failed");
  398. }
  399. fail0:
  400. return 0;
  401. }
  402. int BListener_InitUnix (BListener *o, const char *socket_path, BReactor *reactor, void *user,
  403. BListener_handler handler)
  404. {
  405. ASSERT(socket_path)
  406. ASSERT(handler)
  407. BNetwork_Assert();
  408. // init arguments
  409. o->reactor = reactor;
  410. o->user = user;
  411. o->handler = handler;
  412. // build address
  413. struct unix_addr addr;
  414. if (!build_unix_address(&addr, socket_path)) {
  415. BLog(BLOG_ERROR, "build_unix_address failed");
  416. goto fail0;
  417. }
  418. // init fd
  419. if ((o->fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
  420. BLog(BLOG_ERROR, "socket failed");
  421. goto fail0;
  422. }
  423. // set non-blocking
  424. if (!badvpn_set_nonblocking(o->fd)) {
  425. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  426. goto fail1;
  427. }
  428. // bind
  429. if (bind(o->fd, (struct sockaddr *)&addr.u.addr, addr.len) < 0) {
  430. BLog(BLOG_ERROR, "bind failed");
  431. goto fail1;
  432. }
  433. // listen
  434. if (listen(o->fd, BCONNECTION_LISTEN_BACKLOG) < 0) {
  435. BLog(BLOG_ERROR, "listen failed");
  436. goto fail1;
  437. }
  438. // init BFileDescriptor
  439. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)listener_fd_handler, o);
  440. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  441. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  442. goto fail1;
  443. }
  444. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
  445. // init default job
  446. BPending_Init(&o->default_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_default_job_handler, o);
  447. DebugObject_Init(&o->d_obj);
  448. return 1;
  449. fail1:
  450. if (close(o->fd) < 0) {
  451. BLog(BLOG_ERROR, "close failed");
  452. }
  453. fail0:
  454. return 0;
  455. }
  456. void BListener_Free (BListener *o)
  457. {
  458. DebugObject_Free(&o->d_obj);
  459. // free default job
  460. BPending_Free(&o->default_job);
  461. // free BFileDescriptor
  462. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  463. // free fd
  464. if (close(o->fd) < 0) {
  465. BLog(BLOG_ERROR, "close failed");
  466. }
  467. }
  468. int BConnector_Init (BConnector *o, BAddr addr, BReactor *reactor, void *user,
  469. BConnector_handler handler)
  470. {
  471. ASSERT(BConnection_AddressSupported(addr))
  472. ASSERT(handler)
  473. BNetwork_Assert();
  474. // init arguments
  475. o->reactor = reactor;
  476. o->user = user;
  477. o->handler = handler;
  478. // convert address
  479. struct sys_addr sysaddr;
  480. addr_socket_to_sys(&sysaddr, addr);
  481. // init job
  482. BPending_Init(&o->job, BReactor_PendingGroup(o->reactor), (BPending_handler)connector_job_handler, o);
  483. // init fd
  484. if ((o->fd = socket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0)) < 0) {
  485. BLog(BLOG_ERROR, "socket failed");
  486. goto fail1;
  487. }
  488. // set fd non-blocking
  489. if (!badvpn_set_nonblocking(o->fd)) {
  490. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  491. goto fail2;
  492. }
  493. // connect fd
  494. int res = connect(o->fd, &sysaddr.addr.generic, sysaddr.len);
  495. if (res < 0 && errno != EINPROGRESS) {
  496. BLog(BLOG_ERROR, "connect failed");
  497. goto fail2;
  498. }
  499. // set not connected
  500. o->connected = 0;
  501. // set have no BFileDescriptor
  502. o->have_bfd = 0;
  503. if (res < 0) {
  504. // init BFileDescriptor
  505. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connector_fd_handler, o);
  506. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  507. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  508. goto fail2;
  509. }
  510. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_WRITE);
  511. // set have BFileDescriptor
  512. o->have_bfd = 1;
  513. } else {
  514. // set connected
  515. o->connected = 1;
  516. // set job
  517. BPending_Set(&o->job);
  518. }
  519. DebugObject_Init(&o->d_obj);
  520. return 1;
  521. fail2:
  522. if (close(o->fd) < 0) {
  523. BLog(BLOG_ERROR, "close failed");
  524. }
  525. fail1:
  526. BPending_Free(&o->job);
  527. return 0;
  528. }
  529. void BConnector_Free (BConnector *o)
  530. {
  531. DebugObject_Free(&o->d_obj);
  532. // free BFileDescriptor
  533. if (o->have_bfd) {
  534. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  535. }
  536. // close fd
  537. if (o->fd != -1) {
  538. if (close(o->fd) < 0) {
  539. BLog(BLOG_ERROR, "close failed");
  540. }
  541. }
  542. // free job
  543. BPending_Free(&o->job);
  544. }
  545. int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
  546. BConnection_handler handler)
  547. {
  548. switch (source.type) {
  549. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  550. BListener *listener = source.u.listener.listener;
  551. DebugObject_Access(&listener->d_obj);
  552. ASSERT(BPending_IsSet(&listener->default_job))
  553. } break;
  554. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  555. BConnector *connector = source.u.connector.connector;
  556. DebugObject_Access(&connector->d_obj);
  557. ASSERT(connector->fd >= 0)
  558. ASSERT(connector->connected)
  559. ASSERT(!connector->have_bfd)
  560. ASSERT(!BPending_IsSet(&connector->job))
  561. } break;
  562. case BCONNECTION_SOURCE_TYPE_PIPE: {
  563. ASSERT(source.u.pipe.pipefd >= 0)
  564. } break;
  565. default: ASSERT(0);
  566. }
  567. ASSERT(handler)
  568. BNetwork_Assert();
  569. // init arguments
  570. o->reactor = reactor;
  571. o->user = user;
  572. o->handler = handler;
  573. switch (source.type) {
  574. case BCONNECTION_SOURCE_TYPE_LISTENER: {
  575. BListener *listener = source.u.listener.listener;
  576. // unset listener's default job
  577. BPending_Unset(&listener->default_job);
  578. // accept
  579. struct sys_addr sysaddr;
  580. sysaddr.len = sizeof(sysaddr.addr);
  581. if ((o->fd = accept(listener->fd, &sysaddr.addr.generic, &sysaddr.len)) < 0) {
  582. BLog(BLOG_ERROR, "accept failed");
  583. goto fail0;
  584. }
  585. o->close_fd = 1;
  586. // set non-blocking
  587. if (!badvpn_set_nonblocking(o->fd)) {
  588. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  589. goto fail1;
  590. }
  591. // return address
  592. if (source.u.listener.out_addr) {
  593. addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
  594. }
  595. } break;
  596. case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
  597. BConnector *connector = source.u.connector.connector;
  598. // grab fd from connector
  599. o->fd = connector->fd;
  600. connector->fd = -1;
  601. o->close_fd = 1;
  602. } break;
  603. case BCONNECTION_SOURCE_TYPE_PIPE: {
  604. // use user-provided fd
  605. o->fd = source.u.pipe.pipefd;
  606. o->close_fd = 0;
  607. // set non-blocking
  608. if (!badvpn_set_nonblocking(o->fd)) {
  609. BLog(BLOG_ERROR, "badvpn_set_nonblocking failed");
  610. goto fail1;
  611. }
  612. } break;
  613. }
  614. // init BFileDescriptor
  615. BFileDescriptor_Init(&o->bfd, o->fd, (BFileDescriptor_handler)connection_fd_handler, o);
  616. if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
  617. BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
  618. goto fail1;
  619. }
  620. // set no wait events
  621. o->wait_events = 0;
  622. // init limits
  623. BReactorLimit_Init(&o->send.limit, o->reactor, BCONNECTION_SEND_LIMIT);
  624. BReactorLimit_Init(&o->recv.limit, o->reactor, BCONNECTION_RECV_LIMIT);
  625. // set send and recv not inited
  626. o->send.inited = 0;
  627. o->recv.inited = 0;
  628. // set recv not closed
  629. o->recv.closed = 0;
  630. DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
  631. DebugObject_Init(&o->d_obj);
  632. return 1;
  633. fail1:
  634. if (o->close_fd) {
  635. if (close(o->fd) < 0) {
  636. BLog(BLOG_ERROR, "close failed");
  637. }
  638. }
  639. fail0:
  640. return 0;
  641. }
  642. void BConnection_Free (BConnection *o)
  643. {
  644. DebugObject_Free(&o->d_obj);
  645. DebugError_Free(&o->d_err);
  646. ASSERT(!o->recv.inited)
  647. ASSERT(!o->send.inited)
  648. // free limits
  649. BReactorLimit_Free(&o->recv.limit);
  650. BReactorLimit_Free(&o->send.limit);
  651. // free BFileDescriptor
  652. BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
  653. // close fd
  654. if (o->close_fd) {
  655. if (close(o->fd) < 0) {
  656. BLog(BLOG_ERROR, "close failed");
  657. }
  658. }
  659. }
  660. void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
  661. {
  662. DebugObject_Access(&o->d_obj);
  663. // set handlers
  664. o->user = user;
  665. o->handler = handler;
  666. }
  667. int BConnection_SetSendBuffer (BConnection *o, int buf_size)
  668. {
  669. DebugObject_Access(&o->d_obj);
  670. if (setsockopt(o->fd, SOL_SOCKET, SO_SNDBUF, (void *)&buf_size, sizeof(buf_size)) < 0) {
  671. BLog(BLOG_ERROR, "setsockopt failed");
  672. return 0;
  673. }
  674. return 1;
  675. }
  676. void BConnection_SendAsync_Init (BConnection *o)
  677. {
  678. DebugObject_Access(&o->d_obj);
  679. DebugError_AssertNoError(&o->d_err);
  680. ASSERT(!o->send.inited)
  681. // init interface
  682. StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_if_handler_send, o, BReactor_PendingGroup(o->reactor));
  683. // init job
  684. BPending_Init(&o->send.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_send_job_handler, o);
  685. // set not busy
  686. o->send.busy = 0;
  687. // set inited
  688. o->send.inited = 1;
  689. }
  690. void BConnection_SendAsync_Free (BConnection *o)
  691. {
  692. DebugObject_Access(&o->d_obj);
  693. ASSERT(o->send.inited)
  694. // update events
  695. o->wait_events &= ~BREACTOR_WRITE;
  696. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  697. // free job
  698. BPending_Free(&o->send.job);
  699. // free interface
  700. StreamPassInterface_Free(&o->send.iface);
  701. // set not inited
  702. o->send.inited = 0;
  703. }
  704. StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
  705. {
  706. DebugObject_Access(&o->d_obj);
  707. ASSERT(o->send.inited)
  708. return &o->send.iface;
  709. }
  710. void BConnection_RecvAsync_Init (BConnection *o)
  711. {
  712. DebugObject_Access(&o->d_obj);
  713. DebugError_AssertNoError(&o->d_err);
  714. ASSERT(!o->recv.inited)
  715. ASSERT(!o->recv.closed)
  716. // init interface
  717. StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_if_handler_recv, o, BReactor_PendingGroup(o->reactor));
  718. // init job
  719. BPending_Init(&o->recv.job, BReactor_PendingGroup(o->reactor), (BPending_handler)connection_recv_job_handler, o);
  720. // set not busy
  721. o->recv.busy = 0;
  722. // set inited
  723. o->recv.inited = 1;
  724. }
  725. void BConnection_RecvAsync_Free (BConnection *o)
  726. {
  727. DebugObject_Access(&o->d_obj);
  728. ASSERT(o->recv.inited)
  729. // update events
  730. o->wait_events &= ~BREACTOR_READ;
  731. BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, o->wait_events);
  732. // free job
  733. BPending_Free(&o->recv.job);
  734. // free interface
  735. StreamRecvInterface_Free(&o->recv.iface);
  736. // set not inited
  737. o->recv.inited = 0;
  738. }
  739. StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
  740. {
  741. DebugObject_Access(&o->d_obj);
  742. ASSERT(o->recv.inited)
  743. return &o->recv.iface;
  744. }