meekConn.go 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. golangtls "crypto/tls"
  25. "encoding/base64"
  26. "encoding/json"
  27. "errors"
  28. "fmt"
  29. "io"
  30. "net"
  31. "net/http"
  32. "net/url"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "time"
  37. "github.com/Psiphon-Inc/goarista/monotime"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/nacl/box"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tls"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
  43. "golang.org/x/net/http2"
  44. )
  45. // MeekConn is based on meek-client.go from Tor and Psiphon:
  46. //
  47. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  48. // CC0 1.0 Universal
  49. //
  50. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  51. const (
  52. MEEK_PROTOCOL_VERSION = 3
  53. MEEK_COOKIE_MAX_PADDING = 32
  54. MAX_SEND_PAYLOAD_LENGTH = 65536
  55. FULL_RECEIVE_BUFFER_LENGTH = 4194304
  56. READ_PAYLOAD_CHUNK_LENGTH = 65536
  57. LIMITED_FULL_RECEIVE_BUFFER_LENGTH = 131072
  58. LIMITED_READ_PAYLOAD_CHUNK_LENGTH = 4096
  59. MIN_POLL_INTERVAL = 100 * time.Millisecond
  60. MIN_POLL_INTERVAL_JITTER = 0.3
  61. MAX_POLL_INTERVAL = 5 * time.Second
  62. MAX_POLL_INTERVAL_JITTER = 0.1
  63. POLL_INTERVAL_MULTIPLIER = 1.5
  64. POLL_INTERVAL_JITTER = 0.1
  65. MEEK_ROUND_TRIP_RETRY_DEADLINE = 5 * time.Second
  66. MEEK_ROUND_TRIP_RETRY_MIN_DELAY = 50 * time.Millisecond
  67. MEEK_ROUND_TRIP_RETRY_MAX_DELAY = 1000 * time.Millisecond
  68. MEEK_ROUND_TRIP_RETRY_MULTIPLIER = 2
  69. MEEK_ROUND_TRIP_TIMEOUT = 20 * time.Second
  70. )
  71. // MeekConfig specifies the behavior of a MeekConn
  72. type MeekConfig struct {
  73. // LimitBufferSizes indicates whether to use smaller buffers to
  74. // conserve memory.
  75. LimitBufferSizes bool
  76. // DialAddress is the actual network address to dial to establish a
  77. // connection to the meek server. This may be either a fronted or
  78. // direct address. The address must be in the form "host:port",
  79. // where host may be a domain name or IP address.
  80. DialAddress string
  81. // UseHTTPS indicates whether to use HTTPS (true) or HTTP (false).
  82. UseHTTPS bool
  83. // TLSProfile specifies the TLS profile to use for all underlying
  84. // TLS connections created by this meek connection. Valid values
  85. // are the possible values for CustomTLSConfig.TLSProfile.
  86. // TLSProfile will be used only when DialConfig.UseIndistinguishableTLS
  87. // is set in the DialConfig passed in to DialMeek.
  88. TLSProfile string
  89. // UseObfuscatedSessionTickets indicates whether to use obfuscated
  90. // session tickets. Assumes UseHTTPS is true.
  91. UseObfuscatedSessionTickets bool
  92. // SNIServerName is the value to place in the TLS SNI server_name
  93. // field when HTTPS is used.
  94. SNIServerName string
  95. // HostHeader is the value to place in the HTTP request Host header.
  96. HostHeader string
  97. // TransformedHostName records whether a hostname transformation is
  98. // in effect. This value is used for stats reporting.
  99. TransformedHostName bool
  100. // ClientTunnelProtocol is the protocol the client is using. It's
  101. // included in the meek cookie for optional use by the server, in
  102. // cases where the server cannot unambiguously determine the
  103. // tunnel protocol.
  104. ClientTunnelProtocol string
  105. // The following values are used to create the obfuscated meek cookie.
  106. PsiphonServerAddress string
  107. SessionID string
  108. MeekCookieEncryptionPublicKey string
  109. MeekObfuscatedKey string
  110. }
  111. // MeekConn is a network connection that tunnels TCP over HTTP and supports "fronting". Meek sends
  112. // client->server flow in HTTP request bodies and receives server->client flow in HTTP response bodies.
  113. // Polling is used to achieve full duplex TCP.
  114. //
  115. // Fronting is an obfuscation technique in which the connection
  116. // to a web server, typically a CDN, is indistinguishable from any other HTTPS connection to the generic
  117. // "fronting domain" -- the HTTP Host header is used to route the requests to the actual destination.
  118. // See https://trac.torproject.org/projects/tor/wiki/doc/meek for more details.
  119. //
  120. // MeekConn also operates in unfronted mode, in which plain HTTP connections are made without routing
  121. // through a CDN.
  122. type MeekConn struct {
  123. url *url.URL
  124. additionalHeaders http.Header
  125. cookie *http.Cookie
  126. cachedTLSDialer *cachedTLSDialer
  127. transport transporter
  128. mutex sync.Mutex
  129. isClosed bool
  130. runCtx context.Context
  131. stopRunning context.CancelFunc
  132. relayWaitGroup *sync.WaitGroup
  133. fullReceiveBufferLength int
  134. readPayloadChunkLength int
  135. emptyReceiveBuffer chan *bytes.Buffer
  136. partialReceiveBuffer chan *bytes.Buffer
  137. fullReceiveBuffer chan *bytes.Buffer
  138. emptySendBuffer chan *bytes.Buffer
  139. partialSendBuffer chan *bytes.Buffer
  140. fullSendBuffer chan *bytes.Buffer
  141. }
  142. // transporter is implemented by both http.Transport and upstreamproxy.ProxyAuthTransport.
  143. type transporter interface {
  144. CloseIdleConnections()
  145. RoundTrip(req *http.Request) (resp *http.Response, err error)
  146. }
  147. // DialMeek returns an initialized meek connection. A meek connection is
  148. // an HTTP session which does not depend on an underlying socket connection (although
  149. // persistent HTTP connections are used for performance). This function does not
  150. // wait for the connection to be "established" before returning. A goroutine
  151. // is spawned which will eventually start HTTP polling.
  152. // When frontingAddress is not "", fronting is used. This option assumes caller has
  153. // already checked server entry capabilities.
  154. func DialMeek(
  155. ctx context.Context,
  156. meekConfig *MeekConfig,
  157. dialConfig *DialConfig) (meek *MeekConn, err error) {
  158. runCtx, stopRunning := context.WithCancel(context.Background())
  159. cleanupStopRunning := true
  160. cleanupCachedTLSDialer := true
  161. var cachedTLSDialer *cachedTLSDialer
  162. // Cleanup in error cases
  163. defer func() {
  164. if cleanupStopRunning {
  165. stopRunning()
  166. }
  167. if cleanupCachedTLSDialer && cachedTLSDialer != nil {
  168. cachedTLSDialer.close()
  169. }
  170. }()
  171. // Configure transport: HTTP or HTTPS
  172. var scheme string
  173. var transport transporter
  174. var additionalHeaders http.Header
  175. var proxyUrl func(*http.Request) (*url.URL, error)
  176. if meekConfig.UseHTTPS {
  177. // Custom TLS dialer:
  178. //
  179. // 1. ignores the HTTP request address and uses the fronting domain
  180. // 2. optionally disables SNI -- SNI breaks fronting when used with certain CDNs.
  181. // 3. skips verifying the server cert.
  182. //
  183. // Reasoning for #3:
  184. //
  185. // With a TLS MiM attack in place, and server certs verified, we'll fail to connect because the client
  186. // will refuse to connect. That's not a successful outcome.
  187. //
  188. // With a MiM attack in place, and server certs not verified, we'll fail to connect if the MiM is actively
  189. // targeting Psiphon and classifying the HTTP traffic by Host header or payload signature.
  190. //
  191. // However, in the case of a passive MiM that's just recording traffic or an active MiM that's targeting
  192. // something other than Psiphon, the client will connect. This is a successful outcome.
  193. //
  194. // What is exposed to the MiM? The Host header does not contain a Psiphon server IP address, just an
  195. // unrelated, randomly generated domain name which cannot be used to block direct connections. The
  196. // Psiphon server IP is sent over meek, but it's in the encrypted cookie.
  197. //
  198. // The payload (user traffic) gets its confidentiality and integrity from the underlying SSH protocol.
  199. // So, nothing is leaked to the MiM apart from signatures which could be used to classify the traffic
  200. // as Psiphon to possibly block it; but note that not revealing that the client is Psiphon is outside
  201. // our threat model; we merely seek to evade mass blocking by taking steps that require progressively
  202. // more effort to block.
  203. //
  204. // There is a subtle attack remaining: an adversary that can MiM some CDNs but not others (and so can
  205. // classify Psiphon traffic on some CDNs but not others) may throttle non-MiM CDNs so that our server
  206. // selection always chooses tunnels to the MiM CDN (without any server cert verification, we won't
  207. // exclusively connect to non-MiM CDNs); then the adversary kills the underlying TCP connection after
  208. // some short period. This is mitigated by the "impaired" protocol classification mechanism.
  209. scheme = "https"
  210. tlsConfig := &CustomTLSConfig{
  211. DialAddr: meekConfig.DialAddress,
  212. Dial: NewTCPDialer(dialConfig),
  213. SNIServerName: meekConfig.SNIServerName,
  214. SkipVerify: true,
  215. UseIndistinguishableTLS: dialConfig.UseIndistinguishableTLS,
  216. TLSProfile: meekConfig.TLSProfile,
  217. TrustedCACertificatesFilename: dialConfig.TrustedCACertificatesFilename,
  218. }
  219. if meekConfig.UseObfuscatedSessionTickets {
  220. tlsConfig.ObfuscatedSessionTicketKey = meekConfig.MeekObfuscatedKey
  221. }
  222. tlsDialer := NewCustomTLSDialer(tlsConfig)
  223. // Pre-dial one TLS connection in order to inspect the negotiated
  224. // application protocol. Then we create an HTTP/2 or HTTP/1.1 transport
  225. // depending on which protocol was negotiated. The TLS dialer
  226. // is assumed to negotiate only "h2" or "http/1.1"; or not negotiate
  227. // an application protocol.
  228. //
  229. // We cannot rely on net/http's HTTP/2 support since it's only
  230. // activated when http.Transport.DialTLS returns a golang crypto/tls.Conn;
  231. // e.g., https://github.com/golang/go/blob/c8aec4095e089ff6ac50d18e97c3f46561f14f48/src/net/http/transport.go#L1040
  232. //
  233. // The pre-dialed connection is stored in a cachedTLSDialer, which will
  234. // return the cached pre-dialed connection to its first Dial caller, and
  235. // use the tlsDialer for all other Dials.
  236. //
  237. // cachedTLSDialer.close() must be called on all exits paths from this
  238. // function and in meek.Close() to ensure the cached conn is closed in
  239. // any case where no Dial call is made.
  240. //
  241. // The pre-dial must be interruptible so that DialMeek doesn't block and
  242. // hang/delay a shutdown or end of establishment. So the pre-dial uses
  243. // the Controller's PendingConns, not the MeekConn PendingConns. For this
  244. // purpose, a special preDialer is configured.
  245. //
  246. // Only one pre-dial attempt is made; there are no retries. This differs
  247. // from roundTrip, which retries and may redial for each retry. Retries
  248. // at the pre-dial phase are less useful since there's no active session
  249. // to preserve, and establishment will simply try another server. Note
  250. // that the underlying TCPDial may still try multiple IP addreses when
  251. // the destination is a domain and ir resolves to multiple IP adresses.
  252. // The pre-dial is made within the parent dial context, so that DialMeek
  253. // may be interrupted. Subsequent dials are made within the meek round trip
  254. // request context. Since http.DialTLS doesn't take a context argument
  255. // (yet; as of Go 1.9 this issue is still open: https://github.com/golang/go/issues/21526),
  256. // cachedTLSDialer is used as a conduit to send the request context.
  257. // meekConn.roundTrip sets its request context into cachedTLSDialer, and
  258. // cachedTLSDialer.dial uses that context.
  259. // As DialAddr is set in the CustomTLSConfig, no address is required here.
  260. preConn, err := tlsDialer(ctx, "tcp", "")
  261. if err != nil {
  262. return nil, common.ContextError(err)
  263. }
  264. isHTTP2 := false
  265. if tlsConn, ok := preConn.(*tls.Conn); ok {
  266. state := tlsConn.ConnectionState()
  267. if state.NegotiatedProtocolIsMutual &&
  268. state.NegotiatedProtocol == "h2" {
  269. isHTTP2 = true
  270. }
  271. }
  272. cachedTLSDialer = newCachedTLSDialer(preConn, tlsDialer)
  273. if isHTTP2 {
  274. NoticeInfo("negotiated HTTP/2 for %s", meekConfig.DialAddress)
  275. transport = &http2.Transport{
  276. DialTLS: func(network, addr string, _ *golangtls.Config) (net.Conn, error) {
  277. return cachedTLSDialer.dial(network, addr)
  278. },
  279. }
  280. } else {
  281. transport = &http.Transport{
  282. DialTLS: func(network, addr string) (net.Conn, error) {
  283. return cachedTLSDialer.dial(network, addr)
  284. },
  285. }
  286. }
  287. } else {
  288. scheme = "http"
  289. // The dialer ignores address that http.Transport will pass in (derived
  290. // from the HTTP request URL) and always dials meekConfig.DialAddress.
  291. dialer := func(ctx context.Context, network, _ string) (net.Conn, error) {
  292. return NewTCPDialer(dialConfig)(ctx, network, meekConfig.DialAddress)
  293. }
  294. // For HTTP, and when the meekConfig.DialAddress matches the
  295. // meekConfig.HostHeader, we let http.Transport handle proxying.
  296. // http.Transport will put the the HTTP server address in the HTTP
  297. // request line. In this one case, we can use an HTTP proxy that does
  298. // not offer CONNECT support.
  299. if strings.HasPrefix(dialConfig.UpstreamProxyUrl, "http://") &&
  300. (meekConfig.DialAddress == meekConfig.HostHeader ||
  301. meekConfig.DialAddress == meekConfig.HostHeader+":80") {
  302. url, err := url.Parse(dialConfig.UpstreamProxyUrl)
  303. if err != nil {
  304. return nil, common.ContextError(err)
  305. }
  306. proxyUrl = http.ProxyURL(url)
  307. // Here, the dialer must use the address that http.Transport
  308. // passes in (which will be proxy address).
  309. copyDialConfig := new(DialConfig)
  310. *copyDialConfig = *dialConfig
  311. copyDialConfig.UpstreamProxyUrl = ""
  312. dialer = NewTCPDialer(copyDialConfig)
  313. }
  314. httpTransport := &http.Transport{
  315. Proxy: proxyUrl,
  316. DialContext: dialer,
  317. }
  318. if proxyUrl != nil {
  319. // Wrap transport with a transport that can perform HTTP proxy auth negotiation
  320. transport, err = upstreamproxy.NewProxyAuthTransport(httpTransport, dialConfig.CustomHeaders)
  321. if err != nil {
  322. return nil, common.ContextError(err)
  323. }
  324. } else {
  325. transport = httpTransport
  326. }
  327. }
  328. url := &url.URL{
  329. Scheme: scheme,
  330. Host: meekConfig.HostHeader,
  331. Path: "/",
  332. }
  333. if meekConfig.UseHTTPS {
  334. host, _, err := net.SplitHostPort(meekConfig.DialAddress)
  335. if err != nil {
  336. return nil, common.ContextError(err)
  337. }
  338. additionalHeaders = map[string][]string{
  339. "X-Psiphon-Fronting-Address": {host},
  340. }
  341. } else {
  342. if proxyUrl == nil {
  343. additionalHeaders = dialConfig.CustomHeaders
  344. }
  345. }
  346. cookie, err := makeMeekCookie(meekConfig)
  347. if err != nil {
  348. return nil, common.ContextError(err)
  349. }
  350. // The main loop of a MeekConn is run in the relay() goroutine.
  351. // A MeekConn implements net.Conn concurrency semantics:
  352. // "Multiple goroutines may invoke methods on a Conn simultaneously."
  353. //
  354. // Read() calls and relay() are synchronized by exchanging control of a single
  355. // receiveBuffer (bytes.Buffer). This single buffer may be:
  356. // - in the emptyReceiveBuffer channel when it is available and empty;
  357. // - in the partialReadBuffer channel when it is available and contains data;
  358. // - in the fullReadBuffer channel when it is available and full of data;
  359. // - "checked out" by relay or Read when they are are writing to or reading from the
  360. // buffer, respectively.
  361. // relay() will obtain the buffer from either the empty or partial channel but block when
  362. // the buffer is full. Read will obtain the buffer from the partial or full channel when
  363. // there is data to read but block when the buffer is empty.
  364. // Write() calls and relay() are synchronized in a similar way, using a single
  365. // sendBuffer.
  366. meek = &MeekConn{
  367. url: url,
  368. additionalHeaders: additionalHeaders,
  369. cookie: cookie,
  370. cachedTLSDialer: cachedTLSDialer,
  371. transport: transport,
  372. isClosed: false,
  373. runCtx: runCtx,
  374. stopRunning: stopRunning,
  375. relayWaitGroup: new(sync.WaitGroup),
  376. fullReceiveBufferLength: FULL_RECEIVE_BUFFER_LENGTH,
  377. readPayloadChunkLength: READ_PAYLOAD_CHUNK_LENGTH,
  378. emptyReceiveBuffer: make(chan *bytes.Buffer, 1),
  379. partialReceiveBuffer: make(chan *bytes.Buffer, 1),
  380. fullReceiveBuffer: make(chan *bytes.Buffer, 1),
  381. emptySendBuffer: make(chan *bytes.Buffer, 1),
  382. partialSendBuffer: make(chan *bytes.Buffer, 1),
  383. fullSendBuffer: make(chan *bytes.Buffer, 1),
  384. }
  385. // stopRunning and cachedTLSDialer will now be closed in meek.Close()
  386. cleanupStopRunning = false
  387. cleanupCachedTLSDialer = false
  388. meek.emptyReceiveBuffer <- new(bytes.Buffer)
  389. meek.emptySendBuffer <- new(bytes.Buffer)
  390. meek.relayWaitGroup.Add(1)
  391. if meekConfig.LimitBufferSizes {
  392. meek.fullReceiveBufferLength = LIMITED_FULL_RECEIVE_BUFFER_LENGTH
  393. meek.readPayloadChunkLength = LIMITED_READ_PAYLOAD_CHUNK_LENGTH
  394. }
  395. go meek.relay()
  396. return meek, nil
  397. }
  398. type cachedTLSDialer struct {
  399. usedCachedConn int32
  400. cachedConn net.Conn
  401. requestContext atomic.Value
  402. dialer Dialer
  403. }
  404. func newCachedTLSDialer(cachedConn net.Conn, dialer Dialer) *cachedTLSDialer {
  405. return &cachedTLSDialer{
  406. cachedConn: cachedConn,
  407. dialer: dialer,
  408. }
  409. }
  410. func (c *cachedTLSDialer) setRequestContext(requestContext context.Context) {
  411. c.requestContext.Store(requestContext)
  412. }
  413. func (c *cachedTLSDialer) dial(network, addr string) (net.Conn, error) {
  414. if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
  415. conn := c.cachedConn
  416. c.cachedConn = nil
  417. return conn, nil
  418. }
  419. ctx := c.requestContext.Load().(context.Context)
  420. if ctx == nil {
  421. ctx = context.Background()
  422. }
  423. return c.dialer(ctx, network, addr)
  424. }
  425. func (c *cachedTLSDialer) close() {
  426. if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
  427. c.cachedConn.Close()
  428. c.cachedConn = nil
  429. }
  430. }
  431. // Close terminates the meek connection. Close waits for the relay processing goroutine
  432. // to stop and releases HTTP transport resources.
  433. // A mutex is required to support net.Conn concurrency semantics.
  434. func (meek *MeekConn) Close() (err error) {
  435. meek.mutex.Lock()
  436. isClosed := meek.isClosed
  437. meek.isClosed = true
  438. meek.mutex.Unlock()
  439. if !isClosed {
  440. meek.stopRunning()
  441. if meek.cachedTLSDialer != nil {
  442. meek.cachedTLSDialer.close()
  443. }
  444. meek.relayWaitGroup.Wait()
  445. meek.transport.CloseIdleConnections()
  446. }
  447. return nil
  448. }
  449. // IsClosed implements the Closer iterface. The return value
  450. // indicates whether the MeekConn has been closed.
  451. func (meek *MeekConn) IsClosed() bool {
  452. meek.mutex.Lock()
  453. isClosed := meek.isClosed
  454. meek.mutex.Unlock()
  455. return isClosed
  456. }
  457. // Read reads data from the connection.
  458. // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
  459. func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
  460. if meek.IsClosed() {
  461. return 0, common.ContextError(errors.New("meek connection is closed"))
  462. }
  463. // Block until there is received data to consume
  464. var receiveBuffer *bytes.Buffer
  465. select {
  466. case receiveBuffer = <-meek.partialReceiveBuffer:
  467. case receiveBuffer = <-meek.fullReceiveBuffer:
  468. case <-meek.runCtx.Done():
  469. return 0, common.ContextError(errors.New("meek connection has closed"))
  470. }
  471. n, err = receiveBuffer.Read(buffer)
  472. meek.replaceReceiveBuffer(receiveBuffer)
  473. return n, err
  474. }
  475. // Write writes data to the connection.
  476. // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
  477. func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
  478. if meek.IsClosed() {
  479. return 0, common.ContextError(errors.New("meek connection is closed"))
  480. }
  481. // Repeats until all n bytes are written
  482. n = len(buffer)
  483. for len(buffer) > 0 {
  484. // Block until there is capacity in the send buffer
  485. var sendBuffer *bytes.Buffer
  486. select {
  487. case sendBuffer = <-meek.emptySendBuffer:
  488. case sendBuffer = <-meek.partialSendBuffer:
  489. case <-meek.runCtx.Done():
  490. return 0, common.ContextError(errors.New("meek connection has closed"))
  491. }
  492. writeLen := MAX_SEND_PAYLOAD_LENGTH - sendBuffer.Len()
  493. if writeLen > 0 {
  494. if writeLen > len(buffer) {
  495. writeLen = len(buffer)
  496. }
  497. _, err = sendBuffer.Write(buffer[:writeLen])
  498. buffer = buffer[writeLen:]
  499. }
  500. meek.replaceSendBuffer(sendBuffer)
  501. }
  502. return n, err
  503. }
  504. // LocalAddr is a stub implementation of net.Conn.LocalAddr
  505. func (meek *MeekConn) LocalAddr() net.Addr {
  506. return nil
  507. }
  508. // RemoteAddr is a stub implementation of net.Conn.RemoteAddr
  509. func (meek *MeekConn) RemoteAddr() net.Addr {
  510. return nil
  511. }
  512. // SetDeadline is a stub implementation of net.Conn.SetDeadline
  513. func (meek *MeekConn) SetDeadline(t time.Time) error {
  514. return common.ContextError(errors.New("not supported"))
  515. }
  516. // SetReadDeadline is a stub implementation of net.Conn.SetReadDeadline
  517. func (meek *MeekConn) SetReadDeadline(t time.Time) error {
  518. return common.ContextError(errors.New("not supported"))
  519. }
  520. // SetWriteDeadline is a stub implementation of net.Conn.SetWriteDeadline
  521. func (meek *MeekConn) SetWriteDeadline(t time.Time) error {
  522. return common.ContextError(errors.New("not supported"))
  523. }
  524. func (meek *MeekConn) replaceReceiveBuffer(receiveBuffer *bytes.Buffer) {
  525. switch {
  526. case receiveBuffer.Len() == 0:
  527. meek.emptyReceiveBuffer <- receiveBuffer
  528. case receiveBuffer.Len() >= meek.fullReceiveBufferLength:
  529. meek.fullReceiveBuffer <- receiveBuffer
  530. default:
  531. meek.partialReceiveBuffer <- receiveBuffer
  532. }
  533. }
  534. func (meek *MeekConn) replaceSendBuffer(sendBuffer *bytes.Buffer) {
  535. switch {
  536. case sendBuffer.Len() == 0:
  537. meek.emptySendBuffer <- sendBuffer
  538. case sendBuffer.Len() >= MAX_SEND_PAYLOAD_LENGTH:
  539. meek.fullSendBuffer <- sendBuffer
  540. default:
  541. meek.partialSendBuffer <- sendBuffer
  542. }
  543. }
  544. // relay sends and receives tunneled traffic (payload). An HTTP request is
  545. // triggered when data is in the write queue or at a polling interval.
  546. // There's a geometric increase, up to a maximum, in the polling interval when
  547. // no data is exchanged. Only one HTTP request is in flight at a time.
  548. func (meek *MeekConn) relay() {
  549. // Note: meek.Close() calls here in relay() are made asynchronously
  550. // (using goroutines) since Close() will wait on this WaitGroup.
  551. defer meek.relayWaitGroup.Done()
  552. interval := common.JitterDuration(
  553. MIN_POLL_INTERVAL,
  554. MIN_POLL_INTERVAL_JITTER)
  555. timeout := time.NewTimer(interval)
  556. defer timeout.Stop()
  557. for {
  558. timeout.Reset(interval)
  559. // Block until there is payload to send or it is time to poll
  560. var sendBuffer *bytes.Buffer
  561. select {
  562. case sendBuffer = <-meek.partialSendBuffer:
  563. case sendBuffer = <-meek.fullSendBuffer:
  564. case <-timeout.C:
  565. // In the polling case, send an empty payload
  566. case <-meek.runCtx.Done():
  567. // Drop through to second Done() check
  568. }
  569. // Check Done() again, to ensure it takes precedence
  570. select {
  571. case <-meek.runCtx.Done():
  572. return
  573. default:
  574. }
  575. sendPayloadSize := 0
  576. if sendBuffer != nil {
  577. sendPayloadSize = sendBuffer.Len()
  578. }
  579. // roundTrip will replace sendBuffer (by calling replaceSendBuffer). This is
  580. // a compromise to conserve memory. Using a second buffer here, we could copy
  581. // sendBuffer and immediately replace it, unblocking meekConn.Write() and
  582. // allowing more upstream payload to immediately enqueue. Instead, the request
  583. // payload is read directly from sendBuffer, including retries. Only once the
  584. // server has acknowledged the request payload is sendBuffer replaced. This
  585. // still allows meekConn.Write() to unblock before the round trip response is
  586. // read.
  587. receivedPayloadSize, err := meek.roundTrip(sendBuffer)
  588. if err != nil {
  589. select {
  590. case <-meek.runCtx.Done():
  591. // In this case, meek.roundTrip encountered Done(). Exit without logging error.
  592. return
  593. default:
  594. }
  595. NoticeAlert("%s", common.ContextError(err))
  596. go meek.Close()
  597. return
  598. }
  599. // Calculate polling interval. When data is received,
  600. // immediately request more. Otherwise, schedule next
  601. // poll with exponential back off. Jitter and coin
  602. // flips are used to avoid trivial, static traffic
  603. // timing patterns.
  604. if receivedPayloadSize > 0 || sendPayloadSize > 0 {
  605. interval = 0
  606. } else if interval == 0 {
  607. interval = common.JitterDuration(
  608. MIN_POLL_INTERVAL,
  609. MIN_POLL_INTERVAL_JITTER)
  610. } else {
  611. if common.FlipCoin() {
  612. interval = common.JitterDuration(
  613. interval,
  614. POLL_INTERVAL_JITTER)
  615. } else {
  616. interval = common.JitterDuration(
  617. time.Duration(float64(interval)*POLL_INTERVAL_MULTIPLIER),
  618. POLL_INTERVAL_JITTER)
  619. }
  620. if interval >= MAX_POLL_INTERVAL {
  621. interval = common.JitterDuration(
  622. MAX_POLL_INTERVAL,
  623. MAX_POLL_INTERVAL_JITTER)
  624. }
  625. }
  626. }
  627. }
  628. // readCloseSignaller is an io.ReadCloser wrapper for an io.Reader
  629. // that is passed, as the request body, to http.Transport.RoundTrip.
  630. // readCloseSignaller adds the AwaitClosed call, which is used
  631. // to schedule recycling the buffer underlying the reader only after
  632. // RoundTrip has called Close and will no longer use the buffer.
  633. // See: https://golang.org/pkg/net/http/#RoundTripper
  634. type readCloseSignaller struct {
  635. context context.Context
  636. reader io.Reader
  637. closed chan struct{}
  638. }
  639. func NewReadCloseSignaller(
  640. context context.Context,
  641. reader io.Reader) *readCloseSignaller {
  642. return &readCloseSignaller{
  643. context: context,
  644. reader: reader,
  645. closed: make(chan struct{}, 1),
  646. }
  647. }
  648. func (r *readCloseSignaller) Read(p []byte) (int, error) {
  649. return r.reader.Read(p)
  650. }
  651. func (r *readCloseSignaller) Close() error {
  652. select {
  653. case r.closed <- *new(struct{}):
  654. default:
  655. }
  656. return nil
  657. }
  658. func (r *readCloseSignaller) AwaitClosed() bool {
  659. select {
  660. case <-r.context.Done():
  661. case <-r.closed:
  662. return true
  663. }
  664. return false
  665. }
  666. // roundTrip configures and makes the actual HTTP POST request
  667. func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
  668. // Retries are made when the round trip fails. This adds resiliency
  669. // to connection interruption and intermittent failures.
  670. //
  671. // At least one retry is always attempted, and retries continue
  672. // while still within a brief deadline -- 5 seconds, currently the
  673. // deadline for an actively probed SSH connection to timeout. There
  674. // is a brief delay between retries, allowing for intermittent
  675. // failure states to resolve.
  676. //
  677. // Failure may occur at various stages of the HTTP request:
  678. //
  679. // 1. Before the request begins. In this case, the entire request
  680. // may be rerun.
  681. //
  682. // 2. While sending the request payload. In this case, the client
  683. // must resend its request payload. The server will not have
  684. // relayed its partially received request payload.
  685. //
  686. // 3. After sending the request payload but before receiving
  687. // a response. The client cannot distinguish between case 2 and
  688. // this case, case 3. The client resends its payload and the
  689. // server detects this and skips relaying the request payload.
  690. //
  691. // 4. While reading the response payload. The client will omit its
  692. // request payload when retrying, as the server has already
  693. // acknowledged it. The client will also indicate to the server
  694. // the amount of response payload already received, and the
  695. // server will skip resending the indicated amount of response
  696. // payload.
  697. //
  698. // Retries are indicated to the server by adding a Range header,
  699. // which includes the response payload resend position.
  700. defer func() {
  701. // Ensure sendBuffer is replaced, even in error code paths.
  702. if sendBuffer != nil {
  703. sendBuffer.Truncate(0)
  704. meek.replaceSendBuffer(sendBuffer)
  705. }
  706. }()
  707. retries := uint(0)
  708. retryDeadline := monotime.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
  709. retryDelay := MEEK_ROUND_TRIP_RETRY_MIN_DELAY
  710. serverAcknowledgedRequestPayload := false
  711. receivedPayloadSize := int64(0)
  712. for try := 0; ; try++ {
  713. // Omit the request payload when retrying after receiving a
  714. // partial server response.
  715. var signaller *readCloseSignaller
  716. var requestBody io.ReadCloser
  717. contentLength := 0
  718. if !serverAcknowledgedRequestPayload && sendBuffer != nil {
  719. // sendBuffer will be replaced once the data is no longer needed,
  720. // when RoundTrip calls Close on the Body; this allows meekConn.Write()
  721. // to unblock and start buffering data for the next roung trip while
  722. // still reading the current round trip response. signaller provides
  723. // the hook for awaiting RoundTrip's call to Close.
  724. signaller = NewReadCloseSignaller(meek.runCtx, bytes.NewReader(sendBuffer.Bytes()))
  725. requestBody = signaller
  726. contentLength = sendBuffer.Len()
  727. }
  728. var request *http.Request
  729. request, err := http.NewRequest("POST", meek.url.String(), requestBody)
  730. if err != nil {
  731. // Don't retry when can't initialize a Request
  732. return 0, common.ContextError(err)
  733. }
  734. // Content-Length won't be set automatically due to the underlying
  735. // type of requestBody.
  736. if contentLength > 0 {
  737. request.ContentLength = int64(contentLength)
  738. }
  739. // - meek.stopRunning() will abort a round trip in flight
  740. // - round trip will abort if it exceeds MEEK_ROUND_TRIP_TIMEOUT
  741. requestContext, cancelFunc := context.WithTimeout(
  742. meek.runCtx,
  743. MEEK_ROUND_TRIP_TIMEOUT)
  744. defer cancelFunc()
  745. // Ensure TLS dials are made within the current request context.
  746. if meek.cachedTLSDialer != nil {
  747. meek.cachedTLSDialer.setRequestContext(requestContext)
  748. }
  749. request = request.WithContext(requestContext)
  750. meek.addAdditionalHeaders(request)
  751. request.Header.Set("Content-Type", "application/octet-stream")
  752. request.AddCookie(meek.cookie)
  753. expectedStatusCode := http.StatusOK
  754. // When retrying, add a Range header to indicate how much
  755. // of the response was already received.
  756. if try > 0 {
  757. expectedStatusCode = http.StatusPartialContent
  758. request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize))
  759. }
  760. response, err := meek.transport.RoundTrip(request)
  761. // Wait for RoundTrip to call Close on the request body, when
  762. // there is one. This is necessary to ensure it's safe to
  763. // subsequently replace sendBuffer in both the success and
  764. // error cases.
  765. if signaller != nil {
  766. if !signaller.AwaitClosed() {
  767. // AwaitClosed encountered Done(). Abort immediately. Do not
  768. // replace sendBuffer, as we cannot be certain RoundTrip is
  769. // done with it. MeekConn.Write will exit on Done and not hang
  770. // awaiting sendBuffer.
  771. sendBuffer = nil
  772. return 0, common.ContextError(errors.New("meek connection has closed"))
  773. }
  774. }
  775. if err != nil {
  776. select {
  777. case <-meek.runCtx.Done():
  778. // Exit without retrying and without logging error.
  779. return 0, common.ContextError(err)
  780. default:
  781. }
  782. NoticeAlert("meek round trip failed: %s", err)
  783. // ...continue to retry
  784. }
  785. if err == nil {
  786. if response.StatusCode != expectedStatusCode &&
  787. // Certain http servers return 200 OK where we expect 206, so accept that.
  788. !(expectedStatusCode == http.StatusPartialContent && response.StatusCode == http.StatusOK) {
  789. // Don't retry when the status code is incorrect
  790. response.Body.Close()
  791. return 0, common.ContextError(
  792. fmt.Errorf(
  793. "unexpected status code: %d instead of %d",
  794. response.StatusCode, expectedStatusCode))
  795. }
  796. // Update meek session cookie
  797. for _, c := range response.Cookies() {
  798. if meek.cookie.Name == c.Name {
  799. meek.cookie.Value = c.Value
  800. break
  801. }
  802. }
  803. // Received the response status code, so the server
  804. // must have received the request payload.
  805. serverAcknowledgedRequestPayload = true
  806. // sendBuffer is now no longer required for retries, and the
  807. // buffer may be replaced; this allows meekConn.Write() to unblock
  808. // and start buffering data for the next round trip while still
  809. // reading the current round trip response.
  810. if sendBuffer != nil {
  811. // Assumes signaller.AwaitClosed is called above, so
  812. // sendBuffer will no longer be accessed by RoundTrip.
  813. sendBuffer.Truncate(0)
  814. meek.replaceSendBuffer(sendBuffer)
  815. sendBuffer = nil
  816. }
  817. readPayloadSize, err := meek.readPayload(response.Body)
  818. response.Body.Close()
  819. // receivedPayloadSize is the number of response
  820. // payload bytes received and relayed. A retry can
  821. // resume after this position.
  822. receivedPayloadSize += readPayloadSize
  823. if err != nil {
  824. NoticeAlert("meek read payload failed: %s", err)
  825. // ...continue to retry
  826. } else {
  827. // Round trip completed successfully
  828. break
  829. }
  830. }
  831. // Release context resources now.
  832. cancelFunc()
  833. // Either the request failed entirely, or there was a failure
  834. // streaming the response payload. Always retry once. Then
  835. // retry if time remains; when the next delay exceeds the time
  836. // remaining until the deadline, do not retry.
  837. now := monotime.Now()
  838. if retries >= 1 &&
  839. (now.After(retryDeadline) || retryDeadline.Sub(now) <= retryDelay) {
  840. return 0, common.ContextError(err)
  841. }
  842. retries += 1
  843. delayTimer := time.NewTimer(retryDelay)
  844. select {
  845. case <-delayTimer.C:
  846. case <-meek.runCtx.Done():
  847. delayTimer.Stop()
  848. return 0, common.ContextError(err)
  849. }
  850. // Increase the next delay, to back off and avoid excessive
  851. // activity in conditions such as no network connectivity.
  852. retryDelay *= MEEK_ROUND_TRIP_RETRY_MULTIPLIER
  853. if retryDelay >= MEEK_ROUND_TRIP_RETRY_MAX_DELAY {
  854. retryDelay = MEEK_ROUND_TRIP_RETRY_MAX_DELAY
  855. }
  856. }
  857. return receivedPayloadSize, nil
  858. }
  859. // Add additional headers to the HTTP request using the same method we use for adding
  860. // custom headers to HTTP proxy requests.
  861. func (meek *MeekConn) addAdditionalHeaders(request *http.Request) {
  862. for name, value := range meek.additionalHeaders {
  863. // hack around special case of "Host" header
  864. // https://golang.org/src/net/http/request.go#L474
  865. // using URL.Opaque, see URL.RequestURI() https://golang.org/src/net/url/url.go#L915
  866. if name == "Host" {
  867. if len(value) > 0 {
  868. if request.URL.Opaque == "" {
  869. request.URL.Opaque = request.URL.Scheme + "://" + request.Host + request.URL.RequestURI()
  870. }
  871. request.Host = value[0]
  872. }
  873. } else {
  874. request.Header[name] = value
  875. }
  876. }
  877. }
  878. // readPayload reads the HTTP response in chunks, making the read buffer available
  879. // to MeekConn.Read() calls after each chunk; the intention is to allow bytes to
  880. // flow back to the reader as soon as possible instead of buffering the entire payload.
  881. //
  882. // When readPayload returns an error, the totalSize output is remains valid -- it's the
  883. // number of payload bytes successfully read and relayed.
  884. func (meek *MeekConn) readPayload(
  885. receivedPayload io.ReadCloser) (totalSize int64, err error) {
  886. defer receivedPayload.Close()
  887. totalSize = 0
  888. for {
  889. reader := io.LimitReader(receivedPayload, int64(meek.readPayloadChunkLength))
  890. // Block until there is capacity in the receive buffer
  891. var receiveBuffer *bytes.Buffer
  892. select {
  893. case receiveBuffer = <-meek.emptyReceiveBuffer:
  894. case receiveBuffer = <-meek.partialReceiveBuffer:
  895. case <-meek.runCtx.Done():
  896. return 0, nil
  897. }
  898. // Note: receiveBuffer size may exceed meek.fullReceiveBufferLength by up to the size
  899. // of one received payload. The meek.fullReceiveBufferLength value is just a guideline.
  900. n, err := receiveBuffer.ReadFrom(reader)
  901. meek.replaceReceiveBuffer(receiveBuffer)
  902. totalSize += n
  903. if err != nil {
  904. return totalSize, common.ContextError(err)
  905. }
  906. if n == 0 {
  907. break
  908. }
  909. }
  910. return totalSize, nil
  911. }
  912. // makeCookie creates the cookie to be sent with initial meek HTTP request.
  913. // The purpose of the cookie is to send the following to the server:
  914. // ServerAddress -- the Psiphon Server address the meek server should relay to
  915. // SessionID -- the Psiphon session ID (used by meek server to relay geolocation
  916. // information obtained from the CDN through to the Psiphon Server)
  917. // MeekProtocolVersion -- tells the meek server that this client understands
  918. // the latest protocol.
  919. // The server will create a session using these values and send the session ID
  920. // back to the client via Set-Cookie header. Client must use that value with
  921. // all consequent HTTP requests
  922. // In unfronted meek mode, the cookie is visible over the adversary network, so the
  923. // cookie is encrypted and obfuscated.
  924. func makeMeekCookie(meekConfig *MeekConfig) (cookie *http.Cookie, err error) {
  925. // Make the JSON data
  926. serverAddress := meekConfig.PsiphonServerAddress
  927. cookieData := &protocol.MeekCookieData{
  928. ServerAddress: serverAddress,
  929. SessionID: meekConfig.SessionID,
  930. MeekProtocolVersion: MEEK_PROTOCOL_VERSION,
  931. ClientTunnelProtocol: meekConfig.ClientTunnelProtocol,
  932. }
  933. serializedCookie, err := json.Marshal(cookieData)
  934. if err != nil {
  935. return nil, common.ContextError(err)
  936. }
  937. // Encrypt the JSON data
  938. // NaCl box is used for encryption. The peer public key comes from the server entry.
  939. // Nonce is always all zeros, and is not sent in the cookie (the server also uses an all-zero nonce).
  940. // http://nacl.cace-project.eu/box.html:
  941. // "There is no harm in having the same nonce for different messages if the {sender, receiver} sets are
  942. // different. This is true even if the sets overlap. For example, a sender can use the same nonce for two
  943. // different messages if the messages are sent to two different public keys."
  944. var nonce [24]byte
  945. var publicKey [32]byte
  946. decodedPublicKey, err := base64.StdEncoding.DecodeString(meekConfig.MeekCookieEncryptionPublicKey)
  947. if err != nil {
  948. return nil, common.ContextError(err)
  949. }
  950. copy(publicKey[:], decodedPublicKey)
  951. ephemeralPublicKey, ephemeralPrivateKey, err := box.GenerateKey(rand.Reader)
  952. if err != nil {
  953. return nil, common.ContextError(err)
  954. }
  955. box := box.Seal(nil, serializedCookie, &nonce, &publicKey, ephemeralPrivateKey)
  956. encryptedCookie := make([]byte, 32+len(box))
  957. copy(encryptedCookie[0:32], ephemeralPublicKey[0:32])
  958. copy(encryptedCookie[32:], box)
  959. // Obfuscate the encrypted data
  960. obfuscator, err := common.NewClientObfuscator(
  961. &common.ObfuscatorConfig{Keyword: meekConfig.MeekObfuscatedKey, MaxPadding: MEEK_COOKIE_MAX_PADDING})
  962. if err != nil {
  963. return nil, common.ContextError(err)
  964. }
  965. obfuscatedCookie := obfuscator.SendSeedMessage()
  966. seedLen := len(obfuscatedCookie)
  967. obfuscatedCookie = append(obfuscatedCookie, encryptedCookie...)
  968. obfuscator.ObfuscateClientToServer(obfuscatedCookie[seedLen:])
  969. // Format the HTTP cookie
  970. // The format is <random letter 'A'-'Z'>=<base64 data>, which is intended to match common cookie formats.
  971. A := int('A')
  972. Z := int('Z')
  973. // letterIndex is integer in range [int('A'), int('Z')]
  974. letterIndex, err := common.MakeSecureRandomInt(Z - A + 1)
  975. if err != nil {
  976. return nil, common.ContextError(err)
  977. }
  978. return &http.Cookie{
  979. Name: string(byte(A + letterIndex)),
  980. Value: base64.StdEncoding.EncodeToString(obfuscatedCookie)},
  981. nil
  982. }