meek.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "crypto/tls"
  23. "encoding/base64"
  24. "encoding/json"
  25. "errors"
  26. "io"
  27. "net"
  28. "net/http"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  34. "golang.org/x/crypto/nacl/box"
  35. )
  36. // MeekServer is based on meek-server.go from Tor and Psiphon:
  37. //
  38. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  39. // CC0 1.0 Universal
  40. //
  41. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  42. // Protocol version 1 clients can handle arbitrary length response bodies. Older clients
  43. // report no version number and expect at most 64K response bodies.
  44. const MEEK_PROTOCOL_VERSION_1 = 1
  45. // Protocol version 2 clients initiate a session by sending a encrypted and obfuscated meek
  46. // cookie with their initial HTTP request. Connection information is contained within the
  47. // encrypted cookie payload. The server inspects the cookie and establishes a new session and
  48. // returns a new random session ID back to client via Set-Cookie header. The client uses this
  49. // session ID on all subsequent requests for the remainder of the session.
  50. const MEEK_PROTOCOL_VERSION_2 = 2
  51. const MEEK_MAX_PAYLOAD_LENGTH = 0x10000
  52. const MEEK_TURN_AROUND_TIMEOUT = 20 * time.Millisecond
  53. const MEEK_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
  54. const MEEK_MAX_SESSION_STALENESS = 45 * time.Second
  55. const MEEK_HTTP_CLIENT_READ_TIMEOUT = 45 * time.Second
  56. const MEEK_HTTP_CLIENT_WRITE_TIMEOUT = 10 * time.Second
  57. const MEEK_MIN_SESSION_ID_LENGTH = 8
  58. const MEEK_MAX_SESSION_ID_LENGTH = 20
  59. // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
  60. // Obfusated SSH traffic) over HTTP. Meek may be fronted (through a CDN) or direct and may be
  61. // HTTP or HTTPS.
  62. //
  63. // Upstream traffic arrives in HTTP request bodies and downstream traffic is sent in response
  64. // bodies. The sequence of traffic for a given flow is associated using a session ID that's
  65. // set as a HTTP cookie for the client to submit with each request.
  66. //
  67. // MeekServer hooks into TunnelServer via the net.Conn interface by transforming the
  68. // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
  69. // the meekConn struct.
  70. type MeekServer struct {
  71. support *SupportServices
  72. listener net.Listener
  73. tlsConfig *tls.Config
  74. clientHandler func(clientConn net.Conn)
  75. openConns *psiphon.Conns
  76. stopBroadcast <-chan struct{}
  77. sessionsLock sync.RWMutex
  78. sessions map[string]*meekSession
  79. }
  80. // NewMeekServer initializes a new meek server.
  81. func NewMeekServer(
  82. support *SupportServices,
  83. listener net.Listener,
  84. useTLS bool,
  85. clientHandler func(clientConn net.Conn),
  86. stopBroadcast <-chan struct{}) (*MeekServer, error) {
  87. meekServer := &MeekServer{
  88. support: support,
  89. listener: listener,
  90. clientHandler: clientHandler,
  91. openConns: new(psiphon.Conns),
  92. stopBroadcast: stopBroadcast,
  93. sessions: make(map[string]*meekSession),
  94. }
  95. if useTLS {
  96. tlsConfig, err := makeMeekTLSConfig(support)
  97. if err != nil {
  98. return nil, psiphon.ContextError(err)
  99. }
  100. meekServer.tlsConfig = tlsConfig
  101. }
  102. return meekServer, nil
  103. }
  104. // Run runs the meek server; this function blocks while serving HTTP or
  105. // HTTPS connections on the specified listener. This function also runs
  106. // a goroutine which cleans up expired meek client sessions.
  107. //
  108. // To stop the meek server, both Close() the listener and set the stopBroadcast
  109. // signal specified in NewMeekServer.
  110. func (server *MeekServer) Run() error {
  111. defer server.listener.Close()
  112. defer server.openConns.CloseAll()
  113. // Expire sessions
  114. reaperWaitGroup := new(sync.WaitGroup)
  115. reaperWaitGroup.Add(1)
  116. go func() {
  117. defer reaperWaitGroup.Done()
  118. ticker := time.NewTicker(MEEK_MAX_SESSION_STALENESS / 2)
  119. defer ticker.Stop()
  120. for {
  121. select {
  122. case <-ticker.C:
  123. server.closeExpireSessions()
  124. case <-server.stopBroadcast:
  125. return
  126. }
  127. }
  128. }()
  129. // Serve HTTP or HTTPS
  130. httpServer := &http.Server{
  131. ReadTimeout: MEEK_HTTP_CLIENT_READ_TIMEOUT,
  132. WriteTimeout: MEEK_HTTP_CLIENT_WRITE_TIMEOUT,
  133. Handler: server,
  134. ConnState: server.httpConnStateCallback,
  135. // Disable auto HTTP/2 (https://golang.org/doc/go1.6)
  136. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  137. }
  138. // Note: Serve() will be interrupted by listener.Close() call
  139. var err error
  140. if server.tlsConfig != nil {
  141. httpServer.TLSConfig = server.tlsConfig
  142. httpsServer := psiphon.HTTPSServer{Server: *httpServer}
  143. err = httpsServer.ServeTLS(server.listener)
  144. } else {
  145. err = httpServer.Serve(server.listener)
  146. }
  147. // Can't check for the exact error that Close() will cause in Accept(),
  148. // (see: https://code.google.com/p/go/issues/detail?id=4373). So using an
  149. // explicit stop signal to stop gracefully.
  150. select {
  151. case <-server.stopBroadcast:
  152. err = nil
  153. default:
  154. }
  155. reaperWaitGroup.Wait()
  156. return err
  157. }
  158. // ServeHTTP handles meek client HTTP requests, where the request body
  159. // contains upstream traffic and the response will contain downstream
  160. // traffic.
  161. func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
  162. // Note: no longer requiring that the request method is POST
  163. // Check for the expected meek/session ID cookie.
  164. // Also check for prohibited HTTP headers.
  165. var meekCookie *http.Cookie
  166. for _, c := range request.Cookies() {
  167. meekCookie = c
  168. break
  169. }
  170. if meekCookie == nil || len(meekCookie.Value) == 0 {
  171. log.WithContext().Warning("missing meek cookie")
  172. server.terminateConnection(responseWriter, request)
  173. return
  174. }
  175. if len(server.support.Config.MeekProhibitedHeaders) > 0 {
  176. for _, header := range server.support.Config.MeekProhibitedHeaders {
  177. value := request.Header.Get(header)
  178. if header != "" {
  179. log.WithContextFields(LogFields{
  180. "header": header,
  181. "value": value,
  182. }).Warning("prohibited meek header")
  183. server.terminateConnection(responseWriter, request)
  184. return
  185. }
  186. }
  187. }
  188. // Lookup or create a new session for given meek cookie/session ID.
  189. sessionID, session, err := server.getSession(request, meekCookie)
  190. if err != nil {
  191. log.WithContextFields(LogFields{"error": err}).Warning("session lookup failed")
  192. server.terminateConnection(responseWriter, request)
  193. return
  194. }
  195. // PumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
  196. // read the request body as upstream traffic.
  197. // TODO: run PumpReads and PumpWrites concurrently?
  198. err = session.clientConn.PumpReads(request.Body)
  199. if err != nil {
  200. if err != io.EOF {
  201. log.WithContextFields(LogFields{"error": err}).Warning("pump reads failed")
  202. }
  203. server.terminateConnection(responseWriter, request)
  204. server.closeSession(sessionID)
  205. return
  206. }
  207. // Set cookie before writing the response.
  208. if session.meekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 && session.sessionIDSent == false {
  209. // Replace the meek cookie with the session ID.
  210. // SetCookie for the the session ID cookie is only set once, to reduce overhead. This
  211. // session ID value replaces the original meek cookie value.
  212. http.SetCookie(responseWriter, &http.Cookie{Name: meekCookie.Name, Value: sessionID})
  213. session.sessionIDSent = true
  214. }
  215. // PumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
  216. // write its downstream traffic through to the response body.
  217. err = session.clientConn.PumpWrites(responseWriter)
  218. if err != nil {
  219. if err != io.EOF {
  220. log.WithContextFields(LogFields{"error": err}).Warning("pump writes failed")
  221. }
  222. server.terminateConnection(responseWriter, request)
  223. server.closeSession(sessionID)
  224. return
  225. }
  226. }
  227. // getSession returns the meek client session corresponding the
  228. // meek cookie/session ID. If no session is found, the cookie is
  229. // treated as a meek cookie for a new session and its payload is
  230. // extracted and used to establish a new session.
  231. func (server *MeekServer) getSession(
  232. request *http.Request, meekCookie *http.Cookie) (string, *meekSession, error) {
  233. // Check for an existing session
  234. server.sessionsLock.RLock()
  235. existingSessionID := meekCookie.Value
  236. session, ok := server.sessions[existingSessionID]
  237. server.sessionsLock.RUnlock()
  238. if ok {
  239. session.touch()
  240. return existingSessionID, session, nil
  241. }
  242. // TODO: can multiple http client connections using same session cookie
  243. // cause race conditions on session struct?
  244. // The session is new (or expired). Treat the cookie value as a new meek
  245. // cookie, extract the payload, and create a new session.
  246. payloadJSON, err := getMeekCookiePayload(server.support, meekCookie.Value)
  247. if err != nil {
  248. return "", nil, psiphon.ContextError(err)
  249. }
  250. // Note: this meek server ignores all but Version MeekProtocolVersion;
  251. // the other values are legacy or currently unused.
  252. var clientSessionData struct {
  253. MeekProtocolVersion int `json:"v"`
  254. PsiphonClientSessionId string `json:"s"`
  255. PsiphonServerAddress string `json:"p"`
  256. }
  257. err = json.Unmarshal(payloadJSON, &clientSessionData)
  258. if err != nil {
  259. return "", nil, psiphon.ContextError(err)
  260. }
  261. // Determine the client remote address, which is used for geolocation
  262. // and stats. When an intermediate proxy of CDN is in use, we may be
  263. // able to determine the original client address by inspecting HTTP
  264. // headers such as X-Forwarded-For.
  265. clientIP := strings.Split(request.RemoteAddr, ":")[0]
  266. if len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
  267. for _, header := range server.support.Config.MeekProxyForwardedForHeaders {
  268. value := request.Header.Get(header)
  269. if len(value) > 0 {
  270. // Some headers, such as X-Forwarded-For, are a comma-separated
  271. // list of IPs (each proxy in a chain). The first IP should be
  272. // the client IP.
  273. proxyClientIP := strings.Split(header, ",")[0]
  274. if net.ParseIP(clientIP) != nil {
  275. clientIP = proxyClientIP
  276. break
  277. }
  278. }
  279. }
  280. }
  281. // Create a new meek conn that will relay the payload
  282. // between meek request/responses and the tunnel server client
  283. // handler. The client IP is also used to initialize the
  284. // meek conn with a useful value to return when the tunnel
  285. // server calls conn.RemoteAddr() to get the client's IP address.
  286. // Assumes clientIP is a value IP address; the port value is a stub
  287. // and is expected to be ignored.
  288. clientConn := newMeekConn(
  289. &net.TCPAddr{
  290. IP: net.ParseIP(clientIP),
  291. Port: 0,
  292. },
  293. clientSessionData.MeekProtocolVersion)
  294. session = &meekSession{
  295. clientConn: clientConn,
  296. meekProtocolVersion: clientSessionData.MeekProtocolVersion,
  297. sessionIDSent: false,
  298. }
  299. session.touch()
  300. // Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
  301. // meek cookie to a session ID; v1 clients always send the
  302. // original meek cookie value with each request. The issue with
  303. // v1 is that clients which wake after a device sleep will attempt
  304. // to resume a meek session and the server can't differentiate
  305. // between resuming a session and creating a new session. This
  306. // causes the v1 client connection to hang/timeout.
  307. sessionID := meekCookie.Value
  308. if clientSessionData.MeekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 {
  309. sessionID, err = makeMeekSessionID()
  310. if err != nil {
  311. return "", nil, psiphon.ContextError(err)
  312. }
  313. }
  314. server.sessionsLock.Lock()
  315. server.sessions[sessionID] = session
  316. server.sessionsLock.Unlock()
  317. // Note: from the tunnel server's perspective, this client connection
  318. // will close when closeSessionHelper calls Close() on the meekConn.
  319. server.clientHandler(session.clientConn)
  320. return sessionID, session, nil
  321. }
  322. func (server *MeekServer) closeSessionHelper(
  323. sessionID string, session *meekSession) {
  324. // TODO: close the persistent HTTP client connection, if one exists
  325. session.clientConn.Close()
  326. // Note: assumes caller holds lock on sessionsLock
  327. delete(server.sessions, sessionID)
  328. }
  329. func (server *MeekServer) closeSession(sessionID string) {
  330. server.sessionsLock.Lock()
  331. session, ok := server.sessions[sessionID]
  332. if ok {
  333. server.closeSessionHelper(sessionID, session)
  334. }
  335. server.sessionsLock.Unlock()
  336. }
  337. func (server *MeekServer) closeExpireSessions() {
  338. server.sessionsLock.Lock()
  339. for sessionID, session := range server.sessions {
  340. if session.expired() {
  341. server.closeSessionHelper(sessionID, session)
  342. }
  343. }
  344. server.sessionsLock.Unlock()
  345. }
  346. // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
  347. // meek server.
  348. func (server *MeekServer) httpConnStateCallback(conn net.Conn, connState http.ConnState) {
  349. switch connState {
  350. case http.StateNew:
  351. server.openConns.Add(conn)
  352. case http.StateHijacked, http.StateClosed:
  353. server.openConns.Remove(conn)
  354. }
  355. }
  356. // terminateConnection sends a 404 response to a client and also closes
  357. // a persisitent connection.
  358. func (server *MeekServer) terminateConnection(
  359. responseWriter http.ResponseWriter, request *http.Request) {
  360. http.NotFound(responseWriter, request)
  361. hijack, ok := responseWriter.(http.Hijacker)
  362. if !ok {
  363. return
  364. }
  365. conn, buffer, err := hijack.Hijack()
  366. if err != nil {
  367. return
  368. }
  369. buffer.Flush()
  370. conn.Close()
  371. }
  372. type meekSession struct {
  373. clientConn *meekConn
  374. meekProtocolVersion int
  375. sessionIDSent bool
  376. lastActivity int64
  377. }
  378. func (session *meekSession) touch() {
  379. atomic.StoreInt64(&session.lastActivity, time.Now().UnixNano())
  380. }
  381. func (session *meekSession) expired() bool {
  382. lastActivity := atomic.LoadInt64(&session.lastActivity)
  383. return time.Since(time.Unix(0, lastActivity)) > MEEK_MAX_SESSION_STALENESS
  384. }
  385. // makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
  386. // Currently, this config is optimized for fronted meek where the nature
  387. // of the connection is non-circumvention; it's optimized for performance
  388. // assuming the peer is an uncensored CDN.
  389. func makeMeekTLSConfig(support *SupportServices) (*tls.Config, error) {
  390. certificate, privateKey, err := GenerateWebServerCertificate(
  391. support.Config.MeekCertificateCommonName)
  392. if err != nil {
  393. return nil, psiphon.ContextError(err)
  394. }
  395. tlsCertificate, err := tls.X509KeyPair(
  396. []byte(certificate), []byte(privateKey))
  397. if err != nil {
  398. return nil, psiphon.ContextError(err)
  399. }
  400. return &tls.Config{
  401. Certificates: []tls.Certificate{tlsCertificate},
  402. NextProtos: []string{"http/1.1"},
  403. MinVersion: tls.VersionTLS10,
  404. // This is a reordering of the supported CipherSuites in golang 1.6. Non-ephemeral key
  405. // CipherSuites greatly reduce server load, and we try to select these since the meek
  406. // protocol is providing obfuscation, not privacy/integrity (this is provided by the
  407. // tunneled SSH), so we don't benefit from the perfect forward secrecy property provided
  408. // by ephemeral key CipherSuites.
  409. // https://github.com/golang/go/blob/1cb3044c9fcd88e1557eca1bf35845a4108bc1db/src/crypto/tls/cipher_suites.go#L75
  410. CipherSuites: []uint16{
  411. tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
  412. tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
  413. tls.TLS_RSA_WITH_RC4_128_SHA,
  414. tls.TLS_RSA_WITH_AES_128_CBC_SHA,
  415. tls.TLS_RSA_WITH_AES_256_CBC_SHA,
  416. tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
  417. tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
  418. tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
  419. tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  420. tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
  421. tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA,
  422. tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA,
  423. tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
  424. tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
  425. tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
  426. tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
  427. tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
  428. },
  429. PreferServerCipherSuites: true,
  430. }, nil
  431. }
  432. // getMeekCookiePayload extracts the payload from a meek cookie. The cookie
  433. // paylod is base64 encoded, obfuscated, and NaCl encrypted.
  434. func getMeekCookiePayload(support *SupportServices, cookieValue string) ([]byte, error) {
  435. decodedValue, err := base64.StdEncoding.DecodeString(cookieValue)
  436. if err != nil {
  437. return nil, psiphon.ContextError(err)
  438. }
  439. // The data consists of an obfuscated seed message prepended
  440. // to the obfuscated, encrypted payload. The server obfuscator
  441. // will read the seed message, leaving the remaining encrypted
  442. // data in the reader.
  443. reader := bytes.NewReader(decodedValue[:])
  444. obfuscator, err := psiphon.NewServerObfuscator(
  445. reader,
  446. &psiphon.ObfuscatorConfig{Keyword: support.Config.MeekObfuscatedKey})
  447. if err != nil {
  448. return nil, psiphon.ContextError(err)
  449. }
  450. offset, err := reader.Seek(0, 1)
  451. if err != nil {
  452. return nil, psiphon.ContextError(err)
  453. }
  454. encryptedPayload := decodedValue[offset:]
  455. obfuscator.ObfuscateClientToServer(encryptedPayload)
  456. var nonce [24]byte
  457. var privateKey, ephemeralPublicKey [32]byte
  458. decodedPrivateKey, err := base64.StdEncoding.DecodeString(
  459. support.Config.MeekCookieEncryptionPrivateKey)
  460. if err != nil {
  461. return nil, psiphon.ContextError(err)
  462. }
  463. copy(privateKey[:], decodedPrivateKey)
  464. if len(encryptedPayload) < 32 {
  465. return nil, psiphon.ContextError(errors.New("unexpected encrypted payload size"))
  466. }
  467. copy(ephemeralPublicKey[0:32], encryptedPayload[0:32])
  468. payload, ok := box.Open(nil, encryptedPayload[32:], &nonce, &ephemeralPublicKey, &privateKey)
  469. if !ok {
  470. return nil, psiphon.ContextError(errors.New("open box failed"))
  471. }
  472. return payload, nil
  473. }
  474. // makeMeekSessionID creates a new session ID. The variable size is intended to
  475. // frustrate traffic analysis of both plaintext and TLS meek traffic.
  476. func makeMeekSessionID() (string, error) {
  477. size := MEEK_MIN_SESSION_ID_LENGTH
  478. n, err := psiphon.MakeSecureRandomInt(MEEK_MAX_SESSION_ID_LENGTH - MEEK_MIN_SESSION_ID_LENGTH)
  479. if err != nil {
  480. return "", psiphon.ContextError(err)
  481. }
  482. size += n
  483. sessionID, err := psiphon.MakeRandomStringBase64(size)
  484. if err != nil {
  485. return "", psiphon.ContextError(err)
  486. }
  487. return sessionID, nil
  488. }
  489. // meekConn implements the net.Conn interface and is to be used as a client
  490. // connection by the tunnel server (being passed to sshServer.handleClient).
  491. // meekConn doesn't perform any real I/O, but instead shuttles io.Readers and
  492. // io.Writers between goroutines blocking on Read()s and Write()s.
  493. type meekConn struct {
  494. remoteAddr net.Addr
  495. protocolVersion int
  496. closeBroadcast chan struct{}
  497. closed int32
  498. readLock sync.Mutex
  499. readyReader chan io.Reader
  500. readResult chan error
  501. writeLock sync.Mutex
  502. nextWriteBuffer chan []byte
  503. writeResult chan error
  504. }
  505. func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
  506. return &meekConn{
  507. remoteAddr: remoteAddr,
  508. protocolVersion: protocolVersion,
  509. closeBroadcast: make(chan struct{}),
  510. closed: 0,
  511. readyReader: make(chan io.Reader, 1),
  512. readResult: make(chan error, 1),
  513. nextWriteBuffer: make(chan []byte, 1),
  514. writeResult: make(chan error, 1),
  515. }
  516. }
  517. // PumpReads causes goroutines blocking on meekConn.Read() to read
  518. // from the specified reader. This function blocks until the reader
  519. // is fully consumed or the meekConn is closed.
  520. // Note: channel scheme assumes only one concurrent call to PumpReads
  521. func (conn *meekConn) PumpReads(reader io.Reader) error {
  522. // Assumes that readyReader won't block.
  523. conn.readyReader <- reader
  524. // Receiving readResult means Read(s) have consumed the
  525. // reader sent to readyReader. readyReader is now empty and
  526. // no reference is kept to the reader.
  527. select {
  528. case err := <-conn.readResult:
  529. return err
  530. case <-conn.closeBroadcast:
  531. return io.EOF
  532. }
  533. }
  534. // Read reads from the meekConn into buffer. Read blocks until
  535. // some data is read or the meekConn closes. Under the hood, it
  536. // waits for PumpReads to submit a reader to read from.
  537. // Note: lock is to conform with net.Conn concurrency semantics
  538. func (conn *meekConn) Read(buffer []byte) (int, error) {
  539. conn.readLock.Lock()
  540. defer conn.readLock.Unlock()
  541. var reader io.Reader
  542. select {
  543. case reader = <-conn.readyReader:
  544. case <-conn.closeBroadcast:
  545. return 0, io.EOF
  546. }
  547. n, err := reader.Read(buffer)
  548. if err != nil {
  549. if err == io.EOF {
  550. err = nil
  551. }
  552. // Assumes readerResult won't block.
  553. conn.readResult <- err
  554. } else {
  555. // There may be more data in the reader, but the caller's
  556. // buffer is full, so put the reader back into the ready
  557. // channel. PumpReads remains blocked waiting for another
  558. // Read call.
  559. // Note that the reader could be at EOF, while another call is
  560. // required to get that result (https://golang.org/pkg/io/#Reader).
  561. conn.readyReader <- reader
  562. }
  563. return n, err
  564. }
  565. // PumpReads causes goroutines blocking on meekConn.Write() to write
  566. // to the specified writer. This function blocks until the meek response
  567. // body limits (size for protocol v1, turn around time for protocol v2+)
  568. // are met, or the meekConn is closed.
  569. // Note: channel scheme assumes only one concurrent call to PumpWrites
  570. func (conn *meekConn) PumpWrites(writer io.Writer) error {
  571. startTime := time.Now()
  572. timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
  573. defer timeout.Stop()
  574. for {
  575. select {
  576. case buffer := <-conn.nextWriteBuffer:
  577. _, err := writer.Write(buffer)
  578. // Assumes that writeResult won't block.
  579. // Note: always send the err to writeResult,
  580. // as the Write() caller is blocking on this.
  581. conn.writeResult <- err
  582. if err != nil {
  583. return err
  584. }
  585. if conn.protocolVersion < MEEK_PROTOCOL_VERSION_2 {
  586. // Protocol v1 clients expect at most
  587. // MEEK_MAX_PAYLOAD_LENGTH response bodies
  588. return nil
  589. }
  590. totalElapsedTime := time.Now().Sub(startTime) / time.Millisecond
  591. if totalElapsedTime >= MEEK_EXTENDED_TURN_AROUND_TIMEOUT {
  592. return nil
  593. }
  594. timeout.Reset(MEEK_TURN_AROUND_TIMEOUT)
  595. case <-timeout.C:
  596. return nil
  597. case <-conn.closeBroadcast:
  598. return io.EOF
  599. }
  600. }
  601. }
  602. // Write writes the buffer to the meekConn. It blocks until the
  603. // entire buffer is written to or the meekConn closes. Under the
  604. // hood, it waits for sufficient PumpWrites calls to consume the
  605. // write buffer.
  606. // Note: lock is to conform with net.Conn concurrency semantics
  607. func (conn *meekConn) Write(buffer []byte) (int, error) {
  608. conn.writeLock.Lock()
  609. defer conn.writeLock.Unlock()
  610. // TODO: may be more efficient to send whole buffer
  611. // and have PumpWrites stash partial buffer when can't
  612. // send it all.
  613. n := 0
  614. for n < len(buffer) {
  615. end := n + MEEK_MAX_PAYLOAD_LENGTH
  616. if end > len(buffer) {
  617. end = len(buffer)
  618. }
  619. // Only write MEEK_MAX_PAYLOAD_LENGTH at a time,
  620. // to ensure compatibility with v1 protocol.
  621. chunk := buffer[n:end]
  622. select {
  623. case conn.nextWriteBuffer <- chunk:
  624. case <-conn.closeBroadcast:
  625. return n, io.EOF
  626. }
  627. // Wait for the buffer to be processed.
  628. select {
  629. case err := <-conn.writeResult:
  630. if err != nil {
  631. return n, err
  632. }
  633. case <-conn.closeBroadcast:
  634. return n, io.EOF
  635. }
  636. n += len(chunk)
  637. }
  638. return n, nil
  639. }
  640. // Close closes the meekConn. This will interrupt any blocked
  641. // Read, Write, PumpReads, and PumpWrites.
  642. func (conn *meekConn) Close() error {
  643. if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
  644. close(conn.closeBroadcast)
  645. }
  646. return nil
  647. }
  648. // Stub implementation of net.Conn.LocalAddr
  649. func (conn *meekConn) LocalAddr() net.Addr {
  650. return nil
  651. }
  652. // RemoteAddr returns the remoteAddr specified in newMeekConn. This
  653. // acts as a proxy for the actual remote address, which is either a
  654. // direct HTTP/HTTPS connection remote address, or in the case of
  655. // downstream proxy of CDN fronts, some other value determined via
  656. // HTTP headers.
  657. func (conn *meekConn) RemoteAddr() net.Addr {
  658. return conn.remoteAddr
  659. }
  660. // Stub implementation of net.Conn.SetDeadline
  661. func (conn *meekConn) SetDeadline(t time.Time) error {
  662. return psiphon.ContextError(errors.New("not supported"))
  663. }
  664. // Stub implementation of net.Conn.SetReadDeadline
  665. func (conn *meekConn) SetReadDeadline(t time.Time) error {
  666. return psiphon.ContextError(errors.New("not supported"))
  667. }
  668. // Stub implementation of net.Conn.SetWriteDeadline
  669. func (conn *meekConn) SetWriteDeadline(t time.Time) error {
  670. return psiphon.ContextError(errors.New("not supported"))
  671. }