meek.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "crypto/tls"
  23. "encoding/base64"
  24. "encoding/json"
  25. "errors"
  26. "io"
  27. "net"
  28. "net/http"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  35. "golang.org/x/crypto/nacl/box"
  36. )
  37. // MeekServer is based on meek-server.go from Tor and Psiphon:
  38. //
  39. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  40. // CC0 1.0 Universal
  41. //
  42. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  43. const (
  44. // Protocol version 1 clients can handle arbitrary length response bodies. Older clients
  45. // report no version number and expect at most 64K response bodies.
  46. MEEK_PROTOCOL_VERSION_1 = 1
  47. // Protocol version 2 clients initiate a session by sending a encrypted and obfuscated meek
  48. // cookie with their initial HTTP request. Connection information is contained within the
  49. // encrypted cookie payload. The server inspects the cookie and establishes a new session and
  50. // returns a new random session ID back to client via Set-Cookie header. The client uses this
  51. // session ID on all subsequent requests for the remainder of the session.
  52. MEEK_PROTOCOL_VERSION_2 = 2
  53. MEEK_MAX_PAYLOAD_LENGTH = 0x10000
  54. MEEK_TURN_AROUND_TIMEOUT = 20 * time.Millisecond
  55. MEEK_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
  56. MEEK_MAX_SESSION_STALENESS = 45 * time.Second
  57. MEEK_HTTP_CLIENT_IO_TIMEOUT = 45 * time.Second
  58. MEEK_MIN_SESSION_ID_LENGTH = 8
  59. MEEK_MAX_SESSION_ID_LENGTH = 20
  60. )
  61. // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
  62. // Obfusated SSH traffic) over HTTP. Meek may be fronted (through a CDN) or direct and may be
  63. // HTTP or HTTPS.
  64. //
  65. // Upstream traffic arrives in HTTP request bodies and downstream traffic is sent in response
  66. // bodies. The sequence of traffic for a given flow is associated using a session ID that's
  67. // set as a HTTP cookie for the client to submit with each request.
  68. //
  69. // MeekServer hooks into TunnelServer via the net.Conn interface by transforming the
  70. // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
  71. // the meekConn struct.
  72. type MeekServer struct {
  73. support *SupportServices
  74. listener net.Listener
  75. tlsConfig *tls.Config
  76. clientHandler func(clientConn net.Conn)
  77. openConns *common.Conns
  78. stopBroadcast <-chan struct{}
  79. sessionsLock sync.RWMutex
  80. sessions map[string]*meekSession
  81. }
  82. // NewMeekServer initializes a new meek server.
  83. func NewMeekServer(
  84. support *SupportServices,
  85. listener net.Listener,
  86. useTLS bool,
  87. clientHandler func(clientConn net.Conn),
  88. stopBroadcast <-chan struct{}) (*MeekServer, error) {
  89. meekServer := &MeekServer{
  90. support: support,
  91. listener: listener,
  92. clientHandler: clientHandler,
  93. openConns: new(common.Conns),
  94. stopBroadcast: stopBroadcast,
  95. sessions: make(map[string]*meekSession),
  96. }
  97. if useTLS {
  98. tlsConfig, err := makeMeekTLSConfig(support)
  99. if err != nil {
  100. return nil, common.ContextError(err)
  101. }
  102. meekServer.tlsConfig = tlsConfig
  103. }
  104. return meekServer, nil
  105. }
  106. // Run runs the meek server; this function blocks while serving HTTP or
  107. // HTTPS connections on the specified listener. This function also runs
  108. // a goroutine which cleans up expired meek client sessions.
  109. //
  110. // To stop the meek server, both Close() the listener and set the stopBroadcast
  111. // signal specified in NewMeekServer.
  112. func (server *MeekServer) Run() error {
  113. defer server.listener.Close()
  114. defer server.openConns.CloseAll()
  115. // Expire sessions
  116. reaperWaitGroup := new(sync.WaitGroup)
  117. reaperWaitGroup.Add(1)
  118. go func() {
  119. defer reaperWaitGroup.Done()
  120. ticker := time.NewTicker(MEEK_MAX_SESSION_STALENESS / 2)
  121. defer ticker.Stop()
  122. for {
  123. select {
  124. case <-ticker.C:
  125. server.closeExpireSessions()
  126. case <-server.stopBroadcast:
  127. return
  128. }
  129. }
  130. }()
  131. // Serve HTTP or HTTPS
  132. // Notes:
  133. // - WriteTimeout may include time awaiting request, as per:
  134. // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
  135. // - Legacy meek-server wrapped each client HTTP connection with an explict idle
  136. // timeout net.Conn and didn't use http.Server timeouts. We could do the same
  137. // here (use ActivityMonitoredConn) but the stock http.Server timeouts should
  138. // now be sufficient.
  139. httpServer := &http.Server{
  140. ReadTimeout: MEEK_HTTP_CLIENT_IO_TIMEOUT,
  141. WriteTimeout: MEEK_HTTP_CLIENT_IO_TIMEOUT,
  142. Handler: server,
  143. ConnState: server.httpConnStateCallback,
  144. // Disable auto HTTP/2 (https://golang.org/doc/go1.6)
  145. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  146. }
  147. // Note: Serve() will be interrupted by listener.Close() call
  148. var err error
  149. if server.tlsConfig != nil {
  150. httpServer.TLSConfig = server.tlsConfig
  151. httpsServer := HTTPSServer{Server: *httpServer}
  152. err = httpsServer.ServeTLS(server.listener)
  153. } else {
  154. err = httpServer.Serve(server.listener)
  155. }
  156. // Can't check for the exact error that Close() will cause in Accept(),
  157. // (see: https://code.google.com/p/go/issues/detail?id=4373). So using an
  158. // explicit stop signal to stop gracefully.
  159. select {
  160. case <-server.stopBroadcast:
  161. err = nil
  162. default:
  163. }
  164. reaperWaitGroup.Wait()
  165. return err
  166. }
  167. // ServeHTTP handles meek client HTTP requests, where the request body
  168. // contains upstream traffic and the response will contain downstream
  169. // traffic.
  170. func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
  171. // Note: no longer requiring that the request method is POST
  172. // Check for the expected meek/session ID cookie.
  173. // Also check for prohibited HTTP headers.
  174. var meekCookie *http.Cookie
  175. for _, c := range request.Cookies() {
  176. meekCookie = c
  177. break
  178. }
  179. if meekCookie == nil || len(meekCookie.Value) == 0 {
  180. log.WithContext().Warning("missing meek cookie")
  181. server.terminateConnection(responseWriter, request)
  182. return
  183. }
  184. if len(server.support.Config.MeekProhibitedHeaders) > 0 {
  185. for _, header := range server.support.Config.MeekProhibitedHeaders {
  186. value := request.Header.Get(header)
  187. if header != "" {
  188. log.WithContextFields(LogFields{
  189. "header": header,
  190. "value": value,
  191. }).Warning("prohibited meek header")
  192. server.terminateConnection(responseWriter, request)
  193. return
  194. }
  195. }
  196. }
  197. // Lookup or create a new session for given meek cookie/session ID.
  198. sessionID, session, err := server.getSession(request, meekCookie)
  199. if err != nil {
  200. log.WithContextFields(LogFields{"error": err}).Warning("session lookup failed")
  201. server.terminateConnection(responseWriter, request)
  202. return
  203. }
  204. // PumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
  205. // read the request body as upstream traffic.
  206. // TODO: run PumpReads and PumpWrites concurrently?
  207. err = session.clientConn.PumpReads(request.Body)
  208. if err != nil {
  209. if err != io.EOF {
  210. log.WithContextFields(LogFields{"error": err}).Warning("pump reads failed")
  211. }
  212. server.terminateConnection(responseWriter, request)
  213. server.closeSession(sessionID)
  214. return
  215. }
  216. // Set cookie before writing the response.
  217. if session.meekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 && session.sessionIDSent == false {
  218. // Replace the meek cookie with the session ID.
  219. // SetCookie for the the session ID cookie is only set once, to reduce overhead. This
  220. // session ID value replaces the original meek cookie value.
  221. http.SetCookie(responseWriter, &http.Cookie{Name: meekCookie.Name, Value: sessionID})
  222. session.sessionIDSent = true
  223. }
  224. // PumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
  225. // write its downstream traffic through to the response body.
  226. err = session.clientConn.PumpWrites(responseWriter)
  227. if err != nil {
  228. if err != io.EOF {
  229. log.WithContextFields(LogFields{"error": err}).Warning("pump writes failed")
  230. }
  231. server.terminateConnection(responseWriter, request)
  232. server.closeSession(sessionID)
  233. return
  234. }
  235. }
  236. // getSession returns the meek client session corresponding the
  237. // meek cookie/session ID. If no session is found, the cookie is
  238. // treated as a meek cookie for a new session and its payload is
  239. // extracted and used to establish a new session.
  240. func (server *MeekServer) getSession(
  241. request *http.Request, meekCookie *http.Cookie) (string, *meekSession, error) {
  242. // Check for an existing session
  243. server.sessionsLock.RLock()
  244. existingSessionID := meekCookie.Value
  245. session, ok := server.sessions[existingSessionID]
  246. server.sessionsLock.RUnlock()
  247. if ok {
  248. session.touch()
  249. return existingSessionID, session, nil
  250. }
  251. // TODO: can multiple http client connections using same session cookie
  252. // cause race conditions on session struct?
  253. // The session is new (or expired). Treat the cookie value as a new meek
  254. // cookie, extract the payload, and create a new session.
  255. payloadJSON, err := getMeekCookiePayload(server.support, meekCookie.Value)
  256. if err != nil {
  257. return "", nil, common.ContextError(err)
  258. }
  259. // Note: this meek server ignores all but Version MeekProtocolVersion;
  260. // the other values are legacy or currently unused.
  261. var clientSessionData struct {
  262. MeekProtocolVersion int `json:"v"`
  263. PsiphonClientSessionId string `json:"s"`
  264. PsiphonServerAddress string `json:"p"`
  265. }
  266. err = json.Unmarshal(payloadJSON, &clientSessionData)
  267. if err != nil {
  268. return "", nil, common.ContextError(err)
  269. }
  270. // Determine the client remote address, which is used for geolocation
  271. // and stats. When an intermediate proxy or CDN is in use, we may be
  272. // able to determine the original client address by inspecting HTTP
  273. // headers such as X-Forwarded-For.
  274. clientIP := strings.Split(request.RemoteAddr, ":")[0]
  275. if len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
  276. for _, header := range server.support.Config.MeekProxyForwardedForHeaders {
  277. value := request.Header.Get(header)
  278. if len(value) > 0 {
  279. // Some headers, such as X-Forwarded-For, are a comma-separated
  280. // list of IPs (each proxy in a chain). The first IP should be
  281. // the client IP.
  282. proxyClientIP := strings.Split(header, ",")[0]
  283. if net.ParseIP(proxyClientIP) != nil {
  284. clientIP = proxyClientIP
  285. break
  286. }
  287. }
  288. }
  289. }
  290. // Create a new meek conn that will relay the payload
  291. // between meek request/responses and the tunnel server client
  292. // handler. The client IP is also used to initialize the
  293. // meek conn with a useful value to return when the tunnel
  294. // server calls conn.RemoteAddr() to get the client's IP address.
  295. // Assumes clientIP is a valid IP address; the port value is a stub
  296. // and is expected to be ignored.
  297. clientConn := newMeekConn(
  298. &net.TCPAddr{
  299. IP: net.ParseIP(clientIP),
  300. Port: 0,
  301. },
  302. clientSessionData.MeekProtocolVersion)
  303. session = &meekSession{
  304. clientConn: clientConn,
  305. meekProtocolVersion: clientSessionData.MeekProtocolVersion,
  306. sessionIDSent: false,
  307. }
  308. session.touch()
  309. // Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
  310. // meek cookie to a session ID; v1 clients always send the
  311. // original meek cookie value with each request. The issue with
  312. // v1 is that clients which wake after a device sleep will attempt
  313. // to resume a meek session and the server can't differentiate
  314. // between resuming a session and creating a new session. This
  315. // causes the v1 client connection to hang/timeout.
  316. sessionID := meekCookie.Value
  317. if clientSessionData.MeekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 {
  318. sessionID, err = makeMeekSessionID()
  319. if err != nil {
  320. return "", nil, common.ContextError(err)
  321. }
  322. }
  323. server.sessionsLock.Lock()
  324. server.sessions[sessionID] = session
  325. server.sessionsLock.Unlock()
  326. // Note: from the tunnel server's perspective, this client connection
  327. // will close when closeSessionHelper calls Close() on the meekConn.
  328. server.clientHandler(session.clientConn)
  329. return sessionID, session, nil
  330. }
  331. func (server *MeekServer) closeSessionHelper(
  332. sessionID string, session *meekSession) {
  333. // TODO: close the persistent HTTP client connection, if one exists
  334. session.clientConn.Close()
  335. // Note: assumes caller holds lock on sessionsLock
  336. delete(server.sessions, sessionID)
  337. }
  338. func (server *MeekServer) closeSession(sessionID string) {
  339. server.sessionsLock.Lock()
  340. session, ok := server.sessions[sessionID]
  341. if ok {
  342. server.closeSessionHelper(sessionID, session)
  343. }
  344. server.sessionsLock.Unlock()
  345. }
  346. func (server *MeekServer) closeExpireSessions() {
  347. server.sessionsLock.Lock()
  348. for sessionID, session := range server.sessions {
  349. if session.expired() {
  350. server.closeSessionHelper(sessionID, session)
  351. }
  352. }
  353. server.sessionsLock.Unlock()
  354. }
  355. // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
  356. // meek server.
  357. func (server *MeekServer) httpConnStateCallback(conn net.Conn, connState http.ConnState) {
  358. switch connState {
  359. case http.StateNew:
  360. server.openConns.Add(conn)
  361. case http.StateHijacked, http.StateClosed:
  362. server.openConns.Remove(conn)
  363. }
  364. }
  365. // terminateConnection sends a 404 response to a client and also closes
  366. // a persisitent connection.
  367. func (server *MeekServer) terminateConnection(
  368. responseWriter http.ResponseWriter, request *http.Request) {
  369. http.NotFound(responseWriter, request)
  370. hijack, ok := responseWriter.(http.Hijacker)
  371. if !ok {
  372. return
  373. }
  374. conn, buffer, err := hijack.Hijack()
  375. if err != nil {
  376. return
  377. }
  378. buffer.Flush()
  379. conn.Close()
  380. }
  381. type meekSession struct {
  382. // Note: 64-bit ints used with atomic operations are at placed
  383. // at the start of struct to ensure 64-bit alignment.
  384. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  385. lastActivity int64
  386. clientConn *meekConn
  387. meekProtocolVersion int
  388. sessionIDSent bool
  389. }
  390. func (session *meekSession) touch() {
  391. atomic.StoreInt64(&session.lastActivity, time.Now().UnixNano())
  392. }
  393. func (session *meekSession) expired() bool {
  394. lastActivity := atomic.LoadInt64(&session.lastActivity)
  395. return time.Since(time.Unix(0, lastActivity)) > MEEK_MAX_SESSION_STALENESS
  396. }
  397. // makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
  398. // Currently, this config is optimized for fronted meek where the nature
  399. // of the connection is non-circumvention; it's optimized for performance
  400. // assuming the peer is an uncensored CDN.
  401. func makeMeekTLSConfig(support *SupportServices) (*tls.Config, error) {
  402. certificate, privateKey, err := GenerateWebServerCertificate(
  403. support.Config.MeekCertificateCommonName)
  404. if err != nil {
  405. return nil, common.ContextError(err)
  406. }
  407. tlsCertificate, err := tls.X509KeyPair(
  408. []byte(certificate), []byte(privateKey))
  409. if err != nil {
  410. return nil, common.ContextError(err)
  411. }
  412. return &tls.Config{
  413. Certificates: []tls.Certificate{tlsCertificate},
  414. NextProtos: []string{"http/1.1"},
  415. MinVersion: tls.VersionTLS10,
  416. // This is a reordering of the supported CipherSuites in golang 1.6. Non-ephemeral key
  417. // CipherSuites greatly reduce server load, and we try to select these since the meek
  418. // protocol is providing obfuscation, not privacy/integrity (this is provided by the
  419. // tunneled SSH), so we don't benefit from the perfect forward secrecy property provided
  420. // by ephemeral key CipherSuites.
  421. // https://github.com/golang/go/blob/1cb3044c9fcd88e1557eca1bf35845a4108bc1db/src/crypto/tls/cipher_suites.go#L75
  422. CipherSuites: []uint16{
  423. tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
  424. tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
  425. tls.TLS_RSA_WITH_RC4_128_SHA,
  426. tls.TLS_RSA_WITH_AES_128_CBC_SHA,
  427. tls.TLS_RSA_WITH_AES_256_CBC_SHA,
  428. tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
  429. tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
  430. tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
  431. tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  432. tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
  433. tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA,
  434. tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA,
  435. tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
  436. tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
  437. tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
  438. tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
  439. tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
  440. },
  441. PreferServerCipherSuites: true,
  442. }, nil
  443. }
  444. // getMeekCookiePayload extracts the payload from a meek cookie. The cookie
  445. // paylod is base64 encoded, obfuscated, and NaCl encrypted.
  446. func getMeekCookiePayload(support *SupportServices, cookieValue string) ([]byte, error) {
  447. decodedValue, err := base64.StdEncoding.DecodeString(cookieValue)
  448. if err != nil {
  449. return nil, common.ContextError(err)
  450. }
  451. // The data consists of an obfuscated seed message prepended
  452. // to the obfuscated, encrypted payload. The server obfuscator
  453. // will read the seed message, leaving the remaining encrypted
  454. // data in the reader.
  455. reader := bytes.NewReader(decodedValue[:])
  456. obfuscator, err := psiphon.NewServerObfuscator(
  457. reader,
  458. &psiphon.ObfuscatorConfig{Keyword: support.Config.MeekObfuscatedKey})
  459. if err != nil {
  460. return nil, common.ContextError(err)
  461. }
  462. offset, err := reader.Seek(0, 1)
  463. if err != nil {
  464. return nil, common.ContextError(err)
  465. }
  466. encryptedPayload := decodedValue[offset:]
  467. obfuscator.ObfuscateClientToServer(encryptedPayload)
  468. var nonce [24]byte
  469. var privateKey, ephemeralPublicKey [32]byte
  470. decodedPrivateKey, err := base64.StdEncoding.DecodeString(
  471. support.Config.MeekCookieEncryptionPrivateKey)
  472. if err != nil {
  473. return nil, common.ContextError(err)
  474. }
  475. copy(privateKey[:], decodedPrivateKey)
  476. if len(encryptedPayload) < 32 {
  477. return nil, common.ContextError(errors.New("unexpected encrypted payload size"))
  478. }
  479. copy(ephemeralPublicKey[0:32], encryptedPayload[0:32])
  480. payload, ok := box.Open(nil, encryptedPayload[32:], &nonce, &ephemeralPublicKey, &privateKey)
  481. if !ok {
  482. return nil, common.ContextError(errors.New("open box failed"))
  483. }
  484. return payload, nil
  485. }
  486. // makeMeekSessionID creates a new session ID. The variable size is intended to
  487. // frustrate traffic analysis of both plaintext and TLS meek traffic.
  488. func makeMeekSessionID() (string, error) {
  489. size := MEEK_MIN_SESSION_ID_LENGTH
  490. n, err := common.MakeSecureRandomInt(MEEK_MAX_SESSION_ID_LENGTH - MEEK_MIN_SESSION_ID_LENGTH)
  491. if err != nil {
  492. return "", common.ContextError(err)
  493. }
  494. size += n
  495. sessionID, err := common.MakeRandomStringBase64(size)
  496. if err != nil {
  497. return "", common.ContextError(err)
  498. }
  499. return sessionID, nil
  500. }
  501. // meekConn implements the net.Conn interface and is to be used as a client
  502. // connection by the tunnel server (being passed to sshServer.handleClient).
  503. // meekConn doesn't perform any real I/O, but instead shuttles io.Readers and
  504. // io.Writers between goroutines blocking on Read()s and Write()s.
  505. type meekConn struct {
  506. remoteAddr net.Addr
  507. protocolVersion int
  508. closeBroadcast chan struct{}
  509. closed int32
  510. readLock sync.Mutex
  511. readyReader chan io.Reader
  512. readResult chan error
  513. writeLock sync.Mutex
  514. nextWriteBuffer chan []byte
  515. writeResult chan error
  516. }
  517. func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
  518. return &meekConn{
  519. remoteAddr: remoteAddr,
  520. protocolVersion: protocolVersion,
  521. closeBroadcast: make(chan struct{}),
  522. closed: 0,
  523. readyReader: make(chan io.Reader, 1),
  524. readResult: make(chan error, 1),
  525. nextWriteBuffer: make(chan []byte, 1),
  526. writeResult: make(chan error, 1),
  527. }
  528. }
  529. // PumpReads causes goroutines blocking on meekConn.Read() to read
  530. // from the specified reader. This function blocks until the reader
  531. // is fully consumed or the meekConn is closed.
  532. // Note: channel scheme assumes only one concurrent call to PumpReads
  533. func (conn *meekConn) PumpReads(reader io.Reader) error {
  534. // Assumes that readyReader won't block.
  535. conn.readyReader <- reader
  536. // Receiving readResult means Read(s) have consumed the
  537. // reader sent to readyReader. readyReader is now empty and
  538. // no reference is kept to the reader.
  539. select {
  540. case err := <-conn.readResult:
  541. return err
  542. case <-conn.closeBroadcast:
  543. return io.EOF
  544. }
  545. }
  546. // Read reads from the meekConn into buffer. Read blocks until
  547. // some data is read or the meekConn closes. Under the hood, it
  548. // waits for PumpReads to submit a reader to read from.
  549. // Note: lock is to conform with net.Conn concurrency semantics
  550. func (conn *meekConn) Read(buffer []byte) (int, error) {
  551. conn.readLock.Lock()
  552. defer conn.readLock.Unlock()
  553. var reader io.Reader
  554. select {
  555. case reader = <-conn.readyReader:
  556. case <-conn.closeBroadcast:
  557. return 0, io.EOF
  558. }
  559. n, err := reader.Read(buffer)
  560. if err != nil {
  561. if err == io.EOF {
  562. err = nil
  563. }
  564. // Assumes readerResult won't block.
  565. conn.readResult <- err
  566. } else {
  567. // There may be more data in the reader, but the caller's
  568. // buffer is full, so put the reader back into the ready
  569. // channel. PumpReads remains blocked waiting for another
  570. // Read call.
  571. // Note that the reader could be at EOF, while another call is
  572. // required to get that result (https://golang.org/pkg/io/#Reader).
  573. conn.readyReader <- reader
  574. }
  575. return n, err
  576. }
  577. // PumpReads causes goroutines blocking on meekConn.Write() to write
  578. // to the specified writer. This function blocks until the meek response
  579. // body limits (size for protocol v1, turn around time for protocol v2+)
  580. // are met, or the meekConn is closed.
  581. // Note: channel scheme assumes only one concurrent call to PumpWrites
  582. func (conn *meekConn) PumpWrites(writer io.Writer) error {
  583. startTime := time.Now()
  584. timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
  585. defer timeout.Stop()
  586. for {
  587. select {
  588. case buffer := <-conn.nextWriteBuffer:
  589. _, err := writer.Write(buffer)
  590. // Assumes that writeResult won't block.
  591. // Note: always send the err to writeResult,
  592. // as the Write() caller is blocking on this.
  593. conn.writeResult <- err
  594. if err != nil {
  595. return err
  596. }
  597. if conn.protocolVersion < MEEK_PROTOCOL_VERSION_2 {
  598. // Protocol v1 clients expect at most
  599. // MEEK_MAX_PAYLOAD_LENGTH response bodies
  600. return nil
  601. }
  602. totalElapsedTime := time.Now().Sub(startTime) / time.Millisecond
  603. if totalElapsedTime >= MEEK_EXTENDED_TURN_AROUND_TIMEOUT {
  604. return nil
  605. }
  606. timeout.Reset(MEEK_TURN_AROUND_TIMEOUT)
  607. case <-timeout.C:
  608. return nil
  609. case <-conn.closeBroadcast:
  610. return io.EOF
  611. }
  612. }
  613. }
  614. // Write writes the buffer to the meekConn. It blocks until the
  615. // entire buffer is written to or the meekConn closes. Under the
  616. // hood, it waits for sufficient PumpWrites calls to consume the
  617. // write buffer.
  618. // Note: lock is to conform with net.Conn concurrency semantics
  619. func (conn *meekConn) Write(buffer []byte) (int, error) {
  620. conn.writeLock.Lock()
  621. defer conn.writeLock.Unlock()
  622. // TODO: may be more efficient to send whole buffer
  623. // and have PumpWrites stash partial buffer when can't
  624. // send it all.
  625. n := 0
  626. for n < len(buffer) {
  627. end := n + MEEK_MAX_PAYLOAD_LENGTH
  628. if end > len(buffer) {
  629. end = len(buffer)
  630. }
  631. // Only write MEEK_MAX_PAYLOAD_LENGTH at a time,
  632. // to ensure compatibility with v1 protocol.
  633. chunk := buffer[n:end]
  634. select {
  635. case conn.nextWriteBuffer <- chunk:
  636. case <-conn.closeBroadcast:
  637. return n, io.EOF
  638. }
  639. // Wait for the buffer to be processed.
  640. select {
  641. case err := <-conn.writeResult:
  642. if err != nil {
  643. return n, err
  644. }
  645. case <-conn.closeBroadcast:
  646. return n, io.EOF
  647. }
  648. n += len(chunk)
  649. }
  650. return n, nil
  651. }
  652. // Close closes the meekConn. This will interrupt any blocked
  653. // Read, Write, PumpReads, and PumpWrites.
  654. func (conn *meekConn) Close() error {
  655. if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
  656. close(conn.closeBroadcast)
  657. }
  658. return nil
  659. }
  660. // Stub implementation of net.Conn.LocalAddr
  661. func (conn *meekConn) LocalAddr() net.Addr {
  662. return nil
  663. }
  664. // RemoteAddr returns the remoteAddr specified in newMeekConn. This
  665. // acts as a proxy for the actual remote address, which is either a
  666. // direct HTTP/HTTPS connection remote address, or in the case of
  667. // downstream proxy of CDN fronts, some other value determined via
  668. // HTTP headers.
  669. func (conn *meekConn) RemoteAddr() net.Addr {
  670. return conn.remoteAddr
  671. }
  672. // SetDeadline is not a true implementation of net.Conn.SetDeadline. It
  673. // merely checks that the requested timeout exceeds the MEEK_MAX_SESSION_STALENESS
  674. // period. When it does, and the session is idle, the meekConn Read/Write will
  675. // be interrupted and return io.EOF (not a timeout error) before the deadline.
  676. // In other words, this conn will approximate the desired functionality of
  677. // timing out on idle on or before the requested deadline.
  678. func (conn *meekConn) SetDeadline(t time.Time) error {
  679. // Overhead: nanoseconds (https://blog.cloudflare.com/its-go-time-on-linux/)
  680. if time.Now().Add(MEEK_MAX_SESSION_STALENESS).Before(t) {
  681. return nil
  682. }
  683. return common.ContextError(errors.New("not supported"))
  684. }
  685. // Stub implementation of net.Conn.SetReadDeadline
  686. func (conn *meekConn) SetReadDeadline(t time.Time) error {
  687. return common.ContextError(errors.New("not supported"))
  688. }
  689. // Stub implementation of net.Conn.SetWriteDeadline
  690. func (conn *meekConn) SetWriteDeadline(t time.Time) error {
  691. return common.ContextError(errors.New("not supported"))
  692. }