refraction.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126
  1. //go:build PSIPHON_ENABLE_REFRACTION_NETWORKING
  2. // +build PSIPHON_ENABLE_REFRACTION_NETWORKING
  3. /*
  4. * Copyright (c) 2018, Psiphon Inc.
  5. * All rights reserved.
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. *
  20. */
  21. /*
  22. Package refraction wraps github.com/refraction-networking/gotapdance with
  23. net.Listener and net.Conn types that provide drop-in integration with Psiphon.
  24. */
  25. package refraction
  26. import (
  27. "context"
  28. std_errors "errors"
  29. "fmt"
  30. "io/ioutil"
  31. "net"
  32. "os"
  33. "path/filepath"
  34. "sync"
  35. "sync/atomic"
  36. "time"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  40. "github.com/armon/go-proxyproto"
  41. lrucache "github.com/cognusion/go-cache-lru"
  42. "github.com/pion/sctp"
  43. refraction_networking_assets "github.com/refraction-networking/conjure/pkg/client/assets"
  44. refraction_networking_registration "github.com/refraction-networking/conjure/pkg/registrars/registration"
  45. refraction_networking_transports "github.com/refraction-networking/conjure/pkg/transports/client"
  46. refraction_networking_dtls "github.com/refraction-networking/conjure/pkg/transports/connecting/dtls"
  47. refraction_networking_prefix "github.com/refraction-networking/conjure/pkg/transports/wrapping/prefix"
  48. refraction_networking_proto "github.com/refraction-networking/conjure/proto"
  49. refraction_networking_client "github.com/refraction-networking/gotapdance/tapdance"
  50. )
  51. const (
  52. READ_PROXY_PROTOCOL_HEADER_TIMEOUT = 5 * time.Second
  53. REGISTRATION_CACHE_MAX_ENTRIES = 256
  54. )
  55. // Enabled indicates if Refraction Networking functionality is enabled.
  56. func Enabled() bool {
  57. return true
  58. }
  59. // Listener is a net.Listener.
  60. type Listener struct {
  61. net.Listener
  62. }
  63. // Listen creates a new Refraction Networking listener.
  64. //
  65. // The Refraction Networking station (TapDance or Conjure) will send the
  66. // original client address via the HAProxy proxy protocol v1,
  67. // https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt. The original
  68. // client address is read and returned by accepted conns' RemoteAddr.
  69. // RemoteAddr _must_ be called non-concurrently before calling Read on
  70. // accepted conns as the HAProxy proxy protocol header reading logic sets
  71. // SetReadDeadline and performs a Read.
  72. //
  73. // Psiphon server hosts should be configured to accept tunnel connections only
  74. // from Refraction Networking stations.
  75. func Listen(address string) (net.Listener, error) {
  76. tcpListener, err := net.Listen("tcp", address)
  77. if err != nil {
  78. return nil, errors.Trace(err)
  79. }
  80. // Setting a timeout ensures that reading the proxy protocol
  81. // header completes or times out and RemoteAddr will not block. See:
  82. // https://godoc.org/github.com/armon/go-proxyproto#Conn.RemoteAddr
  83. proxyListener := &proxyproto.Listener{
  84. Listener: tcpListener,
  85. ProxyHeaderTimeout: READ_PROXY_PROTOCOL_HEADER_TIMEOUT}
  86. stationListener := &stationListener{
  87. proxyListener: proxyListener,
  88. }
  89. return &Listener{Listener: stationListener}, nil
  90. }
  91. // stationListener uses the proxyproto.Listener SourceCheck callback to
  92. // capture and record the direct remote address, the station address, and
  93. // wraps accepted conns to provide station address metrics via GetMetrics.
  94. // These metrics enable identifying which station fronted a connection, which
  95. // is useful for network operations and troubleshooting.
  96. //
  97. // go-proxyproto.Conn.RemoteAddr reports the originating client IP address,
  98. // which is geolocated and recorded for metrics. The underlying conn's remote
  99. // address, the station address, is not accessible via the go-proxyproto API.
  100. //
  101. // stationListener is not safe for concurrent access.
  102. type stationListener struct {
  103. proxyListener *proxyproto.Listener
  104. }
  105. func (l *stationListener) Accept() (net.Conn, error) {
  106. var stationRemoteAddr net.Addr
  107. l.proxyListener.SourceCheck = func(addr net.Addr) (bool, error) {
  108. stationRemoteAddr = addr
  109. return true, nil
  110. }
  111. conn, err := l.proxyListener.Accept()
  112. if err != nil {
  113. return nil, err
  114. }
  115. if stationRemoteAddr == nil {
  116. return nil, errors.TraceNew("missing station address")
  117. }
  118. return &stationConn{
  119. Conn: conn,
  120. stationIPAddress: common.IPAddressFromAddr(stationRemoteAddr),
  121. }, nil
  122. }
  123. func (l *stationListener) Close() error {
  124. return l.proxyListener.Close()
  125. }
  126. func (l *stationListener) Addr() net.Addr {
  127. return l.proxyListener.Addr()
  128. }
  129. type stationConn struct {
  130. net.Conn
  131. stationIPAddress string
  132. }
  133. // IrregularTunnelError implements the common.IrregularIndicator interface.
  134. func (c *stationConn) IrregularTunnelError() error {
  135. // We expect a PROXY protocol header, but go-proxyproto does not produce an
  136. // error if the "PROXY " prefix is absent; instead the connection will
  137. // proceed. To detect this case, check if the go-proxyproto RemoteAddr IP
  138. // address matches the underlying connection IP address. When these values
  139. // match, there was no PROXY protocol header.
  140. //
  141. // Limitation: the values will match if there is a PROXY protocol header
  142. // containing the same IP address as the underlying connection. This is not
  143. // an expected case.
  144. if common.IPAddressFromAddr(c.RemoteAddr()) == c.stationIPAddress {
  145. return errors.TraceNew("unexpected station IP address")
  146. }
  147. return nil
  148. }
  149. // GetMetrics implements the common.MetricsSource interface.
  150. func (c *stationConn) GetMetrics() common.LogFields {
  151. logFields := make(common.LogFields)
  152. // Ensure we don't log a potential non-station IP address.
  153. if c.IrregularTunnelError() == nil {
  154. logFields["station_ip_address"] = c.stationIPAddress
  155. }
  156. return logFields
  157. }
  158. // DialTapDance establishes a new TapDance connection to a TapDance station
  159. // specified in the config assets and forwarding through to the Psiphon server
  160. // specified by address.
  161. //
  162. // The TapDance station config assets (which are also the Conjure station
  163. // assets) are read from dataDirectory/"refraction-networking". When no config
  164. // is found, default assets are paved.
  165. //
  166. // dialer specifies the custom dialer for underlying TCP dials.
  167. //
  168. // The input ctx is expected to have a timeout for the dial.
  169. //
  170. // Limitation: the parameters emitLogs and dataDirectory are used for one-time
  171. // initialization and are ignored after the first DialTapDance/Conjure call.
  172. func DialTapDance(
  173. ctx context.Context,
  174. emitLogs bool,
  175. dataDirectory string,
  176. dialer Dialer,
  177. address string) (net.Conn, error) {
  178. // TapDance is disabled. See comment for protocol.DisabledTunnelProtocols.
  179. // With that DisabledTunnelProtocols configuration, clients should not
  180. // reach this error.
  181. //
  182. // Note that in addition to this entry point being disabled, the TapDance
  183. // ClientConf is no longer initialized in initRefractionNetworking below.
  184. return nil, errors.TraceNew("not supported")
  185. // return dial(
  186. // ctx,
  187. // emitLogs,
  188. // dataDirectory,
  189. // dialer,
  190. // address,
  191. // nil)
  192. }
  193. // DialConjure establishes a new Conjure connection to a Conjure station.
  194. //
  195. // dialer specifies the custom dialer to use for phantom dials. Additional
  196. // Conjure-specific parameters are specified in conjureConfig.
  197. //
  198. // See DialTapdance comment.
  199. func DialConjure(
  200. ctx context.Context,
  201. emitLogs bool,
  202. dataDirectory string,
  203. dialer Dialer,
  204. address string,
  205. conjureConfig *ConjureConfig) (net.Conn, error) {
  206. return dial(
  207. ctx,
  208. emitLogs,
  209. dataDirectory,
  210. dialer,
  211. address,
  212. conjureConfig)
  213. }
  214. func dial(
  215. ctx context.Context,
  216. emitLogs bool,
  217. dataDirectory string,
  218. dialer Dialer,
  219. address string,
  220. conjureConfig *ConjureConfig) (net.Conn, error) {
  221. err := initRefractionNetworking(emitLogs, dataDirectory)
  222. if err != nil {
  223. return nil, errors.Trace(err)
  224. }
  225. if _, ok := ctx.Deadline(); !ok {
  226. return nil, errors.TraceNew("dial context has no timeout")
  227. }
  228. useConjure := conjureConfig != nil
  229. manager := newDialManager()
  230. refractionDialer := &refraction_networking_client.Dialer{
  231. DialerWithLaddr: manager.makeManagedDialer(dialer),
  232. V6Support: conjureConfig.EnableIPv6Dials,
  233. UseProxyHeader: true,
  234. }
  235. conjureMetricCached := false
  236. conjureMetricDelay := time.Duration(0)
  237. conjureMetricTransport := ""
  238. conjureMetricPrefix := ""
  239. conjureMetricSTUNServerAddress := ""
  240. conjureMetricDTLSEmptyInitialPacket := false
  241. var conjureCachedRegistration *refraction_networking_client.ConjureReg
  242. var conjureRecordRegistrar *recordRegistrar
  243. if useConjure {
  244. // Our strategy is to try one registration per dial attempt: a cached
  245. // registration, if it exists, or API or decoy registration, as configured.
  246. // This assumes Psiphon establishment will try/retry many candidates as
  247. // required, and that the desired mix of API/decoy registrations will be
  248. // configured and generated. In good network conditions, internal gotapdance
  249. // retries (via APIRegistrar.MaxRetries or APIRegistrar.SecondaryRegistrar)
  250. // are unlikely to start before the Conjure dial is canceled.
  251. // Caching registrations reduces average Conjure dial time by often
  252. // eliminating the registration phase. This is especially impactful for
  253. // short duration tunnels, such as on mobile. Caching also reduces domain
  254. // fronted traffic and load on the API registrar and decoys.
  255. //
  256. // We implement a simple in-memory registration cache with the following
  257. // behavior:
  258. //
  259. // - If a new registration succeeds, but the overall Conjure dial is
  260. // _canceled_, the registration is optimistically cached.
  261. // - If the Conjure phantom dial fails, any associated cached registration
  262. // is discarded.
  263. // - A cached registration's TTL is extended upon phantom dial success.
  264. // - If the configured TTL changes, the cache is cleared.
  265. //
  266. // Limitations:
  267. // - The cache is not persistent.
  268. // - There is no TTL extension during a long connection.
  269. // - Caching a successful registration when the phantom dial is canceled may
  270. // skip the necessary "delay" step (however, an immediate re-establishment
  271. // to the same candidate is unlikely in this case).
  272. //
  273. // TODO:
  274. // - Revisit when gotapdance adds its own caching.
  275. // - Consider "pre-registering" Conjure when already connected with a
  276. // different protocol, so a Conjure registration is available on the next
  277. // establishment; in this scenario, a tunneled API registration would not
  278. // require domain fronting.
  279. refractionDialer.DarkDecoy = true
  280. // The pop operation removes the registration from the cache. This
  281. // eliminates the possibility of concurrent candidates (with the same cache
  282. // key) using and modifying the same registration, a potential race
  283. // condition. The popped cached registration must be reinserted in the cache
  284. // after canceling or success, but not on phantom dial failure.
  285. conjureCachedRegistration = conjureRegistrationCache.pop(conjureConfig)
  286. if conjureCachedRegistration != nil {
  287. refractionDialer.DarkDecoyRegistrar = &cachedRegistrar{
  288. registration: conjureCachedRegistration,
  289. }
  290. conjureMetricCached = true
  291. conjureMetricDelay = 0 // report no delay
  292. } else if conjureConfig.APIRegistrarBidirectionalURL != "" {
  293. if conjureConfig.APIRegistrarHTTPClient == nil {
  294. // While not a guaranteed check, if the APIRegistrarHTTPClient isn't set
  295. // then the API registration would certainly be unfronted, resulting in a
  296. // fingerprintable connection leak.
  297. return nil, errors.TraceNew("missing APIRegistrarHTTPClient")
  298. }
  299. refractionDialer.DarkDecoyRegistrar, err = refraction_networking_registration.NewAPIRegistrar(
  300. &refraction_networking_registration.Config{
  301. Target: conjureConfig.APIRegistrarBidirectionalURL,
  302. Bidirectional: true,
  303. Delay: conjureConfig.APIRegistrarDelay,
  304. MaxRetries: 0,
  305. HTTPClient: conjureConfig.APIRegistrarHTTPClient,
  306. })
  307. if err != nil {
  308. return nil, errors.Trace(err)
  309. }
  310. conjureMetricDelay = conjureConfig.APIRegistrarDelay
  311. } else if conjureConfig.DoDecoyRegistration {
  312. refractionDialer.DarkDecoyRegistrar = refraction_networking_registration.NewDecoyRegistrar()
  313. refractionDialer.Width = conjureConfig.DecoyRegistrarWidth
  314. // Limitation: the decoy registration delay is not currently exposed in the
  315. // gotapdance API.
  316. conjureMetricDelay = -1 // don't report delay
  317. } else {
  318. return nil, errors.TraceNew("no conjure registrar specified")
  319. }
  320. if conjureCachedRegistration == nil && conjureConfig.RegistrationCacheTTL != 0 {
  321. // Record the registration result in order to cache it.
  322. conjureRecordRegistrar = &recordRegistrar{
  323. registrar: refractionDialer.DarkDecoyRegistrar,
  324. }
  325. refractionDialer.DarkDecoyRegistrar = conjureRecordRegistrar
  326. }
  327. // Conjure transport replay limitations:
  328. //
  329. // - For CONJURE_TRANSPORT_PREFIX_OSSH, the selected prefix is not replayed
  330. // - For all transports, randomized port selection is not replayed
  331. randomizeDstPort := conjureConfig.EnablePortRandomization
  332. disableOverrides := !conjureConfig.EnableRegistrationOverrides
  333. conjureMetricTransport = conjureConfig.Transport
  334. switch conjureConfig.Transport {
  335. case protocol.CONJURE_TRANSPORT_MIN_OSSH:
  336. transport, ok := refraction_networking_transports.GetTransportByID(
  337. refraction_networking_proto.TransportType_Min)
  338. if !ok {
  339. return nil, errors.TraceNew("missing min transport")
  340. }
  341. config, err := refraction_networking_transports.NewWithParams(
  342. transport.Name(),
  343. &refraction_networking_proto.GenericTransportParams{
  344. RandomizeDstPort: &randomizeDstPort})
  345. if err != nil {
  346. return nil, errors.Trace(err)
  347. }
  348. refractionDialer.Transport = transport.ID()
  349. refractionDialer.TransportConfig = config
  350. refractionDialer.DisableRegistrarOverrides = disableOverrides
  351. refractionDialer.DialerWithLaddr = newWriteMergeDialer(
  352. refractionDialer.DialerWithLaddr, false, 32)
  353. case protocol.CONJURE_TRANSPORT_PREFIX_OSSH:
  354. transport, ok := refraction_networking_transports.GetTransportByID(
  355. refraction_networking_proto.TransportType_Prefix)
  356. if !ok {
  357. return nil, errors.TraceNew("missing prefix transport")
  358. }
  359. prefixID := int32(refraction_networking_prefix.Rand)
  360. flushPolicy := refraction_networking_prefix.FlushAfterPrefix
  361. config, err := refraction_networking_transports.NewWithParams(
  362. transport.Name(),
  363. &refraction_networking_proto.PrefixTransportParams{
  364. RandomizeDstPort: &randomizeDstPort,
  365. PrefixId: &prefixID,
  366. CustomFlushPolicy: &flushPolicy})
  367. if err != nil {
  368. return nil, errors.Trace(err)
  369. }
  370. refractionDialer.Transport = transport.ID()
  371. refractionDialer.TransportConfig = config
  372. refractionDialer.DisableRegistrarOverrides = disableOverrides
  373. refractionDialer.DialerWithLaddr = newWriteMergeDialer(
  374. refractionDialer.DialerWithLaddr, true, 64)
  375. case protocol.CONJURE_TRANSPORT_DTLS_OSSH:
  376. transport, ok := refraction_networking_transports.GetTransportByID(
  377. refraction_networking_proto.TransportType_DTLS)
  378. if !ok {
  379. return nil, errors.TraceNew("missing DTLS transport")
  380. }
  381. config, err := refraction_networking_transports.NewWithParams(
  382. transport.Name(),
  383. &refraction_networking_proto.DTLSTransportParams{
  384. RandomizeDstPort: &randomizeDstPort})
  385. if err != nil {
  386. return nil, errors.Trace(err)
  387. }
  388. if conjureConfig.STUNServerAddress == "" {
  389. return nil, errors.TraceNew("missing STUN server address")
  390. }
  391. config.SetParams(
  392. &refraction_networking_dtls.ClientConfig{
  393. STUNServer: conjureConfig.STUNServerAddress,
  394. DisableIRWorkaround: !conjureConfig.DTLSEmptyInitialPacket,
  395. })
  396. conjureMetricSTUNServerAddress = conjureConfig.STUNServerAddress
  397. conjureMetricDTLSEmptyInitialPacket = conjureConfig.DTLSEmptyInitialPacket
  398. refractionDialer.Transport = transport.ID()
  399. refractionDialer.TransportConfig = config
  400. refractionDialer.DisableRegistrarOverrides = disableOverrides
  401. default:
  402. return nil, errors.Tracef("invalid Conjure transport: %s", conjureConfig.Transport)
  403. }
  404. }
  405. // If the dial context is cancelled, use dialManager to interrupt
  406. // refractionDialer.DialContext. See dialManager comment explaining why
  407. // refractionDialer.DialContext may block even when the input context is
  408. // cancelled.
  409. dialComplete := make(chan struct{})
  410. go func() {
  411. select {
  412. case <-ctx.Done():
  413. case <-dialComplete:
  414. }
  415. select {
  416. // Prioritize the dialComplete case.
  417. case <-dialComplete:
  418. return
  419. default:
  420. }
  421. manager.close()
  422. }()
  423. conn, err := refractionDialer.DialContext(ctx, "tcp", address)
  424. close(dialComplete)
  425. if err != nil {
  426. // Call manager.close before updating cache, to synchronously shutdown dials
  427. // and ensure there are no further concurrent reads/writes to the recorded
  428. // registration before referencing it.
  429. manager.close()
  430. }
  431. // Cache (or put back) a successful registration. Also put back in the
  432. // specific error case where the phantom dial was canceled, as the
  433. // registration may still be valid. This operation implicitly extends the TTL
  434. // of a reused cached registration; we assume the Conjure station is also
  435. // extending the TTL by the same amount.
  436. //
  437. // Limitation: the cancel case shouldn't extend the TTL.
  438. if useConjure && (conjureCachedRegistration != nil || conjureRecordRegistrar != nil) {
  439. isCanceled := (err != nil && ctx.Err() == context.Canceled)
  440. if err == nil || isCanceled {
  441. registration := conjureCachedRegistration
  442. if registration == nil {
  443. // We assume gotapdance is no longer accessing the Registrar.
  444. registration = conjureRecordRegistrar.registration
  445. }
  446. // conjureRecordRegistrar.registration will be nil if there was no cached
  447. // registration _and_ registration didn't succeed before a cancel.
  448. if registration != nil {
  449. conjureRegistrationCache.put(conjureConfig, registration, isCanceled)
  450. if conjureConfig.Transport == protocol.CONJURE_TRANSPORT_PREFIX_OSSH {
  451. // Record the selected prefix name after registration, as
  452. // the registrar may have overridden the client selection.
  453. conjureMetricPrefix = registration.Transport.Name()
  454. }
  455. }
  456. } else if conjureCachedRegistration != nil {
  457. conjureConfig.Logger.WithTraceFields(
  458. common.LogFields{
  459. "diagnosticID": conjureConfig.DiagnosticID,
  460. "reason": "phantom dial failed",
  461. }).Info(
  462. "drop cached registration")
  463. }
  464. }
  465. if err != nil {
  466. return nil, errors.Trace(err)
  467. }
  468. manager.startUsingRunCtx()
  469. refractionConn := &refractionConn{
  470. Conn: conn,
  471. manager: manager,
  472. }
  473. if useConjure {
  474. // Retain these values for logging metrics.
  475. refractionConn.isConjure = true
  476. refractionConn.conjureMetricCached = conjureMetricCached
  477. refractionConn.conjureMetricDelay = conjureMetricDelay
  478. refractionConn.conjureMetricTransport = conjureMetricTransport
  479. refractionConn.conjureMetricPrefix = conjureMetricPrefix
  480. refractionConn.conjureMetricSTUNServerAddress = conjureMetricSTUNServerAddress
  481. refractionConn.conjureMetricDTLSEmptyInitialPacket = conjureMetricDTLSEmptyInitialPacket
  482. }
  483. return refractionConn, nil
  484. }
  485. func DeleteCachedConjureRegistration(config *ConjureConfig) {
  486. conjureRegistrationCache.delete(config)
  487. }
  488. type registrationCache struct {
  489. mutex sync.Mutex
  490. TTL time.Duration
  491. cache *lrucache.Cache
  492. }
  493. func newRegistrationCache() *registrationCache {
  494. return &registrationCache{
  495. cache: lrucache.NewWithLRU(
  496. lrucache.NoExpiration,
  497. 1*time.Minute,
  498. REGISTRATION_CACHE_MAX_ENTRIES),
  499. }
  500. }
  501. func (c *registrationCache) put(
  502. config *ConjureConfig,
  503. registration *refraction_networking_client.ConjureReg,
  504. isCanceled bool) {
  505. c.mutex.Lock()
  506. defer c.mutex.Unlock()
  507. // Clear the entire cache if the configured TTL changes to avoid retaining
  508. // items for too long. This is expected to be an infrequent event. The
  509. // go-cache-lru API does not offer a mechanism to inspect and adjust the TTL
  510. // of all existing items.
  511. if c.TTL != config.RegistrationCacheTTL {
  512. c.cache.Flush()
  513. c.TTL = config.RegistrationCacheTTL
  514. }
  515. // Drop the cached registration if another entry is found under the same key.
  516. // Since the dial pops its entry out of the cache, finding an existing entry
  517. // implies that another tunnel establishment candidate with the same key has
  518. // successfully registered and connected (or canceled) in the meantime.
  519. // Prefer that newer cached registration.
  520. //
  521. // For Psiphon, one scenario resulting in this condition is that the first
  522. // dial to a given server, using a cached registration, is delayed long
  523. // enough that a new candidate for the same server has been started and
  524. // outpaced the first candidate.
  525. _, found := c.cache.Get(config.RegistrationCacheKey)
  526. if found {
  527. config.Logger.WithTraceFields(
  528. common.LogFields{
  529. "diagnosticID": config.DiagnosticID,
  530. "reason": "existing entry found",
  531. }).Info(
  532. "drop cached registration")
  533. return
  534. }
  535. reason := "connected"
  536. if isCanceled {
  537. reason = "canceled"
  538. }
  539. config.Logger.WithTraceFields(
  540. common.LogFields{
  541. "diagnosticID": config.DiagnosticID,
  542. "cacheSize": c.cache.ItemCount(),
  543. "reason": reason,
  544. }).Info(
  545. "put cached registration")
  546. c.cache.Set(
  547. config.RegistrationCacheKey,
  548. registration,
  549. c.TTL)
  550. }
  551. func (c *registrationCache) pop(
  552. config *ConjureConfig) *refraction_networking_client.ConjureReg {
  553. c.mutex.Lock()
  554. defer c.mutex.Unlock()
  555. // See TTL/Flush comment in put.
  556. if c.TTL != config.RegistrationCacheTTL {
  557. c.cache.Flush()
  558. c.TTL = config.RegistrationCacheTTL
  559. }
  560. entry, found := c.cache.Get(config.RegistrationCacheKey)
  561. config.Logger.WithTraceFields(
  562. common.LogFields{
  563. "diagnosticID": config.DiagnosticID,
  564. "cacheSize": c.cache.ItemCount(),
  565. "found": found,
  566. }).Info(
  567. "pop cached registration")
  568. if found {
  569. c.cache.Delete(config.RegistrationCacheKey)
  570. return entry.(*refraction_networking_client.ConjureReg)
  571. }
  572. return nil
  573. }
  574. func (c *registrationCache) delete(config *ConjureConfig) {
  575. c.mutex.Lock()
  576. defer c.mutex.Unlock()
  577. _, found := c.cache.Get(config.RegistrationCacheKey)
  578. config.Logger.WithTraceFields(
  579. common.LogFields{
  580. "diagnosticID": config.DiagnosticID,
  581. "found": found,
  582. }).Info(
  583. "delete cached registration")
  584. if found {
  585. c.cache.Delete(config.RegistrationCacheKey)
  586. }
  587. }
  588. var conjureRegistrationCache = newRegistrationCache()
  589. type cachedRegistrar struct {
  590. registration *refraction_networking_client.ConjureReg
  591. }
  592. func (r *cachedRegistrar) Register(
  593. _ *refraction_networking_client.ConjureSession,
  594. _ context.Context) (*refraction_networking_client.ConjureReg, error) {
  595. return r.registration, nil
  596. }
  597. func (r *cachedRegistrar) PrepareRegKeys(_ [32]byte, _ []byte) error {
  598. return nil
  599. }
  600. type recordRegistrar struct {
  601. registrar refraction_networking_client.Registrar
  602. registration *refraction_networking_client.ConjureReg
  603. }
  604. func (r *recordRegistrar) Register(
  605. session *refraction_networking_client.ConjureSession,
  606. ctx context.Context) (*refraction_networking_client.ConjureReg, error) {
  607. registration, err := r.registrar.Register(session, ctx)
  608. if err != nil {
  609. return nil, errors.Trace(err)
  610. }
  611. r.registration = registration
  612. return registration, nil
  613. }
  614. func (r *recordRegistrar) PrepareRegKeys(_ [32]byte, _ []byte) error {
  615. return nil
  616. }
  617. // writeMergeConn merges Conjure transport and subsequent OSSH writes in order
  618. // to avoid fixed-sized first or second TCP packets always containing exactly
  619. // the 32-byte or 64-byte HMAC tag.
  620. //
  621. // The Conjure Prefix transport will first write a prefix. writeMergeConn
  622. // assumes the FlushAfterPrefix policy is used, so the first write call for
  623. // that transport will be exactly the arbitrary sized prefix. The second
  624. // write call will be the HMAC tag. Pass the first write through to the
  625. // underlying conn, and then expect the HMAC tag on the second write, and
  626. // handle as follows.
  627. //
  628. // The Conjure Min transport first calls write with an HMAC tag. Buffer this
  629. // value and await the following initial OSSH write, and prepend the buffered
  630. // HMAC tag to the random OSSH data. The first write by OSSH will be a
  631. // variable length multi-packet-sized sequence of random bytes.
  632. type writeMergeConn struct {
  633. net.Conn
  634. tagSize int
  635. mutex sync.Mutex
  636. state int
  637. buffer []byte
  638. err error
  639. }
  640. const (
  641. stateWriteMergeAwaitingPrefix = iota
  642. stateWriteMergeAwaitingTag
  643. stateWriteMergeBufferedTag
  644. stateWriteMergeFinishedMerging
  645. stateWriteMergeFailed
  646. )
  647. func newWriteMergeConn(conn net.Conn, hasPrefix bool, tagSize int) *writeMergeConn {
  648. c := &writeMergeConn{
  649. Conn: conn,
  650. tagSize: tagSize,
  651. }
  652. if hasPrefix {
  653. c.state = stateWriteMergeAwaitingPrefix
  654. } else {
  655. c.state = stateWriteMergeAwaitingTag
  656. }
  657. return c
  658. }
  659. func (conn *writeMergeConn) Write(p []byte) (int, error) {
  660. conn.mutex.Lock()
  661. defer conn.mutex.Unlock()
  662. switch conn.state {
  663. case stateWriteMergeAwaitingPrefix:
  664. conn.state = stateWriteMergeAwaitingTag
  665. return conn.Conn.Write(p)
  666. case stateWriteMergeAwaitingTag:
  667. if len(p) != conn.tagSize {
  668. conn.state = stateWriteMergeFailed
  669. conn.err = errors.Tracef("unexpected tag write size: %d", len(p))
  670. return 0, conn.err
  671. }
  672. conn.buffer = make([]byte, conn.tagSize)
  673. copy(conn.buffer, p)
  674. conn.state = stateWriteMergeBufferedTag
  675. return conn.tagSize, nil
  676. case stateWriteMergeBufferedTag:
  677. conn.buffer = append(conn.buffer, p...)
  678. n, err := conn.Conn.Write(conn.buffer)
  679. if err != nil {
  680. conn.state = stateWriteMergeFailed
  681. conn.err = errors.Trace(err)
  682. } else {
  683. conn.state = stateWriteMergeFinishedMerging
  684. conn.buffer = nil
  685. }
  686. n -= conn.tagSize
  687. if n < 0 {
  688. n = 0
  689. }
  690. // Do not wrap Conn.Write errors
  691. return n, err
  692. case stateWriteMergeFinishedMerging:
  693. return conn.Conn.Write(p)
  694. case stateWriteMergeFailed:
  695. // Return the original error that caused the failure
  696. return 0, conn.err
  697. default:
  698. return 0, errors.TraceNew("unexpected state")
  699. }
  700. }
  701. func newWriteMergeDialer(dialer Dialer, hasPrefix bool, tagSize int) Dialer {
  702. return func(ctx context.Context, network, laddr, raddr string) (net.Conn, error) {
  703. conn, err := dialer(ctx, network, laddr, raddr)
  704. if err != nil {
  705. return nil, errors.Trace(err)
  706. }
  707. return newWriteMergeConn(conn, hasPrefix, tagSize), nil
  708. }
  709. }
  710. // dialManager tracks all dials performed by and dialed conns used by a
  711. // refraction_networking_client conn. dialManager.close interrupts/closes
  712. // all pending dials and established conns immediately. This ensures that
  713. // blocking calls within refraction_networking_client, such as tls.Handhake,
  714. // are interrupted:
  715. // E.g., https://github.com/refraction-networking/gotapdance/blob/4d84655dad2e242b0af0459c31f687b12085dcca/tapdance/conn_raw.go#L307
  716. // (...preceeding SetDeadline is insufficient for immediate cancellation.)
  717. //
  718. // This remains an issue with the Conjure Decoy Registrar:
  719. // https://github.com/refraction-networking/conjure/blob/d9d58260cc7017ab0c64b120579b123a5b2d1c96/pkg/registrars/decoy-registrar/decoy-registrar.go#L208
  720. type dialManager struct {
  721. ctxMutex sync.Mutex
  722. useRunCtx bool
  723. initialDialCtx context.Context
  724. runCtx context.Context
  725. stopRunning context.CancelFunc
  726. conns *common.Conns[net.Conn]
  727. }
  728. func newDialManager() *dialManager {
  729. runCtx, stopRunning := context.WithCancel(context.Background())
  730. return &dialManager{
  731. runCtx: runCtx,
  732. stopRunning: stopRunning,
  733. conns: common.NewConns[net.Conn](),
  734. }
  735. }
  736. func (manager *dialManager) makeManagedDialer(dialer Dialer) Dialer {
  737. return func(ctx context.Context, network, laddr, raddr string) (net.Conn, error) {
  738. return manager.dialWithDialer(dialer, ctx, network, laddr, raddr)
  739. }
  740. }
  741. func (manager *dialManager) dialWithDialer(
  742. dialer Dialer,
  743. ctx context.Context,
  744. network string,
  745. laddr string,
  746. raddr string) (net.Conn, error) {
  747. // The context for this dial is either:
  748. // - ctx, during the initial refraction_networking_client.DialContext, when
  749. // this is Psiphon tunnel establishment.
  750. // - manager.runCtx after the initial refraction_networking_client.Dial
  751. // completes, in which case this is a TapDance protocol reconnection that
  752. // occurs periodically for already established tunnels.
  753. manager.ctxMutex.Lock()
  754. if manager.useRunCtx {
  755. // Preserve the random timeout configured by the TapDance client:
  756. // https://github.com/refraction-networking/gotapdance/blob/4d84655dad2e242b0af0459c31f687b12085dcca/tapdance/conn_raw.go#L263
  757. deadline, ok := ctx.Deadline()
  758. if !ok {
  759. return nil, errors.Tracef("unexpected nil deadline")
  760. }
  761. var cancelFunc context.CancelFunc
  762. ctx, cancelFunc = context.WithDeadline(manager.runCtx, deadline)
  763. defer cancelFunc()
  764. }
  765. manager.ctxMutex.Unlock()
  766. conn, err := dialer(ctx, network, laddr, raddr)
  767. if err != nil {
  768. return nil, errors.Trace(err)
  769. }
  770. // Fail immediately if CloseWrite isn't available in the underlying dialed
  771. // conn. The equivalent check in managedConn.CloseWrite isn't fatal and
  772. // TapDance will run in a degraded state.
  773. // Limitation: if the underlying conn _also_ passes through CloseWrite, this
  774. // check may be insufficient.
  775. if _, ok := conn.(common.CloseWriter); network == "tcp" && !ok {
  776. return nil, errors.TraceNew("underlying conn is not a CloseWriter")
  777. }
  778. conn = &managedConn{
  779. Conn: conn,
  780. manager: manager,
  781. }
  782. if !manager.conns.Add(conn) {
  783. conn.Close()
  784. return nil, errors.TraceNew("already closed")
  785. }
  786. return conn, nil
  787. }
  788. func (manager *dialManager) startUsingRunCtx() {
  789. manager.ctxMutex.Lock()
  790. manager.initialDialCtx = nil
  791. manager.useRunCtx = true
  792. manager.ctxMutex.Unlock()
  793. }
  794. func (manager *dialManager) close() {
  795. manager.conns.CloseAll()
  796. manager.stopRunning()
  797. }
  798. type managedConn struct {
  799. net.Conn
  800. manager *dialManager
  801. }
  802. type fileConn interface {
  803. File() (*os.File, error)
  804. }
  805. // File exposes the net.UDPConn.File() functionality required by the Conjure
  806. // DTLS transport.
  807. func (conn *managedConn) File() (*os.File, error) {
  808. if f, ok := conn.Conn.(fileConn); ok {
  809. return f.File()
  810. }
  811. return nil, errors.TraceNew("underlying conn is not a fileConn")
  812. }
  813. // CloseWrite exposes the net.TCPConn.CloseWrite() functionality
  814. // required by TapDance.
  815. func (conn *managedConn) CloseWrite() error {
  816. if closeWriter, ok := conn.Conn.(common.CloseWriter); ok {
  817. return closeWriter.CloseWrite()
  818. }
  819. return errors.TraceNew("underlying conn is not a CloseWriter")
  820. }
  821. func (conn *managedConn) Close() error {
  822. // Remove must be invoked asynchronously, as this Close may be called by
  823. // conns.CloseAll, leading to a reentrant lock situation.
  824. go conn.manager.conns.Remove(conn)
  825. return conn.Conn.Close()
  826. }
  827. type refractionConn struct {
  828. net.Conn
  829. manager *dialManager
  830. isClosed int32
  831. isConjure bool
  832. conjureMetricCached bool
  833. conjureMetricDelay time.Duration
  834. conjureMetricTransport string
  835. conjureMetricPrefix string
  836. conjureMetricSTUNServerAddress string
  837. conjureMetricDTLSEmptyInitialPacket bool
  838. }
  839. func (conn *refractionConn) Write(p []byte) (int, error) {
  840. n, err := conn.Conn.Write(p)
  841. // For the DTLS transport, underlying SCTP conn writes may fail
  842. // with "stream closed" -- which indicates a permanent failure of the
  843. // transport -- without closing the conn. Explicitly close the conn on
  844. // this error, which will trigger Psiphon to reconnect faster via
  845. // IsClosed checks on port forward failures.
  846. //
  847. // The close is invoked asynchronously to avoid possible deadlocks due to
  848. // a hypothetical panic in the Close call: for a port forward, the unwind
  849. // will invoke a deferred ssh.channel.Close which reenters Write;
  850. // meanwhile, the underlying ssh.channel.writePacket acquires a
  851. // ssh.channel.writeMu lock but does not defer the unlock.
  852. if std_errors.Is(err, sctp.ErrStreamClosed) {
  853. go func() {
  854. _ = conn.Close()
  855. }()
  856. }
  857. return n, err
  858. }
  859. func (conn *refractionConn) Close() error {
  860. conn.manager.close()
  861. err := conn.Conn.Close()
  862. atomic.StoreInt32(&conn.isClosed, 1)
  863. return err
  864. }
  865. func (conn *refractionConn) IsClosed() bool {
  866. return atomic.LoadInt32(&conn.isClosed) == 1
  867. }
  868. // GetMetrics implements the common.MetricsSource interface.
  869. func (conn *refractionConn) GetMetrics() common.LogFields {
  870. logFields := make(common.LogFields)
  871. if conn.isConjure {
  872. cached := "0"
  873. if conn.conjureMetricCached {
  874. cached = "1"
  875. }
  876. logFields["conjure_cached"] = cached
  877. if conn.conjureMetricDelay != -1 {
  878. logFields["conjure_delay"] = fmt.Sprintf("%d", conn.conjureMetricDelay/time.Millisecond)
  879. }
  880. logFields["conjure_transport"] = conn.conjureMetricTransport
  881. if conn.conjureMetricPrefix != "" {
  882. logFields["conjure_prefix"] = conn.conjureMetricPrefix
  883. }
  884. if conn.conjureMetricSTUNServerAddress != "" {
  885. logFields["conjure_stun"] = conn.conjureMetricSTUNServerAddress
  886. }
  887. if conn.conjureMetricTransport == protocol.CONJURE_TRANSPORT_DTLS_OSSH {
  888. emptyPacket := "0"
  889. if conn.conjureMetricDTLSEmptyInitialPacket {
  890. emptyPacket = "1"
  891. }
  892. logFields["conjure_empty_packet"] = emptyPacket
  893. }
  894. host, port, err := net.SplitHostPort(conn.RemoteAddr().String())
  895. if err == nil {
  896. network := "IPv4"
  897. if IP := net.ParseIP(host); IP != nil && IP.To4() == nil {
  898. network = "IPv6"
  899. }
  900. logFields["conjure_network"] = network
  901. logFields["conjure_port_number"] = port
  902. }
  903. }
  904. return logFields
  905. }
  906. var initRefractionNetworkingOnce sync.Once
  907. func initRefractionNetworking(emitLogs bool, dataDirectory string) error {
  908. var initErr error
  909. initRefractionNetworkingOnce.Do(func() {
  910. if !emitLogs {
  911. refraction_networking_client.Logger().Out = ioutil.Discard
  912. } else {
  913. refraction_networking_client.Logger().Out = os.Stdout
  914. }
  915. assetsDir := filepath.Join(dataDirectory, "refraction-networking")
  916. err := os.MkdirAll(assetsDir, 0700)
  917. if err != nil {
  918. initErr = errors.Trace(err)
  919. return
  920. }
  921. clientConfFileName := filepath.Join(assetsDir, "ClientConf")
  922. _, err = os.Stat(clientConfFileName)
  923. if err != nil && os.IsNotExist(err) {
  924. err = ioutil.WriteFile(clientConfFileName, getEmbeddedClientConf(), 0644)
  925. }
  926. if err != nil {
  927. initErr = errors.Trace(err)
  928. return
  929. }
  930. refraction_networking_assets.AssetsSetDir(assetsDir)
  931. // TapDance now uses a distinct Assets/ClientConf,
  932. // refraction_networking_client.Assets. Do not configure the TapDance
  933. // ClientConf to use the same configuration as Conjure, as the
  934. // Conjure ClientConf may contain decoys that are appropriate for
  935. // registration load but not full TapDance tunnel load.
  936. })
  937. return initErr
  938. }