meek.go 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "crypto/rand"
  23. "crypto/tls"
  24. "encoding/base64"
  25. "encoding/hex"
  26. "encoding/json"
  27. std_errors "errors"
  28. "hash/crc64"
  29. "io"
  30. "net"
  31. "net/http"
  32. "runtime"
  33. "strconv"
  34. "strings"
  35. "sync"
  36. "sync/atomic"
  37. "time"
  38. "github.com/Psiphon-Labs/goarista/monotime"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/nacl/box"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
  46. tris "github.com/Psiphon-Labs/tls-tris"
  47. )
  48. // MeekServer is based on meek-server.go from Tor and Psiphon:
  49. //
  50. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  51. // CC0 1.0 Universal
  52. //
  53. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  54. const (
  55. // Protocol version 1 clients can handle arbitrary length response bodies. Older clients
  56. // report no version number and expect at most 64K response bodies.
  57. MEEK_PROTOCOL_VERSION_1 = 1
  58. // Protocol version 2 clients initiate a session by sending an encrypted and obfuscated meek
  59. // cookie with their initial HTTP request. Connection information is contained within the
  60. // encrypted cookie payload. The server inspects the cookie and establishes a new session and
  61. // returns a new random session ID back to client via Set-Cookie header. The client uses this
  62. // session ID on all subsequent requests for the remainder of the session.
  63. MEEK_PROTOCOL_VERSION_2 = 2
  64. // Protocol version 3 clients include resiliency enhancements and will add a Range header
  65. // when retrying a request for a partially downloaded response payload.
  66. MEEK_PROTOCOL_VERSION_3 = 3
  67. MEEK_MAX_REQUEST_PAYLOAD_LENGTH = 65536
  68. MEEK_TURN_AROUND_TIMEOUT = 20 * time.Millisecond
  69. MEEK_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
  70. MEEK_MAX_SESSION_STALENESS = 45 * time.Second
  71. MEEK_HTTP_CLIENT_IO_TIMEOUT = 45 * time.Second
  72. MEEK_MIN_SESSION_ID_LENGTH = 8
  73. MEEK_MAX_SESSION_ID_LENGTH = 20
  74. MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
  75. MEEK_DEFAULT_POOL_BUFFER_LENGTH = 65536
  76. MEEK_DEFAULT_POOL_BUFFER_COUNT = 2048
  77. )
  78. // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
  79. // Obfuscated SSH traffic) over HTTP. Meek may be fronted (through a CDN) or direct and may be
  80. // HTTP or HTTPS.
  81. //
  82. // Upstream traffic arrives in HTTP request bodies and downstream traffic is sent in response
  83. // bodies. The sequence of traffic for a given flow is associated using a session ID that's
  84. // set as a HTTP cookie for the client to submit with each request.
  85. //
  86. // MeekServer hooks into TunnelServer via the net.Conn interface by transforming the
  87. // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
  88. // the meekConn struct.
  89. type MeekServer struct {
  90. support *SupportServices
  91. listener net.Listener
  92. listenerTunnelProtocol string
  93. listenerPort int
  94. tlsConfig *tris.Config
  95. obfuscatorSeedHistory *obfuscator.SeedHistory
  96. clientHandler func(clientTunnelProtocol string, clientConn net.Conn)
  97. openConns *common.Conns
  98. stopBroadcast <-chan struct{}
  99. sessionsLock sync.RWMutex
  100. sessions map[string]*meekSession
  101. checksumTable *crc64.Table
  102. bufferPool *CachedResponseBufferPool
  103. rateLimitLock sync.Mutex
  104. rateLimitHistory map[string][]time.Time
  105. rateLimitCount int
  106. rateLimitSignalGC chan struct{}
  107. }
  108. // NewMeekServer initializes a new meek server.
  109. func NewMeekServer(
  110. support *SupportServices,
  111. listener net.Listener,
  112. listenerTunnelProtocol string,
  113. listenerPort int,
  114. useTLS, isFronted, useObfuscatedSessionTickets bool,
  115. clientHandler func(clientTunnelProtocol string, clientConn net.Conn),
  116. stopBroadcast <-chan struct{}) (*MeekServer, error) {
  117. checksumTable := crc64.MakeTable(crc64.ECMA)
  118. bufferLength := MEEK_DEFAULT_POOL_BUFFER_LENGTH
  119. if support.Config.MeekCachedResponsePoolBufferSize != 0 {
  120. bufferLength = support.Config.MeekCachedResponsePoolBufferSize
  121. }
  122. bufferCount := MEEK_DEFAULT_POOL_BUFFER_COUNT
  123. if support.Config.MeekCachedResponsePoolBufferCount != 0 {
  124. bufferCount = support.Config.MeekCachedResponsePoolBufferCount
  125. }
  126. bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
  127. meekServer := &MeekServer{
  128. support: support,
  129. listener: listener,
  130. listenerTunnelProtocol: listenerTunnelProtocol,
  131. listenerPort: listenerPort,
  132. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  133. clientHandler: clientHandler,
  134. openConns: common.NewConns(),
  135. stopBroadcast: stopBroadcast,
  136. sessions: make(map[string]*meekSession),
  137. checksumTable: checksumTable,
  138. bufferPool: bufferPool,
  139. rateLimitHistory: make(map[string][]time.Time),
  140. rateLimitSignalGC: make(chan struct{}, 1),
  141. }
  142. if useTLS {
  143. tlsConfig, err := makeMeekTLSConfig(
  144. support, isFronted, useObfuscatedSessionTickets)
  145. if err != nil {
  146. return nil, errors.Trace(err)
  147. }
  148. meekServer.tlsConfig = tlsConfig
  149. }
  150. return meekServer, nil
  151. }
  152. // Run runs the meek server; this function blocks while serving HTTP or
  153. // HTTPS connections on the specified listener. This function also runs
  154. // a goroutine which cleans up expired meek client sessions.
  155. //
  156. // To stop the meek server, both Close() the listener and set the stopBroadcast
  157. // signal specified in NewMeekServer.
  158. func (server *MeekServer) Run() error {
  159. waitGroup := new(sync.WaitGroup)
  160. waitGroup.Add(1)
  161. go func() {
  162. defer waitGroup.Done()
  163. ticker := time.NewTicker(MEEK_MAX_SESSION_STALENESS / 2)
  164. defer ticker.Stop()
  165. for {
  166. select {
  167. case <-ticker.C:
  168. server.deleteExpiredSessions()
  169. case <-server.stopBroadcast:
  170. return
  171. }
  172. }
  173. }()
  174. waitGroup.Add(1)
  175. go func() {
  176. defer waitGroup.Done()
  177. server.rateLimitWorker()
  178. }()
  179. // Serve HTTP or HTTPS
  180. //
  181. // - WriteTimeout may include time awaiting request, as per:
  182. // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
  183. // - Legacy meek-server wrapped each client HTTP connection with an explicit idle
  184. // timeout net.Conn and didn't use http.Server timeouts. We could do the same
  185. // here (use ActivityMonitoredConn) but the stock http.Server timeouts should
  186. // now be sufficient.
  187. httpServer := &http.Server{
  188. ReadTimeout: MEEK_HTTP_CLIENT_IO_TIMEOUT,
  189. WriteTimeout: MEEK_HTTP_CLIENT_IO_TIMEOUT,
  190. Handler: server,
  191. ConnState: server.httpConnStateCallback,
  192. // Disable auto HTTP/2 (https://golang.org/doc/go1.6)
  193. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  194. }
  195. // Note: Serve() will be interrupted by listener.Close() call
  196. var err error
  197. if server.tlsConfig != nil {
  198. httpsServer := HTTPSServer{Server: httpServer}
  199. err = httpsServer.ServeTLS(server.listener, server.tlsConfig)
  200. } else {
  201. err = httpServer.Serve(server.listener)
  202. }
  203. // Can't check for the exact error that Close() will cause in Accept(),
  204. // (see: https://code.google.com/p/go/issues/detail?id=4373). So using an
  205. // explicit stop signal to stop gracefully.
  206. select {
  207. case <-server.stopBroadcast:
  208. err = nil
  209. default:
  210. }
  211. // deleteExpiredSessions calls deleteSession which may block waiting
  212. // for active request handlers to complete; timely shutdown requires
  213. // stopping the listener and closing all existing connections before
  214. // awaiting the reaperWaitGroup.
  215. server.listener.Close()
  216. server.openConns.CloseAll()
  217. waitGroup.Wait()
  218. return err
  219. }
  220. // ServeHTTP handles meek client HTTP requests, where the request body
  221. // contains upstream traffic and the response will contain downstream
  222. // traffic.
  223. func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
  224. // Note: no longer requiring that the request method is POST
  225. // Check for the expected meek/session ID cookie.
  226. // Also check for prohibited HTTP headers.
  227. var meekCookie *http.Cookie
  228. for _, c := range request.Cookies() {
  229. meekCookie = c
  230. break
  231. }
  232. if meekCookie == nil || len(meekCookie.Value) == 0 {
  233. log.WithTrace().Warning("missing meek cookie")
  234. common.TerminateHTTPConnection(responseWriter, request)
  235. return
  236. }
  237. if len(server.support.Config.MeekProhibitedHeaders) > 0 {
  238. for _, header := range server.support.Config.MeekProhibitedHeaders {
  239. value := request.Header.Get(header)
  240. if header != "" {
  241. log.WithTraceFields(LogFields{
  242. "header": header,
  243. "value": value,
  244. }).Warning("prohibited meek header")
  245. common.TerminateHTTPConnection(responseWriter, request)
  246. return
  247. }
  248. }
  249. }
  250. // A valid meek cookie indicates which class of request this is:
  251. //
  252. // 1. A new meek session. Create a new session ID and proceed with
  253. // relaying tunnel traffic.
  254. //
  255. // 2. An existing meek session. Resume relaying tunnel traffic.
  256. //
  257. // 3. A request to an endpoint. This meek connection is not for relaying
  258. // tunnel traffic. Instead, the request is handed off to a custom handler.
  259. sessionID, session, endPoint, clientIP, err := server.getSessionOrEndpoint(request, meekCookie)
  260. if err != nil {
  261. // Debug since session cookie errors commonly occur during
  262. // normal operation.
  263. log.WithTraceFields(LogFields{"error": err}).Debug("session lookup failed")
  264. common.TerminateHTTPConnection(responseWriter, request)
  265. return
  266. }
  267. if endPoint != "" {
  268. // Endpoint mode. Currently, this means it's handled by the tactics
  269. // request handler.
  270. geoIPData := server.support.GeoIPService.Lookup(clientIP)
  271. handled := server.support.TacticsServer.HandleEndPoint(
  272. endPoint, common.GeoIPData(geoIPData), responseWriter, request)
  273. if !handled {
  274. log.WithTraceFields(LogFields{"endPoint": endPoint}).Info("unhandled endpoint")
  275. common.TerminateHTTPConnection(responseWriter, request)
  276. }
  277. return
  278. }
  279. // Tunnel relay mode.
  280. // Ensure that there's only one concurrent request handler per client
  281. // session. Depending on the nature of a network disruption, it can
  282. // happen that a client detects a failure and retries while the server
  283. // is still streaming response in the handler for the _previous_ client
  284. // request.
  285. //
  286. // Even if the session.cachedResponse were safe for concurrent
  287. // use (it is not), concurrent handling could lead to loss of session
  288. // since upstream data read by the first request may not reach the
  289. // cached response before the second request reads the cached data.
  290. //
  291. // The existing handler will stream response data, holding the lock,
  292. // for no more than MEEK_EXTENDED_TURN_AROUND_TIMEOUT.
  293. //
  294. // TODO: interrupt an existing handler? The existing handler will be
  295. // sending data to the cached response, but if that buffer fills, the
  296. // session will be lost.
  297. requestNumber := atomic.AddInt64(&session.requestCount, 1)
  298. // Wait for the existing request to complete.
  299. session.lock.Lock()
  300. defer session.lock.Unlock()
  301. // If a newer request has arrived while waiting, discard this one.
  302. // Do not delay processing the newest request.
  303. //
  304. // If the session expired and was deleted while this request was waiting,
  305. // discard this request. The session is no longer valid, and the final call
  306. // to session.cachedResponse.Reset may have already occured, so any further
  307. // session.cachedResponse access may deplete resources (fail to refill the pool).
  308. if atomic.LoadInt64(&session.requestCount) > requestNumber || session.deleted {
  309. common.TerminateHTTPConnection(responseWriter, request)
  310. return
  311. }
  312. // pumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
  313. // read the request body as upstream traffic.
  314. // TODO: run pumpReads and pumpWrites concurrently?
  315. // pumpReads checksums the request payload and skips relaying it when
  316. // it matches the immediately previous request payload. This allows
  317. // clients to resend request payloads, when retrying due to connection
  318. // interruption, without knowing whether the server has received or
  319. // relayed the data.
  320. err = session.clientConn.pumpReads(request.Body)
  321. if err != nil {
  322. if err != io.EOF {
  323. // Debug since errors such as "i/o timeout" occur during normal operation;
  324. // also, golang network error messages may contain client IP.
  325. log.WithTraceFields(LogFields{"error": err}).Debug("read request failed")
  326. }
  327. common.TerminateHTTPConnection(responseWriter, request)
  328. // Note: keep session open to allow client to retry
  329. return
  330. }
  331. // Set cookie before writing the response.
  332. if session.meekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 && !session.sessionIDSent {
  333. // Replace the meek cookie with the session ID.
  334. // SetCookie for the the session ID cookie is only set once, to reduce overhead. This
  335. // session ID value replaces the original meek cookie value.
  336. http.SetCookie(responseWriter, &http.Cookie{Name: meekCookie.Name, Value: sessionID})
  337. session.sessionIDSent = true
  338. }
  339. // When streaming data into the response body, a copy is
  340. // retained in the cachedResponse buffer. This allows the
  341. // client to retry and request that the response be resent
  342. // when the HTTP connection is interrupted.
  343. //
  344. // If a Range header is present, the client is retrying,
  345. // possibly after having received a partial response. In
  346. // this case, use any cached response to attempt to resend
  347. // the response, starting from the resend position the client
  348. // indicates.
  349. //
  350. // When the resend position is not available -- because the
  351. // cachedResponse buffer could not hold it -- the client session
  352. // is closed, as there's no way to resume streaming the payload
  353. // uninterrupted.
  354. //
  355. // The client may retry before a cached response is prepared,
  356. // so a cached response is not always used when a Range header
  357. // is present.
  358. //
  359. // TODO: invalid Range header is ignored; should it be otherwise?
  360. position, isRetry := checkRangeHeader(request)
  361. if isRetry {
  362. atomic.AddInt64(&session.metricClientRetries, 1)
  363. }
  364. hasCompleteCachedResponse := session.cachedResponse.HasPosition(0)
  365. // The client is not expected to send position > 0 when there is
  366. // no cached response; let that case fall through to the next
  367. // HasPosition check which will fail and close the session.
  368. var responseSize int
  369. var responseError error
  370. if isRetry && (hasCompleteCachedResponse || position > 0) {
  371. if !session.cachedResponse.HasPosition(position) {
  372. greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
  373. common.TerminateHTTPConnection(responseWriter, request)
  374. session.delete(true)
  375. return
  376. }
  377. responseWriter.WriteHeader(http.StatusPartialContent)
  378. // TODO:
  379. // - enforce a max extended buffer count per client, for
  380. // fairness? Throttling may make this unnecessary.
  381. // - cachedResponse can now start releasing extended buffers,
  382. // as response bytes before "position" will never be requested
  383. // again?
  384. responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter)
  385. greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize))
  386. // The client may again fail to receive the payload and may again
  387. // retry, so not yet releasing cachedResponse buffers.
  388. } else {
  389. // _Now_ we release buffers holding data from the previous
  390. // response. And then immediately stream the new response into
  391. // newly acquired buffers.
  392. session.cachedResponse.Reset()
  393. // Note: this code depends on an implementation detail of
  394. // io.MultiWriter: a Write() to the MultiWriter writes first
  395. // to the cache, and then to the response writer. So if the
  396. // write to the response writer fails, the payload is cached.
  397. multiWriter := io.MultiWriter(session.cachedResponse, responseWriter)
  398. // The client expects 206, not 200, whenever it sets a Range header,
  399. // which it may do even when no cached response is prepared.
  400. if isRetry {
  401. responseWriter.WriteHeader(http.StatusPartialContent)
  402. }
  403. // pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
  404. // write its downstream traffic through to the response body.
  405. responseSize, responseError = session.clientConn.pumpWrites(multiWriter)
  406. greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
  407. greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
  408. }
  409. // responseError is the result of writing the body either from CopyFromPosition or pumpWrites
  410. if responseError != nil {
  411. if responseError != io.EOF {
  412. // Debug since errors such as "i/o timeout" occur during normal operation;
  413. // also, golang network error messages may contain client IP.
  414. log.WithTraceFields(LogFields{"error": responseError}).Debug("write response failed")
  415. }
  416. common.TerminateHTTPConnection(responseWriter, request)
  417. // Note: keep session open to allow client to retry
  418. return
  419. }
  420. }
  421. func checkRangeHeader(request *http.Request) (int, bool) {
  422. rangeHeader := request.Header.Get("Range")
  423. if rangeHeader == "" {
  424. return 0, false
  425. }
  426. prefix := "bytes="
  427. suffix := "-"
  428. if !strings.HasPrefix(rangeHeader, prefix) ||
  429. !strings.HasSuffix(rangeHeader, suffix) {
  430. return 0, false
  431. }
  432. rangeHeader = strings.TrimPrefix(rangeHeader, prefix)
  433. rangeHeader = strings.TrimSuffix(rangeHeader, suffix)
  434. position, err := strconv.Atoi(rangeHeader)
  435. if err != nil {
  436. return 0, false
  437. }
  438. return position, true
  439. }
  440. // getSessionOrEndpoint checks if the cookie corresponds to an existing tunnel
  441. // relay session ID. If no session is found, the cookie must be an obfuscated
  442. // meek cookie. A new session is created when the meek cookie indicates relay
  443. // mode; or the endpoint is returned when the meek cookie indicates endpoint
  444. // mode.
  445. func (server *MeekServer) getSessionOrEndpoint(
  446. request *http.Request, meekCookie *http.Cookie) (string, *meekSession, string, string, error) {
  447. // Check for an existing session.
  448. server.sessionsLock.RLock()
  449. existingSessionID := meekCookie.Value
  450. session, ok := server.sessions[existingSessionID]
  451. server.sessionsLock.RUnlock()
  452. if ok {
  453. // TODO: can multiple http client connections using same session cookie
  454. // cause race conditions on session struct?
  455. session.touch()
  456. return existingSessionID, session, "", "", nil
  457. }
  458. // Determine the client remote address, which is used for geolocation
  459. // and stats. When an intermediate proxy or CDN is in use, we may be
  460. // able to determine the original client address by inspecting HTTP
  461. // headers such as X-Forwarded-For.
  462. clientIP := strings.Split(request.RemoteAddr, ":")[0]
  463. if len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
  464. for _, header := range server.support.Config.MeekProxyForwardedForHeaders {
  465. value := request.Header.Get(header)
  466. if len(value) > 0 {
  467. // Some headers, such as X-Forwarded-For, are a comma-separated
  468. // list of IPs (each proxy in a chain). The first IP should be
  469. // the client IP.
  470. proxyClientIP := strings.Split(value, ",")[0]
  471. if net.ParseIP(proxyClientIP) != nil &&
  472. server.support.GeoIPService.Lookup(proxyClientIP).Country != GEOIP_UNKNOWN_VALUE {
  473. clientIP = proxyClientIP
  474. break
  475. }
  476. }
  477. }
  478. }
  479. if server.rateLimit(clientIP) {
  480. return "", nil, "", "", errors.TraceNew("rate limit exceeded")
  481. }
  482. // The session is new (or expired). Treat the cookie value as a new meek
  483. // cookie, extract the payload, and create a new session.
  484. payloadJSON, err := server.getMeekCookiePayload(clientIP, meekCookie.Value)
  485. if err != nil {
  486. return "", nil, "", "", errors.Trace(err)
  487. }
  488. // Note: this meek server ignores legacy values PsiphonClientSessionId
  489. // and PsiphonServerAddress.
  490. var clientSessionData protocol.MeekCookieData
  491. err = json.Unmarshal(payloadJSON, &clientSessionData)
  492. if err != nil {
  493. return "", nil, "", "", errors.Trace(err)
  494. }
  495. // Handle endpoints before enforcing the GetEstablishTunnels check.
  496. // Currently, endpoints are tactics requests, and we allow these to be
  497. // handled by servers which would otherwise reject new tunnels.
  498. if clientSessionData.EndPoint != "" {
  499. return "", nil, clientSessionData.EndPoint, clientIP, nil
  500. }
  501. // Don't create new sessions when not establishing. A subsequent SSH handshake
  502. // will not succeed, so creating a meek session just wastes resources.
  503. if server.support.TunnelServer != nil &&
  504. !server.support.TunnelServer.GetEstablishTunnels() {
  505. return "", nil, "", "", errors.TraceNew("not establishing tunnels")
  506. }
  507. // Create a new session
  508. bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
  509. if server.support.Config.MeekCachedResponseBufferSize != 0 {
  510. bufferLength = server.support.Config.MeekCachedResponseBufferSize
  511. }
  512. cachedResponse := NewCachedResponse(bufferLength, server.bufferPool)
  513. session = &meekSession{
  514. meekProtocolVersion: clientSessionData.MeekProtocolVersion,
  515. sessionIDSent: false,
  516. cachedResponse: cachedResponse,
  517. }
  518. session.touch()
  519. // Create a new meek conn that will relay the payload
  520. // between meek request/responses and the tunnel server client
  521. // handler. The client IP is also used to initialize the
  522. // meek conn with a useful value to return when the tunnel
  523. // server calls conn.RemoteAddr() to get the client's IP address.
  524. // Assumes clientIP is a valid IP address; the port value is a stub
  525. // and is expected to be ignored.
  526. clientConn := newMeekConn(
  527. server,
  528. session,
  529. &net.TCPAddr{
  530. IP: net.ParseIP(clientIP),
  531. Port: 0,
  532. },
  533. clientSessionData.MeekProtocolVersion)
  534. session.clientConn = clientConn
  535. // Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
  536. // meek cookie to a session ID; v1 clients always send the
  537. // original meek cookie value with each request. The issue with
  538. // v1 is that clients which wake after a device sleep will attempt
  539. // to resume a meek session and the server can't differentiate
  540. // between resuming a session and creating a new session. This
  541. // causes the v1 client connection to hang/timeout.
  542. sessionID := meekCookie.Value
  543. if clientSessionData.MeekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 {
  544. sessionID, err = makeMeekSessionID()
  545. if err != nil {
  546. return "", nil, "", "", errors.Trace(err)
  547. }
  548. }
  549. server.sessionsLock.Lock()
  550. server.sessions[sessionID] = session
  551. server.sessionsLock.Unlock()
  552. // Note: from the tunnel server's perspective, this client connection
  553. // will close when session.delete calls Close() on the meekConn.
  554. server.clientHandler(clientSessionData.ClientTunnelProtocol, session.clientConn)
  555. return sessionID, session, "", "", nil
  556. }
  557. func (server *MeekServer) rateLimit(clientIP string) bool {
  558. historySize, thresholdSeconds, regions, ISPs, GCTriggerCount, _ :=
  559. server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
  560. if historySize == 0 {
  561. return false
  562. }
  563. if len(regions) > 0 || len(ISPs) > 0 {
  564. // TODO: avoid redundant GeoIP lookups?
  565. geoIPData := server.support.GeoIPService.Lookup(clientIP)
  566. if len(regions) > 0 {
  567. if !common.Contains(regions, geoIPData.Country) {
  568. return false
  569. }
  570. }
  571. if len(ISPs) > 0 {
  572. if !common.Contains(ISPs, geoIPData.ISP) {
  573. return false
  574. }
  575. }
  576. }
  577. limit := true
  578. triggerGC := false
  579. now := time.Now()
  580. threshold := now.Add(-time.Duration(thresholdSeconds) * time.Second)
  581. server.rateLimitLock.Lock()
  582. history, ok := server.rateLimitHistory[clientIP]
  583. if !ok || len(history) != historySize {
  584. history = make([]time.Time, historySize)
  585. server.rateLimitHistory[clientIP] = history
  586. }
  587. for i := 0; i < len(history); i++ {
  588. if history[i].IsZero() || history[i].Before(threshold) {
  589. limit = false
  590. }
  591. if i == len(history)-1 {
  592. history[i] = now
  593. } else {
  594. history[i] = history[i+1]
  595. }
  596. }
  597. if limit {
  598. server.rateLimitCount += 1
  599. if server.rateLimitCount >= GCTriggerCount {
  600. triggerGC = true
  601. server.rateLimitCount = 0
  602. }
  603. }
  604. server.rateLimitLock.Unlock()
  605. if triggerGC {
  606. select {
  607. case server.rateLimitSignalGC <- *new(struct{}):
  608. default:
  609. }
  610. }
  611. return limit
  612. }
  613. func (server *MeekServer) rateLimitWorker() {
  614. _, _, _, _, _, reapFrequencySeconds :=
  615. server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
  616. timer := time.NewTimer(time.Duration(reapFrequencySeconds) * time.Second)
  617. defer timer.Stop()
  618. for {
  619. select {
  620. case <-timer.C:
  621. _, thresholdSeconds, _, _, _, reapFrequencySeconds :=
  622. server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
  623. server.rateLimitLock.Lock()
  624. threshold := time.Now().Add(-time.Duration(thresholdSeconds) * time.Second)
  625. for key, history := range server.rateLimitHistory {
  626. reap := true
  627. for i := 0; i < len(history); i++ {
  628. if !history[i].IsZero() && !history[i].Before(threshold) {
  629. reap = false
  630. }
  631. }
  632. if reap {
  633. delete(server.rateLimitHistory, key)
  634. }
  635. }
  636. // Enable rate limit history map to be garbage collected when possible.
  637. if len(server.rateLimitHistory) == 0 {
  638. server.rateLimitHistory = make(map[string][]time.Time)
  639. }
  640. server.rateLimitLock.Unlock()
  641. timer.Reset(time.Duration(reapFrequencySeconds) * time.Second)
  642. case <-server.rateLimitSignalGC:
  643. runtime.GC()
  644. case <-server.stopBroadcast:
  645. return
  646. }
  647. }
  648. }
  649. func (server *MeekServer) deleteSession(sessionID string) {
  650. // Don't obtain the server.sessionsLock write lock until modifying
  651. // server.sessions, as the session.delete can block for up to
  652. // MEEK_HTTP_CLIENT_IO_TIMEOUT. Allow new sessions to be added
  653. // concurrently.
  654. //
  655. // Since a lock isn't held for the duration, concurrent calls to
  656. // deleteSession with the same sessionID could happen; this is
  657. // not expected since only the reaper goroutine calls deleteExpiredSessions
  658. // (and in any case concurrent execution of the ok block is not an issue).
  659. server.sessionsLock.RLock()
  660. session, ok := server.sessions[sessionID]
  661. server.sessionsLock.RUnlock()
  662. if ok {
  663. session.delete(false)
  664. server.sessionsLock.Lock()
  665. delete(server.sessions, sessionID)
  666. server.sessionsLock.Unlock()
  667. }
  668. }
  669. func (server *MeekServer) deleteExpiredSessions() {
  670. // A deleteSession call may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  671. // so grab a snapshot list of expired sessions and do not hold a lock for
  672. // the duration of deleteExpiredSessions. This allows new sessions to be
  673. // added concurrently.
  674. //
  675. // New sessions added after the snapshot is taken will be checked for
  676. // expiry on subsequent periodic calls to deleteExpiredSessions.
  677. //
  678. // To avoid long delays in releasing resources, individual deletes are
  679. // performed concurrently.
  680. server.sessionsLock.Lock()
  681. expiredSessionIDs := make([]string, 0)
  682. for sessionID, session := range server.sessions {
  683. if session.expired() {
  684. expiredSessionIDs = append(expiredSessionIDs, sessionID)
  685. }
  686. }
  687. server.sessionsLock.Unlock()
  688. start := time.Now()
  689. deleteWaitGroup := new(sync.WaitGroup)
  690. for _, sessionID := range expiredSessionIDs {
  691. deleteWaitGroup.Add(1)
  692. go func(sessionID string) {
  693. defer deleteWaitGroup.Done()
  694. server.deleteSession(sessionID)
  695. }(sessionID)
  696. }
  697. deleteWaitGroup.Wait()
  698. log.WithTraceFields(
  699. LogFields{"elapsed time": time.Since(start)}).Debug("deleted expired sessions")
  700. }
  701. // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
  702. // meek server.
  703. func (server *MeekServer) httpConnStateCallback(conn net.Conn, connState http.ConnState) {
  704. switch connState {
  705. case http.StateNew:
  706. server.openConns.Add(conn)
  707. case http.StateHijacked, http.StateClosed:
  708. server.openConns.Remove(conn)
  709. }
  710. }
  711. // getMeekCookiePayload extracts the payload from a meek cookie. The cookie
  712. // payload is base64 encoded, obfuscated, and NaCl encrypted.
  713. func (server *MeekServer) getMeekCookiePayload(
  714. clientIP string, cookieValue string) ([]byte, error) {
  715. decodedValue, err := base64.StdEncoding.DecodeString(cookieValue)
  716. if err != nil {
  717. return nil, errors.Trace(err)
  718. }
  719. // The data consists of an obfuscated seed message prepended
  720. // to the obfuscated, encrypted payload. The server obfuscator
  721. // will read the seed message, leaving the remaining encrypted
  722. // data in the reader.
  723. reader := bytes.NewReader(decodedValue[:])
  724. obfuscator, err := obfuscator.NewServerObfuscator(
  725. &obfuscator.ObfuscatorConfig{
  726. Keyword: server.support.Config.MeekObfuscatedKey,
  727. SeedHistory: server.obfuscatorSeedHistory,
  728. IrregularLogger: func(clientIP string, logFields common.LogFields) {
  729. logIrregularTunnel(
  730. server.support,
  731. server.listenerTunnelProtocol,
  732. server.listenerPort,
  733. clientIP,
  734. LogFields(logFields))
  735. },
  736. },
  737. clientIP,
  738. reader)
  739. if err != nil {
  740. return nil, errors.Trace(err)
  741. }
  742. offset, err := reader.Seek(0, 1)
  743. if err != nil {
  744. return nil, errors.Trace(err)
  745. }
  746. encryptedPayload := decodedValue[offset:]
  747. obfuscator.ObfuscateClientToServer(encryptedPayload)
  748. var nonce [24]byte
  749. var privateKey, ephemeralPublicKey [32]byte
  750. decodedPrivateKey, err := base64.StdEncoding.DecodeString(
  751. server.support.Config.MeekCookieEncryptionPrivateKey)
  752. if err != nil {
  753. return nil, errors.Trace(err)
  754. }
  755. copy(privateKey[:], decodedPrivateKey)
  756. if len(encryptedPayload) < 32 {
  757. return nil, errors.TraceNew("unexpected encrypted payload size")
  758. }
  759. copy(ephemeralPublicKey[0:32], encryptedPayload[0:32])
  760. payload, ok := box.Open(nil, encryptedPayload[32:], &nonce, &ephemeralPublicKey, &privateKey)
  761. if !ok {
  762. return nil, errors.TraceNew("open box failed")
  763. }
  764. return payload, nil
  765. }
  766. type meekSession struct {
  767. // Note: 64-bit ints used with atomic operations are placed
  768. // at the start of struct to ensure 64-bit alignment.
  769. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  770. lastActivity int64
  771. requestCount int64
  772. metricClientRetries int64
  773. metricPeakResponseSize int64
  774. metricPeakCachedResponseSize int64
  775. metricPeakCachedResponseHitSize int64
  776. metricCachedResponseMissPosition int64
  777. lock sync.Mutex
  778. deleted bool
  779. clientConn *meekConn
  780. meekProtocolVersion int
  781. sessionIDSent bool
  782. cachedResponse *CachedResponse
  783. }
  784. func (session *meekSession) touch() {
  785. atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
  786. }
  787. func (session *meekSession) expired() bool {
  788. lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
  789. return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
  790. }
  791. // delete releases all resources allocated by a session.
  792. func (session *meekSession) delete(haveLock bool) {
  793. // TODO: close the persistent HTTP client connection, if one exists?
  794. // This final call session.cachedResponse.Reset releases shared resources.
  795. //
  796. // This call requires exclusive access. session.lock is be obtained before
  797. // calling session.cachedResponse.Reset. Once the lock is obtained, no
  798. // request for this session is being processed concurrently, and pending
  799. // requests will block at session.lock.
  800. //
  801. // This logic assumes that no further session.cachedResponse access occurs,
  802. // or else resources may deplete (buffers won't be returned to the pool).
  803. // These requirements are achieved by obtaining the lock, setting
  804. // session.deleted, and any subsequent request handlers checking
  805. // session.deleted immediately after obtaining the lock.
  806. //
  807. // session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  808. // the timeout for any active request handler processing a session
  809. // request.
  810. //
  811. // When the lock must be acquired, clientConn.Close is called first, to
  812. // interrupt any existing request handler blocking on pumpReads or pumpWrites.
  813. session.clientConn.Close()
  814. if !haveLock {
  815. session.lock.Lock()
  816. }
  817. // Release all extended buffers back to the pool.
  818. // session.cachedResponse.Reset is not safe for concurrent calls.
  819. session.cachedResponse.Reset()
  820. session.deleted = true
  821. if !haveLock {
  822. session.lock.Unlock()
  823. }
  824. }
  825. // GetMetrics implements the common.MetricsSource interface.
  826. func (session *meekSession) GetMetrics() common.LogFields {
  827. logFields := make(common.LogFields)
  828. logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)
  829. logFields["meek_peak_response_size"] = atomic.LoadInt64(&session.metricPeakResponseSize)
  830. logFields["meek_peak_cached_response_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseSize)
  831. logFields["meek_peak_cached_response_hit_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseHitSize)
  832. logFields["meek_cached_response_miss_position"] = atomic.LoadInt64(&session.metricCachedResponseMissPosition)
  833. return logFields
  834. }
  835. // makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
  836. // Currently, this config is optimized for fronted meek where the nature
  837. // of the connection is non-circumvention; it's optimized for performance
  838. // assuming the peer is an uncensored CDN.
  839. func makeMeekTLSConfig(
  840. support *SupportServices,
  841. isFronted, useObfuscatedSessionTickets bool) (*tris.Config, error) {
  842. certificate, privateKey, err := common.GenerateWebServerCertificate(values.GetHostName())
  843. if err != nil {
  844. return nil, errors.Trace(err)
  845. }
  846. tlsCertificate, err := tris.X509KeyPair(
  847. []byte(certificate), []byte(privateKey))
  848. if err != nil {
  849. return nil, errors.Trace(err)
  850. }
  851. // Vary the minimum version to frustrate scanning/fingerprinting of unfronted servers.
  852. // Limitation: like the certificate, this value changes on restart.
  853. minVersionCandidates := []uint16{tris.VersionTLS10, tris.VersionTLS11, tris.VersionTLS12}
  854. minVersion := minVersionCandidates[prng.Intn(len(minVersionCandidates))]
  855. config := &tris.Config{
  856. Certificates: []tris.Certificate{tlsCertificate},
  857. NextProtos: []string{"http/1.1"},
  858. MinVersion: minVersion,
  859. UseExtendedMasterSecret: true,
  860. }
  861. if isFronted {
  862. // This is a reordering of the supported CipherSuites in golang 1.6[*]. Non-ephemeral key
  863. // CipherSuites greatly reduce server load, and we try to select these since the meek
  864. // protocol is providing obfuscation, not privacy/integrity (this is provided by the
  865. // tunneled SSH), so we don't benefit from the perfect forward secrecy property provided
  866. // by ephemeral key CipherSuites.
  867. // https://github.com/golang/go/blob/1cb3044c9fcd88e1557eca1bf35845a4108bc1db/src/crypto/tls/cipher_suites.go#L75
  868. //
  869. // This optimization is applied only when there's a CDN in front of the meek server; in
  870. // unfronted cases we prefer a more natural TLS handshake.
  871. //
  872. // [*] the list has since been updated, removing CipherSuites using RC4 and 3DES.
  873. config.CipherSuites = []uint16{
  874. tris.TLS_RSA_WITH_AES_128_GCM_SHA256,
  875. tris.TLS_RSA_WITH_AES_256_GCM_SHA384,
  876. tris.TLS_RSA_WITH_AES_128_CBC_SHA,
  877. tris.TLS_RSA_WITH_AES_256_CBC_SHA,
  878. tris.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
  879. tris.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
  880. tris.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  881. tris.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
  882. tris.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
  883. tris.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
  884. tris.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
  885. tris.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
  886. }
  887. config.PreferServerCipherSuites = true
  888. }
  889. if useObfuscatedSessionTickets {
  890. // See obfuscated session ticket overview
  891. // in NewObfuscatedClientSessionCache.
  892. var obfuscatedSessionTicketKey [32]byte
  893. key, err := hex.DecodeString(support.Config.MeekObfuscatedKey)
  894. if err == nil && len(key) != 32 {
  895. err = std_errors.New("invalid obfuscated session key length")
  896. }
  897. if err != nil {
  898. return nil, errors.Trace(err)
  899. }
  900. copy(obfuscatedSessionTicketKey[:], key)
  901. var standardSessionTicketKey [32]byte
  902. _, err = rand.Read(standardSessionTicketKey[:])
  903. if err != nil {
  904. return nil, errors.Trace(err)
  905. }
  906. // Note: SessionTicketKey needs to be set, or else, it appears,
  907. // tris.Config.serverInit() will clobber the value set by
  908. // SetSessionTicketKeys.
  909. config.SessionTicketKey = obfuscatedSessionTicketKey
  910. config.SetSessionTicketKeys([][32]byte{
  911. standardSessionTicketKey,
  912. obfuscatedSessionTicketKey})
  913. }
  914. return config, nil
  915. }
  916. // makeMeekSessionID creates a new session ID. The variable size is intended to
  917. // frustrate traffic analysis of both plaintext and TLS meek traffic.
  918. func makeMeekSessionID() (string, error) {
  919. size := MEEK_MIN_SESSION_ID_LENGTH +
  920. prng.Intn(MEEK_MAX_SESSION_ID_LENGTH-MEEK_MIN_SESSION_ID_LENGTH)
  921. sessionID, err := common.MakeSecureRandomBytes(size)
  922. if err != nil {
  923. return "", errors.Trace(err)
  924. }
  925. // Omit padding to maximize variable size space. To the client, the session
  926. // ID is an opaque string cookie value.
  927. return base64.RawStdEncoding.EncodeToString(sessionID), nil
  928. }
  929. // meekConn implements the net.Conn interface and is to be used as a client
  930. // connection by the tunnel server (being passed to sshServer.handleClient).
  931. // meekConn bridges net/http request/response payload readers and writers
  932. // and goroutines calling Read()s and Write()s.
  933. type meekConn struct {
  934. meekServer *MeekServer
  935. meekSession *meekSession
  936. remoteAddr net.Addr
  937. protocolVersion int
  938. closeBroadcast chan struct{}
  939. closed int32
  940. lastReadChecksum *uint64
  941. readLock sync.Mutex
  942. emptyReadBuffer chan *bytes.Buffer
  943. partialReadBuffer chan *bytes.Buffer
  944. fullReadBuffer chan *bytes.Buffer
  945. writeLock sync.Mutex
  946. nextWriteBuffer chan []byte
  947. writeResult chan error
  948. }
  949. func newMeekConn(
  950. meekServer *MeekServer,
  951. meekSession *meekSession,
  952. remoteAddr net.Addr,
  953. protocolVersion int) *meekConn {
  954. conn := &meekConn{
  955. meekServer: meekServer,
  956. meekSession: meekSession,
  957. remoteAddr: remoteAddr,
  958. protocolVersion: protocolVersion,
  959. closeBroadcast: make(chan struct{}),
  960. closed: 0,
  961. emptyReadBuffer: make(chan *bytes.Buffer, 1),
  962. partialReadBuffer: make(chan *bytes.Buffer, 1),
  963. fullReadBuffer: make(chan *bytes.Buffer, 1),
  964. nextWriteBuffer: make(chan []byte, 1),
  965. writeResult: make(chan error, 1),
  966. }
  967. // Read() calls and pumpReads() are synchronized by exchanging control
  968. // of a single readBuffer. This is the same scheme used in and described
  969. // in psiphon.MeekConn.
  970. conn.emptyReadBuffer <- new(bytes.Buffer)
  971. return conn
  972. }
  973. // pumpReads causes goroutines blocking on meekConn.Read() to read
  974. // from the specified reader. This function blocks until the reader
  975. // is fully consumed or the meekConn is closed. A read buffer allows
  976. // up to MEEK_MAX_REQUEST_PAYLOAD_LENGTH bytes to be read and buffered
  977. // without a Read() immediately consuming the bytes, but there's still
  978. // a possibility of a stall if no Read() calls are made after this
  979. // read buffer is full.
  980. // Note: assumes only one concurrent call to pumpReads
  981. func (conn *meekConn) pumpReads(reader io.Reader) error {
  982. // Use either an empty or partial buffer. By using a partial
  983. // buffer, pumpReads will not block if the Read() caller has
  984. // not fully drained the read buffer.
  985. var readBuffer *bytes.Buffer
  986. select {
  987. case readBuffer = <-conn.emptyReadBuffer:
  988. case readBuffer = <-conn.partialReadBuffer:
  989. case <-conn.closeBroadcast:
  990. return io.EOF
  991. }
  992. newDataOffset := readBuffer.Len()
  993. // Since we need to read the full request payload in order to
  994. // take its checksum before relaying it, the read buffer can
  995. // grow to up to 2 x MEEK_MAX_REQUEST_PAYLOAD_LENGTH + 1.
  996. // +1 allows for an explicit check for request payloads that
  997. // exceed the maximum permitted length.
  998. limitReader := io.LimitReader(reader, MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1)
  999. n, err := readBuffer.ReadFrom(limitReader)
  1000. if err == nil && n == MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1 {
  1001. err = std_errors.New("invalid request payload length")
  1002. }
  1003. // If the request read fails, don't relay the new data. This allows
  1004. // the client to retry and resend its request payload without
  1005. // interrupting/duplicating the payload flow.
  1006. if err != nil {
  1007. readBuffer.Truncate(newDataOffset)
  1008. conn.replaceReadBuffer(readBuffer)
  1009. return errors.Trace(err)
  1010. }
  1011. // Check if request payload checksum matches immediately
  1012. // previous payload. On match, assume this is a client retry
  1013. // sending payload that was already relayed and skip this
  1014. // payload. Payload is OSSH ciphertext and almost surely
  1015. // will not repeat. In the highly unlikely case that it does,
  1016. // the underlying SSH connection will fail and the client
  1017. // must reconnect.
  1018. checksum := crc64.Checksum(
  1019. readBuffer.Bytes()[newDataOffset:], conn.meekServer.checksumTable)
  1020. if conn.lastReadChecksum == nil {
  1021. conn.lastReadChecksum = new(uint64)
  1022. } else if *conn.lastReadChecksum == checksum {
  1023. readBuffer.Truncate(newDataOffset)
  1024. }
  1025. *conn.lastReadChecksum = checksum
  1026. conn.replaceReadBuffer(readBuffer)
  1027. return nil
  1028. }
  1029. var errMeekConnectionHasClosed = std_errors.New("meek connection has closed")
  1030. // Read reads from the meekConn into buffer. Read blocks until
  1031. // some data is read or the meekConn closes. Under the hood, it
  1032. // waits for pumpReads to submit a reader to read from.
  1033. // Note: lock is to conform with net.Conn concurrency semantics
  1034. func (conn *meekConn) Read(buffer []byte) (int, error) {
  1035. conn.readLock.Lock()
  1036. defer conn.readLock.Unlock()
  1037. var readBuffer *bytes.Buffer
  1038. select {
  1039. case readBuffer = <-conn.partialReadBuffer:
  1040. case readBuffer = <-conn.fullReadBuffer:
  1041. case <-conn.closeBroadcast:
  1042. return 0, errors.Trace(errMeekConnectionHasClosed)
  1043. }
  1044. n, err := readBuffer.Read(buffer)
  1045. conn.replaceReadBuffer(readBuffer)
  1046. return n, err
  1047. }
  1048. func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
  1049. length := readBuffer.Len()
  1050. if length >= MEEK_MAX_REQUEST_PAYLOAD_LENGTH {
  1051. conn.fullReadBuffer <- readBuffer
  1052. } else if length == 0 {
  1053. conn.emptyReadBuffer <- readBuffer
  1054. } else {
  1055. conn.partialReadBuffer <- readBuffer
  1056. }
  1057. }
  1058. // pumpWrites causes goroutines blocking on meekConn.Write() to write
  1059. // to the specified writer. This function blocks until the meek response
  1060. // body limits (size for protocol v1, turn around time for protocol v2+)
  1061. // are met, or the meekConn is closed.
  1062. // Note: channel scheme assumes only one concurrent call to pumpWrites
  1063. func (conn *meekConn) pumpWrites(writer io.Writer) (int, error) {
  1064. startTime := time.Now()
  1065. timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
  1066. defer timeout.Stop()
  1067. n := 0
  1068. for {
  1069. select {
  1070. case buffer := <-conn.nextWriteBuffer:
  1071. written, err := writer.Write(buffer)
  1072. n += written
  1073. // Assumes that writeResult won't block.
  1074. // Note: always send the err to writeResult,
  1075. // as the Write() caller is blocking on this.
  1076. conn.writeResult <- err
  1077. if err != nil {
  1078. return n, err
  1079. }
  1080. if conn.protocolVersion < MEEK_PROTOCOL_VERSION_1 {
  1081. // Pre-protocol version 1 clients expect at most
  1082. // MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
  1083. return n, nil
  1084. }
  1085. totalElapsedTime := time.Since(startTime) / time.Millisecond
  1086. if totalElapsedTime >= MEEK_EXTENDED_TURN_AROUND_TIMEOUT {
  1087. return n, nil
  1088. }
  1089. timeout.Reset(MEEK_TURN_AROUND_TIMEOUT)
  1090. case <-timeout.C:
  1091. return n, nil
  1092. case <-conn.closeBroadcast:
  1093. return n, errors.Trace(errMeekConnectionHasClosed)
  1094. }
  1095. }
  1096. }
  1097. // Write writes the buffer to the meekConn. It blocks until the
  1098. // entire buffer is written to or the meekConn closes. Under the
  1099. // hood, it waits for sufficient pumpWrites calls to consume the
  1100. // write buffer.
  1101. // Note: lock is to conform with net.Conn concurrency semantics
  1102. func (conn *meekConn) Write(buffer []byte) (int, error) {
  1103. conn.writeLock.Lock()
  1104. defer conn.writeLock.Unlock()
  1105. // TODO: may be more efficient to send whole buffer
  1106. // and have pumpWrites stash partial buffer when can't
  1107. // send it all.
  1108. n := 0
  1109. for n < len(buffer) {
  1110. end := n + MEEK_MAX_REQUEST_PAYLOAD_LENGTH
  1111. if end > len(buffer) {
  1112. end = len(buffer)
  1113. }
  1114. // Only write MEEK_MAX_REQUEST_PAYLOAD_LENGTH at a time,
  1115. // to ensure compatibility with v1 protocol.
  1116. chunk := buffer[n:end]
  1117. select {
  1118. case conn.nextWriteBuffer <- chunk:
  1119. case <-conn.closeBroadcast:
  1120. return n, errors.Trace(errMeekConnectionHasClosed)
  1121. }
  1122. // Wait for the buffer to be processed.
  1123. select {
  1124. case <-conn.writeResult:
  1125. // The err from conn.writeResult comes from the
  1126. // io.MultiWriter used in pumpWrites, which writes
  1127. // to both the cached response and the HTTP response.
  1128. //
  1129. // Don't stop on error here, since only writing
  1130. // to the HTTP response will fail, and the client
  1131. // may retry and use the cached response.
  1132. //
  1133. // It's possible that the cached response buffer
  1134. // is too small for the client to successfully
  1135. // retry, but that cannot be determined. In this
  1136. // case, the meek connection will eventually fail.
  1137. //
  1138. // err is already logged in ServeHTTP.
  1139. case <-conn.closeBroadcast:
  1140. return n, errors.Trace(errMeekConnectionHasClosed)
  1141. }
  1142. n += len(chunk)
  1143. }
  1144. return n, nil
  1145. }
  1146. // Close closes the meekConn. This will interrupt any blocked
  1147. // Read, Write, pumpReads, and pumpWrites.
  1148. func (conn *meekConn) Close() error {
  1149. if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
  1150. close(conn.closeBroadcast)
  1151. }
  1152. return nil
  1153. }
  1154. // Stub implementation of net.Conn.LocalAddr
  1155. func (conn *meekConn) LocalAddr() net.Addr {
  1156. return nil
  1157. }
  1158. // RemoteAddr returns the remoteAddr specified in newMeekConn. This
  1159. // acts as a proxy for the actual remote address, which is either a
  1160. // direct HTTP/HTTPS connection remote address, or in the case of
  1161. // downstream proxy of CDN fronts, some other value determined via
  1162. // HTTP headers.
  1163. func (conn *meekConn) RemoteAddr() net.Addr {
  1164. return conn.remoteAddr
  1165. }
  1166. // SetDeadline is not a true implementation of net.Conn.SetDeadline. It
  1167. // merely checks that the requested timeout exceeds the MEEK_MAX_SESSION_STALENESS
  1168. // period. When it does, and the session is idle, the meekConn Read/Write will
  1169. // be interrupted and return an error (not a timeout error) before the deadline.
  1170. // In other words, this conn will approximate the desired functionality of
  1171. // timing out on idle on or before the requested deadline.
  1172. func (conn *meekConn) SetDeadline(t time.Time) error {
  1173. // Overhead: nanoseconds (https://blog.cloudflare.com/its-go-time-on-linux/)
  1174. if time.Now().Add(MEEK_MAX_SESSION_STALENESS).Before(t) {
  1175. return nil
  1176. }
  1177. return errors.TraceNew("not supported")
  1178. }
  1179. // Stub implementation of net.Conn.SetReadDeadline
  1180. func (conn *meekConn) SetReadDeadline(t time.Time) error {
  1181. return errors.TraceNew("not supported")
  1182. }
  1183. // Stub implementation of net.Conn.SetWriteDeadline
  1184. func (conn *meekConn) SetWriteDeadline(t time.Time) error {
  1185. return errors.TraceNew("not supported")
  1186. }
  1187. // GetMetrics implements the common.MetricsSource interface. The metrics are
  1188. // maintained in the meek session type; but logTunnel, which calls
  1189. // MetricsSource.GetMetrics, has a pointer only to this conn, so it calls
  1190. // through to the session.
  1191. func (conn *meekConn) GetMetrics() common.LogFields {
  1192. return conn.meekSession.GetMetrics()
  1193. }