meek.go 62 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "crypto/tls"
  25. "encoding/base64"
  26. "encoding/hex"
  27. "encoding/json"
  28. std_errors "errors"
  29. "hash/crc64"
  30. "io"
  31. "net"
  32. "net/http"
  33. "runtime"
  34. "strconv"
  35. "strings"
  36. "sync"
  37. "sync/atomic"
  38. "time"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
  47. tris "github.com/Psiphon-Labs/tls-tris"
  48. lrucache "github.com/cognusion/go-cache-lru"
  49. "github.com/juju/ratelimit"
  50. "golang.org/x/crypto/nacl/box"
  51. )
  52. // MeekServer is based on meek-server.go from Tor and Psiphon:
  53. //
  54. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  55. // CC0 1.0 Universal
  56. //
  57. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  58. const (
  59. // Protocol version 1 clients can handle arbitrary length response bodies. Older clients
  60. // report no version number and expect at most 64K response bodies.
  61. MEEK_PROTOCOL_VERSION_1 = 1
  62. // Protocol version 2 clients initiate a session by sending an encrypted and obfuscated meek
  63. // cookie with their initial HTTP request. Connection information is contained within the
  64. // encrypted cookie payload. The server inspects the cookie and establishes a new session and
  65. // returns a new random session ID back to client via Set-Cookie header. The client uses this
  66. // session ID on all subsequent requests for the remainder of the session.
  67. MEEK_PROTOCOL_VERSION_2 = 2
  68. // Protocol version 3 clients include resiliency enhancements and will add a Range header
  69. // when retrying a request for a partially downloaded response payload.
  70. MEEK_PROTOCOL_VERSION_3 = 3
  71. MEEK_MAX_REQUEST_PAYLOAD_LENGTH = 65536
  72. MEEK_MIN_SESSION_ID_LENGTH = 8
  73. MEEK_MAX_SESSION_ID_LENGTH = 20
  74. MEEK_DEFAULT_TURN_AROUND_TIMEOUT = 10 * time.Millisecond
  75. MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
  76. MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD = 8192
  77. MEEK_DEFAULT_MAX_SESSION_STALENESS = 45 * time.Second
  78. MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT = 45 * time.Second
  79. MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT = 360 * time.Second
  80. MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
  81. MEEK_DEFAULT_POOL_BUFFER_LENGTH = 65536
  82. MEEK_DEFAULT_POOL_BUFFER_COUNT = 2048
  83. )
  84. // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
  85. // Obfuscated SSH traffic) over HTTP. Meek may be fronted (through a CDN) or direct and may be
  86. // HTTP or HTTPS.
  87. //
  88. // Upstream traffic arrives in HTTP request bodies and downstream traffic is sent in response
  89. // bodies. The sequence of traffic for a given flow is associated using a session ID that's
  90. // set as a HTTP cookie for the client to submit with each request.
  91. //
  92. // MeekServer hooks into TunnelServer via the net.Conn interface by transforming the
  93. // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
  94. // the meekConn struct.
  95. type MeekServer struct {
  96. support *SupportServices
  97. listener net.Listener
  98. listenerTunnelProtocol string
  99. listenerPort int
  100. isFronted bool
  101. passthroughAddress string
  102. turnAroundTimeout time.Duration
  103. extendedTurnAroundTimeout time.Duration
  104. skipExtendedTurnAroundThreshold int
  105. maxSessionStaleness time.Duration
  106. httpClientIOTimeout time.Duration
  107. tlsConfig *tris.Config
  108. obfuscatorSeedHistory *obfuscator.SeedHistory
  109. clientHandler func(clientTunnelProtocol string, clientConn net.Conn)
  110. openConns *common.Conns
  111. stopBroadcast <-chan struct{}
  112. sessionsLock sync.RWMutex
  113. sessions map[string]*meekSession
  114. checksumTable *crc64.Table
  115. bufferPool *CachedResponseBufferPool
  116. rateLimitLock sync.Mutex
  117. rateLimitHistory *lrucache.Cache
  118. rateLimitCount int
  119. rateLimitSignalGC chan struct{}
  120. }
  121. // NewMeekServer initializes a new meek server.
  122. func NewMeekServer(
  123. support *SupportServices,
  124. listener net.Listener,
  125. listenerTunnelProtocol string,
  126. listenerPort int,
  127. useTLS, isFronted, useObfuscatedSessionTickets bool,
  128. clientHandler func(clientTunnelProtocol string, clientConn net.Conn),
  129. stopBroadcast <-chan struct{}) (*MeekServer, error) {
  130. passthroughAddress := support.Config.TunnelProtocolPassthroughAddresses[listenerTunnelProtocol]
  131. turnAroundTimeout := MEEK_DEFAULT_TURN_AROUND_TIMEOUT
  132. if support.Config.MeekTurnAroundTimeoutMilliseconds != nil {
  133. turnAroundTimeout = time.Duration(
  134. *support.Config.MeekTurnAroundTimeoutMilliseconds) * time.Millisecond
  135. }
  136. extendedTurnAroundTimeout := MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT
  137. if support.Config.MeekExtendedTurnAroundTimeoutMilliseconds != nil {
  138. extendedTurnAroundTimeout = time.Duration(
  139. *support.Config.MeekExtendedTurnAroundTimeoutMilliseconds) * time.Millisecond
  140. }
  141. skipExtendedTurnAroundThreshold := MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD
  142. if support.Config.MeekSkipExtendedTurnAroundThresholdBytes != nil {
  143. skipExtendedTurnAroundThreshold = *support.Config.MeekSkipExtendedTurnAroundThresholdBytes
  144. }
  145. maxSessionStaleness := MEEK_DEFAULT_MAX_SESSION_STALENESS
  146. if support.Config.MeekMaxSessionStalenessMilliseconds != nil {
  147. maxSessionStaleness = time.Duration(
  148. *support.Config.MeekMaxSessionStalenessMilliseconds) * time.Millisecond
  149. }
  150. var httpClientIOTimeout time.Duration
  151. if isFronted {
  152. // Fronted has a distinct timeout, and the default is higher since new
  153. // clients may connect to a CDN edge and start using an existing
  154. // persistent connection.
  155. httpClientIOTimeout = MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT
  156. if support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds != nil {
  157. httpClientIOTimeout = time.Duration(
  158. *support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds) * time.Millisecond
  159. }
  160. } else {
  161. httpClientIOTimeout = MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT
  162. if support.Config.MeekHTTPClientIOTimeoutMilliseconds != nil {
  163. httpClientIOTimeout = time.Duration(
  164. *support.Config.MeekHTTPClientIOTimeoutMilliseconds) * time.Millisecond
  165. }
  166. }
  167. checksumTable := crc64.MakeTable(crc64.ECMA)
  168. bufferLength := MEEK_DEFAULT_POOL_BUFFER_LENGTH
  169. if support.Config.MeekCachedResponsePoolBufferSize != 0 {
  170. bufferLength = support.Config.MeekCachedResponsePoolBufferSize
  171. }
  172. bufferCount := MEEK_DEFAULT_POOL_BUFFER_COUNT
  173. if support.Config.MeekCachedResponsePoolBufferCount != 0 {
  174. bufferCount = support.Config.MeekCachedResponsePoolBufferCount
  175. }
  176. _, thresholdSeconds, _, _, _, _, _, _, reapFrequencySeconds, maxEntries :=
  177. support.TrafficRulesSet.GetMeekRateLimiterConfig()
  178. rateLimitHistory := lrucache.NewWithLRU(
  179. time.Duration(thresholdSeconds)*time.Second,
  180. time.Duration(reapFrequencySeconds)*time.Second,
  181. maxEntries)
  182. bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
  183. meekServer := &MeekServer{
  184. support: support,
  185. listener: listener,
  186. listenerTunnelProtocol: listenerTunnelProtocol,
  187. listenerPort: listenerPort,
  188. isFronted: isFronted,
  189. passthroughAddress: passthroughAddress,
  190. turnAroundTimeout: turnAroundTimeout,
  191. extendedTurnAroundTimeout: extendedTurnAroundTimeout,
  192. skipExtendedTurnAroundThreshold: skipExtendedTurnAroundThreshold,
  193. maxSessionStaleness: maxSessionStaleness,
  194. httpClientIOTimeout: httpClientIOTimeout,
  195. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  196. clientHandler: clientHandler,
  197. openConns: common.NewConns(),
  198. stopBroadcast: stopBroadcast,
  199. sessions: make(map[string]*meekSession),
  200. checksumTable: checksumTable,
  201. bufferPool: bufferPool,
  202. rateLimitHistory: rateLimitHistory,
  203. rateLimitSignalGC: make(chan struct{}, 1),
  204. }
  205. if useTLS {
  206. tlsConfig, err := meekServer.makeMeekTLSConfig(
  207. isFronted, useObfuscatedSessionTickets)
  208. if err != nil {
  209. return nil, errors.Trace(err)
  210. }
  211. meekServer.tlsConfig = tlsConfig
  212. }
  213. return meekServer, nil
  214. }
  215. type meekContextKey struct {
  216. key string
  217. }
  218. var meekNetConnContextKey = &meekContextKey{"net.Conn"}
  219. // Run runs the meek server; this function blocks while serving HTTP or
  220. // HTTPS connections on the specified listener. This function also runs
  221. // a goroutine which cleans up expired meek client sessions.
  222. //
  223. // To stop the meek server, both Close() the listener and set the stopBroadcast
  224. // signal specified in NewMeekServer.
  225. func (server *MeekServer) Run() error {
  226. waitGroup := new(sync.WaitGroup)
  227. waitGroup.Add(1)
  228. go func() {
  229. defer waitGroup.Done()
  230. ticker := time.NewTicker(server.maxSessionStaleness / 2)
  231. defer ticker.Stop()
  232. for {
  233. select {
  234. case <-ticker.C:
  235. server.deleteExpiredSessions()
  236. case <-server.stopBroadcast:
  237. return
  238. }
  239. }
  240. }()
  241. waitGroup.Add(1)
  242. go func() {
  243. defer waitGroup.Done()
  244. server.rateLimitWorker()
  245. }()
  246. // Serve HTTP or HTTPS
  247. //
  248. // - WriteTimeout may include time awaiting request, as per:
  249. // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
  250. // - Legacy meek-server wrapped each client HTTP connection with an explicit idle
  251. // timeout net.Conn and didn't use http.Server timeouts. We could do the same
  252. // here (use ActivityMonitoredConn) but the stock http.Server timeouts should
  253. // now be sufficient.
  254. httpServer := &http.Server{
  255. ReadTimeout: server.httpClientIOTimeout,
  256. WriteTimeout: server.httpClientIOTimeout,
  257. Handler: server,
  258. ConnState: server.httpConnStateCallback,
  259. ConnContext: func(ctx context.Context, conn net.Conn) context.Context {
  260. return context.WithValue(ctx, meekNetConnContextKey, conn)
  261. },
  262. // Disable auto HTTP/2 (https://golang.org/doc/go1.6)
  263. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  264. }
  265. // Note: Serve() will be interrupted by listener.Close() call
  266. var err error
  267. if server.tlsConfig != nil {
  268. httpsServer := HTTPSServer{Server: httpServer}
  269. err = httpsServer.ServeTLS(server.listener, server.tlsConfig)
  270. } else {
  271. err = httpServer.Serve(server.listener)
  272. }
  273. // Can't check for the exact error that Close() will cause in Accept(),
  274. // (see: https://code.google.com/p/go/issues/detail?id=4373). So using an
  275. // explicit stop signal to stop gracefully.
  276. select {
  277. case <-server.stopBroadcast:
  278. err = nil
  279. default:
  280. }
  281. // deleteExpiredSessions calls deleteSession which may block waiting
  282. // for active request handlers to complete; timely shutdown requires
  283. // stopping the listener and closing all existing connections before
  284. // awaiting the reaperWaitGroup.
  285. server.listener.Close()
  286. server.openConns.CloseAll()
  287. waitGroup.Wait()
  288. return err
  289. }
  290. // ServeHTTP handles meek client HTTP requests, where the request body
  291. // contains upstream traffic and the response will contain downstream
  292. // traffic.
  293. func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
  294. // Note: no longer requiring that the request method is POST
  295. // Check for required headers and values. For fronting, required headers
  296. // may be used to identify a CDN edge. When this check fails,
  297. // TerminateHTTPConnection is called instead of handleError, so any
  298. // persistent connection is always closed.
  299. if len(server.support.Config.MeekRequiredHeaders) > 0 {
  300. for header, value := range server.support.Config.MeekRequiredHeaders {
  301. requestValue := request.Header.Get(header)
  302. if requestValue != value {
  303. log.WithTraceFields(LogFields{
  304. "header": header,
  305. "value": requestValue,
  306. }).Warning("invalid required meek header")
  307. common.TerminateHTTPConnection(responseWriter, request)
  308. return
  309. }
  310. }
  311. }
  312. // Check for the expected meek/session ID cookie.
  313. // Also check for prohibited HTTP headers.
  314. var meekCookie *http.Cookie
  315. for _, c := range request.Cookies() {
  316. meekCookie = c
  317. break
  318. }
  319. if meekCookie == nil || len(meekCookie.Value) == 0 {
  320. log.WithTrace().Warning("missing meek cookie")
  321. common.TerminateHTTPConnection(responseWriter, request)
  322. return
  323. }
  324. if len(server.support.Config.MeekProhibitedHeaders) > 0 {
  325. for _, header := range server.support.Config.MeekProhibitedHeaders {
  326. value := request.Header.Get(header)
  327. if header != "" {
  328. log.WithTraceFields(LogFields{
  329. "header": header,
  330. "value": value,
  331. }).Warning("prohibited meek header")
  332. server.handleError(responseWriter, request)
  333. return
  334. }
  335. }
  336. }
  337. // A valid meek cookie indicates which class of request this is:
  338. //
  339. // 1. A new meek session. Create a new session ID and proceed with
  340. // relaying tunnel traffic.
  341. //
  342. // 2. An existing meek session. Resume relaying tunnel traffic.
  343. //
  344. // 3. A request to an endpoint. This meek connection is not for relaying
  345. // tunnel traffic. Instead, the request is handed off to a custom handler.
  346. sessionID,
  347. session,
  348. underlyingConn,
  349. endPoint,
  350. endPointGeoIPData,
  351. err := server.getSessionOrEndpoint(request, meekCookie)
  352. if err != nil {
  353. // Debug since session cookie errors commonly occur during
  354. // normal operation.
  355. log.WithTraceFields(LogFields{"error": err}).Debug("session lookup failed")
  356. server.handleError(responseWriter, request)
  357. return
  358. }
  359. if endPoint != "" {
  360. // Endpoint mode. Currently, this means it's handled by the tactics
  361. // request handler.
  362. handled := server.support.TacticsServer.HandleEndPoint(
  363. endPoint, common.GeoIPData(*endPointGeoIPData), responseWriter, request)
  364. if !handled {
  365. log.WithTraceFields(LogFields{"endPoint": endPoint}).Info("unhandled endpoint")
  366. server.handleError(responseWriter, request)
  367. }
  368. return
  369. }
  370. // Tunnel relay mode.
  371. // Ensure that there's only one concurrent request handler per client
  372. // session. Depending on the nature of a network disruption, it can
  373. // happen that a client detects a failure and retries while the server
  374. // is still streaming response in the handler for the _previous_ client
  375. // request.
  376. //
  377. // Even if the session.cachedResponse were safe for concurrent
  378. // use (it is not), concurrent handling could lead to loss of session
  379. // since upstream data read by the first request may not reach the
  380. // cached response before the second request reads the cached data.
  381. //
  382. // The existing handler will stream response data, holding the lock,
  383. // for no more than MEEK_EXTENDED_TURN_AROUND_TIMEOUT.
  384. //
  385. // TODO: interrupt an existing handler? The existing handler will be
  386. // sending data to the cached response, but if that buffer fills, the
  387. // session will be lost.
  388. requestNumber := atomic.AddInt64(&session.requestCount, 1)
  389. // Wait for the existing request to complete.
  390. session.lock.Lock()
  391. defer session.lock.Unlock()
  392. // Count this metric once the lock is acquired, to avoid concurrent and
  393. // potentially incorrect session.underlyingConn updates.
  394. //
  395. // It should never be the case that a new underlyingConn has the same
  396. // value as the previous session.underlyingConn, as each is a net.Conn
  397. // interface which includes a pointer, and the previous value cannot
  398. // be garbage collected until session.underlyingConn is updated.
  399. if session.underlyingConn != underlyingConn {
  400. atomic.AddInt64(&session.metricUnderlyingConnCount, 1)
  401. session.underlyingConn = underlyingConn
  402. }
  403. // If a newer request has arrived while waiting, discard this one.
  404. // Do not delay processing the newest request.
  405. //
  406. // If the session expired and was deleted while this request was waiting,
  407. // discard this request. The session is no longer valid, and the final call
  408. // to session.cachedResponse.Reset may have already occured, so any further
  409. // session.cachedResponse access may deplete resources (fail to refill the pool).
  410. if atomic.LoadInt64(&session.requestCount) > requestNumber || session.deleted {
  411. common.TerminateHTTPConnection(responseWriter, request)
  412. return
  413. }
  414. // pumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
  415. // read the request body as upstream traffic.
  416. // TODO: run pumpReads and pumpWrites concurrently?
  417. // pumpReads checksums the request payload and skips relaying it when
  418. // it matches the immediately previous request payload. This allows
  419. // clients to resend request payloads, when retrying due to connection
  420. // interruption, without knowing whether the server has received or
  421. // relayed the data.
  422. requestSize, err := session.clientConn.pumpReads(request.Body)
  423. if err != nil {
  424. if err != io.EOF {
  425. // Debug since errors such as "i/o timeout" occur during normal operation;
  426. // also, golang network error messages may contain client IP.
  427. log.WithTraceFields(LogFields{"error": err}).Debug("read request failed")
  428. }
  429. common.TerminateHTTPConnection(responseWriter, request)
  430. // Note: keep session open to allow client to retry
  431. return
  432. }
  433. // The extended turn around mechanism optimizes for downstream flows by
  434. // sending more data in the response as long as it's available. As a
  435. // heuristic, when the request size meets a threshold, optimize instead
  436. // of upstream flows by skipping the extended turn around.
  437. skipExtendedTurnAround := requestSize >= int64(server.skipExtendedTurnAroundThreshold)
  438. // Set cookie before writing the response.
  439. if session.meekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 && !session.sessionIDSent {
  440. // Replace the meek cookie with the session ID.
  441. // SetCookie for the the session ID cookie is only set once, to reduce overhead. This
  442. // session ID value replaces the original meek cookie value.
  443. http.SetCookie(responseWriter, &http.Cookie{Name: meekCookie.Name, Value: sessionID})
  444. session.sessionIDSent = true
  445. }
  446. // When streaming data into the response body, a copy is
  447. // retained in the cachedResponse buffer. This allows the
  448. // client to retry and request that the response be resent
  449. // when the HTTP connection is interrupted.
  450. //
  451. // If a Range header is present, the client is retrying,
  452. // possibly after having received a partial response. In
  453. // this case, use any cached response to attempt to resend
  454. // the response, starting from the resend position the client
  455. // indicates.
  456. //
  457. // When the resend position is not available -- because the
  458. // cachedResponse buffer could not hold it -- the client session
  459. // is closed, as there's no way to resume streaming the payload
  460. // uninterrupted.
  461. //
  462. // The client may retry before a cached response is prepared,
  463. // so a cached response is not always used when a Range header
  464. // is present.
  465. //
  466. // TODO: invalid Range header is ignored; should it be otherwise?
  467. position, isRetry := checkRangeHeader(request)
  468. if isRetry {
  469. atomic.AddInt64(&session.metricClientRetries, 1)
  470. }
  471. hasCompleteCachedResponse := session.cachedResponse.HasPosition(0)
  472. // The client is not expected to send position > 0 when there is
  473. // no cached response; let that case fall through to the next
  474. // HasPosition check which will fail and close the session.
  475. var responseSize int
  476. var responseError error
  477. if isRetry && (hasCompleteCachedResponse || position > 0) {
  478. if !session.cachedResponse.HasPosition(position) {
  479. greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
  480. server.handleError(responseWriter, request)
  481. session.delete(true)
  482. return
  483. }
  484. responseWriter.WriteHeader(http.StatusPartialContent)
  485. // TODO:
  486. // - enforce a max extended buffer count per client, for
  487. // fairness? Throttling may make this unnecessary.
  488. // - cachedResponse can now start releasing extended buffers,
  489. // as response bytes before "position" will never be requested
  490. // again?
  491. responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter)
  492. greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize))
  493. // The client may again fail to receive the payload and may again
  494. // retry, so not yet releasing cachedResponse buffers.
  495. } else {
  496. // _Now_ we release buffers holding data from the previous
  497. // response. And then immediately stream the new response into
  498. // newly acquired buffers.
  499. session.cachedResponse.Reset()
  500. // Note: this code depends on an implementation detail of
  501. // io.MultiWriter: a Write() to the MultiWriter writes first
  502. // to the cache, and then to the response writer. So if the
  503. // write to the response writer fails, the payload is cached.
  504. multiWriter := io.MultiWriter(session.cachedResponse, responseWriter)
  505. // The client expects 206, not 200, whenever it sets a Range header,
  506. // which it may do even when no cached response is prepared.
  507. if isRetry {
  508. responseWriter.WriteHeader(http.StatusPartialContent)
  509. }
  510. // pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
  511. // write its downstream traffic through to the response body.
  512. responseSize, responseError = session.clientConn.pumpWrites(multiWriter, skipExtendedTurnAround)
  513. greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
  514. greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
  515. }
  516. // responseError is the result of writing the body either from CopyFromPosition or pumpWrites
  517. if responseError != nil {
  518. if responseError != io.EOF {
  519. // Debug since errors such as "i/o timeout" occur during normal operation;
  520. // also, golang network error messages may contain client IP.
  521. log.WithTraceFields(LogFields{"error": responseError}).Debug("write response failed")
  522. }
  523. server.handleError(responseWriter, request)
  524. // Note: keep session open to allow client to retry
  525. return
  526. }
  527. }
  528. func (server *MeekServer) handleError(responseWriter http.ResponseWriter, request *http.Request) {
  529. // When fronted, keep the persistent connection open since it may be used
  530. // by many clients coming through the same edge. For performance reasons,
  531. // an error, including invalid input, from one client shouldn't close the
  532. // persistent connection used by other clients.
  533. if server.isFronted {
  534. http.NotFound(responseWriter, request)
  535. return
  536. }
  537. common.TerminateHTTPConnection(responseWriter, request)
  538. }
  539. func checkRangeHeader(request *http.Request) (int, bool) {
  540. rangeHeader := request.Header.Get("Range")
  541. if rangeHeader == "" {
  542. return 0, false
  543. }
  544. prefix := "bytes="
  545. suffix := "-"
  546. if !strings.HasPrefix(rangeHeader, prefix) ||
  547. !strings.HasSuffix(rangeHeader, suffix) {
  548. return 0, false
  549. }
  550. rangeHeader = strings.TrimPrefix(rangeHeader, prefix)
  551. rangeHeader = strings.TrimSuffix(rangeHeader, suffix)
  552. position, err := strconv.Atoi(rangeHeader)
  553. if err != nil {
  554. return 0, false
  555. }
  556. return position, true
  557. }
  558. // getSessionOrEndpoint checks if the cookie corresponds to an existing tunnel
  559. // relay session ID. If no session is found, the cookie must be an obfuscated
  560. // meek cookie. A new session is created when the meek cookie indicates relay
  561. // mode; or the endpoint is returned when the meek cookie indicates endpoint
  562. // mode.
  563. func (server *MeekServer) getSessionOrEndpoint(
  564. request *http.Request, meekCookie *http.Cookie) (string, *meekSession, net.Conn, string, *GeoIPData, error) {
  565. underlyingConn := request.Context().Value(meekNetConnContextKey).(net.Conn)
  566. // Check for an existing session.
  567. server.sessionsLock.RLock()
  568. existingSessionID := meekCookie.Value
  569. session, ok := server.sessions[existingSessionID]
  570. server.sessionsLock.RUnlock()
  571. if ok {
  572. // TODO: can multiple http client connections using same session cookie
  573. // cause race conditions on session struct?
  574. session.touch()
  575. return existingSessionID, session, underlyingConn, "", nil, nil
  576. }
  577. // Determine the client remote address, which is used for geolocation
  578. // stats, rate limiting, anti-probing, discovery, and tactics selection
  579. // logic.
  580. //
  581. // When an intermediate proxy or CDN is in use, we may be
  582. // able to determine the original client address by inspecting HTTP
  583. // headers such as X-Forwarded-For.
  584. //
  585. // We trust only headers provided by CDNs. Fronted Psiphon server hosts
  586. // should be configured to accept tunnel connections only from CDN edges.
  587. // When the CDN passes along a chain of IPs, as in X-Forwarded-For, we
  588. // trust only the right-most IP, which is provided by the CDN.
  589. clientIP, _, err := net.SplitHostPort(request.RemoteAddr)
  590. if err != nil {
  591. return "", nil, nil, "", nil, errors.Trace(err)
  592. }
  593. if net.ParseIP(clientIP) == nil {
  594. return "", nil, nil, "", nil, errors.TraceNew("invalid IP address")
  595. }
  596. if protocol.TunnelProtocolUsesFrontedMeek(server.listenerTunnelProtocol) &&
  597. len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
  598. // When there are multiple header names in MeekProxyForwardedForHeaders,
  599. // the first valid match is preferred. MeekProxyForwardedForHeaders should be
  600. // configured to use header names that are always provided by the CDN(s) and
  601. // not header names that may be passed through from clients.
  602. for _, header := range server.support.Config.MeekProxyForwardedForHeaders {
  603. // In the case where there are multiple headers,
  604. // request.Header.Get returns the first header, but we want the
  605. // last header; so use request.Header.Values and select the last
  606. // value. As per RFC 2616 section 4.2, a proxy must not change
  607. // the order of field values, which implies that it should append
  608. // values to the last header.
  609. values := request.Header.Values(header)
  610. if len(values) > 0 {
  611. value := values[len(values)-1]
  612. // Some headers, such as X-Forwarded-For, are a comma-separated
  613. // list of IPs (each proxy in a chain). Select the last IP.
  614. IPs := strings.Split(value, ",")
  615. IP := IPs[len(IPs)-1]
  616. // Remove optional whitespace surrounding the commas.
  617. IP = strings.TrimSpace(IP)
  618. if net.ParseIP(IP) != nil {
  619. clientIP = IP
  620. break
  621. }
  622. }
  623. }
  624. }
  625. geoIPData := server.support.GeoIPService.Lookup(clientIP)
  626. // The session is new (or expired). Treat the cookie value as a new meek
  627. // cookie, extract the payload, and create a new session.
  628. // Limitation: when the cookie is a session ID for an expired session, we
  629. // still attempt to treat it as a meek cookie. As it stands, that yields
  630. // either base64 decoding errors (RawStdEncoding vs. StdEncoding) or
  631. // length errors. We could log cleaner errors ("session is expired") by
  632. // checking that the cookie is a well-formed (base64.RawStdEncoding) value
  633. // between MEEK_MIN_SESSION_ID_LENGTH and MEEK_MAX_SESSION_ID_LENGTH
  634. // bytes -- assuming that MEEK_MAX_SESSION_ID_LENGTH is too short to be a
  635. // valid meek cookie.
  636. payloadJSON, err := server.getMeekCookiePayload(clientIP, meekCookie.Value)
  637. if err != nil {
  638. return "", nil, nil, "", nil, errors.Trace(err)
  639. }
  640. // Note: this meek server ignores legacy values PsiphonClientSessionId
  641. // and PsiphonServerAddress.
  642. var clientSessionData protocol.MeekCookieData
  643. err = json.Unmarshal(payloadJSON, &clientSessionData)
  644. if err != nil {
  645. return "", nil, nil, "", nil, errors.Trace(err)
  646. }
  647. tunnelProtocol := server.listenerTunnelProtocol
  648. if clientSessionData.ClientTunnelProtocol != "" {
  649. if !protocol.IsValidClientTunnelProtocol(
  650. clientSessionData.ClientTunnelProtocol,
  651. server.listenerTunnelProtocol,
  652. server.support.Config.GetRunningProtocols()) {
  653. return "", nil, nil, "", nil, errors.Tracef(
  654. "invalid client tunnel protocol: %s", clientSessionData.ClientTunnelProtocol)
  655. }
  656. tunnelProtocol = clientSessionData.ClientTunnelProtocol
  657. }
  658. // Any rate limit is enforced after the meek cookie is validated, so a prober
  659. // without the obfuscation secret will be unable to fingerprint the server
  660. // based on response time combined with the rate limit configuration. The
  661. // rate limit is primarily intended to limit memory resource consumption and
  662. // not the overhead incurred by cookie validation.
  663. if server.rateLimit(clientIP, geoIPData, tunnelProtocol) {
  664. return "", nil, nil, "", nil, errors.TraceNew("rate limit exceeded")
  665. }
  666. // Handle endpoints before enforcing CheckEstablishTunnels.
  667. // Currently, endpoints are tactics requests, and we allow these to be
  668. // handled by servers which would otherwise reject new tunnels.
  669. if clientSessionData.EndPoint != "" {
  670. return "", nil, nil, clientSessionData.EndPoint, &geoIPData, nil
  671. }
  672. // Don't create new sessions when not establishing. A subsequent SSH handshake
  673. // will not succeed, so creating a meek session just wastes resources.
  674. if server.support.TunnelServer != nil &&
  675. !server.support.TunnelServer.CheckEstablishTunnels() {
  676. return "", nil, nil, "", nil, errors.TraceNew("not establishing tunnels")
  677. }
  678. // Disconnect immediately if the tactics for the client restricts usage of
  679. // the fronting provider ID. The probability may be used to influence
  680. // usage of a given fronting provider; but when only that provider works
  681. // for a given client, and the probability is less than 1.0, the client
  682. // can retry until it gets a successful coin flip.
  683. //
  684. // Clients will also skip candidates with restricted fronting provider IDs.
  685. // The client-side probability, RestrictFrontingProviderIDsClientProbability,
  686. // is applied independently of the server-side coin flip here.
  687. //
  688. // At this stage, GeoIP tactics filters are active, but handshake API
  689. // parameters are not.
  690. //
  691. // See the comment in server.LoadConfig regarding fronting provider ID
  692. // limitations.
  693. if protocol.TunnelProtocolUsesFrontedMeek(server.listenerTunnelProtocol) &&
  694. server.support.ServerTacticsParametersCache != nil {
  695. p, err := server.support.ServerTacticsParametersCache.Get(geoIPData)
  696. if err != nil {
  697. return "", nil, nil, "", nil, errors.Trace(err)
  698. }
  699. if !p.IsNil() &&
  700. common.Contains(
  701. p.Strings(parameters.RestrictFrontingProviderIDs),
  702. server.support.Config.GetFrontingProviderID()) {
  703. if p.WeightedCoinFlip(
  704. parameters.RestrictFrontingProviderIDsServerProbability) {
  705. return "", nil, nil, "", nil, errors.TraceNew("restricted fronting provider")
  706. }
  707. }
  708. }
  709. // Create a new session
  710. bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
  711. if server.support.Config.MeekCachedResponseBufferSize != 0 {
  712. bufferLength = server.support.Config.MeekCachedResponseBufferSize
  713. }
  714. cachedResponse := NewCachedResponse(bufferLength, server.bufferPool)
  715. session = &meekSession{
  716. meekProtocolVersion: clientSessionData.MeekProtocolVersion,
  717. sessionIDSent: false,
  718. cachedResponse: cachedResponse,
  719. cookieName: meekCookie.Name,
  720. contentType: request.Header.Get("Content-Type"),
  721. }
  722. session.touch()
  723. // Create a new meek conn that will relay the payload
  724. // between meek request/responses and the tunnel server client
  725. // handler. The client IP is also used to initialize the
  726. // meek conn with a useful value to return when the tunnel
  727. // server calls conn.RemoteAddr() to get the client's IP address.
  728. // Assumes clientIP is a valid IP address; the port value is a stub
  729. // and is expected to be ignored.
  730. clientConn := newMeekConn(
  731. server,
  732. session,
  733. underlyingConn,
  734. &net.TCPAddr{
  735. IP: net.ParseIP(clientIP),
  736. Port: 0,
  737. },
  738. clientSessionData.MeekProtocolVersion)
  739. session.clientConn = clientConn
  740. // Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
  741. // meek cookie to a session ID; v1 clients always send the
  742. // original meek cookie value with each request. The issue with
  743. // v1 is that clients which wake after a device sleep will attempt
  744. // to resume a meek session and the server can't differentiate
  745. // between resuming a session and creating a new session. This
  746. // causes the v1 client connection to hang/timeout.
  747. sessionID := meekCookie.Value
  748. if clientSessionData.MeekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 {
  749. sessionID, err = makeMeekSessionID()
  750. if err != nil {
  751. return "", nil, nil, "", nil, errors.Trace(err)
  752. }
  753. }
  754. server.sessionsLock.Lock()
  755. server.sessions[sessionID] = session
  756. server.sessionsLock.Unlock()
  757. // Note: from the tunnel server's perspective, this client connection
  758. // will close when session.delete calls Close() on the meekConn.
  759. server.clientHandler(clientSessionData.ClientTunnelProtocol, session.clientConn)
  760. return sessionID, session, underlyingConn, "", nil, nil
  761. }
  762. func (server *MeekServer) rateLimit(
  763. clientIP string, geoIPData GeoIPData, tunnelProtocol string) bool {
  764. historySize,
  765. thresholdSeconds,
  766. tunnelProtocols,
  767. regions,
  768. ISPs,
  769. ASNs,
  770. cities,
  771. GCTriggerCount, _, _ :=
  772. server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
  773. if historySize == 0 {
  774. return false
  775. }
  776. if len(tunnelProtocols) > 0 {
  777. if !common.Contains(tunnelProtocols, tunnelProtocol) {
  778. return false
  779. }
  780. }
  781. if len(regions) > 0 || len(ISPs) > 0 || len(ASNs) > 0 || len(cities) > 0 {
  782. if len(regions) > 0 {
  783. if !common.Contains(regions, geoIPData.Country) {
  784. return false
  785. }
  786. }
  787. if len(ISPs) > 0 {
  788. if !common.Contains(ISPs, geoIPData.ISP) {
  789. return false
  790. }
  791. }
  792. if len(ASNs) > 0 {
  793. if !common.Contains(ASNs, geoIPData.ASN) {
  794. return false
  795. }
  796. }
  797. if len(cities) > 0 {
  798. if !common.Contains(cities, geoIPData.City) {
  799. return false
  800. }
  801. }
  802. }
  803. // With IPv6, individual users or sites are users commonly allocated a /64
  804. // or /56, so rate limit by /56.
  805. rateLimitIP := clientIP
  806. IP := net.ParseIP(clientIP)
  807. if IP != nil && IP.To4() == nil {
  808. rateLimitIP = IP.Mask(net.CIDRMask(56, 128)).String()
  809. }
  810. // go-cache-lru is safe for concurrent access, but lacks an atomic
  811. // compare-and-set type operations to check if an entry exists before
  812. // adding a new one. This mutex ensures the Get and Add are atomic
  813. // (as well as synchronizing access to rateLimitCount).
  814. server.rateLimitLock.Lock()
  815. var rateLimiter *ratelimit.Bucket
  816. entry, ok := server.rateLimitHistory.Get(rateLimitIP)
  817. if ok {
  818. rateLimiter = entry.(*ratelimit.Bucket)
  819. } else {
  820. rateLimiter = ratelimit.NewBucketWithQuantum(
  821. time.Duration(thresholdSeconds)*time.Second,
  822. int64(historySize),
  823. int64(historySize))
  824. server.rateLimitHistory.Set(
  825. rateLimitIP,
  826. rateLimiter,
  827. time.Duration(thresholdSeconds)*time.Second)
  828. }
  829. limit := rateLimiter.TakeAvailable(1) < 1
  830. triggerGC := false
  831. if limit {
  832. server.rateLimitCount += 1
  833. if server.rateLimitCount >= GCTriggerCount {
  834. triggerGC = true
  835. server.rateLimitCount = 0
  836. }
  837. }
  838. server.rateLimitLock.Unlock()
  839. if triggerGC {
  840. select {
  841. case server.rateLimitSignalGC <- struct{}{}:
  842. default:
  843. }
  844. }
  845. return limit
  846. }
  847. func (server *MeekServer) rateLimitWorker() {
  848. for {
  849. select {
  850. case <-server.rateLimitSignalGC:
  851. runtime.GC()
  852. case <-server.stopBroadcast:
  853. return
  854. }
  855. }
  856. }
  857. func (server *MeekServer) deleteSession(sessionID string) {
  858. // Don't obtain the server.sessionsLock write lock until modifying
  859. // server.sessions, as the session.delete can block for up to
  860. // MEEK_HTTP_CLIENT_IO_TIMEOUT. Allow new sessions to be added
  861. // concurrently.
  862. //
  863. // Since a lock isn't held for the duration, concurrent calls to
  864. // deleteSession with the same sessionID could happen; this is
  865. // not expected since only the reaper goroutine calls deleteExpiredSessions
  866. // (and in any case concurrent execution of the ok block is not an issue).
  867. server.sessionsLock.RLock()
  868. session, ok := server.sessions[sessionID]
  869. server.sessionsLock.RUnlock()
  870. if ok {
  871. session.delete(false)
  872. server.sessionsLock.Lock()
  873. delete(server.sessions, sessionID)
  874. server.sessionsLock.Unlock()
  875. }
  876. }
  877. func (server *MeekServer) deleteExpiredSessions() {
  878. // A deleteSession call may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  879. // so grab a snapshot list of expired sessions and do not hold a lock for
  880. // the duration of deleteExpiredSessions. This allows new sessions to be
  881. // added concurrently.
  882. //
  883. // New sessions added after the snapshot is taken will be checked for
  884. // expiry on subsequent periodic calls to deleteExpiredSessions.
  885. //
  886. // To avoid long delays in releasing resources, individual deletes are
  887. // performed concurrently.
  888. server.sessionsLock.Lock()
  889. expiredSessionIDs := make([]string, 0)
  890. for sessionID, session := range server.sessions {
  891. if session.expired() {
  892. expiredSessionIDs = append(expiredSessionIDs, sessionID)
  893. }
  894. }
  895. server.sessionsLock.Unlock()
  896. start := time.Now()
  897. deleteWaitGroup := new(sync.WaitGroup)
  898. for _, sessionID := range expiredSessionIDs {
  899. deleteWaitGroup.Add(1)
  900. go func(sessionID string) {
  901. defer deleteWaitGroup.Done()
  902. server.deleteSession(sessionID)
  903. }(sessionID)
  904. }
  905. deleteWaitGroup.Wait()
  906. log.WithTraceFields(
  907. LogFields{"elapsed time": time.Since(start)}).Debug("deleted expired sessions")
  908. }
  909. // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
  910. // meek server.
  911. func (server *MeekServer) httpConnStateCallback(conn net.Conn, connState http.ConnState) {
  912. switch connState {
  913. case http.StateNew:
  914. server.openConns.Add(conn)
  915. case http.StateHijacked, http.StateClosed:
  916. server.openConns.Remove(conn)
  917. }
  918. }
  919. // getMeekCookiePayload extracts the payload from a meek cookie. The cookie
  920. // payload is base64 encoded, obfuscated, and NaCl encrypted.
  921. func (server *MeekServer) getMeekCookiePayload(
  922. clientIP string, cookieValue string) ([]byte, error) {
  923. decodedValue, err := base64.StdEncoding.DecodeString(cookieValue)
  924. if err != nil {
  925. return nil, errors.Trace(err)
  926. }
  927. // The data consists of an obfuscated seed message prepended
  928. // to the obfuscated, encrypted payload. The server obfuscator
  929. // will read the seed message, leaving the remaining encrypted
  930. // data in the reader.
  931. reader := bytes.NewReader(decodedValue[:])
  932. obfuscator, err := obfuscator.NewServerObfuscator(
  933. &obfuscator.ObfuscatorConfig{
  934. Keyword: server.support.Config.MeekObfuscatedKey,
  935. SeedHistory: server.obfuscatorSeedHistory,
  936. IrregularLogger: func(clientIP string, err error, logFields common.LogFields) {
  937. logIrregularTunnel(
  938. server.support,
  939. server.listenerTunnelProtocol,
  940. server.listenerPort,
  941. clientIP,
  942. errors.Trace(err),
  943. LogFields(logFields))
  944. },
  945. },
  946. clientIP,
  947. reader)
  948. if err != nil {
  949. return nil, errors.Trace(err)
  950. }
  951. offset, err := reader.Seek(0, 1)
  952. if err != nil {
  953. return nil, errors.Trace(err)
  954. }
  955. encryptedPayload := decodedValue[offset:]
  956. obfuscator.ObfuscateClientToServer(encryptedPayload)
  957. var nonce [24]byte
  958. var privateKey, ephemeralPublicKey [32]byte
  959. decodedPrivateKey, err := base64.StdEncoding.DecodeString(
  960. server.support.Config.MeekCookieEncryptionPrivateKey)
  961. if err != nil {
  962. return nil, errors.Trace(err)
  963. }
  964. copy(privateKey[:], decodedPrivateKey)
  965. if len(encryptedPayload) < 32 {
  966. return nil, errors.TraceNew("unexpected encrypted payload size")
  967. }
  968. copy(ephemeralPublicKey[0:32], encryptedPayload[0:32])
  969. payload, ok := box.Open(nil, encryptedPayload[32:], &nonce, &ephemeralPublicKey, &privateKey)
  970. if !ok {
  971. return nil, errors.TraceNew("open box failed")
  972. }
  973. return payload, nil
  974. }
  975. // makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
  976. // Currently, this config is optimized for fronted meek where the nature
  977. // of the connection is non-circumvention; it's optimized for performance
  978. // assuming the peer is an uncensored CDN.
  979. func (server *MeekServer) makeMeekTLSConfig(
  980. isFronted bool, useObfuscatedSessionTickets bool) (*tris.Config, error) {
  981. certificate, privateKey, err := common.GenerateWebServerCertificate(values.GetHostName())
  982. if err != nil {
  983. return nil, errors.Trace(err)
  984. }
  985. tlsCertificate, err := tris.X509KeyPair(
  986. []byte(certificate), []byte(privateKey))
  987. if err != nil {
  988. return nil, errors.Trace(err)
  989. }
  990. // Vary the minimum version to frustrate scanning/fingerprinting of unfronted servers.
  991. // Limitation: like the certificate, this value changes on restart.
  992. minVersionCandidates := []uint16{tris.VersionTLS10, tris.VersionTLS11, tris.VersionTLS12}
  993. minVersion := minVersionCandidates[prng.Intn(len(minVersionCandidates))]
  994. config := &tris.Config{
  995. Certificates: []tris.Certificate{tlsCertificate},
  996. NextProtos: []string{"http/1.1"},
  997. MinVersion: minVersion,
  998. UseExtendedMasterSecret: true,
  999. }
  1000. if isFronted {
  1001. // This is a reordering of the supported CipherSuites in golang 1.6[*]. Non-ephemeral key
  1002. // CipherSuites greatly reduce server load, and we try to select these since the meek
  1003. // protocol is providing obfuscation, not privacy/integrity (this is provided by the
  1004. // tunneled SSH), so we don't benefit from the perfect forward secrecy property provided
  1005. // by ephemeral key CipherSuites.
  1006. // https://github.com/golang/go/blob/1cb3044c9fcd88e1557eca1bf35845a4108bc1db/src/crypto/tls/cipher_suites.go#L75
  1007. //
  1008. // This optimization is applied only when there's a CDN in front of the meek server; in
  1009. // unfronted cases we prefer a more natural TLS handshake.
  1010. //
  1011. // [*] the list has since been updated, removing CipherSuites using RC4 and 3DES.
  1012. config.CipherSuites = []uint16{
  1013. tris.TLS_RSA_WITH_AES_128_GCM_SHA256,
  1014. tris.TLS_RSA_WITH_AES_256_GCM_SHA384,
  1015. tris.TLS_RSA_WITH_AES_128_CBC_SHA,
  1016. tris.TLS_RSA_WITH_AES_256_CBC_SHA,
  1017. tris.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
  1018. tris.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
  1019. tris.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  1020. tris.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
  1021. tris.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
  1022. tris.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
  1023. tris.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
  1024. tris.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
  1025. }
  1026. config.PreferServerCipherSuites = true
  1027. }
  1028. if useObfuscatedSessionTickets {
  1029. // See obfuscated session ticket overview
  1030. // in NewObfuscatedClientSessionCache.
  1031. config.UseObfuscatedSessionTickets = true
  1032. var obfuscatedSessionTicketKey [32]byte
  1033. key, err := hex.DecodeString(server.support.Config.MeekObfuscatedKey)
  1034. if err == nil && len(key) != 32 {
  1035. err = std_errors.New("invalid obfuscated session key length")
  1036. }
  1037. if err != nil {
  1038. return nil, errors.Trace(err)
  1039. }
  1040. copy(obfuscatedSessionTicketKey[:], key)
  1041. var standardSessionTicketKey [32]byte
  1042. _, err = rand.Read(standardSessionTicketKey[:])
  1043. if err != nil {
  1044. return nil, errors.Trace(err)
  1045. }
  1046. // Note: SessionTicketKey needs to be set, or else, it appears,
  1047. // tris.Config.serverInit() will clobber the value set by
  1048. // SetSessionTicketKeys.
  1049. config.SessionTicketKey = obfuscatedSessionTicketKey
  1050. config.SetSessionTicketKeys([][32]byte{
  1051. standardSessionTicketKey,
  1052. obfuscatedSessionTicketKey})
  1053. }
  1054. // When configured, initialize passthrough mode, an anti-probing defense.
  1055. // Clients must prove knowledge of the obfuscated key via a message sent in
  1056. // the TLS ClientHello random field.
  1057. //
  1058. // When clients fail to provide a valid message, the client connection is
  1059. // relayed to the designated passthrough address, typically another web site.
  1060. // The entire flow is relayed, including the original ClientHello, so the
  1061. // client will perform a TLS handshake with the passthrough target.
  1062. //
  1063. // Irregular events are logged for invalid client activity.
  1064. if server.passthroughAddress != "" {
  1065. config.PassthroughAddress = server.passthroughAddress
  1066. config.PassthroughVerifyMessage = func(
  1067. message []byte) bool {
  1068. return obfuscator.VerifyTLSPassthroughMessage(
  1069. !server.support.Config.LegacyPassthrough,
  1070. server.support.Config.MeekObfuscatedKey,
  1071. message)
  1072. }
  1073. config.PassthroughLogInvalidMessage = func(
  1074. clientIP string) {
  1075. logIrregularTunnel(
  1076. server.support,
  1077. server.listenerTunnelProtocol,
  1078. server.listenerPort,
  1079. clientIP,
  1080. errors.TraceNew("invalid passthrough message"),
  1081. nil)
  1082. }
  1083. config.PassthroughHistoryAddNew = func(
  1084. clientIP string,
  1085. clientRandom []byte) bool {
  1086. // Use a custom, shorter TTL based on the validity period of the
  1087. // passthrough message.
  1088. TTL := obfuscator.TLS_PASSTHROUGH_TIME_PERIOD
  1089. if server.support.Config.LegacyPassthrough {
  1090. TTL = obfuscator.HISTORY_SEED_TTL
  1091. }
  1092. // strictMode is true as, unlike with meek cookies, legitimate meek clients
  1093. // never retry TLS connections using a previous random value.
  1094. ok, logFields := server.obfuscatorSeedHistory.AddNewWithTTL(
  1095. true,
  1096. clientIP,
  1097. "client-random",
  1098. clientRandom,
  1099. TTL)
  1100. if logFields != nil {
  1101. logIrregularTunnel(
  1102. server.support,
  1103. server.listenerTunnelProtocol,
  1104. server.listenerPort,
  1105. clientIP,
  1106. errors.TraceNew("duplicate passthrough message"),
  1107. LogFields(*logFields))
  1108. }
  1109. return ok
  1110. }
  1111. }
  1112. return config, nil
  1113. }
  1114. type meekSession struct {
  1115. // Note: 64-bit ints used with atomic operations are placed
  1116. // at the start of struct to ensure 64-bit alignment.
  1117. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  1118. lastActivity int64
  1119. requestCount int64
  1120. metricClientRetries int64
  1121. metricPeakResponseSize int64
  1122. metricPeakCachedResponseSize int64
  1123. metricPeakCachedResponseHitSize int64
  1124. metricCachedResponseMissPosition int64
  1125. metricUnderlyingConnCount int64
  1126. lock sync.Mutex
  1127. deleted bool
  1128. underlyingConn net.Conn
  1129. clientConn *meekConn
  1130. meekProtocolVersion int
  1131. sessionIDSent bool
  1132. cachedResponse *CachedResponse
  1133. cookieName string
  1134. contentType string
  1135. }
  1136. func (session *meekSession) touch() {
  1137. atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
  1138. }
  1139. func (session *meekSession) expired() bool {
  1140. if session.clientConn == nil {
  1141. // Not fully initialized. meekSession.clientConn will be set before adding
  1142. // the session to MeekServer.sessions.
  1143. return false
  1144. }
  1145. lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
  1146. return monotime.Since(lastActivity) >
  1147. session.clientConn.meekServer.maxSessionStaleness
  1148. }
  1149. // delete releases all resources allocated by a session.
  1150. func (session *meekSession) delete(haveLock bool) {
  1151. // TODO: close the persistent HTTP client connection, if one exists?
  1152. // This final call session.cachedResponse.Reset releases shared resources.
  1153. //
  1154. // This call requires exclusive access. session.lock is be obtained before
  1155. // calling session.cachedResponse.Reset. Once the lock is obtained, no
  1156. // request for this session is being processed concurrently, and pending
  1157. // requests will block at session.lock.
  1158. //
  1159. // This logic assumes that no further session.cachedResponse access occurs,
  1160. // or else resources may deplete (buffers won't be returned to the pool).
  1161. // These requirements are achieved by obtaining the lock, setting
  1162. // session.deleted, and any subsequent request handlers checking
  1163. // session.deleted immediately after obtaining the lock.
  1164. //
  1165. // session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  1166. // the timeout for any active request handler processing a session
  1167. // request.
  1168. //
  1169. // When the lock must be acquired, clientConn.Close is called first, to
  1170. // interrupt any existing request handler blocking on pumpReads or pumpWrites.
  1171. session.clientConn.Close()
  1172. if !haveLock {
  1173. session.lock.Lock()
  1174. }
  1175. // Release all extended buffers back to the pool.
  1176. // session.cachedResponse.Reset is not safe for concurrent calls.
  1177. session.cachedResponse.Reset()
  1178. session.deleted = true
  1179. if !haveLock {
  1180. session.lock.Unlock()
  1181. }
  1182. }
  1183. // GetMetrics implements the common.MetricsSource interface.
  1184. func (session *meekSession) GetMetrics() common.LogFields {
  1185. logFields := make(common.LogFields)
  1186. logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)
  1187. logFields["meek_peak_response_size"] = atomic.LoadInt64(&session.metricPeakResponseSize)
  1188. logFields["meek_peak_cached_response_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseSize)
  1189. logFields["meek_peak_cached_response_hit_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseHitSize)
  1190. logFields["meek_cached_response_miss_position"] = atomic.LoadInt64(&session.metricCachedResponseMissPosition)
  1191. logFields["meek_underlying_connection_count"] = atomic.LoadInt64(&session.metricUnderlyingConnCount)
  1192. logFields["meek_cookie_name"] = session.cookieName
  1193. logFields["meek_content_type"] = session.contentType
  1194. return logFields
  1195. }
  1196. // makeMeekSessionID creates a new session ID. The variable size is intended to
  1197. // frustrate traffic analysis of both plaintext and TLS meek traffic.
  1198. func makeMeekSessionID() (string, error) {
  1199. size := MEEK_MIN_SESSION_ID_LENGTH +
  1200. prng.Intn(MEEK_MAX_SESSION_ID_LENGTH-MEEK_MIN_SESSION_ID_LENGTH)
  1201. sessionID, err := common.MakeSecureRandomBytes(size)
  1202. if err != nil {
  1203. return "", errors.Trace(err)
  1204. }
  1205. // Omit padding to maximize variable size space. To the client, the session
  1206. // ID is an opaque string cookie value.
  1207. return base64.RawStdEncoding.EncodeToString(sessionID), nil
  1208. }
  1209. // meekConn implements the net.Conn interface and is to be used as a client
  1210. // connection by the tunnel server (being passed to sshServer.handleClient).
  1211. // meekConn bridges net/http request/response payload readers and writers
  1212. // and goroutines calling Read()s and Write()s.
  1213. type meekConn struct {
  1214. meekServer *MeekServer
  1215. meekSession *meekSession
  1216. firstUnderlyingConn net.Conn
  1217. remoteAddr net.Addr
  1218. protocolVersion int
  1219. closeBroadcast chan struct{}
  1220. closed int32
  1221. lastReadChecksum *uint64
  1222. readLock sync.Mutex
  1223. emptyReadBuffer chan *bytes.Buffer
  1224. partialReadBuffer chan *bytes.Buffer
  1225. fullReadBuffer chan *bytes.Buffer
  1226. writeLock sync.Mutex
  1227. nextWriteBuffer chan []byte
  1228. writeResult chan error
  1229. }
  1230. func newMeekConn(
  1231. meekServer *MeekServer,
  1232. meekSession *meekSession,
  1233. underlyingConn net.Conn,
  1234. remoteAddr net.Addr,
  1235. protocolVersion int) *meekConn {
  1236. // In order to inspect its properties, meekConn will hold a reference to
  1237. // firstUnderlyingConn, the _first_ underlying TCP conn, for the full
  1238. // lifetime of meekConn, which may exceed the lifetime of firstUnderlyingConn
  1239. // and include subsequent underlying TCP conns. In this case, it is expected
  1240. // that firstUnderlyingConn will be closed by "net/http", so no OS resources
  1241. // (e.g., a socket) are retained longer than necessary.
  1242. conn := &meekConn{
  1243. meekServer: meekServer,
  1244. meekSession: meekSession,
  1245. firstUnderlyingConn: underlyingConn,
  1246. remoteAddr: remoteAddr,
  1247. protocolVersion: protocolVersion,
  1248. closeBroadcast: make(chan struct{}),
  1249. closed: 0,
  1250. emptyReadBuffer: make(chan *bytes.Buffer, 1),
  1251. partialReadBuffer: make(chan *bytes.Buffer, 1),
  1252. fullReadBuffer: make(chan *bytes.Buffer, 1),
  1253. nextWriteBuffer: make(chan []byte, 1),
  1254. writeResult: make(chan error, 1),
  1255. }
  1256. // Read() calls and pumpReads() are synchronized by exchanging control
  1257. // of a single readBuffer. This is the same scheme used in and described
  1258. // in psiphon.MeekConn.
  1259. conn.emptyReadBuffer <- new(bytes.Buffer)
  1260. return conn
  1261. }
  1262. // GetMetrics implements the common.MetricsSource interface. The metrics are
  1263. // maintained in the meek session type; but logTunnel, which calls
  1264. // MetricsSource.GetMetrics, has a pointer only to this conn, so it calls
  1265. // through to the session.
  1266. func (conn *meekConn) GetMetrics() common.LogFields {
  1267. logFields := conn.meekSession.GetMetrics()
  1268. if conn.meekServer.passthroughAddress != "" {
  1269. logFields["passthrough_address"] = conn.meekServer.passthroughAddress
  1270. }
  1271. // Include metrics, such as fragmentor metrics, from the _first_ underlying
  1272. // TCP conn. Properties of subsequent underlying TCP conns are not reflected
  1273. // in these metrics; we assume that the first TCP conn, which most likely
  1274. // transits the various protocol handshakes, is most significant.
  1275. underlyingMetrics, ok := conn.firstUnderlyingConn.(common.MetricsSource)
  1276. if ok {
  1277. logFields.Add(underlyingMetrics.GetMetrics())
  1278. }
  1279. return logFields
  1280. }
  1281. // GetUnderlyingTCPAddrs implements the common.UnderlyingTCPAddrSource
  1282. // interface, returning the TCP addresses for the _first_ underlying TCP
  1283. // connection in the meek tunnel.
  1284. func (conn *meekConn) GetUnderlyingTCPAddrs() (*net.TCPAddr, *net.TCPAddr, bool) {
  1285. localAddr, ok := conn.firstUnderlyingConn.LocalAddr().(*net.TCPAddr)
  1286. if !ok {
  1287. return nil, nil, false
  1288. }
  1289. remoteAddr, ok := conn.firstUnderlyingConn.RemoteAddr().(*net.TCPAddr)
  1290. if !ok {
  1291. return nil, nil, false
  1292. }
  1293. return localAddr, remoteAddr, true
  1294. }
  1295. // SetReplay implements the common.FragmentorReplayAccessor interface, applying
  1296. // the inputs to the _first_ underlying TCP connection in the meek tunnel. If
  1297. // the underlying connection is closed, the SetSeed call will have no effect.
  1298. func (conn *meekConn) SetReplay(PRNG *prng.PRNG) {
  1299. fragmentor, ok := conn.firstUnderlyingConn.(common.FragmentorReplayAccessor)
  1300. if ok {
  1301. fragmentor.SetReplay(PRNG)
  1302. }
  1303. }
  1304. // GetReplay implements the FragmentorReplayAccessor interface, getting the
  1305. // outputs from the _first_ underlying TCP connection in the meek tunnel.
  1306. //
  1307. // We assume that the first TCP conn is most significant: the initial TCP
  1308. // connection most likely fragments protocol handshakes; and, in the case the
  1309. // packet manipulation, any selected packet manipulation spec would have been
  1310. // successful.
  1311. func (conn *meekConn) GetReplay() (*prng.Seed, bool) {
  1312. fragmentor, ok := conn.firstUnderlyingConn.(common.FragmentorReplayAccessor)
  1313. if ok {
  1314. return fragmentor.GetReplay()
  1315. }
  1316. return nil, false
  1317. }
  1318. // pumpReads causes goroutines blocking on meekConn.Read() to read
  1319. // from the specified reader. This function blocks until the reader
  1320. // is fully consumed or the meekConn is closed. A read buffer allows
  1321. // up to MEEK_MAX_REQUEST_PAYLOAD_LENGTH bytes to be read and buffered
  1322. // without a Read() immediately consuming the bytes, but there's still
  1323. // a possibility of a stall if no Read() calls are made after this
  1324. // read buffer is full.
  1325. // Returns the number of request bytes read.
  1326. // Note: assumes only one concurrent call to pumpReads
  1327. func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
  1328. // Use either an empty or partial buffer. By using a partial
  1329. // buffer, pumpReads will not block if the Read() caller has
  1330. // not fully drained the read buffer.
  1331. var readBuffer *bytes.Buffer
  1332. select {
  1333. case readBuffer = <-conn.emptyReadBuffer:
  1334. case readBuffer = <-conn.partialReadBuffer:
  1335. case <-conn.closeBroadcast:
  1336. return 0, io.EOF
  1337. }
  1338. newDataOffset := readBuffer.Len()
  1339. // Since we need to read the full request payload in order to
  1340. // take its checksum before relaying it, the read buffer can
  1341. // grow to up to 2 x MEEK_MAX_REQUEST_PAYLOAD_LENGTH + 1.
  1342. // +1 allows for an explicit check for request payloads that
  1343. // exceed the maximum permitted length.
  1344. limitReader := io.LimitReader(reader, MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1)
  1345. n, err := readBuffer.ReadFrom(limitReader)
  1346. if err == nil && n == MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1 {
  1347. err = std_errors.New("invalid request payload length")
  1348. }
  1349. // If the request read fails, don't relay the new data. This allows
  1350. // the client to retry and resend its request payload without
  1351. // interrupting/duplicating the payload flow.
  1352. if err != nil {
  1353. readBuffer.Truncate(newDataOffset)
  1354. conn.replaceReadBuffer(readBuffer)
  1355. return 0, errors.Trace(err)
  1356. }
  1357. // Check if request payload checksum matches immediately
  1358. // previous payload. On match, assume this is a client retry
  1359. // sending payload that was already relayed and skip this
  1360. // payload. Payload is OSSH ciphertext and almost surely
  1361. // will not repeat. In the highly unlikely case that it does,
  1362. // the underlying SSH connection will fail and the client
  1363. // must reconnect.
  1364. checksum := crc64.Checksum(
  1365. readBuffer.Bytes()[newDataOffset:], conn.meekServer.checksumTable)
  1366. if conn.lastReadChecksum == nil {
  1367. conn.lastReadChecksum = new(uint64)
  1368. } else if *conn.lastReadChecksum == checksum {
  1369. readBuffer.Truncate(newDataOffset)
  1370. }
  1371. *conn.lastReadChecksum = checksum
  1372. conn.replaceReadBuffer(readBuffer)
  1373. return n, nil
  1374. }
  1375. var errMeekConnectionHasClosed = std_errors.New("meek connection has closed")
  1376. // Read reads from the meekConn into buffer. Read blocks until
  1377. // some data is read or the meekConn closes. Under the hood, it
  1378. // waits for pumpReads to submit a reader to read from.
  1379. // Note: lock is to conform with net.Conn concurrency semantics
  1380. func (conn *meekConn) Read(buffer []byte) (int, error) {
  1381. conn.readLock.Lock()
  1382. defer conn.readLock.Unlock()
  1383. var readBuffer *bytes.Buffer
  1384. select {
  1385. case readBuffer = <-conn.partialReadBuffer:
  1386. case readBuffer = <-conn.fullReadBuffer:
  1387. case <-conn.closeBroadcast:
  1388. return 0, errors.Trace(errMeekConnectionHasClosed)
  1389. }
  1390. n, err := readBuffer.Read(buffer)
  1391. conn.replaceReadBuffer(readBuffer)
  1392. return n, err
  1393. }
  1394. func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
  1395. length := readBuffer.Len()
  1396. if length >= MEEK_MAX_REQUEST_PAYLOAD_LENGTH {
  1397. conn.fullReadBuffer <- readBuffer
  1398. } else if length == 0 {
  1399. conn.emptyReadBuffer <- readBuffer
  1400. } else {
  1401. conn.partialReadBuffer <- readBuffer
  1402. }
  1403. }
  1404. // pumpWrites causes goroutines blocking on meekConn.Write() to write
  1405. // to the specified writer. This function blocks until the meek response
  1406. // body limits (size for protocol v1, turn around time for protocol v2+)
  1407. // are met, or the meekConn is closed.
  1408. //
  1409. // Note: channel scheme assumes only one concurrent call to pumpWrites
  1410. func (conn *meekConn) pumpWrites(
  1411. writer io.Writer, skipExtendedTurnAround bool) (int, error) {
  1412. startTime := time.Now()
  1413. timeout := time.NewTimer(conn.meekServer.turnAroundTimeout)
  1414. defer timeout.Stop()
  1415. n := 0
  1416. for {
  1417. select {
  1418. case buffer := <-conn.nextWriteBuffer:
  1419. written, err := writer.Write(buffer)
  1420. n += written
  1421. // Assumes that writeResult won't block.
  1422. // Note: always send the err to writeResult,
  1423. // as the Write() caller is blocking on this.
  1424. conn.writeResult <- err
  1425. if err != nil {
  1426. return n, err
  1427. }
  1428. if conn.protocolVersion < MEEK_PROTOCOL_VERSION_1 {
  1429. // Pre-protocol version 1 clients expect at most
  1430. // MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
  1431. return n, nil
  1432. }
  1433. if skipExtendedTurnAround {
  1434. // When fast turn around is indicated, skip the extended turn
  1435. // around timeout. This optimizes for upstream flows.
  1436. return n, nil
  1437. }
  1438. totalElapsedTime := time.Since(startTime) / time.Millisecond
  1439. if totalElapsedTime >= conn.meekServer.extendedTurnAroundTimeout {
  1440. return n, nil
  1441. }
  1442. timeout.Reset(conn.meekServer.turnAroundTimeout)
  1443. case <-timeout.C:
  1444. return n, nil
  1445. case <-conn.closeBroadcast:
  1446. return n, errors.Trace(errMeekConnectionHasClosed)
  1447. }
  1448. }
  1449. }
  1450. // Write writes the buffer to the meekConn. It blocks until the
  1451. // entire buffer is written to or the meekConn closes. Under the
  1452. // hood, it waits for sufficient pumpWrites calls to consume the
  1453. // write buffer.
  1454. // Note: lock is to conform with net.Conn concurrency semantics
  1455. func (conn *meekConn) Write(buffer []byte) (int, error) {
  1456. conn.writeLock.Lock()
  1457. defer conn.writeLock.Unlock()
  1458. // TODO: may be more efficient to send whole buffer
  1459. // and have pumpWrites stash partial buffer when can't
  1460. // send it all.
  1461. n := 0
  1462. for n < len(buffer) {
  1463. end := n + MEEK_MAX_REQUEST_PAYLOAD_LENGTH
  1464. if end > len(buffer) {
  1465. end = len(buffer)
  1466. }
  1467. // Only write MEEK_MAX_REQUEST_PAYLOAD_LENGTH at a time,
  1468. // to ensure compatibility with v1 protocol.
  1469. chunk := buffer[n:end]
  1470. select {
  1471. case conn.nextWriteBuffer <- chunk:
  1472. case <-conn.closeBroadcast:
  1473. return n, errors.Trace(errMeekConnectionHasClosed)
  1474. }
  1475. // Wait for the buffer to be processed.
  1476. select {
  1477. case <-conn.writeResult:
  1478. // The err from conn.writeResult comes from the
  1479. // io.MultiWriter used in pumpWrites, which writes
  1480. // to both the cached response and the HTTP response.
  1481. //
  1482. // Don't stop on error here, since only writing
  1483. // to the HTTP response will fail, and the client
  1484. // may retry and use the cached response.
  1485. //
  1486. // It's possible that the cached response buffer
  1487. // is too small for the client to successfully
  1488. // retry, but that cannot be determined. In this
  1489. // case, the meek connection will eventually fail.
  1490. //
  1491. // err is already logged in ServeHTTP.
  1492. case <-conn.closeBroadcast:
  1493. return n, errors.Trace(errMeekConnectionHasClosed)
  1494. }
  1495. n += len(chunk)
  1496. }
  1497. return n, nil
  1498. }
  1499. // Close closes the meekConn. This will interrupt any blocked
  1500. // Read, Write, pumpReads, and pumpWrites.
  1501. func (conn *meekConn) Close() error {
  1502. if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
  1503. close(conn.closeBroadcast)
  1504. // In general, we rely on "net/http" to close underlying TCP conns. In
  1505. // this case, we can directly close the first once, if it's still
  1506. // open. Don't close a persistent connection when fronted, as it may
  1507. // be still be used by other clients.
  1508. if !conn.meekServer.isFronted {
  1509. conn.firstUnderlyingConn.Close()
  1510. }
  1511. }
  1512. return nil
  1513. }
  1514. // Stub implementation of net.Conn.LocalAddr
  1515. func (conn *meekConn) LocalAddr() net.Addr {
  1516. return nil
  1517. }
  1518. // RemoteAddr returns the remoteAddr specified in newMeekConn. This
  1519. // acts as a proxy for the actual remote address, which is either a
  1520. // direct HTTP/HTTPS connection remote address, or in the case of
  1521. // downstream proxy of CDN fronts, some other value determined via
  1522. // HTTP headers.
  1523. func (conn *meekConn) RemoteAddr() net.Addr {
  1524. return conn.remoteAddr
  1525. }
  1526. // SetDeadline is not a true implementation of net.Conn.SetDeadline. It
  1527. // merely checks that the requested timeout exceeds the MEEK_MAX_SESSION_STALENESS
  1528. // period. When it does, and the session is idle, the meekConn Read/Write will
  1529. // be interrupted and return an error (not a timeout error) before the deadline.
  1530. // In other words, this conn will approximate the desired functionality of
  1531. // timing out on idle on or before the requested deadline.
  1532. func (conn *meekConn) SetDeadline(t time.Time) error {
  1533. // Overhead: nanoseconds (https://blog.cloudflare.com/its-go-time-on-linux/)
  1534. if time.Now().Add(conn.meekServer.maxSessionStaleness).Before(t) {
  1535. return nil
  1536. }
  1537. return errors.TraceNew("not supported")
  1538. }
  1539. // Stub implementation of net.Conn.SetReadDeadline
  1540. func (conn *meekConn) SetReadDeadline(t time.Time) error {
  1541. return errors.TraceNew("not supported")
  1542. }
  1543. // Stub implementation of net.Conn.SetWriteDeadline
  1544. func (conn *meekConn) SetWriteDeadline(t time.Time) error {
  1545. return errors.TraceNew("not supported")
  1546. }