meekConn.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "encoding/base64"
  25. "encoding/json"
  26. "errors"
  27. "fmt"
  28. "io"
  29. "net"
  30. "net/http"
  31. "net/url"
  32. "strings"
  33. "sync"
  34. "time"
  35. "github.com/Psiphon-Inc/crypto/nacl/box"
  36. "github.com/Psiphon-Inc/goarista/monotime"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
  40. )
  41. // MeekConn is based on meek-client.go from Tor and Psiphon:
  42. //
  43. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  44. // CC0 1.0 Universal
  45. //
  46. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  47. const (
  48. MEEK_PROTOCOL_VERSION = 3
  49. MEEK_COOKIE_MAX_PADDING = 32
  50. MAX_SEND_PAYLOAD_LENGTH = 65536
  51. FULL_RECEIVE_BUFFER_LENGTH = 4194304
  52. READ_PAYLOAD_CHUNK_LENGTH = 65536
  53. MIN_POLL_INTERVAL = 100 * time.Millisecond
  54. MIN_POLL_INTERVAL_JITTER = 0.3
  55. MAX_POLL_INTERVAL = 5 * time.Second
  56. MAX_POLL_INTERVAL_JITTER = 0.1
  57. POLL_INTERVAL_MULTIPLIER = 1.5
  58. POLL_INTERVAL_JITTER = 0.1
  59. MEEK_ROUND_TRIP_RETRY_DEADLINE = 5 * time.Second
  60. MEEK_ROUND_TRIP_RETRY_DELAY = 50 * time.Millisecond
  61. MEEK_ROUND_TRIP_TIMEOUT = 20 * time.Second
  62. )
  63. // MeekConfig specifies the behavior of a MeekConn
  64. type MeekConfig struct {
  65. // DialAddress is the actual network address to dial to establish a
  66. // connection to the meek server. This may be either a fronted or
  67. // direct address. The address must be in the form "host:port",
  68. // where host may be a domain name or IP address.
  69. DialAddress string
  70. // UseHTTPS indicates whether to use HTTPS (true) or HTTP (false).
  71. UseHTTPS bool
  72. // TLSProfile specifies the TLS profile to use for all underlying
  73. // TLS connections created by this meek connection. Valid values
  74. // are the possible values for CustomTLSConfig.TLSProfile.
  75. // TLSProfile will be used only when DialConfig.UseIndistinguishableTLS
  76. // is set in the DialConfig passed in to DialMeek.
  77. TLSProfile string
  78. // UseObfuscatedSessionTickets indicates whether to use obfuscated
  79. // session tickets. Assumes UseHTTPS is true.
  80. UseObfuscatedSessionTickets bool
  81. // SNIServerName is the value to place in the TLS SNI server_name
  82. // field when HTTPS is used.
  83. SNIServerName string
  84. // HostHeader is the value to place in the HTTP request Host header.
  85. HostHeader string
  86. // TransformedHostName records whether a hostname transformation is
  87. // in effect. This value is used for stats reporting.
  88. TransformedHostName bool
  89. // ClientTunnelProtocol is the protocol the client is using. It's
  90. // included in the meek cookie for optional use by the server, in
  91. // cases where the server cannot unambiguously determine the
  92. // tunnel protocol.
  93. ClientTunnelProtocol string
  94. // The following values are used to create the obfuscated meek cookie.
  95. PsiphonServerAddress string
  96. SessionID string
  97. MeekCookieEncryptionPublicKey string
  98. MeekObfuscatedKey string
  99. }
  100. // MeekConn is a network connection that tunnels TCP over HTTP and supports "fronting". Meek sends
  101. // client->server flow in HTTP request bodies and receives server->client flow in HTTP response bodies.
  102. // Polling is used to achieve full duplex TCP.
  103. //
  104. // Fronting is an obfuscation technique in which the connection
  105. // to a web server, typically a CDN, is indistinguishable from any other HTTPS connection to the generic
  106. // "fronting domain" -- the HTTP Host header is used to route the requests to the actual destination.
  107. // See https://trac.torproject.org/projects/tor/wiki/doc/meek for more details.
  108. //
  109. // MeekConn also operates in unfronted mode, in which plain HTTP connections are made without routing
  110. // through a CDN.
  111. type MeekConn struct {
  112. url *url.URL
  113. additionalHeaders http.Header
  114. cookie *http.Cookie
  115. pendingConns *common.Conns
  116. transport transporter
  117. mutex sync.Mutex
  118. isClosed bool
  119. runContext context.Context
  120. stopRunning context.CancelFunc
  121. relayWaitGroup *sync.WaitGroup
  122. emptyReceiveBuffer chan *bytes.Buffer
  123. partialReceiveBuffer chan *bytes.Buffer
  124. fullReceiveBuffer chan *bytes.Buffer
  125. emptySendBuffer chan *bytes.Buffer
  126. partialSendBuffer chan *bytes.Buffer
  127. fullSendBuffer chan *bytes.Buffer
  128. }
  129. // transporter is implemented by both http.Transport and upstreamproxy.ProxyAuthTransport.
  130. type transporter interface {
  131. CancelRequest(req *http.Request)
  132. CloseIdleConnections()
  133. RegisterProtocol(scheme string, rt http.RoundTripper)
  134. RoundTrip(req *http.Request) (resp *http.Response, err error)
  135. }
  136. // DialMeek returns an initialized meek connection. A meek connection is
  137. // an HTTP session which does not depend on an underlying socket connection (although
  138. // persistent HTTP connections are used for performance). This function does not
  139. // wait for the connection to be "established" before returning. A goroutine
  140. // is spawned which will eventually start HTTP polling.
  141. // When frontingAddress is not "", fronting is used. This option assumes caller has
  142. // already checked server entry capabilities.
  143. func DialMeek(
  144. meekConfig *MeekConfig,
  145. dialConfig *DialConfig) (meek *MeekConn, err error) {
  146. // Configure transport
  147. // Note: MeekConn has its own PendingConns to manage the underlying HTTP transport connections,
  148. // which may be interrupted on MeekConn.Close(). This code previously used the establishTunnel
  149. // pendingConns here, but that was a lifecycle mismatch: we don't want to abort HTTP transport
  150. // connections while MeekConn is still in use
  151. pendingConns := new(common.Conns)
  152. // Use a copy of DialConfig with the meek pendingConns
  153. meekDialConfig := new(DialConfig)
  154. *meekDialConfig = *dialConfig
  155. meekDialConfig.PendingConns = pendingConns
  156. var transport transporter
  157. var additionalHeaders http.Header
  158. var proxyUrl func(*http.Request) (*url.URL, error)
  159. if meekConfig.UseHTTPS {
  160. // Custom TLS dialer:
  161. //
  162. // 1. ignores the HTTP request address and uses the fronting domain
  163. // 2. optionally disables SNI -- SNI breaks fronting when used with certain CDNs.
  164. // 3. skips verifying the server cert.
  165. //
  166. // Reasoning for #3:
  167. //
  168. // With a TLS MiM attack in place, and server certs verified, we'll fail to connect because the client
  169. // will refuse to connect. That's not a successful outcome.
  170. //
  171. // With a MiM attack in place, and server certs not verified, we'll fail to connect if the MiM is actively
  172. // targeting Psiphon and classifying the HTTP traffic by Host header or payload signature.
  173. //
  174. // However, in the case of a passive MiM that's just recording traffic or an active MiM that's targeting
  175. // something other than Psiphon, the client will connect. This is a successful outcome.
  176. //
  177. // What is exposed to the MiM? The Host header does not contain a Psiphon server IP address, just an
  178. // unrelated, randomly generated domain name which cannot be used to block direct connections. The
  179. // Psiphon server IP is sent over meek, but it's in the encrypted cookie.
  180. //
  181. // The payload (user traffic) gets its confidentiality and integrity from the underlying SSH protocol.
  182. // So, nothing is leaked to the MiM apart from signatures which could be used to classify the traffic
  183. // as Psiphon to possibly block it; but note that not revealing that the client is Psiphon is outside
  184. // our threat model; we merely seek to evade mass blocking by taking steps that require progressively
  185. // more effort to block.
  186. //
  187. // There is a subtle attack remaining: an adversary that can MiM some CDNs but not others (and so can
  188. // classify Psiphon traffic on some CDNs but not others) may throttle non-MiM CDNs so that our server
  189. // selection always chooses tunnels to the MiM CDN (without any server cert verification, we won't
  190. // exclusively connect to non-MiM CDNs); then the adversary kills the underlying TCP connection after
  191. // some short period. This is mitigated by the "impaired" protocol classification mechanism.
  192. tlsConfig := &CustomTLSConfig{
  193. DialAddr: meekConfig.DialAddress,
  194. Dial: NewTCPDialer(meekDialConfig),
  195. Timeout: meekDialConfig.ConnectTimeout,
  196. SNIServerName: meekConfig.SNIServerName,
  197. SkipVerify: true,
  198. UseIndistinguishableTLS: meekDialConfig.UseIndistinguishableTLS,
  199. TLSProfile: meekConfig.TLSProfile,
  200. TrustedCACertificatesFilename: meekDialConfig.TrustedCACertificatesFilename,
  201. }
  202. if meekConfig.UseObfuscatedSessionTickets {
  203. tlsConfig.ObfuscatedSessionTicketKey = meekConfig.MeekObfuscatedKey
  204. }
  205. dialer := NewCustomTLSDialer(tlsConfig)
  206. // TODO: wrap in an http.Client and use http.Client.Timeout which actually covers round trip
  207. transport = &http.Transport{
  208. Dial: dialer,
  209. ResponseHeaderTimeout: MEEK_ROUND_TRIP_TIMEOUT,
  210. }
  211. } else {
  212. // The dialer ignores address that http.Transport will pass in (derived
  213. // from the HTTP request URL) and always dials meekConfig.DialAddress.
  214. dialer := func(string, string) (net.Conn, error) {
  215. return NewTCPDialer(meekDialConfig)("tcp", meekConfig.DialAddress)
  216. }
  217. // For HTTP, and when the meekConfig.DialAddress matches the
  218. // meekConfig.HostHeader, we let http.Transport handle proxying.
  219. // http.Transport will put the the HTTP server address in the HTTP
  220. // request line. In this one case, we can use an HTTP proxy that does
  221. // not offer CONNECT support.
  222. if strings.HasPrefix(meekDialConfig.UpstreamProxyUrl, "http://") &&
  223. (meekConfig.DialAddress == meekConfig.HostHeader ||
  224. meekConfig.DialAddress == meekConfig.HostHeader+":80") {
  225. url, err := url.Parse(meekDialConfig.UpstreamProxyUrl)
  226. if err != nil {
  227. return nil, common.ContextError(err)
  228. }
  229. proxyUrl = http.ProxyURL(url)
  230. meekDialConfig.UpstreamProxyUrl = ""
  231. // Here, the dialer must use the address that http.Transport
  232. // passes in (which will be proxy address).
  233. dialer = NewTCPDialer(meekDialConfig)
  234. }
  235. // TODO: wrap in an http.Client and use http.Client.Timeout which actually covers round trip
  236. httpTransport := &http.Transport{
  237. Proxy: proxyUrl,
  238. Dial: dialer,
  239. ResponseHeaderTimeout: MEEK_ROUND_TRIP_TIMEOUT,
  240. }
  241. if proxyUrl != nil {
  242. // Wrap transport with a transport that can perform HTTP proxy auth negotiation
  243. transport, err = upstreamproxy.NewProxyAuthTransport(httpTransport, meekDialConfig.CustomHeaders)
  244. if err != nil {
  245. return nil, common.ContextError(err)
  246. }
  247. } else {
  248. transport = httpTransport
  249. }
  250. }
  251. // Scheme is always "http". Otherwise http.Transport will try to do another TLS
  252. // handshake inside the explicit TLS session (in fronting mode).
  253. url := &url.URL{
  254. Scheme: "http",
  255. Host: meekConfig.HostHeader,
  256. Path: "/",
  257. }
  258. if meekConfig.UseHTTPS {
  259. host, _, err := net.SplitHostPort(meekConfig.DialAddress)
  260. if err != nil {
  261. return nil, common.ContextError(err)
  262. }
  263. additionalHeaders = map[string][]string{
  264. "X-Psiphon-Fronting-Address": {host},
  265. }
  266. } else {
  267. if proxyUrl == nil {
  268. additionalHeaders = meekDialConfig.CustomHeaders
  269. }
  270. }
  271. cookie, err := makeMeekCookie(meekConfig)
  272. if err != nil {
  273. return nil, common.ContextError(err)
  274. }
  275. runContext, stopRunning := context.WithCancel(context.Background())
  276. // The main loop of a MeekConn is run in the relay() goroutine.
  277. // A MeekConn implements net.Conn concurrency semantics:
  278. // "Multiple goroutines may invoke methods on a Conn simultaneously."
  279. //
  280. // Read() calls and relay() are synchronized by exchanging control of a single
  281. // receiveBuffer (bytes.Buffer). This single buffer may be:
  282. // - in the emptyReceiveBuffer channel when it is available and empty;
  283. // - in the partialReadBuffer channel when it is available and contains data;
  284. // - in the fullReadBuffer channel when it is available and full of data;
  285. // - "checked out" by relay or Read when they are are writing to or reading from the
  286. // buffer, respectively.
  287. // relay() will obtain the buffer from either the empty or partial channel but block when
  288. // the buffer is full. Read will obtain the buffer from the partial or full channel when
  289. // there is data to read but block when the buffer is empty.
  290. // Write() calls and relay() are synchronized in a similar way, using a single
  291. // sendBuffer.
  292. meek = &MeekConn{
  293. url: url,
  294. additionalHeaders: additionalHeaders,
  295. cookie: cookie,
  296. pendingConns: pendingConns,
  297. transport: transport,
  298. isClosed: false,
  299. runContext: runContext,
  300. stopRunning: stopRunning,
  301. relayWaitGroup: new(sync.WaitGroup),
  302. emptyReceiveBuffer: make(chan *bytes.Buffer, 1),
  303. partialReceiveBuffer: make(chan *bytes.Buffer, 1),
  304. fullReceiveBuffer: make(chan *bytes.Buffer, 1),
  305. emptySendBuffer: make(chan *bytes.Buffer, 1),
  306. partialSendBuffer: make(chan *bytes.Buffer, 1),
  307. fullSendBuffer: make(chan *bytes.Buffer, 1),
  308. }
  309. // TODO: benchmark bytes.Buffer vs. built-in append with slices?
  310. meek.emptyReceiveBuffer <- new(bytes.Buffer)
  311. meek.emptySendBuffer <- new(bytes.Buffer)
  312. meek.relayWaitGroup.Add(1)
  313. go meek.relay()
  314. // Enable interruption
  315. if !dialConfig.PendingConns.Add(meek) {
  316. meek.Close()
  317. return nil, common.ContextError(errors.New("pending connections already closed"))
  318. }
  319. return meek, nil
  320. }
  321. // Close terminates the meek connection. Close waits for the relay processing goroutine
  322. // to stop and releases HTTP transport resources.
  323. // A mutex is required to support net.Conn concurrency semantics.
  324. func (meek *MeekConn) Close() (err error) {
  325. meek.mutex.Lock()
  326. isClosed := meek.isClosed
  327. meek.isClosed = true
  328. meek.mutex.Unlock()
  329. if !isClosed {
  330. meek.stopRunning()
  331. meek.pendingConns.CloseAll()
  332. meek.relayWaitGroup.Wait()
  333. meek.transport.CloseIdleConnections()
  334. }
  335. return nil
  336. }
  337. // IsClosed implements the Closer iterface. The return value
  338. // indicates whether the MeekConn has been closed.
  339. func (meek *MeekConn) IsClosed() bool {
  340. meek.mutex.Lock()
  341. isClosed := meek.isClosed
  342. meek.mutex.Unlock()
  343. return isClosed
  344. }
  345. // Read reads data from the connection.
  346. // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
  347. func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
  348. if meek.IsClosed() {
  349. return 0, common.ContextError(errors.New("meek connection is closed"))
  350. }
  351. // Block until there is received data to consume
  352. var receiveBuffer *bytes.Buffer
  353. select {
  354. case receiveBuffer = <-meek.partialReceiveBuffer:
  355. case receiveBuffer = <-meek.fullReceiveBuffer:
  356. case <-meek.runContext.Done():
  357. return 0, common.ContextError(errors.New("meek connection has closed"))
  358. }
  359. n, err = receiveBuffer.Read(buffer)
  360. meek.replaceReceiveBuffer(receiveBuffer)
  361. return n, err
  362. }
  363. // Write writes data to the connection.
  364. // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
  365. func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
  366. if meek.IsClosed() {
  367. return 0, common.ContextError(errors.New("meek connection is closed"))
  368. }
  369. // Repeats until all n bytes are written
  370. n = len(buffer)
  371. for len(buffer) > 0 {
  372. // Block until there is capacity in the send buffer
  373. var sendBuffer *bytes.Buffer
  374. select {
  375. case sendBuffer = <-meek.emptySendBuffer:
  376. case sendBuffer = <-meek.partialSendBuffer:
  377. case <-meek.runContext.Done():
  378. return 0, common.ContextError(errors.New("meek connection has closed"))
  379. }
  380. writeLen := MAX_SEND_PAYLOAD_LENGTH - sendBuffer.Len()
  381. if writeLen > 0 {
  382. if writeLen > len(buffer) {
  383. writeLen = len(buffer)
  384. }
  385. _, err = sendBuffer.Write(buffer[:writeLen])
  386. buffer = buffer[writeLen:]
  387. }
  388. meek.replaceSendBuffer(sendBuffer)
  389. }
  390. return n, err
  391. }
  392. // Stub implementation of net.Conn.LocalAddr
  393. func (meek *MeekConn) LocalAddr() net.Addr {
  394. return nil
  395. }
  396. // Stub implementation of net.Conn.RemoteAddr
  397. func (meek *MeekConn) RemoteAddr() net.Addr {
  398. return nil
  399. }
  400. // Stub implementation of net.Conn.SetDeadline
  401. func (meek *MeekConn) SetDeadline(t time.Time) error {
  402. return common.ContextError(errors.New("not supported"))
  403. }
  404. // Stub implementation of net.Conn.SetReadDeadline
  405. func (meek *MeekConn) SetReadDeadline(t time.Time) error {
  406. return common.ContextError(errors.New("not supported"))
  407. }
  408. // Stub implementation of net.Conn.SetWriteDeadline
  409. func (meek *MeekConn) SetWriteDeadline(t time.Time) error {
  410. return common.ContextError(errors.New("not supported"))
  411. }
  412. func (meek *MeekConn) replaceReceiveBuffer(receiveBuffer *bytes.Buffer) {
  413. switch {
  414. case receiveBuffer.Len() == 0:
  415. meek.emptyReceiveBuffer <- receiveBuffer
  416. case receiveBuffer.Len() >= FULL_RECEIVE_BUFFER_LENGTH:
  417. meek.fullReceiveBuffer <- receiveBuffer
  418. default:
  419. meek.partialReceiveBuffer <- receiveBuffer
  420. }
  421. }
  422. func (meek *MeekConn) replaceSendBuffer(sendBuffer *bytes.Buffer) {
  423. switch {
  424. case sendBuffer.Len() == 0:
  425. meek.emptySendBuffer <- sendBuffer
  426. case sendBuffer.Len() >= MAX_SEND_PAYLOAD_LENGTH:
  427. meek.fullSendBuffer <- sendBuffer
  428. default:
  429. meek.partialSendBuffer <- sendBuffer
  430. }
  431. }
  432. // relay sends and receives tunneled traffic (payload). An HTTP request is
  433. // triggered when data is in the write queue or at a polling interval.
  434. // There's a geometric increase, up to a maximum, in the polling interval when
  435. // no data is exchanged. Only one HTTP request is in flight at a time.
  436. func (meek *MeekConn) relay() {
  437. // Note: meek.Close() calls here in relay() are made asynchronously
  438. // (using goroutines) since Close() will wait on this WaitGroup.
  439. defer meek.relayWaitGroup.Done()
  440. interval := common.JitterDuration(
  441. MIN_POLL_INTERVAL,
  442. MIN_POLL_INTERVAL_JITTER)
  443. timeout := time.NewTimer(interval)
  444. sendPayload := make([]byte, MAX_SEND_PAYLOAD_LENGTH)
  445. for {
  446. timeout.Reset(interval)
  447. // Block until there is payload to send or it is time to poll
  448. var sendBuffer *bytes.Buffer
  449. select {
  450. case sendBuffer = <-meek.partialSendBuffer:
  451. case sendBuffer = <-meek.fullSendBuffer:
  452. case <-timeout.C:
  453. // In the polling case, send an empty payload
  454. case <-meek.runContext.Done():
  455. // Drop through to second Done() check
  456. }
  457. // Check Done() again, to ensure it takes precedence
  458. select {
  459. case <-meek.runContext.Done():
  460. return
  461. default:
  462. }
  463. sendPayloadSize := 0
  464. if sendBuffer != nil {
  465. var err error
  466. sendPayloadSize, err = sendBuffer.Read(sendPayload)
  467. meek.replaceSendBuffer(sendBuffer)
  468. if err != nil {
  469. NoticeAlert("%s", common.ContextError(err))
  470. go meek.Close()
  471. return
  472. }
  473. }
  474. receivedPayloadSize, err := meek.roundTrip(sendPayload[:sendPayloadSize])
  475. if err != nil {
  476. select {
  477. case <-meek.runContext.Done():
  478. // In this case, meek.roundTrip encountered Done(). Exit without logging error.
  479. return
  480. default:
  481. }
  482. NoticeAlert("%s", common.ContextError(err))
  483. go meek.Close()
  484. return
  485. }
  486. // Calculate polling interval. When data is received,
  487. // immediately request more. Otherwise, schedule next
  488. // poll with exponential back off. Jitter and coin
  489. // flips are used to avoid trivial, static traffic
  490. // timing patterns.
  491. if receivedPayloadSize > 0 || sendPayloadSize > 0 {
  492. interval = 0
  493. } else if interval == 0 {
  494. interval = common.JitterDuration(
  495. MIN_POLL_INTERVAL,
  496. MIN_POLL_INTERVAL_JITTER)
  497. } else {
  498. if common.FlipCoin() {
  499. interval = common.JitterDuration(
  500. interval,
  501. POLL_INTERVAL_JITTER)
  502. } else {
  503. interval = common.JitterDuration(
  504. time.Duration(float64(interval)*POLL_INTERVAL_MULTIPLIER),
  505. POLL_INTERVAL_JITTER)
  506. }
  507. if interval >= MAX_POLL_INTERVAL {
  508. interval = common.JitterDuration(
  509. MAX_POLL_INTERVAL,
  510. MAX_POLL_INTERVAL_JITTER)
  511. }
  512. }
  513. }
  514. }
  515. // roundTrip configures and makes the actual HTTP POST request
  516. func (meek *MeekConn) roundTrip(sendPayload []byte) (int64, error) {
  517. // Retries are made when the round trip fails. This adds resiliency
  518. // to connection interruption and intermittent failures.
  519. //
  520. // At least one retry is always attempted, and retries continue
  521. // while still within a brief deadline -- 5 seconds, currently the
  522. // deadline for an actively probed SSH connection to timeout. There
  523. // is a brief delay between retries, allowing for intermittent
  524. // failure states to resolve.
  525. //
  526. // Failure may occur at various stages of the HTTP request:
  527. //
  528. // 1. Before the request begins. In this case, the entire request
  529. // may be rerun.
  530. //
  531. // 2. While sending the request payload. In this case, the client
  532. // must resend its request payload. The server will not have
  533. // relayed its partially received request payload.
  534. //
  535. // 3. After sending the request payload but before receiving
  536. // a response. The client cannot distinguish between case 2 and
  537. // this case, case 3. The client resends its payload and the
  538. // server detects this and skips relaying the request payload.
  539. //
  540. // 4. While reading the response payload. The client will omit its
  541. // request payload when retrying, as the server has already
  542. // acknowleged it. The client will also indicate to the server
  543. // the amount of response payload already received, and the
  544. // server will skip resending the indicated amount of response
  545. // payload.
  546. //
  547. // Retries are indicated to the server by adding a Range header,
  548. // which includes the response payload resend position.
  549. retries := uint(0)
  550. retryDeadline := monotime.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
  551. serverAcknowlegedRequestPayload := false
  552. receivedPayloadSize := int64(0)
  553. for try := 0; ; try++ {
  554. // Omit the request payload when retrying after receiving a
  555. // partial server response.
  556. var sendPayloadReader io.Reader
  557. if !serverAcknowlegedRequestPayload {
  558. sendPayloadReader = bytes.NewReader(sendPayload)
  559. }
  560. var request *http.Request
  561. request, err := http.NewRequest("POST", meek.url.String(), sendPayloadReader)
  562. if err != nil {
  563. // Don't retry when can't initialize a Request
  564. return 0, common.ContextError(err)
  565. }
  566. // Note: meek.stopRunning() will abort a round trip in flight
  567. request = request.WithContext(meek.runContext)
  568. meek.addAdditionalHeaders(request)
  569. request.Header.Set("Content-Type", "application/octet-stream")
  570. request.AddCookie(meek.cookie)
  571. expectedStatusCode := http.StatusOK
  572. // When retrying, add a Range header to indicate how much
  573. // of the response was already received.
  574. if try > 0 {
  575. expectedStatusCode = http.StatusPartialContent
  576. request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize))
  577. }
  578. response, err := meek.transport.RoundTrip(request)
  579. if err != nil {
  580. select {
  581. case <-meek.runContext.Done():
  582. // Exit without retrying and without logging error.
  583. return 0, common.ContextError(err)
  584. default:
  585. }
  586. NoticeAlert("meek round trip failed: %s", err)
  587. // ...continue to retry
  588. }
  589. if err == nil {
  590. if response.StatusCode != expectedStatusCode {
  591. // Don't retry when the status code is incorrect
  592. response.Body.Close()
  593. return 0, common.ContextError(
  594. fmt.Errorf(
  595. "unexpected status code: %d instead of %d",
  596. response.StatusCode, expectedStatusCode))
  597. }
  598. // Update meek session cookie
  599. for _, c := range response.Cookies() {
  600. if meek.cookie.Name == c.Name {
  601. meek.cookie.Value = c.Value
  602. break
  603. }
  604. }
  605. // Received the response status code, so the server
  606. // must have received the request payload.
  607. serverAcknowlegedRequestPayload = true
  608. readPayloadSize, err := meek.readPayload(response.Body)
  609. response.Body.Close()
  610. // receivedPayloadSize is the number of response
  611. // payload bytes received and relayed. A retry can
  612. // resume after this position.
  613. receivedPayloadSize += readPayloadSize
  614. if err != nil {
  615. NoticeAlert("meek read payload failed: %s", err)
  616. // ...continue to retry
  617. } else {
  618. // Round trip completed successfully
  619. break
  620. }
  621. }
  622. // Either the request failed entirely, or there was a failure
  623. // streaming the response payload. Retry, if time remains.
  624. if retries >= 1 && monotime.Now().After(retryDeadline) {
  625. return 0, common.ContextError(err)
  626. }
  627. retries += 1
  628. time.Sleep(MEEK_ROUND_TRIP_RETRY_DELAY)
  629. }
  630. return receivedPayloadSize, nil
  631. }
  632. // Add additional headers to the HTTP request using the same method we use for adding
  633. // custom headers to HTTP proxy requests.
  634. func (meek *MeekConn) addAdditionalHeaders(request *http.Request) {
  635. for name, value := range meek.additionalHeaders {
  636. // hack around special case of "Host" header
  637. // https://golang.org/src/net/http/request.go#L474
  638. // using URL.Opaque, see URL.RequestURI() https://golang.org/src/net/url/url.go#L915
  639. if name == "Host" {
  640. if len(value) > 0 {
  641. if request.URL.Opaque == "" {
  642. request.URL.Opaque = request.URL.Scheme + "://" + request.Host + request.URL.RequestURI()
  643. }
  644. request.Host = value[0]
  645. }
  646. } else {
  647. request.Header[name] = value
  648. }
  649. }
  650. }
  651. // readPayload reads the HTTP response in chunks, making the read buffer available
  652. // to MeekConn.Read() calls after each chunk; the intention is to allow bytes to
  653. // flow back to the reader as soon as possible instead of buffering the entire payload.
  654. //
  655. // When readPayload returns an error, the totalSize output is remains valid -- it's the
  656. // number of payload bytes successfully read and relayed.
  657. func (meek *MeekConn) readPayload(
  658. receivedPayload io.ReadCloser) (totalSize int64, err error) {
  659. defer receivedPayload.Close()
  660. totalSize = 0
  661. for {
  662. reader := io.LimitReader(receivedPayload, READ_PAYLOAD_CHUNK_LENGTH)
  663. // Block until there is capacity in the receive buffer
  664. var receiveBuffer *bytes.Buffer
  665. select {
  666. case receiveBuffer = <-meek.emptyReceiveBuffer:
  667. case receiveBuffer = <-meek.partialReceiveBuffer:
  668. case <-meek.runContext.Done():
  669. return 0, nil
  670. }
  671. // Note: receiveBuffer size may exceed FULL_RECEIVE_BUFFER_LENGTH by up to the size
  672. // of one received payload. The FULL_RECEIVE_BUFFER_LENGTH value is just a guideline.
  673. n, err := receiveBuffer.ReadFrom(reader)
  674. meek.replaceReceiveBuffer(receiveBuffer)
  675. totalSize += n
  676. if err != nil {
  677. return totalSize, common.ContextError(err)
  678. }
  679. if n == 0 {
  680. break
  681. }
  682. }
  683. return totalSize, nil
  684. }
  685. // makeCookie creates the cookie to be sent with initial meek HTTP request.
  686. // The purpose of the cookie is to send the following to the server:
  687. // ServerAddress -- the Psiphon Server address the meek server should relay to
  688. // SessionID -- the Psiphon session ID (used by meek server to relay geolocation
  689. // information obtained from the CDN through to the Psiphon Server)
  690. // MeekProtocolVersion -- tells the meek server that this client understands
  691. // the latest protocol.
  692. // The server will create a session using these values and send the session ID
  693. // back to the client via Set-Cookie header. Client must use that value with
  694. // all consequent HTTP requests
  695. // In unfronted meek mode, the cookie is visible over the adversary network, so the
  696. // cookie is encrypted and obfuscated.
  697. func makeMeekCookie(meekConfig *MeekConfig) (cookie *http.Cookie, err error) {
  698. // Make the JSON data
  699. serverAddress := meekConfig.PsiphonServerAddress
  700. cookieData := &protocol.MeekCookieData{
  701. ServerAddress: serverAddress,
  702. SessionID: meekConfig.SessionID,
  703. MeekProtocolVersion: MEEK_PROTOCOL_VERSION,
  704. ClientTunnelProtocol: meekConfig.ClientTunnelProtocol,
  705. }
  706. serializedCookie, err := json.Marshal(cookieData)
  707. if err != nil {
  708. return nil, common.ContextError(err)
  709. }
  710. // Encrypt the JSON data
  711. // NaCl box is used for encryption. The peer public key comes from the server entry.
  712. // Nonce is always all zeros, and is not sent in the cookie (the server also uses an all-zero nonce).
  713. // http://nacl.cace-project.eu/box.html:
  714. // "There is no harm in having the same nonce for different messages if the {sender, receiver} sets are
  715. // different. This is true even if the sets overlap. For example, a sender can use the same nonce for two
  716. // different messages if the messages are sent to two different public keys."
  717. var nonce [24]byte
  718. var publicKey [32]byte
  719. decodedPublicKey, err := base64.StdEncoding.DecodeString(meekConfig.MeekCookieEncryptionPublicKey)
  720. if err != nil {
  721. return nil, common.ContextError(err)
  722. }
  723. copy(publicKey[:], decodedPublicKey)
  724. ephemeralPublicKey, ephemeralPrivateKey, err := box.GenerateKey(rand.Reader)
  725. if err != nil {
  726. return nil, common.ContextError(err)
  727. }
  728. box := box.Seal(nil, serializedCookie, &nonce, &publicKey, ephemeralPrivateKey)
  729. encryptedCookie := make([]byte, 32+len(box))
  730. copy(encryptedCookie[0:32], ephemeralPublicKey[0:32])
  731. copy(encryptedCookie[32:], box)
  732. // Obfuscate the encrypted data
  733. obfuscator, err := common.NewClientObfuscator(
  734. &common.ObfuscatorConfig{Keyword: meekConfig.MeekObfuscatedKey, MaxPadding: MEEK_COOKIE_MAX_PADDING})
  735. if err != nil {
  736. return nil, common.ContextError(err)
  737. }
  738. obfuscatedCookie := obfuscator.SendSeedMessage()
  739. seedLen := len(obfuscatedCookie)
  740. obfuscatedCookie = append(obfuscatedCookie, encryptedCookie...)
  741. obfuscator.ObfuscateClientToServer(obfuscatedCookie[seedLen:])
  742. // Format the HTTP cookie
  743. // The format is <random letter 'A'-'Z'>=<base64 data>, which is intended to match common cookie formats.
  744. A := int('A')
  745. Z := int('Z')
  746. // letterIndex is integer in range [int('A'), int('Z')]
  747. letterIndex, err := common.MakeSecureRandomInt(Z - A + 1)
  748. if err != nil {
  749. return nil, common.ContextError(err)
  750. }
  751. return &http.Cookie{
  752. Name: string(byte(A + letterIndex)),
  753. Value: base64.StdEncoding.EncodeToString(obfuscatedCookie)},
  754. nil
  755. }