meek.go 68 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. std_tls "crypto/tls"
  25. "encoding/base64"
  26. "encoding/hex"
  27. "encoding/json"
  28. std_errors "errors"
  29. "hash/crc64"
  30. "io"
  31. "net"
  32. "net/http"
  33. "runtime"
  34. "strconv"
  35. "strings"
  36. "sync"
  37. "sync/atomic"
  38. "time"
  39. tls "github.com/Psiphon-Labs/psiphon-tls"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
  49. lrucache "github.com/cognusion/go-cache-lru"
  50. "github.com/juju/ratelimit"
  51. "golang.org/x/crypto/nacl/box"
  52. )
  53. // MeekServer is based on meek-server.go from Tor and Psiphon:
  54. //
  55. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  56. // CC0 1.0 Universal
  57. //
  58. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  59. const (
  60. // Protocol version 1 clients can handle arbitrary length response bodies. Older clients
  61. // report no version number and expect at most 64K response bodies.
  62. MEEK_PROTOCOL_VERSION_1 = 1
  63. // Protocol version 2 clients initiate a session by sending an encrypted and obfuscated meek
  64. // cookie with their initial HTTP request. Connection information is contained within the
  65. // encrypted cookie payload. The server inspects the cookie and establishes a new session and
  66. // returns a new random session ID back to client via Set-Cookie header. The client uses this
  67. // session ID on all subsequent requests for the remainder of the session.
  68. MEEK_PROTOCOL_VERSION_2 = 2
  69. // Protocol version 3 clients include resiliency enhancements and will add a Range header
  70. // when retrying a request for a partially downloaded response payload.
  71. MEEK_PROTOCOL_VERSION_3 = 3
  72. MEEK_MAX_REQUEST_PAYLOAD_LENGTH = 65536
  73. MEEK_MIN_SESSION_ID_LENGTH = 8
  74. MEEK_MAX_SESSION_ID_LENGTH = 20
  75. MEEK_DEFAULT_TURN_AROUND_TIMEOUT = 10 * time.Millisecond
  76. MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
  77. MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD = 8192
  78. MEEK_DEFAULT_MAX_SESSION_STALENESS = 45 * time.Second
  79. MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT = 45 * time.Second
  80. MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT = 360 * time.Second
  81. MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
  82. MEEK_DEFAULT_POOL_BUFFER_LENGTH = 65536
  83. MEEK_DEFAULT_POOL_BUFFER_COUNT = 2048
  84. )
  85. // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
  86. // Obfuscated SSH traffic) over HTTP. Meek may be fronted (through a CDN) or direct and may be
  87. // HTTP or HTTPS.
  88. //
  89. // Upstream traffic arrives in HTTP request bodies and downstream traffic is sent in response
  90. // bodies. The sequence of traffic for a given flow is associated using a session ID that's
  91. // set as a HTTP cookie for the client to submit with each request.
  92. //
  93. // MeekServer hooks into TunnelServer via the net.Conn interface by transforming the
  94. // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
  95. // the meekConn struct.
  96. type MeekServer struct {
  97. support *SupportServices
  98. listener net.Listener
  99. listenerTunnelProtocol string
  100. listenerPort int
  101. isFronted bool
  102. passthroughAddress string
  103. turnAroundTimeout time.Duration
  104. extendedTurnAroundTimeout time.Duration
  105. skipExtendedTurnAroundThreshold int
  106. maxSessionStaleness time.Duration
  107. httpClientIOTimeout time.Duration
  108. tlsConfig *tls.Config
  109. obfuscatorSeedHistory *obfuscator.SeedHistory
  110. clientHandler func(clientConn net.Conn, data *additionalTransportData)
  111. openConns *common.Conns
  112. stopBroadcast <-chan struct{}
  113. sessionsLock sync.RWMutex
  114. sessions map[string]*meekSession
  115. checksumTable *crc64.Table
  116. bufferPool *CachedResponseBufferPool
  117. rateLimitLock sync.Mutex
  118. rateLimitHistory *lrucache.Cache
  119. rateLimitCount int
  120. rateLimitSignalGC chan struct{}
  121. normalizer *transforms.HTTPNormalizerListener
  122. }
  123. // NewMeekServer initializes a new meek server.
  124. func NewMeekServer(
  125. support *SupportServices,
  126. listener net.Listener,
  127. listenerTunnelProtocol string,
  128. listenerPort int,
  129. useTLS, isFronted, useObfuscatedSessionTickets, useHTTPNormalizer bool,
  130. clientHandler func(clientConn net.Conn, data *additionalTransportData),
  131. stopBroadcast <-chan struct{}) (*MeekServer, error) {
  132. passthroughAddress := support.Config.TunnelProtocolPassthroughAddresses[listenerTunnelProtocol]
  133. turnAroundTimeout := MEEK_DEFAULT_TURN_AROUND_TIMEOUT
  134. if support.Config.MeekTurnAroundTimeoutMilliseconds != nil {
  135. turnAroundTimeout = time.Duration(
  136. *support.Config.MeekTurnAroundTimeoutMilliseconds) * time.Millisecond
  137. }
  138. extendedTurnAroundTimeout := MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT
  139. if support.Config.MeekExtendedTurnAroundTimeoutMilliseconds != nil {
  140. extendedTurnAroundTimeout = time.Duration(
  141. *support.Config.MeekExtendedTurnAroundTimeoutMilliseconds) * time.Millisecond
  142. }
  143. skipExtendedTurnAroundThreshold := MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD
  144. if support.Config.MeekSkipExtendedTurnAroundThresholdBytes != nil {
  145. skipExtendedTurnAroundThreshold = *support.Config.MeekSkipExtendedTurnAroundThresholdBytes
  146. }
  147. maxSessionStaleness := MEEK_DEFAULT_MAX_SESSION_STALENESS
  148. if support.Config.MeekMaxSessionStalenessMilliseconds != nil {
  149. maxSessionStaleness = time.Duration(
  150. *support.Config.MeekMaxSessionStalenessMilliseconds) * time.Millisecond
  151. }
  152. var httpClientIOTimeout time.Duration
  153. if isFronted {
  154. // Fronted has a distinct timeout, and the default is higher since new
  155. // clients may connect to a CDN edge and start using an existing
  156. // persistent connection.
  157. httpClientIOTimeout = MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT
  158. if support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds != nil {
  159. httpClientIOTimeout = time.Duration(
  160. *support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds) * time.Millisecond
  161. }
  162. } else {
  163. httpClientIOTimeout = MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT
  164. if support.Config.MeekHTTPClientIOTimeoutMilliseconds != nil {
  165. httpClientIOTimeout = time.Duration(
  166. *support.Config.MeekHTTPClientIOTimeoutMilliseconds) * time.Millisecond
  167. }
  168. }
  169. checksumTable := crc64.MakeTable(crc64.ECMA)
  170. bufferLength := MEEK_DEFAULT_POOL_BUFFER_LENGTH
  171. if support.Config.MeekCachedResponsePoolBufferSize != 0 {
  172. bufferLength = support.Config.MeekCachedResponsePoolBufferSize
  173. }
  174. bufferCount := MEEK_DEFAULT_POOL_BUFFER_COUNT
  175. if support.Config.MeekCachedResponsePoolBufferCount != 0 {
  176. bufferCount = support.Config.MeekCachedResponsePoolBufferCount
  177. }
  178. _, thresholdSeconds, _, _, _, _, _, _, reapFrequencySeconds, maxEntries :=
  179. support.TrafficRulesSet.GetMeekRateLimiterConfig()
  180. rateLimitHistory := lrucache.NewWithLRU(
  181. time.Duration(thresholdSeconds)*time.Second,
  182. time.Duration(reapFrequencySeconds)*time.Second,
  183. maxEntries)
  184. bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
  185. meekServer := &MeekServer{
  186. support: support,
  187. listener: listener,
  188. listenerTunnelProtocol: listenerTunnelProtocol,
  189. listenerPort: listenerPort,
  190. isFronted: isFronted,
  191. passthroughAddress: passthroughAddress,
  192. turnAroundTimeout: turnAroundTimeout,
  193. extendedTurnAroundTimeout: extendedTurnAroundTimeout,
  194. skipExtendedTurnAroundThreshold: skipExtendedTurnAroundThreshold,
  195. maxSessionStaleness: maxSessionStaleness,
  196. httpClientIOTimeout: httpClientIOTimeout,
  197. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  198. clientHandler: clientHandler,
  199. openConns: common.NewConns(),
  200. stopBroadcast: stopBroadcast,
  201. sessions: make(map[string]*meekSession),
  202. checksumTable: checksumTable,
  203. bufferPool: bufferPool,
  204. rateLimitHistory: rateLimitHistory,
  205. rateLimitSignalGC: make(chan struct{}, 1),
  206. }
  207. if useTLS {
  208. tlsConfig, err := meekServer.makeMeekTLSConfig(
  209. isFronted, useObfuscatedSessionTickets)
  210. if err != nil {
  211. return nil, errors.Trace(err)
  212. }
  213. meekServer.tlsConfig = tlsConfig
  214. }
  215. if useHTTPNormalizer && protocol.TunnelProtocolUsesMeekHTTPNormalizer(listenerTunnelProtocol) {
  216. normalizer := meekServer.makeMeekHTTPNormalizerListener()
  217. meekServer.normalizer = normalizer
  218. meekServer.listener = normalizer
  219. }
  220. return meekServer, nil
  221. }
  222. type meekContextKey struct {
  223. key string
  224. }
  225. var meekNetConnContextKey = &meekContextKey{"net.Conn"}
  226. // Run runs the meek server; this function blocks while serving HTTP or
  227. // HTTPS connections on the specified listener. This function also runs
  228. // a goroutine which cleans up expired meek client sessions.
  229. //
  230. // To stop the meek server, both Close() the listener and set the stopBroadcast
  231. // signal specified in NewMeekServer.
  232. func (server *MeekServer) Run() error {
  233. waitGroup := new(sync.WaitGroup)
  234. waitGroup.Add(1)
  235. go func() {
  236. defer waitGroup.Done()
  237. ticker := time.NewTicker(server.maxSessionStaleness / 2)
  238. defer ticker.Stop()
  239. for {
  240. select {
  241. case <-ticker.C:
  242. server.deleteExpiredSessions()
  243. case <-server.stopBroadcast:
  244. return
  245. }
  246. }
  247. }()
  248. waitGroup.Add(1)
  249. go func() {
  250. defer waitGroup.Done()
  251. server.rateLimitWorker()
  252. }()
  253. // Serve HTTP or HTTPS
  254. //
  255. // - WriteTimeout may include time awaiting request, as per:
  256. // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
  257. // - Legacy meek-server wrapped each client HTTP connection with an explicit idle
  258. // timeout net.Conn and didn't use http.Server timeouts. We could do the same
  259. // here (use ActivityMonitoredConn) but the stock http.Server timeouts should
  260. // now be sufficient.
  261. httpServer := &http.Server{
  262. ReadTimeout: server.httpClientIOTimeout,
  263. WriteTimeout: server.httpClientIOTimeout,
  264. Handler: server,
  265. ConnState: server.httpConnStateCallback,
  266. ConnContext: func(ctx context.Context, conn net.Conn) context.Context {
  267. return context.WithValue(ctx, meekNetConnContextKey, conn)
  268. },
  269. // Disable auto HTTP/2 (https://golang.org/doc/go1.6)
  270. TLSNextProto: make(map[string]func(*http.Server, *std_tls.Conn, http.Handler)),
  271. }
  272. // Note: Serve() will be interrupted by listener.Close() call
  273. var err error
  274. if server.tlsConfig != nil {
  275. httpsServer := HTTPSServer{Server: httpServer}
  276. err = httpsServer.ServeTLS(server.listener, server.tlsConfig)
  277. } else {
  278. err = httpServer.Serve(server.listener)
  279. }
  280. // Can't check for the exact error that Close() will cause in Accept(),
  281. // (see: https://code.google.com/p/go/issues/detail?id=4373). So using an
  282. // explicit stop signal to stop gracefully.
  283. select {
  284. case <-server.stopBroadcast:
  285. err = nil
  286. default:
  287. }
  288. // deleteExpiredSessions calls deleteSession which may block waiting
  289. // for active request handlers to complete; timely shutdown requires
  290. // stopping the listener and closing all existing connections before
  291. // awaiting the reaperWaitGroup.
  292. server.listener.Close()
  293. server.openConns.CloseAll()
  294. waitGroup.Wait()
  295. return err
  296. }
  297. // ServeHTTP handles meek client HTTP requests, where the request body
  298. // contains upstream traffic and the response will contain downstream
  299. // traffic.
  300. func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
  301. // Note: no longer requiring that the request method is POST
  302. // Check for required headers and values. For fronting, required headers
  303. // may be used to identify a CDN edge. When this check fails,
  304. // TerminateHTTPConnection is called instead of handleError, so any
  305. // persistent connection is always closed.
  306. if len(server.support.Config.MeekRequiredHeaders) > 0 {
  307. for header, value := range server.support.Config.MeekRequiredHeaders {
  308. requestValue := request.Header.Get(header)
  309. if requestValue != value {
  310. log.WithTraceFields(LogFields{
  311. "header": header,
  312. "value": requestValue,
  313. }).Warning("invalid required meek header")
  314. common.TerminateHTTPConnection(responseWriter, request)
  315. return
  316. }
  317. }
  318. }
  319. // Check for the expected meek/session ID cookie.
  320. // Also check for prohibited HTTP headers.
  321. var meekCookie *http.Cookie
  322. for _, c := range request.Cookies() {
  323. meekCookie = c
  324. break
  325. }
  326. if meekCookie == nil || len(meekCookie.Value) == 0 {
  327. log.WithTrace().Warning("missing meek cookie")
  328. common.TerminateHTTPConnection(responseWriter, request)
  329. return
  330. }
  331. if len(server.support.Config.MeekProhibitedHeaders) > 0 {
  332. for _, header := range server.support.Config.MeekProhibitedHeaders {
  333. value := request.Header.Get(header)
  334. if header != "" {
  335. log.WithTraceFields(LogFields{
  336. "header": header,
  337. "value": value,
  338. }).Warning("prohibited meek header")
  339. server.handleError(responseWriter, request)
  340. return
  341. }
  342. }
  343. }
  344. // A valid meek cookie indicates which class of request this is:
  345. //
  346. // 1. A new meek session. Create a new session ID and proceed with
  347. // relaying tunnel traffic.
  348. //
  349. // 2. An existing meek session. Resume relaying tunnel traffic.
  350. //
  351. // 3. A request to an endpoint. This meek connection is not for relaying
  352. // tunnel traffic. Instead, the request is handed off to a custom handler.
  353. sessionID,
  354. session,
  355. underlyingConn,
  356. endPoint,
  357. endPointGeoIPData,
  358. err := server.getSessionOrEndpoint(request, meekCookie)
  359. if err != nil {
  360. // Debug since session cookie errors commonly occur during
  361. // normal operation.
  362. log.WithTraceFields(LogFields{"error": err}).Debug("session lookup failed")
  363. server.handleError(responseWriter, request)
  364. return
  365. }
  366. if endPoint != "" {
  367. // Endpoint mode. Currently, this means it's handled by the tactics
  368. // request handler.
  369. handled := server.support.TacticsServer.HandleEndPoint(
  370. endPoint, common.GeoIPData(*endPointGeoIPData), responseWriter, request)
  371. if !handled {
  372. log.WithTraceFields(LogFields{"endPoint": endPoint}).Info("unhandled endpoint")
  373. server.handleError(responseWriter, request)
  374. }
  375. return
  376. }
  377. // Tunnel relay mode.
  378. // Ensure that there's only one concurrent request handler per client
  379. // session. Depending on the nature of a network disruption, it can
  380. // happen that a client detects a failure and retries while the server
  381. // is still streaming response in the handler for the _previous_ client
  382. // request.
  383. //
  384. // Even if the session.cachedResponse were safe for concurrent
  385. // use (it is not), concurrent handling could lead to loss of session
  386. // since upstream data read by the first request may not reach the
  387. // cached response before the second request reads the cached data.
  388. //
  389. // The existing handler will stream response data, holding the lock,
  390. // for no more than MEEK_EXTENDED_TURN_AROUND_TIMEOUT.
  391. //
  392. // TODO: interrupt an existing handler? The existing handler will be
  393. // sending data to the cached response, but if that buffer fills, the
  394. // session will be lost.
  395. requestNumber := atomic.AddInt64(&session.requestCount, 1)
  396. // Wait for the existing request to complete.
  397. session.lock.Lock()
  398. defer session.lock.Unlock()
  399. // Count this metric once the lock is acquired, to avoid concurrent and
  400. // potentially incorrect session.underlyingConn updates.
  401. //
  402. // It should never be the case that a new underlyingConn has the same
  403. // value as the previous session.underlyingConn, as each is a net.Conn
  404. // interface which includes a pointer, and the previous value cannot
  405. // be garbage collected until session.underlyingConn is updated.
  406. if session.underlyingConn != underlyingConn {
  407. atomic.AddInt64(&session.metricUnderlyingConnCount, 1)
  408. session.underlyingConn = underlyingConn
  409. }
  410. // If a newer request has arrived while waiting, discard this one.
  411. // Do not delay processing the newest request.
  412. //
  413. // If the session expired and was deleted while this request was waiting,
  414. // discard this request. The session is no longer valid, and the final call
  415. // to session.cachedResponse.Reset may have already occured, so any further
  416. // session.cachedResponse access may deplete resources (fail to refill the pool).
  417. if atomic.LoadInt64(&session.requestCount) > requestNumber || session.deleted {
  418. common.TerminateHTTPConnection(responseWriter, request)
  419. return
  420. }
  421. // pumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
  422. // read the request body as upstream traffic.
  423. // TODO: run pumpReads and pumpWrites concurrently?
  424. // pumpReads checksums the request payload and skips relaying it when
  425. // it matches the immediately previous request payload. This allows
  426. // clients to resend request payloads, when retrying due to connection
  427. // interruption, without knowing whether the server has received or
  428. // relayed the data.
  429. requestSize, err := session.clientConn.pumpReads(request.Body)
  430. if err != nil {
  431. if err != io.EOF {
  432. // Debug since errors such as "i/o timeout" occur during normal operation;
  433. // also, golang network error messages may contain client IP.
  434. log.WithTraceFields(LogFields{"error": err}).Debug("read request failed")
  435. }
  436. common.TerminateHTTPConnection(responseWriter, request)
  437. // Note: keep session open to allow client to retry
  438. return
  439. }
  440. // The extended turn around mechanism optimizes for downstream flows by
  441. // sending more data in the response as long as it's available. As a
  442. // heuristic, when the request size meets a threshold, optimize instead
  443. // of upstream flows by skipping the extended turn around.
  444. skipExtendedTurnAround := requestSize >= int64(server.skipExtendedTurnAroundThreshold)
  445. // Set cookie before writing the response.
  446. if session.meekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 && !session.sessionIDSent {
  447. // Replace the meek cookie with the session ID.
  448. // SetCookie for the the session ID cookie is only set once, to reduce overhead. This
  449. // session ID value replaces the original meek cookie value.
  450. http.SetCookie(responseWriter, &http.Cookie{Name: meekCookie.Name, Value: sessionID})
  451. session.sessionIDSent = true
  452. }
  453. // When streaming data into the response body, a copy is
  454. // retained in the cachedResponse buffer. This allows the
  455. // client to retry and request that the response be resent
  456. // when the HTTP connection is interrupted.
  457. //
  458. // If a Range header is present, the client is retrying,
  459. // possibly after having received a partial response. In
  460. // this case, use any cached response to attempt to resend
  461. // the response, starting from the resend position the client
  462. // indicates.
  463. //
  464. // When the resend position is not available -- because the
  465. // cachedResponse buffer could not hold it -- the client session
  466. // is closed, as there's no way to resume streaming the payload
  467. // uninterrupted.
  468. //
  469. // The client may retry before a cached response is prepared,
  470. // so a cached response is not always used when a Range header
  471. // is present.
  472. //
  473. // TODO: invalid Range header is ignored; should it be otherwise?
  474. position, isRetry := checkRangeHeader(request)
  475. if isRetry {
  476. atomic.AddInt64(&session.metricClientRetries, 1)
  477. }
  478. hasCompleteCachedResponse := session.cachedResponse.HasPosition(0)
  479. // The client is not expected to send position > 0 when there is
  480. // no cached response; let that case fall through to the next
  481. // HasPosition check which will fail and close the session.
  482. var responseSize int
  483. var responseError error
  484. if isRetry && (hasCompleteCachedResponse || position > 0) {
  485. if !session.cachedResponse.HasPosition(position) {
  486. greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
  487. server.handleError(responseWriter, request)
  488. session.delete(true)
  489. return
  490. }
  491. responseWriter.WriteHeader(http.StatusPartialContent)
  492. // TODO:
  493. // - enforce a max extended buffer count per client, for
  494. // fairness? Throttling may make this unnecessary.
  495. // - cachedResponse can now start releasing extended buffers,
  496. // as response bytes before "position" will never be requested
  497. // again?
  498. responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter)
  499. greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize))
  500. // The client may again fail to receive the payload and may again
  501. // retry, so not yet releasing cachedResponse buffers.
  502. } else {
  503. // _Now_ we release buffers holding data from the previous
  504. // response. And then immediately stream the new response into
  505. // newly acquired buffers.
  506. session.cachedResponse.Reset()
  507. // Note: this code depends on an implementation detail of
  508. // io.MultiWriter: a Write() to the MultiWriter writes first
  509. // to the cache, and then to the response writer. So if the
  510. // write to the response writer fails, the payload is cached.
  511. multiWriter := io.MultiWriter(session.cachedResponse, responseWriter)
  512. // The client expects 206, not 200, whenever it sets a Range header,
  513. // which it may do even when no cached response is prepared.
  514. if isRetry {
  515. responseWriter.WriteHeader(http.StatusPartialContent)
  516. }
  517. // pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
  518. // write its downstream traffic through to the response body.
  519. responseSize, responseError = session.clientConn.pumpWrites(multiWriter, skipExtendedTurnAround)
  520. greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
  521. greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
  522. }
  523. // responseError is the result of writing the body either from CopyFromPosition or pumpWrites
  524. if responseError != nil {
  525. if responseError != io.EOF {
  526. // Debug since errors such as "i/o timeout" occur during normal operation;
  527. // also, golang network error messages may contain client IP.
  528. log.WithTraceFields(LogFields{"error": responseError}).Debug("write response failed")
  529. }
  530. server.handleError(responseWriter, request)
  531. // Note: keep session open to allow client to retry
  532. return
  533. }
  534. }
  535. func (server *MeekServer) handleError(responseWriter http.ResponseWriter, request *http.Request) {
  536. // When fronted, keep the persistent connection open since it may be used
  537. // by many clients coming through the same edge. For performance reasons,
  538. // an error, including invalid input, from one client shouldn't close the
  539. // persistent connection used by other clients.
  540. if server.isFronted {
  541. http.NotFound(responseWriter, request)
  542. return
  543. }
  544. common.TerminateHTTPConnection(responseWriter, request)
  545. }
  546. func checkRangeHeader(request *http.Request) (int, bool) {
  547. rangeHeader := request.Header.Get("Range")
  548. if rangeHeader == "" {
  549. return 0, false
  550. }
  551. prefix := "bytes="
  552. suffix := "-"
  553. if !strings.HasPrefix(rangeHeader, prefix) ||
  554. !strings.HasSuffix(rangeHeader, suffix) {
  555. return 0, false
  556. }
  557. rangeHeader = strings.TrimPrefix(rangeHeader, prefix)
  558. rangeHeader = strings.TrimSuffix(rangeHeader, suffix)
  559. position, err := strconv.Atoi(rangeHeader)
  560. if err != nil {
  561. return 0, false
  562. }
  563. return position, true
  564. }
  565. // getSessionOrEndpoint checks if the cookie corresponds to an existing tunnel
  566. // relay session ID. If no session is found, the cookie must be an obfuscated
  567. // meek cookie. A new session is created when the meek cookie indicates relay
  568. // mode; or the endpoint is returned when the meek cookie indicates endpoint
  569. // mode.
  570. func (server *MeekServer) getSessionOrEndpoint(
  571. request *http.Request, meekCookie *http.Cookie) (string, *meekSession, net.Conn, string, *GeoIPData, error) {
  572. underlyingConn := request.Context().Value(meekNetConnContextKey).(net.Conn)
  573. // Check for an existing session.
  574. server.sessionsLock.RLock()
  575. existingSessionID := meekCookie.Value
  576. session, ok := server.sessions[existingSessionID]
  577. server.sessionsLock.RUnlock()
  578. if ok {
  579. // TODO: can multiple http client connections using same session cookie
  580. // cause race conditions on session struct?
  581. session.touch()
  582. return existingSessionID, session, underlyingConn, "", nil, nil
  583. }
  584. // Determine the client remote address, which is used for geolocation
  585. // stats, rate limiting, anti-probing, discovery, and tactics selection
  586. // logic.
  587. //
  588. // When an intermediate proxy or CDN is in use, we may be
  589. // able to determine the original client address by inspecting HTTP
  590. // headers such as X-Forwarded-For.
  591. //
  592. // We trust only headers provided by CDNs. Fronted Psiphon server hosts
  593. // should be configured to accept tunnel connections only from CDN edges.
  594. // When the CDN passes along a chain of IPs, as in X-Forwarded-For, we
  595. // trust only the right-most IP, which is provided by the CDN.
  596. clientIP, _, err := net.SplitHostPort(request.RemoteAddr)
  597. if err != nil {
  598. return "", nil, nil, "", nil, errors.Trace(err)
  599. }
  600. if net.ParseIP(clientIP) == nil {
  601. return "", nil, nil, "", nil, errors.TraceNew("invalid IP address")
  602. }
  603. if server.isFronted && len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
  604. // When there are multiple header names in MeekProxyForwardedForHeaders,
  605. // the first valid match is preferred. MeekProxyForwardedForHeaders should be
  606. // configured to use header names that are always provided by the CDN(s) and
  607. // not header names that may be passed through from clients.
  608. for _, header := range server.support.Config.MeekProxyForwardedForHeaders {
  609. // In the case where there are multiple headers,
  610. // request.Header.Get returns the first header, but we want the
  611. // last header; so use request.Header.Values and select the last
  612. // value. As per RFC 2616 section 4.2, a proxy must not change
  613. // the order of field values, which implies that it should append
  614. // values to the last header.
  615. values := request.Header.Values(header)
  616. if len(values) > 0 {
  617. value := values[len(values)-1]
  618. // Some headers, such as X-Forwarded-For, are a comma-separated
  619. // list of IPs (each proxy in a chain). Select the last IP.
  620. IPs := strings.Split(value, ",")
  621. IP := IPs[len(IPs)-1]
  622. // Remove optional whitespace surrounding the commas.
  623. IP = strings.TrimSpace(IP)
  624. if net.ParseIP(IP) != nil {
  625. clientIP = IP
  626. break
  627. }
  628. }
  629. }
  630. }
  631. geoIPData := server.support.GeoIPService.Lookup(clientIP)
  632. // Check for a steering IP header, which contains an alternate dial IP to
  633. // be returned to the client via the secure API handshake response.
  634. // Steering may be used to load balance CDN traffic.
  635. //
  636. // The steering IP header is added by a CDN or CDN service process. To
  637. // prevent steering IP spoofing, the service process must filter out any
  638. // steering IP headers injected into ingress requests.
  639. //
  640. // Steering IP headers must appear in the first request of a meek session
  641. // in order to be recorded here and relayed to the client.
  642. var steeringIP string
  643. if server.isFronted && server.support.Config.EnableSteeringIPs {
  644. steeringIP = request.Header.Get("X-Psiphon-Steering-Ip")
  645. if steeringIP != "" {
  646. IP := net.ParseIP(steeringIP)
  647. if IP == nil || common.IsBogon(IP) {
  648. steeringIP = ""
  649. log.WithTraceFields(LogFields{"steeringIP": steeringIP}).Warning("invalid steering IP")
  650. }
  651. }
  652. }
  653. // The session is new (or expired). Treat the cookie value as a new meek
  654. // cookie, extract the payload, and create a new session.
  655. // Limitation: when the cookie is a session ID for an expired session, we
  656. // still attempt to treat it as a meek cookie. As it stands, that yields
  657. // either base64 decoding errors (RawStdEncoding vs. StdEncoding) or
  658. // length errors. We could log cleaner errors ("session is expired") by
  659. // checking that the cookie is a well-formed (base64.RawStdEncoding) value
  660. // between MEEK_MIN_SESSION_ID_LENGTH and MEEK_MAX_SESSION_ID_LENGTH
  661. // bytes -- assuming that MEEK_MAX_SESSION_ID_LENGTH is too short to be a
  662. // valid meek cookie.
  663. var payloadJSON []byte
  664. if server.normalizer != nil {
  665. // NOTE: operates on the assumption that the normalizer is not wrapped
  666. // with a further conn.
  667. underlyingConn := request.Context().Value(meekNetConnContextKey).(net.Conn)
  668. normalizedConn := underlyingConn.(*transforms.HTTPNormalizer)
  669. payloadJSON = normalizedConn.ValidateMeekCookieResult
  670. } else {
  671. payloadJSON, err = server.getMeekCookiePayload(clientIP, meekCookie.Value)
  672. if err != nil {
  673. return "", nil, nil, "", nil, errors.Trace(err)
  674. }
  675. }
  676. // Note: this meek server ignores legacy values PsiphonClientSessionId
  677. // and PsiphonServerAddress.
  678. var clientSessionData protocol.MeekCookieData
  679. err = json.Unmarshal(payloadJSON, &clientSessionData)
  680. if err != nil {
  681. return "", nil, nil, "", nil, errors.Trace(err)
  682. }
  683. // Any rate limit is enforced after the meek cookie is validated, so a prober
  684. // without the obfuscation secret will be unable to fingerprint the server
  685. // based on response time combined with the rate limit configuration. The
  686. // rate limit is primarily intended to limit memory resource consumption and
  687. // not the overhead incurred by cookie validation.
  688. if server.rateLimit(clientIP, geoIPData, server.listenerTunnelProtocol) {
  689. return "", nil, nil, "", nil, errors.TraceNew("rate limit exceeded")
  690. }
  691. // Handle endpoints before enforcing CheckEstablishTunnels.
  692. // Currently, endpoints are tactics requests, and we allow these to be
  693. // handled by servers which would otherwise reject new tunnels.
  694. if clientSessionData.EndPoint != "" {
  695. return "", nil, nil, clientSessionData.EndPoint, &geoIPData, nil
  696. }
  697. // Don't create new sessions when not establishing. A subsequent SSH handshake
  698. // will not succeed, so creating a meek session just wastes resources.
  699. if server.support.TunnelServer != nil &&
  700. !server.support.TunnelServer.CheckEstablishTunnels() {
  701. return "", nil, nil, "", nil, errors.TraceNew("not establishing tunnels")
  702. }
  703. // Disconnect immediately if the tactics for the client restricts usage of
  704. // the fronting provider ID. The probability may be used to influence
  705. // usage of a given fronting provider; but when only that provider works
  706. // for a given client, and the probability is less than 1.0, the client
  707. // can retry until it gets a successful coin flip.
  708. //
  709. // Clients will also skip candidates with restricted fronting provider IDs.
  710. // The client-side probability, RestrictFrontingProviderIDsClientProbability,
  711. // is applied independently of the server-side coin flip here.
  712. //
  713. // At this stage, GeoIP tactics filters are active, but handshake API
  714. // parameters are not.
  715. //
  716. // See the comment in server.LoadConfig regarding fronting provider ID
  717. // limitations.
  718. if protocol.TunnelProtocolUsesFrontedMeek(server.listenerTunnelProtocol) &&
  719. server.support.ServerTacticsParametersCache != nil {
  720. p, err := server.support.ServerTacticsParametersCache.Get(geoIPData)
  721. if err != nil {
  722. return "", nil, nil, "", nil, errors.Trace(err)
  723. }
  724. if !p.IsNil() &&
  725. common.Contains(
  726. p.Strings(parameters.RestrictFrontingProviderIDs),
  727. server.support.Config.GetFrontingProviderID()) {
  728. if p.WeightedCoinFlip(
  729. parameters.RestrictFrontingProviderIDsServerProbability) {
  730. return "", nil, nil, "", nil, errors.TraceNew("restricted fronting provider")
  731. }
  732. }
  733. }
  734. // The tunnel protocol name is used for stats and traffic rules. In many
  735. // cases, its value is unambiguously determined by the listener port. In
  736. // certain cases, such as multiple fronted protocols with a single
  737. // backend listener, the client's reported tunnel protocol value is used.
  738. // The caller must validate clientTunnelProtocol with
  739. // protocol.IsValidClientTunnelProtocol.
  740. var clientTunnelProtocol string
  741. if clientSessionData.ClientTunnelProtocol != "" {
  742. if !protocol.IsValidClientTunnelProtocol(
  743. clientSessionData.ClientTunnelProtocol,
  744. server.listenerTunnelProtocol,
  745. server.support.Config.GetRunningProtocols()) {
  746. return "", nil, nil, "", nil, errors.Tracef(
  747. "invalid client tunnel protocol: %s", clientSessionData.ClientTunnelProtocol)
  748. }
  749. clientTunnelProtocol = clientSessionData.ClientTunnelProtocol
  750. }
  751. // Create a new session
  752. bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
  753. if server.support.Config.MeekCachedResponseBufferSize != 0 {
  754. bufferLength = server.support.Config.MeekCachedResponseBufferSize
  755. }
  756. cachedResponse := NewCachedResponse(bufferLength, server.bufferPool)
  757. session = &meekSession{
  758. meekProtocolVersion: clientSessionData.MeekProtocolVersion,
  759. sessionIDSent: false,
  760. cachedResponse: cachedResponse,
  761. cookieName: meekCookie.Name,
  762. contentType: request.Header.Get("Content-Type"),
  763. }
  764. session.touch()
  765. // Create a new meek conn that will relay the payload
  766. // between meek request/responses and the tunnel server client
  767. // handler. The client IP is also used to initialize the
  768. // meek conn with a useful value to return when the tunnel
  769. // server calls conn.RemoteAddr() to get the client's IP address.
  770. // Assumes clientIP is a valid IP address; the port value is a stub
  771. // and is expected to be ignored.
  772. clientConn := newMeekConn(
  773. server,
  774. session,
  775. underlyingConn,
  776. &net.TCPAddr{
  777. IP: net.ParseIP(clientIP),
  778. Port: 0,
  779. },
  780. clientSessionData.MeekProtocolVersion)
  781. session.clientConn = clientConn
  782. // Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
  783. // meek cookie to a session ID; v1 clients always send the
  784. // original meek cookie value with each request. The issue with
  785. // v1 is that clients which wake after a device sleep will attempt
  786. // to resume a meek session and the server can't differentiate
  787. // between resuming a session and creating a new session. This
  788. // causes the v1 client connection to hang/timeout.
  789. sessionID := meekCookie.Value
  790. if clientSessionData.MeekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 {
  791. sessionID, err = makeMeekSessionID()
  792. if err != nil {
  793. return "", nil, nil, "", nil, errors.Trace(err)
  794. }
  795. }
  796. server.sessionsLock.Lock()
  797. server.sessions[sessionID] = session
  798. server.sessionsLock.Unlock()
  799. var additionalData *additionalTransportData
  800. if clientTunnelProtocol != "" || steeringIP != "" {
  801. additionalData = &additionalTransportData{
  802. overrideTunnelProtocol: clientTunnelProtocol,
  803. steeringIP: steeringIP,
  804. }
  805. }
  806. // Note: from the tunnel server's perspective, this client connection
  807. // will close when session.delete calls Close() on the meekConn.
  808. server.clientHandler(session.clientConn, additionalData)
  809. return sessionID, session, underlyingConn, "", nil, nil
  810. }
  811. func (server *MeekServer) rateLimit(
  812. clientIP string, geoIPData GeoIPData, tunnelProtocol string) bool {
  813. historySize,
  814. thresholdSeconds,
  815. tunnelProtocols,
  816. regions,
  817. ISPs,
  818. ASNs,
  819. cities,
  820. GCTriggerCount, _, _ :=
  821. server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
  822. if historySize == 0 {
  823. return false
  824. }
  825. if len(tunnelProtocols) > 0 {
  826. if !common.Contains(tunnelProtocols, tunnelProtocol) {
  827. return false
  828. }
  829. }
  830. if len(regions) > 0 || len(ISPs) > 0 || len(ASNs) > 0 || len(cities) > 0 {
  831. if len(regions) > 0 {
  832. if !common.Contains(regions, geoIPData.Country) {
  833. return false
  834. }
  835. }
  836. if len(ISPs) > 0 {
  837. if !common.Contains(ISPs, geoIPData.ISP) {
  838. return false
  839. }
  840. }
  841. if len(ASNs) > 0 {
  842. if !common.Contains(ASNs, geoIPData.ASN) {
  843. return false
  844. }
  845. }
  846. if len(cities) > 0 {
  847. if !common.Contains(cities, geoIPData.City) {
  848. return false
  849. }
  850. }
  851. }
  852. // With IPv6, individual users or sites are users commonly allocated a /64
  853. // or /56, so rate limit by /56.
  854. rateLimitIP := clientIP
  855. IP := net.ParseIP(clientIP)
  856. if IP != nil && IP.To4() == nil {
  857. rateLimitIP = IP.Mask(net.CIDRMask(56, 128)).String()
  858. }
  859. // go-cache-lru is safe for concurrent access, but lacks an atomic
  860. // compare-and-set type operations to check if an entry exists before
  861. // adding a new one. This mutex ensures the Get and Add are atomic
  862. // (as well as synchronizing access to rateLimitCount).
  863. server.rateLimitLock.Lock()
  864. var rateLimiter *ratelimit.Bucket
  865. entry, ok := server.rateLimitHistory.Get(rateLimitIP)
  866. if ok {
  867. rateLimiter = entry.(*ratelimit.Bucket)
  868. } else {
  869. rateLimiter = ratelimit.NewBucketWithQuantum(
  870. time.Duration(thresholdSeconds)*time.Second,
  871. int64(historySize),
  872. int64(historySize))
  873. server.rateLimitHistory.Set(
  874. rateLimitIP,
  875. rateLimiter,
  876. time.Duration(thresholdSeconds)*time.Second)
  877. }
  878. limit := rateLimiter.TakeAvailable(1) < 1
  879. triggerGC := false
  880. if limit {
  881. server.rateLimitCount += 1
  882. if server.rateLimitCount >= GCTriggerCount {
  883. triggerGC = true
  884. server.rateLimitCount = 0
  885. }
  886. }
  887. server.rateLimitLock.Unlock()
  888. if triggerGC {
  889. select {
  890. case server.rateLimitSignalGC <- struct{}{}:
  891. default:
  892. }
  893. }
  894. return limit
  895. }
  896. func (server *MeekServer) rateLimitWorker() {
  897. for {
  898. select {
  899. case <-server.rateLimitSignalGC:
  900. runtime.GC()
  901. case <-server.stopBroadcast:
  902. return
  903. }
  904. }
  905. }
  906. func (server *MeekServer) deleteSession(sessionID string) {
  907. // Don't obtain the server.sessionsLock write lock until modifying
  908. // server.sessions, as the session.delete can block for up to
  909. // MEEK_HTTP_CLIENT_IO_TIMEOUT. Allow new sessions to be added
  910. // concurrently.
  911. //
  912. // Since a lock isn't held for the duration, concurrent calls to
  913. // deleteSession with the same sessionID could happen; this is
  914. // not expected since only the reaper goroutine calls deleteExpiredSessions
  915. // (and in any case concurrent execution of the ok block is not an issue).
  916. server.sessionsLock.RLock()
  917. session, ok := server.sessions[sessionID]
  918. server.sessionsLock.RUnlock()
  919. if ok {
  920. session.delete(false)
  921. server.sessionsLock.Lock()
  922. delete(server.sessions, sessionID)
  923. server.sessionsLock.Unlock()
  924. }
  925. }
  926. func (server *MeekServer) deleteExpiredSessions() {
  927. // A deleteSession call may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  928. // so grab a snapshot list of expired sessions and do not hold a lock for
  929. // the duration of deleteExpiredSessions. This allows new sessions to be
  930. // added concurrently.
  931. //
  932. // New sessions added after the snapshot is taken will be checked for
  933. // expiry on subsequent periodic calls to deleteExpiredSessions.
  934. //
  935. // To avoid long delays in releasing resources, individual deletes are
  936. // performed concurrently.
  937. server.sessionsLock.Lock()
  938. expiredSessionIDs := make([]string, 0)
  939. for sessionID, session := range server.sessions {
  940. if session.expired() {
  941. expiredSessionIDs = append(expiredSessionIDs, sessionID)
  942. }
  943. }
  944. server.sessionsLock.Unlock()
  945. start := time.Now()
  946. deleteWaitGroup := new(sync.WaitGroup)
  947. for _, sessionID := range expiredSessionIDs {
  948. deleteWaitGroup.Add(1)
  949. go func(sessionID string) {
  950. defer deleteWaitGroup.Done()
  951. server.deleteSession(sessionID)
  952. }(sessionID)
  953. }
  954. deleteWaitGroup.Wait()
  955. log.WithTraceFields(
  956. LogFields{"elapsed time": time.Since(start)}).Debug("deleted expired sessions")
  957. }
  958. // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
  959. // meek server.
  960. func (server *MeekServer) httpConnStateCallback(conn net.Conn, connState http.ConnState) {
  961. switch connState {
  962. case http.StateNew:
  963. server.openConns.Add(conn)
  964. case http.StateHijacked, http.StateClosed:
  965. server.openConns.Remove(conn)
  966. }
  967. }
  968. // getMeekCookiePayload extracts the payload from a meek cookie. The cookie
  969. // payload is base64 encoded, obfuscated, and NaCl encrypted.
  970. func (server *MeekServer) getMeekCookiePayload(
  971. clientIP string, cookieValue string) ([]byte, error) {
  972. decodedValue, err := base64.StdEncoding.DecodeString(cookieValue)
  973. if err != nil {
  974. return nil, errors.Trace(err)
  975. }
  976. // The data consists of an obfuscated seed message prepended
  977. // to the obfuscated, encrypted payload. The server obfuscator
  978. // will read the seed message, leaving the remaining encrypted
  979. // data in the reader.
  980. reader := bytes.NewReader(decodedValue[:])
  981. obfuscator, err := obfuscator.NewServerObfuscator(
  982. &obfuscator.ObfuscatorConfig{
  983. Keyword: server.support.Config.MeekObfuscatedKey,
  984. SeedHistory: server.obfuscatorSeedHistory,
  985. IrregularLogger: func(clientIP string, err error, logFields common.LogFields) {
  986. logIrregularTunnel(
  987. server.support,
  988. server.listenerTunnelProtocol,
  989. server.listenerPort,
  990. clientIP,
  991. errors.Trace(err),
  992. LogFields(logFields))
  993. },
  994. },
  995. clientIP,
  996. reader)
  997. if err != nil {
  998. return nil, errors.Trace(err)
  999. }
  1000. offset, err := reader.Seek(0, 1)
  1001. if err != nil {
  1002. return nil, errors.Trace(err)
  1003. }
  1004. encryptedPayload := decodedValue[offset:]
  1005. obfuscator.ObfuscateClientToServer(encryptedPayload)
  1006. var nonce [24]byte
  1007. var privateKey, ephemeralPublicKey [32]byte
  1008. decodedPrivateKey, err := base64.StdEncoding.DecodeString(
  1009. server.support.Config.MeekCookieEncryptionPrivateKey)
  1010. if err != nil {
  1011. return nil, errors.Trace(err)
  1012. }
  1013. copy(privateKey[:], decodedPrivateKey)
  1014. if len(encryptedPayload) < 32 {
  1015. return nil, errors.TraceNew("unexpected encrypted payload size")
  1016. }
  1017. copy(ephemeralPublicKey[0:32], encryptedPayload[0:32])
  1018. payload, ok := box.Open(nil, encryptedPayload[32:], &nonce, &ephemeralPublicKey, &privateKey)
  1019. if !ok {
  1020. return nil, errors.TraceNew("open box failed")
  1021. }
  1022. return payload, nil
  1023. }
  1024. // makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
  1025. // Currently, this config is optimized for fronted meek where the nature
  1026. // of the connection is non-circumvention; it's optimized for performance
  1027. // assuming the peer is an uncensored CDN.
  1028. func (server *MeekServer) makeMeekTLSConfig(
  1029. isFronted bool, useObfuscatedSessionTickets bool) (*tls.Config, error) {
  1030. certificate, privateKey, err := common.GenerateWebServerCertificate(values.GetHostName())
  1031. if err != nil {
  1032. return nil, errors.Trace(err)
  1033. }
  1034. tlsCertificate, err := tls.X509KeyPair(
  1035. []byte(certificate), []byte(privateKey))
  1036. if err != nil {
  1037. return nil, errors.Trace(err)
  1038. }
  1039. // Vary the minimum version to frustrate scanning/fingerprinting of unfronted servers.
  1040. // Limitation: like the certificate, this value changes on restart.
  1041. minVersionCandidates := []uint16{tls.VersionTLS10, tls.VersionTLS11, tls.VersionTLS12}
  1042. minVersion := minVersionCandidates[prng.Intn(len(minVersionCandidates))]
  1043. config := &tls.Config{
  1044. Certificates: []tls.Certificate{tlsCertificate},
  1045. NextProtos: []string{"http/1.1"},
  1046. MinVersion: minVersion,
  1047. }
  1048. if isFronted {
  1049. // This is a reordering of the supported CipherSuites in golang 1.6[*]. Non-ephemeral key
  1050. // CipherSuites greatly reduce server load, and we try to select these since the meek
  1051. // protocol is providing obfuscation, not privacy/integrity (this is provided by the
  1052. // tunneled SSH), so we don't benefit from the perfect forward secrecy property provided
  1053. // by ephemeral key CipherSuites.
  1054. // https://github.com/golang/go/blob/1cb3044c9fcd88e1557eca1bf35845a4108bc1db/src/crypto/tls/cipher_suites.go#L75
  1055. //
  1056. // This optimization is applied only when there's a CDN in front of the meek server; in
  1057. // unfronted cases we prefer a more natural TLS handshake.
  1058. //
  1059. // [*] the list has since been updated, removing CipherSuites using RC4 and 3DES.
  1060. config.CipherSuites = []uint16{
  1061. tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
  1062. tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
  1063. tls.TLS_RSA_WITH_AES_128_CBC_SHA,
  1064. tls.TLS_RSA_WITH_AES_256_CBC_SHA,
  1065. tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
  1066. tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
  1067. tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  1068. tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
  1069. tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
  1070. tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
  1071. tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
  1072. tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
  1073. }
  1074. }
  1075. if useObfuscatedSessionTickets {
  1076. // See obfuscated session ticket overview
  1077. // in NewObfuscatedClientSessionState.
  1078. config.UseObfuscatedSessionTickets = true
  1079. var obfuscatedSessionTicketKey [32]byte
  1080. key, err := hex.DecodeString(server.support.Config.MeekObfuscatedKey)
  1081. if err == nil && len(key) != 32 {
  1082. err = std_errors.New("invalid obfuscated session key length")
  1083. }
  1084. if err != nil {
  1085. return nil, errors.Trace(err)
  1086. }
  1087. copy(obfuscatedSessionTicketKey[:], key)
  1088. var standardSessionTicketKey [32]byte
  1089. _, err = rand.Read(standardSessionTicketKey[:])
  1090. if err != nil {
  1091. return nil, errors.Trace(err)
  1092. }
  1093. // Note: SessionTicketKey needs to be set, or else, it appears,
  1094. // tris.Config.serverInit() will clobber the value set by
  1095. // SetSessionTicketKeys.
  1096. config.SessionTicketKey = obfuscatedSessionTicketKey
  1097. config.SetSessionTicketKeys([][32]byte{
  1098. standardSessionTicketKey,
  1099. obfuscatedSessionTicketKey})
  1100. }
  1101. // When configured, initialize passthrough mode, an anti-probing defense.
  1102. // Clients must prove knowledge of the obfuscated key via a message sent in
  1103. // the TLS ClientHello random field.
  1104. //
  1105. // When clients fail to provide a valid message, the client connection is
  1106. // relayed to the designated passthrough address, typically another web site.
  1107. // The entire flow is relayed, including the original ClientHello, so the
  1108. // client will perform a TLS handshake with the passthrough target.
  1109. //
  1110. // Irregular events are logged for invalid client activity.
  1111. if server.passthroughAddress != "" {
  1112. config.PassthroughAddress = server.passthroughAddress
  1113. config.PassthroughVerifyMessage = func(
  1114. message []byte) bool {
  1115. return obfuscator.VerifyTLSPassthroughMessage(
  1116. !server.support.Config.LegacyPassthrough,
  1117. server.support.Config.MeekObfuscatedKey,
  1118. message)
  1119. }
  1120. config.PassthroughLogInvalidMessage = func(
  1121. clientIP string) {
  1122. logIrregularTunnel(
  1123. server.support,
  1124. server.listenerTunnelProtocol,
  1125. server.listenerPort,
  1126. clientIP,
  1127. errors.TraceNew("invalid passthrough message"),
  1128. nil)
  1129. }
  1130. config.PassthroughHistoryAddNew = func(
  1131. clientIP string,
  1132. clientRandom []byte) bool {
  1133. // Use a custom, shorter TTL based on the validity period of the
  1134. // passthrough message.
  1135. TTL := obfuscator.TLS_PASSTHROUGH_HISTORY_TTL
  1136. if server.support.Config.LegacyPassthrough {
  1137. TTL = obfuscator.HISTORY_SEED_TTL
  1138. }
  1139. // strictMode is true as, unlike with meek cookies, legitimate meek clients
  1140. // never retry TLS connections using a previous random value.
  1141. ok, logFields := server.obfuscatorSeedHistory.AddNewWithTTL(
  1142. true,
  1143. clientIP,
  1144. "client-random",
  1145. clientRandom,
  1146. TTL)
  1147. if logFields != nil {
  1148. logIrregularTunnel(
  1149. server.support,
  1150. server.listenerTunnelProtocol,
  1151. server.listenerPort,
  1152. clientIP,
  1153. errors.TraceNew("duplicate passthrough message"),
  1154. LogFields(*logFields))
  1155. }
  1156. return ok
  1157. }
  1158. }
  1159. return config, nil
  1160. }
  1161. // makeMeekHTTPNormalizerListener returns the meek server listener wrapped in
  1162. // an HTTP normalizer.
  1163. func (server *MeekServer) makeMeekHTTPNormalizerListener() *transforms.HTTPNormalizerListener {
  1164. normalizer := transforms.WrapListenerWithHTTPNormalizer(server.listener)
  1165. normalizer.ProhibitedHeaders = server.support.Config.MeekProhibitedHeaders
  1166. normalizer.MaxReqLineAndHeadersSize = 8192 // max number of header bytes common web servers will read before returning an error
  1167. if server.passthroughAddress != "" {
  1168. normalizer.PassthroughAddress = server.passthroughAddress
  1169. normalizer.PassthroughDialer = net.Dial
  1170. }
  1171. normalizer.PassthroughLogPassthrough = func(
  1172. clientIP string, tunnelError error, logFields map[string]interface{}) {
  1173. logIrregularTunnel(
  1174. server.support,
  1175. server.listenerTunnelProtocol,
  1176. server.listenerPort,
  1177. clientIP,
  1178. errors.Trace(tunnelError),
  1179. logFields)
  1180. }
  1181. // ValidateMeekCookie is invoked by the normalizer with the value of the
  1182. // cookie header (if present), before ServeHTTP gets the request and calls
  1183. // getSessionOrEndpoint; and then any valid meek cookie payload, or meek
  1184. // session ID, extracted in this callback is stored to be fetched by
  1185. // getSessionOrEndpoint.
  1186. // Note: if there are multiple cookie headers, even though prohibited by
  1187. // rfc6265, then ValidateMeekCookie will only be invoked once with the
  1188. // first one received.
  1189. normalizer.ValidateMeekCookie = func(clientIP string, rawCookies []byte) ([]byte, error) {
  1190. // Parse cookie.
  1191. if len(rawCookies) == 0 {
  1192. return nil, errors.TraceNew("no cookies")
  1193. }
  1194. // TODO/perf: readCookies in net/http is not exported, use a local
  1195. // implementation which does not require allocating an http.header
  1196. // each time.
  1197. request := http.Request{
  1198. Header: http.Header{
  1199. "Cookie": []string{string(rawCookies)},
  1200. },
  1201. }
  1202. cookies := request.Cookies()
  1203. if len(rawCookies) == 0 {
  1204. return nil, errors.Tracef("invalid cookies: %s", string(rawCookies))
  1205. }
  1206. // Use value of the first cookie.
  1207. meekCookieValue := cookies[0].Value
  1208. // Check for an existing session.
  1209. server.sessionsLock.RLock()
  1210. existingSessionID := meekCookieValue
  1211. _, ok := server.sessions[existingSessionID]
  1212. server.sessionsLock.RUnlock()
  1213. if ok {
  1214. // The cookie is a session ID for an active (not expired) session.
  1215. // Return it and then it will be stored and later fetched by
  1216. // getSessionOrEndpoint where it will be mapped to the existing
  1217. // session.
  1218. // Note: it's possible for the session to expire between this check
  1219. // and when getSessionOrEndpoint looks up the session.
  1220. return rawCookies, nil
  1221. }
  1222. // The session is new (or expired). Treat the cookie value as a new
  1223. // meek cookie, extract the payload, and return it; and then it will be
  1224. // stored and later fetched by getSessionOrEndpoint.
  1225. payloadJSON, err := server.getMeekCookiePayload(clientIP, meekCookieValue)
  1226. if err != nil {
  1227. return nil, errors.Trace(err)
  1228. }
  1229. return payloadJSON, nil
  1230. }
  1231. return normalizer
  1232. }
  1233. type meekSession struct {
  1234. // Note: 64-bit ints used with atomic operations are placed
  1235. // at the start of struct to ensure 64-bit alignment.
  1236. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  1237. lastActivity int64
  1238. requestCount int64
  1239. metricClientRetries int64
  1240. metricPeakResponseSize int64
  1241. metricPeakCachedResponseSize int64
  1242. metricPeakCachedResponseHitSize int64
  1243. metricCachedResponseMissPosition int64
  1244. metricUnderlyingConnCount int64
  1245. lock sync.Mutex
  1246. deleted bool
  1247. underlyingConn net.Conn
  1248. clientConn *meekConn
  1249. meekProtocolVersion int
  1250. sessionIDSent bool
  1251. cachedResponse *CachedResponse
  1252. cookieName string
  1253. contentType string
  1254. }
  1255. func (session *meekSession) touch() {
  1256. atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
  1257. }
  1258. func (session *meekSession) expired() bool {
  1259. if session.clientConn == nil {
  1260. // Not fully initialized. meekSession.clientConn will be set before adding
  1261. // the session to MeekServer.sessions.
  1262. return false
  1263. }
  1264. lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
  1265. return monotime.Since(lastActivity) >
  1266. session.clientConn.meekServer.maxSessionStaleness
  1267. }
  1268. // delete releases all resources allocated by a session.
  1269. func (session *meekSession) delete(haveLock bool) {
  1270. // TODO: close the persistent HTTP client connection, if one exists?
  1271. // This final call session.cachedResponse.Reset releases shared resources.
  1272. //
  1273. // This call requires exclusive access. session.lock is be obtained before
  1274. // calling session.cachedResponse.Reset. Once the lock is obtained, no
  1275. // request for this session is being processed concurrently, and pending
  1276. // requests will block at session.lock.
  1277. //
  1278. // This logic assumes that no further session.cachedResponse access occurs,
  1279. // or else resources may deplete (buffers won't be returned to the pool).
  1280. // These requirements are achieved by obtaining the lock, setting
  1281. // session.deleted, and any subsequent request handlers checking
  1282. // session.deleted immediately after obtaining the lock.
  1283. //
  1284. // session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  1285. // the timeout for any active request handler processing a session
  1286. // request.
  1287. //
  1288. // When the lock must be acquired, clientConn.Close is called first, to
  1289. // interrupt any existing request handler blocking on pumpReads or pumpWrites.
  1290. session.clientConn.Close()
  1291. if !haveLock {
  1292. session.lock.Lock()
  1293. }
  1294. // Release all extended buffers back to the pool.
  1295. // session.cachedResponse.Reset is not safe for concurrent calls.
  1296. session.cachedResponse.Reset()
  1297. session.deleted = true
  1298. if !haveLock {
  1299. session.lock.Unlock()
  1300. }
  1301. }
  1302. // GetMetrics implements the common.MetricsSource interface.
  1303. func (session *meekSession) GetMetrics() common.LogFields {
  1304. logFields := make(common.LogFields)
  1305. logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)
  1306. logFields["meek_peak_response_size"] = atomic.LoadInt64(&session.metricPeakResponseSize)
  1307. logFields["meek_peak_cached_response_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseSize)
  1308. logFields["meek_peak_cached_response_hit_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseHitSize)
  1309. logFields["meek_cached_response_miss_position"] = atomic.LoadInt64(&session.metricCachedResponseMissPosition)
  1310. logFields["meek_underlying_connection_count"] = atomic.LoadInt64(&session.metricUnderlyingConnCount)
  1311. logFields["meek_cookie_name"] = session.cookieName
  1312. logFields["meek_content_type"] = session.contentType
  1313. return logFields
  1314. }
  1315. // makeMeekSessionID creates a new session ID. The variable size is intended to
  1316. // frustrate traffic analysis of both plaintext and TLS meek traffic.
  1317. func makeMeekSessionID() (string, error) {
  1318. size := MEEK_MIN_SESSION_ID_LENGTH +
  1319. prng.Intn(MEEK_MAX_SESSION_ID_LENGTH-MEEK_MIN_SESSION_ID_LENGTH)
  1320. sessionID, err := common.MakeSecureRandomBytes(size)
  1321. if err != nil {
  1322. return "", errors.Trace(err)
  1323. }
  1324. // Omit padding to maximize variable size space. To the client, the session
  1325. // ID is an opaque string cookie value.
  1326. return base64.RawStdEncoding.EncodeToString(sessionID), nil
  1327. }
  1328. // meekConn implements the net.Conn interface and is to be used as a client
  1329. // connection by the tunnel server (being passed to sshServer.handleClient).
  1330. // meekConn bridges net/http request/response payload readers and writers
  1331. // and goroutines calling Read()s and Write()s.
  1332. type meekConn struct {
  1333. meekServer *MeekServer
  1334. meekSession *meekSession
  1335. firstUnderlyingConn net.Conn
  1336. remoteAddr net.Addr
  1337. protocolVersion int
  1338. closeBroadcast chan struct{}
  1339. closed int32
  1340. lastReadChecksum *uint64
  1341. readLock sync.Mutex
  1342. emptyReadBuffer chan *bytes.Buffer
  1343. partialReadBuffer chan *bytes.Buffer
  1344. fullReadBuffer chan *bytes.Buffer
  1345. writeLock sync.Mutex
  1346. nextWriteBuffer chan []byte
  1347. writeResult chan error
  1348. }
  1349. func newMeekConn(
  1350. meekServer *MeekServer,
  1351. meekSession *meekSession,
  1352. underlyingConn net.Conn,
  1353. remoteAddr net.Addr,
  1354. protocolVersion int) *meekConn {
  1355. // In order to inspect its properties, meekConn will hold a reference to
  1356. // firstUnderlyingConn, the _first_ underlying TCP conn, for the full
  1357. // lifetime of meekConn, which may exceed the lifetime of firstUnderlyingConn
  1358. // and include subsequent underlying TCP conns. In this case, it is expected
  1359. // that firstUnderlyingConn will be closed by "net/http", so no OS resources
  1360. // (e.g., a socket) are retained longer than necessary.
  1361. conn := &meekConn{
  1362. meekServer: meekServer,
  1363. meekSession: meekSession,
  1364. firstUnderlyingConn: underlyingConn,
  1365. remoteAddr: remoteAddr,
  1366. protocolVersion: protocolVersion,
  1367. closeBroadcast: make(chan struct{}),
  1368. closed: 0,
  1369. emptyReadBuffer: make(chan *bytes.Buffer, 1),
  1370. partialReadBuffer: make(chan *bytes.Buffer, 1),
  1371. fullReadBuffer: make(chan *bytes.Buffer, 1),
  1372. nextWriteBuffer: make(chan []byte, 1),
  1373. writeResult: make(chan error, 1),
  1374. }
  1375. // Read() calls and pumpReads() are synchronized by exchanging control
  1376. // of a single readBuffer. This is the same scheme used in and described
  1377. // in psiphon.MeekConn.
  1378. conn.emptyReadBuffer <- new(bytes.Buffer)
  1379. return conn
  1380. }
  1381. // GetMetrics implements the common.MetricsSource interface. The metrics are
  1382. // maintained in the meek session type; but logTunnel, which calls
  1383. // MetricsSource.GetMetrics, has a pointer only to this conn, so it calls
  1384. // through to the session.
  1385. func (conn *meekConn) GetMetrics() common.LogFields {
  1386. logFields := conn.meekSession.GetMetrics()
  1387. if conn.meekServer.passthroughAddress != "" {
  1388. logFields["passthrough_address"] = conn.meekServer.passthroughAddress
  1389. }
  1390. // Include metrics, such as fragmentor metrics, from the _first_ underlying
  1391. // TCP conn. Properties of subsequent underlying TCP conns are not reflected
  1392. // in these metrics; we assume that the first TCP conn, which most likely
  1393. // transits the various protocol handshakes, is most significant.
  1394. underlyingMetrics, ok := conn.firstUnderlyingConn.(common.MetricsSource)
  1395. if ok {
  1396. logFields.Add(underlyingMetrics.GetMetrics())
  1397. }
  1398. return logFields
  1399. }
  1400. // GetUnderlyingTCPAddrs implements the common.UnderlyingTCPAddrSource
  1401. // interface, returning the TCP addresses for the _first_ underlying TCP
  1402. // connection in the meek tunnel.
  1403. func (conn *meekConn) GetUnderlyingTCPAddrs() (*net.TCPAddr, *net.TCPAddr, bool) {
  1404. localAddr, ok := conn.firstUnderlyingConn.LocalAddr().(*net.TCPAddr)
  1405. if !ok {
  1406. return nil, nil, false
  1407. }
  1408. remoteAddr, ok := conn.firstUnderlyingConn.RemoteAddr().(*net.TCPAddr)
  1409. if !ok {
  1410. return nil, nil, false
  1411. }
  1412. return localAddr, remoteAddr, true
  1413. }
  1414. // SetReplay implements the common.FragmentorReplayAccessor interface, applying
  1415. // the inputs to the _first_ underlying TCP connection in the meek tunnel. If
  1416. // the underlying connection is closed, then SetSeed call will have no effect.
  1417. func (conn *meekConn) SetReplay(PRNG *prng.PRNG) {
  1418. underlyingConn := conn.firstUnderlyingConn
  1419. if conn.meekServer.normalizer != nil {
  1420. // The underlying conn is wrapped with a normalizer.
  1421. normalizer, ok := underlyingConn.(*transforms.HTTPNormalizer)
  1422. if ok {
  1423. underlyingConn = normalizer.Conn
  1424. }
  1425. }
  1426. fragmentor, ok := underlyingConn.(common.FragmentorAccessor)
  1427. if ok {
  1428. fragmentor.SetReplay(PRNG)
  1429. }
  1430. }
  1431. // GetReplay implements the FragmentorReplayAccessor interface, getting the
  1432. // outputs from the _first_ underlying TCP connection in the meek tunnel.
  1433. //
  1434. // We assume that the first TCP conn is most significant: the initial TCP
  1435. // connection most likely fragments protocol handshakes; and, in the case the
  1436. // packet manipulation, any selected packet manipulation spec would have been
  1437. // successful.
  1438. func (conn *meekConn) GetReplay() (*prng.Seed, bool) {
  1439. underlyingConn := conn.firstUnderlyingConn
  1440. if conn.meekServer.normalizer != nil {
  1441. // The underlying conn is wrapped with a normalizer.
  1442. normalizer, ok := underlyingConn.(*transforms.HTTPNormalizer)
  1443. if ok {
  1444. underlyingConn = normalizer.Conn
  1445. }
  1446. }
  1447. fragmentor, ok := underlyingConn.(common.FragmentorAccessor)
  1448. if ok {
  1449. return fragmentor.GetReplay()
  1450. }
  1451. return nil, false
  1452. }
  1453. func (conn *meekConn) StopFragmenting() {
  1454. fragmentor, ok := conn.firstUnderlyingConn.(common.FragmentorAccessor)
  1455. if ok {
  1456. fragmentor.StopFragmenting()
  1457. }
  1458. }
  1459. // pumpReads causes goroutines blocking on meekConn.Read() to read
  1460. // from the specified reader. This function blocks until the reader
  1461. // is fully consumed or the meekConn is closed. A read buffer allows
  1462. // up to MEEK_MAX_REQUEST_PAYLOAD_LENGTH bytes to be read and buffered
  1463. // without a Read() immediately consuming the bytes, but there's still
  1464. // a possibility of a stall if no Read() calls are made after this
  1465. // read buffer is full.
  1466. // Returns the number of request bytes read.
  1467. // Note: assumes only one concurrent call to pumpReads
  1468. func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
  1469. // Use either an empty or partial buffer. By using a partial
  1470. // buffer, pumpReads will not block if the Read() caller has
  1471. // not fully drained the read buffer.
  1472. var readBuffer *bytes.Buffer
  1473. select {
  1474. case readBuffer = <-conn.emptyReadBuffer:
  1475. case readBuffer = <-conn.partialReadBuffer:
  1476. case <-conn.closeBroadcast:
  1477. return 0, io.EOF
  1478. }
  1479. newDataOffset := readBuffer.Len()
  1480. // Since we need to read the full request payload in order to
  1481. // take its checksum before relaying it, the read buffer can
  1482. // grow to up to 2 x MEEK_MAX_REQUEST_PAYLOAD_LENGTH + 1.
  1483. // +1 allows for an explicit check for request payloads that
  1484. // exceed the maximum permitted length.
  1485. limitReader := io.LimitReader(reader, MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1)
  1486. n, err := readBuffer.ReadFrom(limitReader)
  1487. if err == nil && n == MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1 {
  1488. err = std_errors.New("invalid request payload length")
  1489. }
  1490. // If the request read fails, don't relay the new data. This allows
  1491. // the client to retry and resend its request payload without
  1492. // interrupting/duplicating the payload flow.
  1493. if err != nil {
  1494. readBuffer.Truncate(newDataOffset)
  1495. conn.replaceReadBuffer(readBuffer)
  1496. return 0, errors.Trace(err)
  1497. }
  1498. // Check if request payload checksum matches immediately
  1499. // previous payload. On match, assume this is a client retry
  1500. // sending payload that was already relayed and skip this
  1501. // payload. Payload is OSSH ciphertext and almost surely
  1502. // will not repeat. In the highly unlikely case that it does,
  1503. // the underlying SSH connection will fail and the client
  1504. // must reconnect.
  1505. checksum := crc64.Checksum(
  1506. readBuffer.Bytes()[newDataOffset:], conn.meekServer.checksumTable)
  1507. if conn.lastReadChecksum == nil {
  1508. conn.lastReadChecksum = new(uint64)
  1509. } else if *conn.lastReadChecksum == checksum {
  1510. readBuffer.Truncate(newDataOffset)
  1511. }
  1512. *conn.lastReadChecksum = checksum
  1513. conn.replaceReadBuffer(readBuffer)
  1514. return n, nil
  1515. }
  1516. var errMeekConnectionHasClosed = std_errors.New("meek connection has closed")
  1517. // Read reads from the meekConn into buffer. Read blocks until
  1518. // some data is read or the meekConn closes. Under the hood, it
  1519. // waits for pumpReads to submit a reader to read from.
  1520. // Note: lock is to conform with net.Conn concurrency semantics
  1521. func (conn *meekConn) Read(buffer []byte) (int, error) {
  1522. conn.readLock.Lock()
  1523. defer conn.readLock.Unlock()
  1524. var readBuffer *bytes.Buffer
  1525. select {
  1526. case readBuffer = <-conn.partialReadBuffer:
  1527. case readBuffer = <-conn.fullReadBuffer:
  1528. case <-conn.closeBroadcast:
  1529. return 0, errors.Trace(errMeekConnectionHasClosed)
  1530. }
  1531. n, err := readBuffer.Read(buffer)
  1532. conn.replaceReadBuffer(readBuffer)
  1533. return n, err
  1534. }
  1535. func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
  1536. length := readBuffer.Len()
  1537. if length >= MEEK_MAX_REQUEST_PAYLOAD_LENGTH {
  1538. conn.fullReadBuffer <- readBuffer
  1539. } else if length == 0 {
  1540. conn.emptyReadBuffer <- readBuffer
  1541. } else {
  1542. conn.partialReadBuffer <- readBuffer
  1543. }
  1544. }
  1545. // pumpWrites causes goroutines blocking on meekConn.Write() to write
  1546. // to the specified writer. This function blocks until the meek response
  1547. // body limits (size for protocol v1, turn around time for protocol v2+)
  1548. // are met, or the meekConn is closed.
  1549. //
  1550. // Note: channel scheme assumes only one concurrent call to pumpWrites
  1551. func (conn *meekConn) pumpWrites(
  1552. writer io.Writer, skipExtendedTurnAround bool) (int, error) {
  1553. startTime := time.Now()
  1554. timeout := time.NewTimer(conn.meekServer.turnAroundTimeout)
  1555. defer timeout.Stop()
  1556. n := 0
  1557. for {
  1558. select {
  1559. case buffer := <-conn.nextWriteBuffer:
  1560. written, err := writer.Write(buffer)
  1561. n += written
  1562. // Assumes that writeResult won't block.
  1563. // Note: always send the err to writeResult,
  1564. // as the Write() caller is blocking on this.
  1565. conn.writeResult <- err
  1566. if err != nil {
  1567. return n, err
  1568. }
  1569. if conn.protocolVersion < MEEK_PROTOCOL_VERSION_1 {
  1570. // Pre-protocol version 1 clients expect at most
  1571. // MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
  1572. return n, nil
  1573. }
  1574. if skipExtendedTurnAround {
  1575. // When fast turn around is indicated, skip the extended turn
  1576. // around timeout. This optimizes for upstream flows.
  1577. return n, nil
  1578. }
  1579. totalElapsedTime := time.Since(startTime) / time.Millisecond
  1580. if totalElapsedTime >= conn.meekServer.extendedTurnAroundTimeout {
  1581. return n, nil
  1582. }
  1583. timeout.Reset(conn.meekServer.turnAroundTimeout)
  1584. case <-timeout.C:
  1585. return n, nil
  1586. case <-conn.closeBroadcast:
  1587. return n, errors.Trace(errMeekConnectionHasClosed)
  1588. }
  1589. }
  1590. }
  1591. // Write writes the buffer to the meekConn. It blocks until the
  1592. // entire buffer is written to or the meekConn closes. Under the
  1593. // hood, it waits for sufficient pumpWrites calls to consume the
  1594. // write buffer.
  1595. // Note: lock is to conform with net.Conn concurrency semantics
  1596. func (conn *meekConn) Write(buffer []byte) (int, error) {
  1597. conn.writeLock.Lock()
  1598. defer conn.writeLock.Unlock()
  1599. // TODO: may be more efficient to send whole buffer
  1600. // and have pumpWrites stash partial buffer when can't
  1601. // send it all.
  1602. n := 0
  1603. for n < len(buffer) {
  1604. end := n + MEEK_MAX_REQUEST_PAYLOAD_LENGTH
  1605. if end > len(buffer) {
  1606. end = len(buffer)
  1607. }
  1608. // Only write MEEK_MAX_REQUEST_PAYLOAD_LENGTH at a time,
  1609. // to ensure compatibility with v1 protocol.
  1610. chunk := buffer[n:end]
  1611. select {
  1612. case conn.nextWriteBuffer <- chunk:
  1613. case <-conn.closeBroadcast:
  1614. return n, errors.Trace(errMeekConnectionHasClosed)
  1615. }
  1616. // Wait for the buffer to be processed.
  1617. select {
  1618. case <-conn.writeResult:
  1619. // The err from conn.writeResult comes from the
  1620. // io.MultiWriter used in pumpWrites, which writes
  1621. // to both the cached response and the HTTP response.
  1622. //
  1623. // Don't stop on error here, since only writing
  1624. // to the HTTP response will fail, and the client
  1625. // may retry and use the cached response.
  1626. //
  1627. // It's possible that the cached response buffer
  1628. // is too small for the client to successfully
  1629. // retry, but that cannot be determined. In this
  1630. // case, the meek connection will eventually fail.
  1631. //
  1632. // err is already logged in ServeHTTP.
  1633. case <-conn.closeBroadcast:
  1634. return n, errors.Trace(errMeekConnectionHasClosed)
  1635. }
  1636. n += len(chunk)
  1637. }
  1638. return n, nil
  1639. }
  1640. // Close closes the meekConn. This will interrupt any blocked
  1641. // Read, Write, pumpReads, and pumpWrites.
  1642. func (conn *meekConn) Close() error {
  1643. if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
  1644. close(conn.closeBroadcast)
  1645. // In general, we rely on "net/http" to close underlying TCP conns. In
  1646. // this case, we can directly close the first once, if it's still
  1647. // open. Don't close a persistent connection when fronted, as it may
  1648. // be still be used by other clients.
  1649. if !conn.meekServer.isFronted {
  1650. conn.firstUnderlyingConn.Close()
  1651. }
  1652. }
  1653. return nil
  1654. }
  1655. // Stub implementation of net.Conn.LocalAddr
  1656. func (conn *meekConn) LocalAddr() net.Addr {
  1657. return nil
  1658. }
  1659. // RemoteAddr returns the remoteAddr specified in newMeekConn. This
  1660. // acts as a proxy for the actual remote address, which is either a
  1661. // direct HTTP/HTTPS connection remote address, or in the case of
  1662. // downstream proxy of CDN fronts, some other value determined via
  1663. // HTTP headers.
  1664. func (conn *meekConn) RemoteAddr() net.Addr {
  1665. return conn.remoteAddr
  1666. }
  1667. // SetDeadline is not a true implementation of net.Conn.SetDeadline. It
  1668. // merely checks that the requested timeout exceeds the MEEK_MAX_SESSION_STALENESS
  1669. // period. When it does, and the session is idle, the meekConn Read/Write will
  1670. // be interrupted and return an error (not a timeout error) before the deadline.
  1671. // In other words, this conn will approximate the desired functionality of
  1672. // timing out on idle on or before the requested deadline.
  1673. func (conn *meekConn) SetDeadline(t time.Time) error {
  1674. // Overhead: nanoseconds (https://blog.cloudflare.com/its-go-time-on-linux/)
  1675. if time.Now().Add(conn.meekServer.maxSessionStaleness).Before(t) {
  1676. return nil
  1677. }
  1678. return errors.TraceNew("not supported")
  1679. }
  1680. // Stub implementation of net.Conn.SetReadDeadline
  1681. func (conn *meekConn) SetReadDeadline(t time.Time) error {
  1682. return errors.TraceNew("not supported")
  1683. }
  1684. // Stub implementation of net.Conn.SetWriteDeadline
  1685. func (conn *meekConn) SetWriteDeadline(t time.Time) error {
  1686. return errors.TraceNew("not supported")
  1687. }