refraction.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136
  1. //go:build PSIPHON_ENABLE_REFRACTION_NETWORKING
  2. // +build PSIPHON_ENABLE_REFRACTION_NETWORKING
  3. /*
  4. * Copyright (c) 2018, Psiphon Inc.
  5. * All rights reserved.
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. *
  20. */
  21. /*
  22. Package refraction wraps github.com/refraction-networking/gotapdance with
  23. net.Listener and net.Conn types that provide drop-in integration with Psiphon.
  24. */
  25. package refraction
  26. import (
  27. "context"
  28. std_errors "errors"
  29. "fmt"
  30. "io/ioutil"
  31. "net"
  32. "os"
  33. "path/filepath"
  34. "sync"
  35. "sync/atomic"
  36. "time"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  40. "github.com/armon/go-proxyproto"
  41. lrucache "github.com/cognusion/go-cache-lru"
  42. "github.com/pion/sctp"
  43. refraction_networking_assets "github.com/refraction-networking/conjure/pkg/client/assets"
  44. refraction_networking_registration "github.com/refraction-networking/conjure/pkg/registrars/registration"
  45. refraction_networking_transports "github.com/refraction-networking/conjure/pkg/transports/client"
  46. refraction_networking_dtls "github.com/refraction-networking/conjure/pkg/transports/connecting/dtls"
  47. refraction_networking_prefix "github.com/refraction-networking/conjure/pkg/transports/wrapping/prefix"
  48. refraction_networking_proto "github.com/refraction-networking/conjure/proto"
  49. refraction_networking_client "github.com/refraction-networking/gotapdance/tapdance"
  50. )
  51. const (
  52. READ_PROXY_PROTOCOL_HEADER_TIMEOUT = 5 * time.Second
  53. REGISTRATION_CACHE_MAX_ENTRIES = 256
  54. )
  55. // Enabled indicates if Refraction Networking functionality is enabled.
  56. func Enabled() bool {
  57. return true
  58. }
  59. // Listener is a net.Listener.
  60. type Listener struct {
  61. net.Listener
  62. }
  63. // Listen creates a new Refraction Networking listener.
  64. //
  65. // The Refraction Networking station (TapDance or Conjure) will send the
  66. // original client address via the HAProxy proxy protocol v1,
  67. // https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt. The original
  68. // client address is read and returned by accepted conns' RemoteAddr.
  69. // RemoteAddr _must_ be called non-concurrently before calling Read on
  70. // accepted conns as the HAProxy proxy protocol header reading logic sets
  71. // SetReadDeadline and performs a Read.
  72. //
  73. // Psiphon server hosts should be configured to accept tunnel connections only
  74. // from Refraction Networking stations.
  75. func Listen(address string) (net.Listener, error) {
  76. tcpListener, err := net.Listen("tcp", address)
  77. if err != nil {
  78. return nil, errors.Trace(err)
  79. }
  80. // Setting a timeout ensures that reading the proxy protocol
  81. // header completes or times out and RemoteAddr will not block. See:
  82. // https://godoc.org/github.com/armon/go-proxyproto#Conn.RemoteAddr
  83. proxyListener := &proxyproto.Listener{
  84. Listener: tcpListener,
  85. ProxyHeaderTimeout: READ_PROXY_PROTOCOL_HEADER_TIMEOUT}
  86. stationListener := &stationListener{
  87. proxyListener: proxyListener,
  88. }
  89. return &Listener{Listener: stationListener}, nil
  90. }
  91. // stationListener uses the proxyproto.Listener SourceCheck callback to
  92. // capture and record the direct remote address, the station address, and
  93. // wraps accepted conns to provide station address metrics via GetMetrics.
  94. // These metrics enable identifying which station fronted a connection, which
  95. // is useful for network operations and troubleshooting.
  96. //
  97. // go-proxyproto.Conn.RemoteAddr reports the originating client IP address,
  98. // which is geolocated and recorded for metrics. The underlying conn's remote
  99. // address, the station address, is not accessible via the go-proxyproto API.
  100. //
  101. // stationListener is not safe for concurrent access.
  102. type stationListener struct {
  103. proxyListener *proxyproto.Listener
  104. }
  105. func (l *stationListener) Accept() (net.Conn, error) {
  106. var stationRemoteAddr net.Addr
  107. l.proxyListener.SourceCheck = func(addr net.Addr) (bool, error) {
  108. stationRemoteAddr = addr
  109. return true, nil
  110. }
  111. conn, err := l.proxyListener.Accept()
  112. if err != nil {
  113. return nil, err
  114. }
  115. if stationRemoteAddr == nil {
  116. return nil, errors.TraceNew("missing station address")
  117. }
  118. return &stationConn{
  119. Conn: conn,
  120. stationIPAddress: common.IPAddressFromAddr(stationRemoteAddr),
  121. }, nil
  122. }
  123. func (l *stationListener) Close() error {
  124. return l.proxyListener.Close()
  125. }
  126. func (l *stationListener) Addr() net.Addr {
  127. return l.proxyListener.Addr()
  128. }
  129. type stationConn struct {
  130. net.Conn
  131. stationIPAddress string
  132. }
  133. // IrregularTunnelError implements the common.IrregularIndicator interface.
  134. func (c *stationConn) IrregularTunnelError() error {
  135. // We expect a PROXY protocol header, but go-proxyproto does not produce an
  136. // error if the "PROXY " prefix is absent; instead the connection will
  137. // proceed. To detect this case, check if the go-proxyproto RemoteAddr IP
  138. // address matches the underlying connection IP address. When these values
  139. // match, there was no PROXY protocol header.
  140. //
  141. // Limitation: the values will match if there is a PROXY protocol header
  142. // containing the same IP address as the underlying connection. This is not
  143. // an expected case.
  144. if common.IPAddressFromAddr(c.RemoteAddr()) == c.stationIPAddress {
  145. return errors.TraceNew("unexpected station IP address")
  146. }
  147. return nil
  148. }
  149. // GetMetrics implements the common.MetricsSource interface.
  150. func (c *stationConn) GetMetrics() common.LogFields {
  151. logFields := make(common.LogFields)
  152. // Ensure we don't log a potential non-station IP address.
  153. if c.IrregularTunnelError() == nil {
  154. logFields["station_ip_address"] = c.stationIPAddress
  155. }
  156. return logFields
  157. }
  158. // DialTapDance establishes a new TapDance connection to a TapDance station
  159. // specified in the config assets and forwarding through to the Psiphon server
  160. // specified by address.
  161. //
  162. // The TapDance station config assets (which are also the Conjure station
  163. // assets) are read from dataDirectory/"refraction-networking". When no config
  164. // is found, default assets are paved.
  165. //
  166. // dialer specifies the custom dialer for underlying TCP dials.
  167. //
  168. // The input ctx is expected to have a timeout for the dial.
  169. //
  170. // Limitation: the parameters emitLogs and dataDirectory are used for one-time
  171. // initialization and are ignored after the first DialTapDance/Conjure call.
  172. func DialTapDance(
  173. ctx context.Context,
  174. emitLogs bool,
  175. dataDirectory string,
  176. dialer Dialer,
  177. address string) (net.Conn, error) {
  178. // TapDance is disabled. See comment for protocol.DisabledTunnelProtocols.
  179. // With that DisabledTunnelProtocols configuration, clients should not
  180. // reach this error.
  181. //
  182. // Note that in addition to this entry point being disabled, the TapDance
  183. // ClientConf is no longer initialized in initRefractionNetworking below.
  184. return nil, errors.TraceNew("not supported")
  185. // return dial(
  186. // ctx,
  187. // emitLogs,
  188. // dataDirectory,
  189. // dialer,
  190. // address,
  191. // nil)
  192. }
  193. // DialConjure establishes a new Conjure connection to a Conjure station.
  194. //
  195. // dialer specifies the custom dialer to use for phantom dials. Additional
  196. // Conjure-specific parameters are specified in conjureConfig.
  197. //
  198. // See DialTapdance comment.
  199. func DialConjure(
  200. ctx context.Context,
  201. emitLogs bool,
  202. dataDirectory string,
  203. dialer Dialer,
  204. address string,
  205. conjureConfig *ConjureConfig) (net.Conn, error) {
  206. return dial(
  207. ctx,
  208. emitLogs,
  209. dataDirectory,
  210. dialer,
  211. address,
  212. conjureConfig)
  213. }
  214. func dial(
  215. ctx context.Context,
  216. emitLogs bool,
  217. dataDirectory string,
  218. dialer Dialer,
  219. address string,
  220. conjureConfig *ConjureConfig) (net.Conn, error) {
  221. err := initRefractionNetworking(emitLogs, dataDirectory)
  222. if err != nil {
  223. return nil, errors.Trace(err)
  224. }
  225. if _, ok := ctx.Deadline(); !ok {
  226. return nil, errors.TraceNew("dial context has no timeout")
  227. }
  228. useConjure := conjureConfig != nil
  229. manager := newDialManager()
  230. refractionDialer := &refraction_networking_client.Dialer{
  231. DialerWithLaddr: manager.makeManagedDialer(dialer),
  232. V6Support: conjureConfig.EnableIPv6Dials,
  233. UseProxyHeader: true,
  234. }
  235. conjureMetricCached := false
  236. conjureMetricDelay := time.Duration(0)
  237. conjureMetricTransport := ""
  238. conjureMetricPrefix := ""
  239. conjureMetricSTUNServerAddress := ""
  240. conjureMetricDTLSEmptyInitialPacket := false
  241. var conjureCachedRegistration *refraction_networking_client.ConjureReg
  242. var conjureRecordRegistrar *recordRegistrar
  243. if useConjure {
  244. // Our strategy is to try one registration per dial attempt: a cached
  245. // registration, if it exists, or API or decoy registration, as configured.
  246. // This assumes Psiphon establishment will try/retry many candidates as
  247. // required, and that the desired mix of API/decoy registrations will be
  248. // configured and generated. In good network conditions, internal gotapdance
  249. // retries (via APIRegistrar.MaxRetries or APIRegistrar.SecondaryRegistrar)
  250. // are unlikely to start before the Conjure dial is canceled.
  251. // Caching registrations reduces average Conjure dial time by often
  252. // eliminating the registration phase. This is especially impactful for
  253. // short duration tunnels, such as on mobile. Caching also reduces domain
  254. // fronted traffic and load on the API registrar and decoys.
  255. //
  256. // We implement a simple in-memory registration cache with the following
  257. // behavior:
  258. //
  259. // - If a new registration succeeds, but the overall Conjure dial is
  260. // _canceled_, the registration is optimistically cached.
  261. // - If the Conjure phantom dial fails, any associated cached registration
  262. // is discarded.
  263. // - A cached registration's TTL is extended upon phantom dial success.
  264. // - If the configured TTL changes, the cache is cleared.
  265. //
  266. // Limitations:
  267. // - The cache is not persistent.
  268. // - There is no TTL extension during a long connection.
  269. // - Caching a successful registration when the phantom dial is canceled may
  270. // skip the necessary "delay" step (however, an immediate re-establishment
  271. // to the same candidate is unlikely in this case).
  272. //
  273. // TODO:
  274. // - Revisit when gotapdance adds its own caching.
  275. // - Consider "pre-registering" Conjure when already connected with a
  276. // different protocol, so a Conjure registration is available on the next
  277. // establishment; in this scenario, a tunneled API registration would not
  278. // require domain fronting.
  279. refractionDialer.DarkDecoy = true
  280. // The pop operation removes the registration from the cache. This
  281. // eliminates the possibility of concurrent candidates (with the same cache
  282. // key) using and modifying the same registration, a potential race
  283. // condition. The popped cached registration must be reinserted in the cache
  284. // after canceling or success, but not on phantom dial failure.
  285. conjureCachedRegistration = conjureRegistrationCache.pop(conjureConfig)
  286. if conjureCachedRegistration != nil {
  287. refractionDialer.DarkDecoyRegistrar = &cachedRegistrar{
  288. registration: conjureCachedRegistration,
  289. }
  290. conjureMetricCached = true
  291. conjureMetricDelay = 0 // report no delay
  292. } else if conjureConfig.APIRegistrarBidirectionalURL != "" {
  293. if conjureConfig.APIRegistrarHTTPClient == nil {
  294. // While not a guaranteed check, if the APIRegistrarHTTPClient isn't set
  295. // then the API registration would certainly be unfronted, resulting in a
  296. // fingerprintable connection leak.
  297. return nil, errors.TraceNew("missing APIRegistrarHTTPClient")
  298. }
  299. refractionDialer.DarkDecoyRegistrar, err = refraction_networking_registration.NewAPIRegistrar(
  300. &refraction_networking_registration.Config{
  301. Target: conjureConfig.APIRegistrarBidirectionalURL,
  302. Bidirectional: true,
  303. Delay: conjureConfig.APIRegistrarDelay,
  304. MaxRetries: 0,
  305. HTTPClient: conjureConfig.APIRegistrarHTTPClient,
  306. })
  307. if err != nil {
  308. return nil, errors.Trace(err)
  309. }
  310. conjureMetricDelay = conjureConfig.APIRegistrarDelay
  311. } else if conjureConfig.DoDecoyRegistration {
  312. refractionDialer.DarkDecoyRegistrar = refraction_networking_registration.NewDecoyRegistrar()
  313. refractionDialer.Width = conjureConfig.DecoyRegistrarWidth
  314. // Limitation: the decoy registration delay is not currently exposed in the
  315. // gotapdance API.
  316. conjureMetricDelay = -1 // don't report delay
  317. } else {
  318. return nil, errors.TraceNew("no conjure registrar specified")
  319. }
  320. if conjureCachedRegistration == nil && conjureConfig.RegistrationCacheTTL != 0 {
  321. // Record the registration result in order to cache it.
  322. conjureRecordRegistrar = &recordRegistrar{
  323. registrar: refractionDialer.DarkDecoyRegistrar,
  324. }
  325. refractionDialer.DarkDecoyRegistrar = conjureRecordRegistrar
  326. }
  327. // Conjure transport replay limitations:
  328. //
  329. // - For CONJURE_TRANSPORT_PREFIX_OSSH, the selected prefix is not replayed
  330. // - For all transports, randomized port selection is not replayed
  331. randomizeDstPort := conjureConfig.EnablePortRandomization
  332. disableOverrides := !conjureConfig.EnableRegistrationOverrides
  333. conjureMetricTransport = conjureConfig.Transport
  334. switch conjureConfig.Transport {
  335. case protocol.CONJURE_TRANSPORT_MIN_OSSH:
  336. transport, ok := refraction_networking_transports.GetTransportByID(
  337. refraction_networking_proto.TransportType_Min)
  338. if !ok {
  339. return nil, errors.TraceNew("missing min transport")
  340. }
  341. config, err := refraction_networking_transports.NewWithParams(
  342. transport.Name(),
  343. &refraction_networking_proto.GenericTransportParams{
  344. RandomizeDstPort: &randomizeDstPort})
  345. if err != nil {
  346. return nil, errors.Trace(err)
  347. }
  348. refractionDialer.Transport = transport.ID()
  349. refractionDialer.TransportConfig = config
  350. refractionDialer.DisableRegistrarOverrides = disableOverrides
  351. if !conjureConfig.DoDecoyRegistration {
  352. // Limitation: the writeMergeConn wrapping is skipped when
  353. // using decoy registration, since the refraction package
  354. // uses DialerWithLaddr for both the decoy registration step
  355. // as well as the following phantom dial, and the
  356. // writeMergeConn is only appropriate for the phantom dial.
  357. refractionDialer.DialerWithLaddr = newWriteMergeDialer(
  358. refractionDialer.DialerWithLaddr, false, 32)
  359. }
  360. case protocol.CONJURE_TRANSPORT_PREFIX_OSSH:
  361. transport, ok := refraction_networking_transports.GetTransportByID(
  362. refraction_networking_proto.TransportType_Prefix)
  363. if !ok {
  364. return nil, errors.TraceNew("missing prefix transport")
  365. }
  366. prefixID := int32(refraction_networking_prefix.Rand)
  367. flushPolicy := refraction_networking_prefix.FlushAfterPrefix
  368. config, err := refraction_networking_transports.NewWithParams(
  369. transport.Name(),
  370. &refraction_networking_proto.PrefixTransportParams{
  371. RandomizeDstPort: &randomizeDstPort,
  372. PrefixId: &prefixID,
  373. CustomFlushPolicy: &flushPolicy})
  374. if err != nil {
  375. return nil, errors.Trace(err)
  376. }
  377. refractionDialer.Transport = transport.ID()
  378. refractionDialer.TransportConfig = config
  379. refractionDialer.DisableRegistrarOverrides = disableOverrides
  380. if !conjureConfig.DoDecoyRegistration {
  381. // See limitation comment above.
  382. refractionDialer.DialerWithLaddr = newWriteMergeDialer(
  383. refractionDialer.DialerWithLaddr, true, 64)
  384. }
  385. case protocol.CONJURE_TRANSPORT_DTLS_OSSH:
  386. transport, ok := refraction_networking_transports.GetTransportByID(
  387. refraction_networking_proto.TransportType_DTLS)
  388. if !ok {
  389. return nil, errors.TraceNew("missing DTLS transport")
  390. }
  391. config, err := refraction_networking_transports.NewWithParams(
  392. transport.Name(),
  393. &refraction_networking_proto.DTLSTransportParams{
  394. RandomizeDstPort: &randomizeDstPort})
  395. if err != nil {
  396. return nil, errors.Trace(err)
  397. }
  398. if conjureConfig.STUNServerAddress == "" {
  399. return nil, errors.TraceNew("missing STUN server address")
  400. }
  401. config.SetParams(
  402. &refraction_networking_dtls.ClientConfig{
  403. STUNServer: conjureConfig.STUNServerAddress,
  404. DisableIRWorkaround: !conjureConfig.DTLSEmptyInitialPacket,
  405. })
  406. conjureMetricSTUNServerAddress = conjureConfig.STUNServerAddress
  407. conjureMetricDTLSEmptyInitialPacket = conjureConfig.DTLSEmptyInitialPacket
  408. refractionDialer.Transport = transport.ID()
  409. refractionDialer.TransportConfig = config
  410. refractionDialer.DisableRegistrarOverrides = disableOverrides
  411. default:
  412. return nil, errors.Tracef("invalid Conjure transport: %s", conjureConfig.Transport)
  413. }
  414. }
  415. // If the dial context is cancelled, use dialManager to interrupt
  416. // refractionDialer.DialContext. See dialManager comment explaining why
  417. // refractionDialer.DialContext may block even when the input context is
  418. // cancelled.
  419. dialComplete := make(chan struct{})
  420. go func() {
  421. select {
  422. case <-ctx.Done():
  423. case <-dialComplete:
  424. }
  425. select {
  426. // Prioritize the dialComplete case.
  427. case <-dialComplete:
  428. return
  429. default:
  430. }
  431. manager.close()
  432. }()
  433. conn, err := refractionDialer.DialContext(ctx, "tcp", address)
  434. close(dialComplete)
  435. if err != nil {
  436. // Call manager.close before updating cache, to synchronously shutdown dials
  437. // and ensure there are no further concurrent reads/writes to the recorded
  438. // registration before referencing it.
  439. manager.close()
  440. }
  441. // Cache (or put back) a successful registration. Also put back in the
  442. // specific error case where the phantom dial was canceled, as the
  443. // registration may still be valid. This operation implicitly extends the TTL
  444. // of a reused cached registration; we assume the Conjure station is also
  445. // extending the TTL by the same amount.
  446. //
  447. // Limitation: the cancel case shouldn't extend the TTL.
  448. if useConjure && (conjureCachedRegistration != nil || conjureRecordRegistrar != nil) {
  449. isCanceled := (err != nil && ctx.Err() == context.Canceled)
  450. if err == nil || isCanceled {
  451. registration := conjureCachedRegistration
  452. if registration == nil {
  453. // We assume gotapdance is no longer accessing the Registrar.
  454. registration = conjureRecordRegistrar.registration
  455. }
  456. // conjureRecordRegistrar.registration will be nil if there was no cached
  457. // registration _and_ registration didn't succeed before a cancel.
  458. if registration != nil {
  459. conjureRegistrationCache.put(conjureConfig, registration, isCanceled)
  460. if conjureConfig.Transport == protocol.CONJURE_TRANSPORT_PREFIX_OSSH {
  461. // Record the selected prefix name after registration, as
  462. // the registrar may have overridden the client selection.
  463. conjureMetricPrefix = registration.Transport.Name()
  464. }
  465. }
  466. } else if conjureCachedRegistration != nil {
  467. conjureConfig.Logger.WithTraceFields(
  468. common.LogFields{
  469. "diagnosticID": conjureConfig.DiagnosticID,
  470. "reason": "phantom dial failed",
  471. }).Info(
  472. "drop cached registration")
  473. }
  474. }
  475. if err != nil {
  476. return nil, errors.Trace(err)
  477. }
  478. manager.startUsingRunCtx()
  479. refractionConn := &refractionConn{
  480. Conn: conn,
  481. manager: manager,
  482. }
  483. if useConjure {
  484. // Retain these values for logging metrics.
  485. refractionConn.isConjure = true
  486. refractionConn.conjureMetricCached = conjureMetricCached
  487. refractionConn.conjureMetricDelay = conjureMetricDelay
  488. refractionConn.conjureMetricTransport = conjureMetricTransport
  489. refractionConn.conjureMetricPrefix = conjureMetricPrefix
  490. refractionConn.conjureMetricSTUNServerAddress = conjureMetricSTUNServerAddress
  491. refractionConn.conjureMetricDTLSEmptyInitialPacket = conjureMetricDTLSEmptyInitialPacket
  492. }
  493. return refractionConn, nil
  494. }
  495. func DeleteCachedConjureRegistration(config *ConjureConfig) {
  496. conjureRegistrationCache.delete(config)
  497. }
  498. type registrationCache struct {
  499. mutex sync.Mutex
  500. TTL time.Duration
  501. cache *lrucache.Cache
  502. }
  503. func newRegistrationCache() *registrationCache {
  504. return &registrationCache{
  505. cache: lrucache.NewWithLRU(
  506. lrucache.NoExpiration,
  507. 1*time.Minute,
  508. REGISTRATION_CACHE_MAX_ENTRIES),
  509. }
  510. }
  511. func (c *registrationCache) put(
  512. config *ConjureConfig,
  513. registration *refraction_networking_client.ConjureReg,
  514. isCanceled bool) {
  515. c.mutex.Lock()
  516. defer c.mutex.Unlock()
  517. // Clear the entire cache if the configured TTL changes to avoid retaining
  518. // items for too long. This is expected to be an infrequent event. The
  519. // go-cache-lru API does not offer a mechanism to inspect and adjust the TTL
  520. // of all existing items.
  521. if c.TTL != config.RegistrationCacheTTL {
  522. c.cache.Flush()
  523. c.TTL = config.RegistrationCacheTTL
  524. }
  525. // Drop the cached registration if another entry is found under the same key.
  526. // Since the dial pops its entry out of the cache, finding an existing entry
  527. // implies that another tunnel establishment candidate with the same key has
  528. // successfully registered and connected (or canceled) in the meantime.
  529. // Prefer that newer cached registration.
  530. //
  531. // For Psiphon, one scenario resulting in this condition is that the first
  532. // dial to a given server, using a cached registration, is delayed long
  533. // enough that a new candidate for the same server has been started and
  534. // outpaced the first candidate.
  535. _, found := c.cache.Get(config.RegistrationCacheKey)
  536. if found {
  537. config.Logger.WithTraceFields(
  538. common.LogFields{
  539. "diagnosticID": config.DiagnosticID,
  540. "reason": "existing entry found",
  541. }).Info(
  542. "drop cached registration")
  543. return
  544. }
  545. reason := "connected"
  546. if isCanceled {
  547. reason = "canceled"
  548. }
  549. config.Logger.WithTraceFields(
  550. common.LogFields{
  551. "diagnosticID": config.DiagnosticID,
  552. "cacheSize": c.cache.ItemCount(),
  553. "reason": reason,
  554. }).Info(
  555. "put cached registration")
  556. c.cache.Set(
  557. config.RegistrationCacheKey,
  558. registration,
  559. c.TTL)
  560. }
  561. func (c *registrationCache) pop(
  562. config *ConjureConfig) *refraction_networking_client.ConjureReg {
  563. c.mutex.Lock()
  564. defer c.mutex.Unlock()
  565. // See TTL/Flush comment in put.
  566. if c.TTL != config.RegistrationCacheTTL {
  567. c.cache.Flush()
  568. c.TTL = config.RegistrationCacheTTL
  569. }
  570. entry, found := c.cache.Get(config.RegistrationCacheKey)
  571. config.Logger.WithTraceFields(
  572. common.LogFields{
  573. "diagnosticID": config.DiagnosticID,
  574. "cacheSize": c.cache.ItemCount(),
  575. "found": found,
  576. }).Info(
  577. "pop cached registration")
  578. if found {
  579. c.cache.Delete(config.RegistrationCacheKey)
  580. return entry.(*refraction_networking_client.ConjureReg)
  581. }
  582. return nil
  583. }
  584. func (c *registrationCache) delete(config *ConjureConfig) {
  585. c.mutex.Lock()
  586. defer c.mutex.Unlock()
  587. _, found := c.cache.Get(config.RegistrationCacheKey)
  588. config.Logger.WithTraceFields(
  589. common.LogFields{
  590. "diagnosticID": config.DiagnosticID,
  591. "found": found,
  592. }).Info(
  593. "delete cached registration")
  594. if found {
  595. c.cache.Delete(config.RegistrationCacheKey)
  596. }
  597. }
  598. var conjureRegistrationCache = newRegistrationCache()
  599. type cachedRegistrar struct {
  600. registration *refraction_networking_client.ConjureReg
  601. }
  602. func (r *cachedRegistrar) Register(
  603. _ *refraction_networking_client.ConjureSession,
  604. _ context.Context) (*refraction_networking_client.ConjureReg, error) {
  605. return r.registration, nil
  606. }
  607. func (r *cachedRegistrar) PrepareRegKeys(_ [32]byte, _ []byte) error {
  608. return nil
  609. }
  610. type recordRegistrar struct {
  611. registrar refraction_networking_client.Registrar
  612. registration *refraction_networking_client.ConjureReg
  613. }
  614. func (r *recordRegistrar) Register(
  615. session *refraction_networking_client.ConjureSession,
  616. ctx context.Context) (*refraction_networking_client.ConjureReg, error) {
  617. registration, err := r.registrar.Register(session, ctx)
  618. if err != nil {
  619. return nil, errors.Trace(err)
  620. }
  621. r.registration = registration
  622. return registration, nil
  623. }
  624. func (r *recordRegistrar) PrepareRegKeys(_ [32]byte, _ []byte) error {
  625. return nil
  626. }
  627. // writeMergeConn merges Conjure transport and subsequent OSSH writes in order
  628. // to avoid fixed-sized first or second TCP packets always containing exactly
  629. // the 32-byte or 64-byte HMAC tag.
  630. //
  631. // The Conjure Prefix transport will first write a prefix. writeMergeConn
  632. // assumes the FlushAfterPrefix policy is used, so the first write call for
  633. // that transport will be exactly the arbitrary sized prefix. The second
  634. // write call will be the HMAC tag. Pass the first write through to the
  635. // underlying conn, and then expect the HMAC tag on the second write, and
  636. // handle as follows.
  637. //
  638. // The Conjure Min transport first calls write with an HMAC tag. Buffer this
  639. // value and await the following initial OSSH write, and prepend the buffered
  640. // HMAC tag to the random OSSH data. The first write by OSSH will be a
  641. // variable length multi-packet-sized sequence of random bytes.
  642. type writeMergeConn struct {
  643. net.Conn
  644. tagSize int
  645. mutex sync.Mutex
  646. state int
  647. buffer []byte
  648. err error
  649. }
  650. const (
  651. stateWriteMergeAwaitingPrefix = iota
  652. stateWriteMergeAwaitingTag
  653. stateWriteMergeBufferedTag
  654. stateWriteMergeFinishedMerging
  655. stateWriteMergeFailed
  656. )
  657. func newWriteMergeConn(conn net.Conn, hasPrefix bool, tagSize int) *writeMergeConn {
  658. c := &writeMergeConn{
  659. Conn: conn,
  660. tagSize: tagSize,
  661. }
  662. if hasPrefix {
  663. c.state = stateWriteMergeAwaitingPrefix
  664. } else {
  665. c.state = stateWriteMergeAwaitingTag
  666. }
  667. return c
  668. }
  669. func (conn *writeMergeConn) Write(p []byte) (int, error) {
  670. conn.mutex.Lock()
  671. defer conn.mutex.Unlock()
  672. switch conn.state {
  673. case stateWriteMergeAwaitingPrefix:
  674. conn.state = stateWriteMergeAwaitingTag
  675. return conn.Conn.Write(p)
  676. case stateWriteMergeAwaitingTag:
  677. if len(p) != conn.tagSize {
  678. conn.state = stateWriteMergeFailed
  679. conn.err = errors.Tracef("unexpected tag write size: %d", len(p))
  680. return 0, conn.err
  681. }
  682. conn.buffer = make([]byte, conn.tagSize)
  683. copy(conn.buffer, p)
  684. conn.state = stateWriteMergeBufferedTag
  685. return conn.tagSize, nil
  686. case stateWriteMergeBufferedTag:
  687. conn.buffer = append(conn.buffer, p...)
  688. n, err := conn.Conn.Write(conn.buffer)
  689. if err != nil {
  690. conn.state = stateWriteMergeFailed
  691. conn.err = errors.Trace(err)
  692. } else {
  693. conn.state = stateWriteMergeFinishedMerging
  694. conn.buffer = nil
  695. }
  696. n -= conn.tagSize
  697. if n < 0 {
  698. n = 0
  699. }
  700. // Do not wrap Conn.Write errors
  701. return n, err
  702. case stateWriteMergeFinishedMerging:
  703. return conn.Conn.Write(p)
  704. case stateWriteMergeFailed:
  705. // Return the original error that caused the failure
  706. return 0, conn.err
  707. default:
  708. return 0, errors.TraceNew("unexpected state")
  709. }
  710. }
  711. func newWriteMergeDialer(dialer Dialer, hasPrefix bool, tagSize int) Dialer {
  712. return func(ctx context.Context, network, laddr, raddr string) (net.Conn, error) {
  713. conn, err := dialer(ctx, network, laddr, raddr)
  714. if err != nil {
  715. return nil, errors.Trace(err)
  716. }
  717. return newWriteMergeConn(conn, hasPrefix, tagSize), nil
  718. }
  719. }
  720. // dialManager tracks all dials performed by and dialed conns used by a
  721. // refraction_networking_client conn. dialManager.close interrupts/closes
  722. // all pending dials and established conns immediately. This ensures that
  723. // blocking calls within refraction_networking_client, such as tls.Handhake,
  724. // are interrupted:
  725. // E.g., https://github.com/refraction-networking/gotapdance/blob/4d84655dad2e242b0af0459c31f687b12085dcca/tapdance/conn_raw.go#L307
  726. // (...preceeding SetDeadline is insufficient for immediate cancellation.)
  727. //
  728. // This remains an issue with the Conjure Decoy Registrar:
  729. // https://github.com/refraction-networking/conjure/blob/d9d58260cc7017ab0c64b120579b123a5b2d1c96/pkg/registrars/decoy-registrar/decoy-registrar.go#L208
  730. type dialManager struct {
  731. ctxMutex sync.Mutex
  732. useRunCtx bool
  733. initialDialCtx context.Context
  734. runCtx context.Context
  735. stopRunning context.CancelFunc
  736. conns *common.Conns[net.Conn]
  737. }
  738. func newDialManager() *dialManager {
  739. runCtx, stopRunning := context.WithCancel(context.Background())
  740. return &dialManager{
  741. runCtx: runCtx,
  742. stopRunning: stopRunning,
  743. conns: common.NewConns[net.Conn](),
  744. }
  745. }
  746. func (manager *dialManager) makeManagedDialer(dialer Dialer) Dialer {
  747. return func(ctx context.Context, network, laddr, raddr string) (net.Conn, error) {
  748. return manager.dialWithDialer(dialer, ctx, network, laddr, raddr)
  749. }
  750. }
  751. func (manager *dialManager) dialWithDialer(
  752. dialer Dialer,
  753. ctx context.Context,
  754. network string,
  755. laddr string,
  756. raddr string) (net.Conn, error) {
  757. // The context for this dial is either:
  758. // - ctx, during the initial refraction_networking_client.DialContext, when
  759. // this is Psiphon tunnel establishment.
  760. // - manager.runCtx after the initial refraction_networking_client.Dial
  761. // completes, in which case this is a TapDance protocol reconnection that
  762. // occurs periodically for already established tunnels.
  763. manager.ctxMutex.Lock()
  764. if manager.useRunCtx {
  765. // Preserve the random timeout configured by the TapDance client:
  766. // https://github.com/refraction-networking/gotapdance/blob/4d84655dad2e242b0af0459c31f687b12085dcca/tapdance/conn_raw.go#L263
  767. deadline, ok := ctx.Deadline()
  768. if !ok {
  769. return nil, errors.Tracef("unexpected nil deadline")
  770. }
  771. var cancelFunc context.CancelFunc
  772. ctx, cancelFunc = context.WithDeadline(manager.runCtx, deadline)
  773. defer cancelFunc()
  774. }
  775. manager.ctxMutex.Unlock()
  776. conn, err := dialer(ctx, network, laddr, raddr)
  777. if err != nil {
  778. return nil, errors.Trace(err)
  779. }
  780. // Fail immediately if CloseWrite isn't available in the underlying dialed
  781. // conn. The equivalent check in managedConn.CloseWrite isn't fatal and
  782. // TapDance will run in a degraded state.
  783. // Limitation: if the underlying conn _also_ passes through CloseWrite, this
  784. // check may be insufficient.
  785. if _, ok := conn.(common.CloseWriter); network == "tcp" && !ok {
  786. return nil, errors.TraceNew("underlying conn is not a CloseWriter")
  787. }
  788. conn = &managedConn{
  789. Conn: conn,
  790. manager: manager,
  791. }
  792. if !manager.conns.Add(conn) {
  793. conn.Close()
  794. return nil, errors.TraceNew("already closed")
  795. }
  796. return conn, nil
  797. }
  798. func (manager *dialManager) startUsingRunCtx() {
  799. manager.ctxMutex.Lock()
  800. manager.initialDialCtx = nil
  801. manager.useRunCtx = true
  802. manager.ctxMutex.Unlock()
  803. }
  804. func (manager *dialManager) close() {
  805. manager.conns.CloseAll()
  806. manager.stopRunning()
  807. }
  808. type managedConn struct {
  809. net.Conn
  810. manager *dialManager
  811. }
  812. type fileConn interface {
  813. File() (*os.File, error)
  814. }
  815. // File exposes the net.UDPConn.File() functionality required by the Conjure
  816. // DTLS transport.
  817. func (conn *managedConn) File() (*os.File, error) {
  818. if f, ok := conn.Conn.(fileConn); ok {
  819. return f.File()
  820. }
  821. return nil, errors.TraceNew("underlying conn is not a fileConn")
  822. }
  823. // CloseWrite exposes the net.TCPConn.CloseWrite() functionality
  824. // required by TapDance.
  825. func (conn *managedConn) CloseWrite() error {
  826. if closeWriter, ok := conn.Conn.(common.CloseWriter); ok {
  827. return closeWriter.CloseWrite()
  828. }
  829. return errors.TraceNew("underlying conn is not a CloseWriter")
  830. }
  831. func (conn *managedConn) Close() error {
  832. // Remove must be invoked asynchronously, as this Close may be called by
  833. // conns.CloseAll, leading to a reentrant lock situation.
  834. go conn.manager.conns.Remove(conn)
  835. return conn.Conn.Close()
  836. }
  837. type refractionConn struct {
  838. net.Conn
  839. manager *dialManager
  840. isClosed int32
  841. isConjure bool
  842. conjureMetricCached bool
  843. conjureMetricDelay time.Duration
  844. conjureMetricTransport string
  845. conjureMetricPrefix string
  846. conjureMetricSTUNServerAddress string
  847. conjureMetricDTLSEmptyInitialPacket bool
  848. }
  849. func (conn *refractionConn) Write(p []byte) (int, error) {
  850. n, err := conn.Conn.Write(p)
  851. // For the DTLS transport, underlying SCTP conn writes may fail
  852. // with "stream closed" -- which indicates a permanent failure of the
  853. // transport -- without closing the conn. Explicitly close the conn on
  854. // this error, which will trigger Psiphon to reconnect faster via
  855. // IsClosed checks on port forward failures.
  856. //
  857. // The close is invoked asynchronously to avoid possible deadlocks due to
  858. // a hypothetical panic in the Close call: for a port forward, the unwind
  859. // will invoke a deferred ssh.channel.Close which reenters Write;
  860. // meanwhile, the underlying ssh.channel.writePacket acquires a
  861. // ssh.channel.writeMu lock but does not defer the unlock.
  862. if std_errors.Is(err, sctp.ErrStreamClosed) {
  863. go func() {
  864. _ = conn.Close()
  865. }()
  866. }
  867. return n, err
  868. }
  869. func (conn *refractionConn) Close() error {
  870. conn.manager.close()
  871. err := conn.Conn.Close()
  872. atomic.StoreInt32(&conn.isClosed, 1)
  873. return err
  874. }
  875. func (conn *refractionConn) IsClosed() bool {
  876. return atomic.LoadInt32(&conn.isClosed) == 1
  877. }
  878. // GetMetrics implements the common.MetricsSource interface.
  879. func (conn *refractionConn) GetMetrics() common.LogFields {
  880. logFields := make(common.LogFields)
  881. if conn.isConjure {
  882. cached := "0"
  883. if conn.conjureMetricCached {
  884. cached = "1"
  885. }
  886. logFields["conjure_cached"] = cached
  887. if conn.conjureMetricDelay != -1 {
  888. logFields["conjure_delay"] = fmt.Sprintf("%d", conn.conjureMetricDelay/time.Millisecond)
  889. }
  890. logFields["conjure_transport"] = conn.conjureMetricTransport
  891. if conn.conjureMetricPrefix != "" {
  892. logFields["conjure_prefix"] = conn.conjureMetricPrefix
  893. }
  894. if conn.conjureMetricSTUNServerAddress != "" {
  895. logFields["conjure_stun"] = conn.conjureMetricSTUNServerAddress
  896. }
  897. if conn.conjureMetricTransport == protocol.CONJURE_TRANSPORT_DTLS_OSSH {
  898. emptyPacket := "0"
  899. if conn.conjureMetricDTLSEmptyInitialPacket {
  900. emptyPacket = "1"
  901. }
  902. logFields["conjure_empty_packet"] = emptyPacket
  903. }
  904. host, port, err := net.SplitHostPort(conn.RemoteAddr().String())
  905. if err == nil {
  906. network := "IPv4"
  907. if IP := net.ParseIP(host); IP != nil && IP.To4() == nil {
  908. network = "IPv6"
  909. }
  910. logFields["conjure_network"] = network
  911. logFields["conjure_port_number"] = port
  912. }
  913. }
  914. return logFields
  915. }
  916. var initRefractionNetworkingOnce sync.Once
  917. func initRefractionNetworking(emitLogs bool, dataDirectory string) error {
  918. var initErr error
  919. initRefractionNetworkingOnce.Do(func() {
  920. if !emitLogs {
  921. refraction_networking_client.Logger().Out = ioutil.Discard
  922. } else {
  923. refraction_networking_client.Logger().Out = os.Stdout
  924. }
  925. assetsDir := filepath.Join(dataDirectory, "refraction-networking")
  926. err := os.MkdirAll(assetsDir, 0700)
  927. if err != nil {
  928. initErr = errors.Trace(err)
  929. return
  930. }
  931. clientConfFileName := filepath.Join(assetsDir, "ClientConf")
  932. _, err = os.Stat(clientConfFileName)
  933. if err != nil && os.IsNotExist(err) {
  934. err = ioutil.WriteFile(clientConfFileName, getEmbeddedClientConf(), 0644)
  935. }
  936. if err != nil {
  937. initErr = errors.Trace(err)
  938. return
  939. }
  940. refraction_networking_assets.AssetsSetDir(assetsDir)
  941. // TapDance now uses a distinct Assets/ClientConf,
  942. // refraction_networking_client.Assets. Do not configure the TapDance
  943. // ClientConf to use the same configuration as Conjure, as the
  944. // Conjure ClientConf may contain decoys that are appropriate for
  945. // registration load but not full TapDance tunnel load.
  946. })
  947. return initErr
  948. }