meek.go 93 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "crypto/subtle"
  25. "crypto/tls"
  26. "encoding/base64"
  27. "encoding/hex"
  28. "encoding/json"
  29. std_errors "errors"
  30. "hash/crc64"
  31. "io"
  32. "io/ioutil"
  33. "net"
  34. "net/http"
  35. "runtime"
  36. "strconv"
  37. "strings"
  38. "sync"
  39. "sync/atomic"
  40. "time"
  41. psiphon_tls "github.com/Psiphon-Labs/psiphon-tls"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  51. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
  52. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
  53. lrucache "github.com/cognusion/go-cache-lru"
  54. "golang.org/x/crypto/nacl/box"
  55. "golang.org/x/time/rate"
  56. )
  57. // MeekServer is based on meek-server.go from Tor and Psiphon:
  58. //
  59. // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
  60. // CC0 1.0 Universal
  61. //
  62. // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
  63. const (
  64. // Protocol version 1 clients can handle arbitrary length response bodies. Older clients
  65. // report no version number and expect at most 64K response bodies.
  66. MEEK_PROTOCOL_VERSION_1 = 1
  67. // Protocol version 2 clients initiate a session by sending an encrypted and obfuscated meek
  68. // cookie with their initial HTTP request. Connection information is contained within the
  69. // encrypted cookie payload. The server inspects the cookie and establishes a new session and
  70. // returns a new random session ID back to client via Set-Cookie header. The client uses this
  71. // session ID on all subsequent requests for the remainder of the session.
  72. MEEK_PROTOCOL_VERSION_2 = 2
  73. // Protocol version 3 clients include resiliency enhancements and will add a Range header
  74. // when retrying a request for a partially downloaded response payload.
  75. MEEK_PROTOCOL_VERSION_3 = 3
  76. MEEK_MAX_REQUEST_PAYLOAD_LENGTH = 65536
  77. MEEK_MIN_SESSION_ID_LENGTH = 8
  78. MEEK_MAX_SESSION_ID_LENGTH = 20
  79. MEEK_DEFAULT_TURN_AROUND_TIMEOUT = 10 * time.Millisecond
  80. MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
  81. MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD = 8192
  82. MEEK_DEFAULT_MAX_SESSION_STALENESS = 45 * time.Second
  83. MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT = 45 * time.Second
  84. MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT = 360 * time.Second
  85. MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
  86. MEEK_DEFAULT_POOL_BUFFER_LENGTH = 65536
  87. MEEK_DEFAULT_POOL_BUFFER_COUNT = 2048
  88. MEEK_DEFAULT_POOL_BUFFER_CLIENT_LIMIT = 32
  89. MEEK_ENDPOINT_MAX_REQUEST_PAYLOAD_LENGTH = 65536
  90. MEEK_MAX_SESSION_COUNT = 1000000
  91. )
  92. // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
  93. // Obfuscated SSH traffic) over HTTP. Meek may be fronted (through a CDN) or direct and may be
  94. // HTTP or HTTPS.
  95. //
  96. // Upstream traffic arrives in HTTP request bodies and downstream traffic is sent in response
  97. // bodies. The sequence of traffic for a given flow is associated using a session ID that's
  98. // set as a HTTP cookie for the client to submit with each request.
  99. //
  100. // MeekServer hooks into TunnelServer via the net.Conn interface by transforming the
  101. // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
  102. // the meekConn struct.
  103. type MeekServer struct {
  104. support *SupportServices
  105. listener net.Listener
  106. listenerTunnelProtocol string
  107. listenerPort int
  108. isFronted bool
  109. passthroughAddress string
  110. turnAroundTimeout time.Duration
  111. extendedTurnAroundTimeout time.Duration
  112. skipExtendedTurnAroundThreshold int
  113. maxSessionStaleness time.Duration
  114. httpClientIOTimeout time.Duration
  115. stdTLSConfig *tls.Config
  116. psiphonTLSConfig *psiphon_tls.Config
  117. obfuscatorSeedHistory *obfuscator.SeedHistory
  118. clientHandler func(clientConn net.Conn, data *additionalTransportData)
  119. openConns *common.Conns[net.Conn]
  120. stopBroadcast <-chan struct{}
  121. sessionsLock sync.RWMutex
  122. sessions map[string]*meekSession
  123. checksumTable *crc64.Table
  124. bufferPool *CachedResponseBufferPool
  125. rateLimitLock sync.Mutex
  126. rateLimitHistory *lrucache.Cache
  127. rateLimitCount int
  128. rateLimitSignalGC chan struct{}
  129. normalizer *transforms.HTTPNormalizerListener
  130. inproxyBroker *inproxy.Broker
  131. inproxyCheckAllowMatch atomic.Value
  132. }
  133. // NewMeekServer initializes a new meek server.
  134. func NewMeekServer(
  135. support *SupportServices,
  136. listener net.Listener,
  137. listenerTunnelProtocol string,
  138. listenerPort int,
  139. useTLS, isFronted, useObfuscatedSessionTickets, useHTTPNormalizer bool,
  140. clientHandler func(clientConn net.Conn, data *additionalTransportData),
  141. stopBroadcast <-chan struct{}) (*MeekServer, error) {
  142. // With fronting, MeekRequiredHeaders can be used to ensure that the
  143. // request is coming through a CDN that's configured to add the
  144. // specified, secret header values. Configuring the MeekRequiredHeaders
  145. // scheme is required when running an in-proxy broker.
  146. if isFronted &&
  147. support.Config.MeekServerRunInproxyBroker &&
  148. len(support.Config.MeekRequiredHeaders) < 1 {
  149. return nil, errors.TraceNew("missing required header")
  150. }
  151. passthroughAddress := support.Config.TunnelProtocolPassthroughAddresses[listenerTunnelProtocol]
  152. turnAroundTimeout := MEEK_DEFAULT_TURN_AROUND_TIMEOUT
  153. if support.Config.MeekTurnAroundTimeoutMilliseconds != nil {
  154. turnAroundTimeout = time.Duration(
  155. *support.Config.MeekTurnAroundTimeoutMilliseconds) * time.Millisecond
  156. }
  157. extendedTurnAroundTimeout := MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT
  158. if support.Config.MeekExtendedTurnAroundTimeoutMilliseconds != nil {
  159. extendedTurnAroundTimeout = time.Duration(
  160. *support.Config.MeekExtendedTurnAroundTimeoutMilliseconds) * time.Millisecond
  161. }
  162. skipExtendedTurnAroundThreshold := MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD
  163. if support.Config.MeekSkipExtendedTurnAroundThresholdBytes != nil {
  164. skipExtendedTurnAroundThreshold = *support.Config.MeekSkipExtendedTurnAroundThresholdBytes
  165. }
  166. maxSessionStaleness := MEEK_DEFAULT_MAX_SESSION_STALENESS
  167. if support.Config.MeekMaxSessionStalenessMilliseconds != nil {
  168. maxSessionStaleness = time.Duration(
  169. *support.Config.MeekMaxSessionStalenessMilliseconds) * time.Millisecond
  170. }
  171. var httpClientIOTimeout time.Duration
  172. if isFronted {
  173. // Fronted has a distinct timeout, and the default is higher since new
  174. // clients may connect to a CDN edge and start using an existing
  175. // persistent connection.
  176. httpClientIOTimeout = MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT
  177. if support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds != nil {
  178. httpClientIOTimeout = time.Duration(
  179. *support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds) * time.Millisecond
  180. }
  181. } else {
  182. httpClientIOTimeout = MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT
  183. if support.Config.MeekHTTPClientIOTimeoutMilliseconds != nil {
  184. httpClientIOTimeout = time.Duration(
  185. *support.Config.MeekHTTPClientIOTimeoutMilliseconds) * time.Millisecond
  186. }
  187. }
  188. checksumTable := crc64.MakeTable(crc64.ECMA)
  189. bufferLength := MEEK_DEFAULT_POOL_BUFFER_LENGTH
  190. if support.Config.MeekCachedResponsePoolBufferSize != 0 {
  191. bufferLength = support.Config.MeekCachedResponsePoolBufferSize
  192. }
  193. bufferCount := MEEK_DEFAULT_POOL_BUFFER_COUNT
  194. if support.Config.MeekCachedResponsePoolBufferCount != 0 {
  195. bufferCount = support.Config.MeekCachedResponsePoolBufferCount
  196. }
  197. bufferPoolClientLimit := MEEK_DEFAULT_POOL_BUFFER_CLIENT_LIMIT
  198. if support.Config.MeekCachedResponsePoolBufferClientLimit != 0 {
  199. bufferPoolClientLimit = support.Config.MeekCachedResponsePoolBufferClientLimit
  200. }
  201. _, thresholdSeconds, _, _, _, _, _, _, reapFrequencySeconds, maxEntries :=
  202. support.TrafficRulesSet.GetMeekRateLimiterConfig()
  203. rateLimitHistory := lrucache.NewWithLRU(
  204. time.Duration(thresholdSeconds)*time.Second,
  205. time.Duration(reapFrequencySeconds)*time.Second,
  206. maxEntries)
  207. bufferPool := NewCachedResponseBufferPool(
  208. bufferLength, bufferCount, bufferPoolClientLimit)
  209. // Limitation: rate limiting and resource limiting are handled by external
  210. // components, and MeekServer enforces only a sanity check limit on the
  211. // number the number of entries in MeekServer.sessions.
  212. //
  213. // See comment in newSSHServer for more details.
  214. meekServer := &MeekServer{
  215. support: support,
  216. listener: listener,
  217. listenerTunnelProtocol: listenerTunnelProtocol,
  218. listenerPort: listenerPort,
  219. isFronted: isFronted,
  220. passthroughAddress: passthroughAddress,
  221. turnAroundTimeout: turnAroundTimeout,
  222. extendedTurnAroundTimeout: extendedTurnAroundTimeout,
  223. skipExtendedTurnAroundThreshold: skipExtendedTurnAroundThreshold,
  224. maxSessionStaleness: maxSessionStaleness,
  225. httpClientIOTimeout: httpClientIOTimeout,
  226. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  227. clientHandler: clientHandler,
  228. openConns: common.NewConns[net.Conn](),
  229. stopBroadcast: stopBroadcast,
  230. sessions: make(map[string]*meekSession),
  231. checksumTable: checksumTable,
  232. bufferPool: bufferPool,
  233. rateLimitHistory: rateLimitHistory,
  234. rateLimitSignalGC: make(chan struct{}, 1),
  235. }
  236. if useTLS {
  237. // For fronted meek servers, crypto/tls is used to ensure that
  238. // net/http.Server.Serve will find *crypto/tls.Conn types, as
  239. // required for enabling HTTP/2. The fronted case does not not
  240. // support or require the TLS passthrough or obfuscated session
  241. // ticket mechanisms, which are implemented in psiphon-tls. HTTP/2 is
  242. // preferred for fronted meek servers in order to multiplex many
  243. // concurrent requests, either from many tunnel clients or
  244. // many/individual in-proxy broker clients, over a single network
  245. // connection.
  246. //
  247. // For direct meek servers, psiphon-tls is used to provide the TLS
  248. // passthrough or obfuscated session ticket obfuscation mechanisms.
  249. // Direct meek servers do not enable HTTP/1.1 Each individual meek
  250. // tunnel client will have its own network connection and each client
  251. // has only a single in-flight meek request at a time.
  252. if isFronted {
  253. if useObfuscatedSessionTickets {
  254. return nil, errors.TraceNew("obfuscated session tickets unsupported")
  255. }
  256. if meekServer.passthroughAddress != "" {
  257. return nil, errors.TraceNew("passthrough unsupported")
  258. }
  259. tlsConfig, err := meekServer.makeFrontedMeekTLSConfig()
  260. if err != nil {
  261. return nil, errors.Trace(err)
  262. }
  263. meekServer.stdTLSConfig = tlsConfig
  264. } else {
  265. tlsConfig, err := meekServer.makeDirectMeekTLSConfig(
  266. useObfuscatedSessionTickets)
  267. if err != nil {
  268. return nil, errors.Trace(err)
  269. }
  270. meekServer.psiphonTLSConfig = tlsConfig
  271. }
  272. }
  273. if useHTTPNormalizer && protocol.TunnelProtocolUsesMeekHTTPNormalizer(listenerTunnelProtocol) {
  274. normalizer := meekServer.makeMeekHTTPNormalizerListener()
  275. meekServer.normalizer = normalizer
  276. meekServer.listener = normalizer
  277. }
  278. // Initialize in-proxy broker service
  279. if support.Config.MeekServerRunInproxyBroker {
  280. if !inproxy.Enabled() {
  281. // Note that, technically, it may be possible to allow this case,
  282. // since PSIPHON_ENABLE_INPROXY is currently required only for
  283. // client/proxy-side WebRTC functionality, although that could change.
  284. return nil, errors.TraceNew("inproxy implementation is not enabled")
  285. }
  286. if support.Config.InproxyBrokerAllowCommonASNMatching {
  287. inproxy.SetAllowCommonASNMatching(true)
  288. }
  289. if support.Config.InproxyBrokerAllowBogonWebRTCConnections {
  290. inproxy.SetAllowBogonWebRTCConnections(true)
  291. }
  292. sessionPrivateKey, err := inproxy.SessionPrivateKeyFromString(
  293. support.Config.InproxyBrokerSessionPrivateKey)
  294. if err != nil {
  295. return nil, errors.Trace(err)
  296. }
  297. obfuscationRootSecret, err := inproxy.ObfuscationSecretFromString(
  298. support.Config.InproxyBrokerObfuscationRootSecret)
  299. if err != nil {
  300. return nil, errors.Trace(err)
  301. }
  302. lookupGeoIPData := func(IP string) common.GeoIPData {
  303. return common.GeoIPData(support.GeoIPService.Lookup(IP))
  304. }
  305. inproxyBroker, err := inproxy.NewBroker(
  306. &inproxy.BrokerConfig{
  307. Logger: CommonLogger(log),
  308. AllowProxy: meekServer.inproxyBrokerAllowProxy,
  309. PrioritizeProxy: meekServer.inproxyBrokerPrioritizeProxy,
  310. AllowClient: meekServer.inproxyBrokerAllowClient,
  311. AllowDomainFrontedDestinations: meekServer.inproxyBrokerAllowDomainFrontedDestinations,
  312. AllowMatch: meekServer.inproxyBrokerAllowMatch,
  313. LookupGeoIP: lookupGeoIPData,
  314. APIParameterValidator: getInproxyBrokerAPIParameterValidator(),
  315. APIParameterLogFieldFormatter: getInproxyBrokerAPIParameterLogFieldFormatter(),
  316. IsValidServerEntryTag: support.PsinetDatabase.IsValidServerEntryTag,
  317. GetTacticsPayload: meekServer.inproxyBrokerGetTacticsPayload,
  318. IsLoadLimiting: meekServer.support.TunnelServer.CheckLoadLimiting,
  319. RelayDSLRequest: meekServer.inproxyBrokerRelayDSLRequest,
  320. PrivateKey: sessionPrivateKey,
  321. ObfuscationRootSecret: obfuscationRootSecret,
  322. ServerEntrySignaturePublicKey: support.Config.InproxyBrokerServerEntrySignaturePublicKey,
  323. })
  324. if err != nil {
  325. return nil, errors.Trace(err)
  326. }
  327. meekServer.inproxyBroker = inproxyBroker
  328. // inproxyReloadTactics initializes compartment ID, timeouts, and
  329. // other broker parameter values from tactics.
  330. err = meekServer.inproxyReloadTactics()
  331. if err != nil {
  332. return nil, errors.Trace(err)
  333. }
  334. }
  335. return meekServer, nil
  336. }
  337. // ReloadTactics signals components to reload tactics and reinitialize as
  338. // required when tactics may have changed.
  339. func (server *MeekServer) ReloadTactics() error {
  340. if server.support.Config.MeekServerRunInproxyBroker {
  341. err := server.inproxyReloadTactics()
  342. if err != nil {
  343. return errors.Trace(err)
  344. }
  345. }
  346. return nil
  347. }
  348. type meekContextKey struct {
  349. key string
  350. }
  351. var meekNetConnContextKey = &meekContextKey{"net.Conn"}
  352. // Run runs the meek server; this function blocks while serving HTTP or
  353. // HTTPS connections on the specified listener. This function also runs
  354. // a goroutine which cleans up expired meek client sessions.
  355. //
  356. // To stop the meek server, both Close() the listener and set the stopBroadcast
  357. // signal specified in NewMeekServer.
  358. func (server *MeekServer) Run() error {
  359. waitGroup := new(sync.WaitGroup)
  360. waitGroup.Add(1)
  361. go func() {
  362. defer waitGroup.Done()
  363. ticker := time.NewTicker(server.maxSessionStaleness / 2)
  364. defer ticker.Stop()
  365. for {
  366. select {
  367. case <-ticker.C:
  368. server.deleteExpiredSessions()
  369. case <-server.stopBroadcast:
  370. return
  371. }
  372. }
  373. }()
  374. waitGroup.Add(1)
  375. go func() {
  376. defer waitGroup.Done()
  377. server.rateLimitWorker()
  378. }()
  379. if server.inproxyBroker != nil {
  380. err := server.inproxyBroker.Start()
  381. if err != nil {
  382. return errors.Trace(err)
  383. }
  384. defer server.inproxyBroker.Stop()
  385. }
  386. // Serve HTTP or HTTPS
  387. //
  388. // - WriteTimeout may include time awaiting request, as per:
  389. // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
  390. //
  391. // - Legacy meek-server wrapped each client HTTP connection with an explicit idle
  392. // timeout net.Conn and didn't use http.Server timeouts. We could do the same
  393. // here (use ActivityMonitoredConn) but the stock http.Server timeouts should
  394. // now be sufficient.
  395. //
  396. // - HTTP/2 is enabled (the default), which is required for efficient
  397. // in-proxy broker connection sharing.
  398. //
  399. // - Any CDN fronting a meek server running an in-proxy broker should be
  400. // configured with timeouts that accomodate the proxy announcement
  401. // request long polling.
  402. httpServer := &http.Server{
  403. ReadTimeout: server.httpClientIOTimeout,
  404. WriteTimeout: server.httpClientIOTimeout,
  405. Handler: server,
  406. ConnState: server.httpConnStateCallback,
  407. ConnContext: func(ctx context.Context, conn net.Conn) context.Context {
  408. return context.WithValue(ctx, meekNetConnContextKey, conn)
  409. },
  410. }
  411. // Note: Serve() will be interrupted by server.listener.Close() call
  412. listener := server.listener
  413. if server.stdTLSConfig != nil {
  414. listener = tls.NewListener(server.listener, server.stdTLSConfig)
  415. } else if server.psiphonTLSConfig != nil {
  416. listener = psiphon_tls.NewListener(server.listener, server.psiphonTLSConfig)
  417. // Disable auto HTTP/2
  418. httpServer.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler))
  419. }
  420. err := httpServer.Serve(listener)
  421. // Can't check for the exact error that Close() will cause in Accept(),
  422. // (see: https://code.google.com/p/go/issues/detail?id=4373). So using an
  423. // explicit stop signal to stop gracefully.
  424. select {
  425. case <-server.stopBroadcast:
  426. err = nil
  427. default:
  428. }
  429. // deleteExpiredSessions calls deleteSession which may block waiting
  430. // for active request handlers to complete; timely shutdown requires
  431. // stopping the listener and closing all existing connections before
  432. // awaiting the reaperWaitGroup.
  433. server.listener.Close()
  434. server.openConns.CloseAll()
  435. waitGroup.Wait()
  436. return err
  437. }
  438. func handleServeHTTPPanic() {
  439. // Disable panic recovery, to ensure panics are captured and logged by
  440. // panicwrap.
  441. //
  442. // The net.http ServeHTTP caller will recover any ServeHTTP panic, so
  443. // re-panic in another goroutine after capturing the panicking goroutine
  444. // call stack.
  445. if r := recover(); r != nil {
  446. var stack [4096]byte
  447. n := runtime.Stack(stack[:], false)
  448. err := errors.Tracef("ServeHTTP panic: %v\n%s", r, stack[:n])
  449. go panic(err.Error())
  450. }
  451. }
  452. // ServeHTTP handles meek client HTTP requests, where the request body
  453. // contains upstream traffic and the response will contain downstream
  454. // traffic.
  455. func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
  456. defer handleServeHTTPPanic()
  457. // Note: no longer requiring that the request method is POST
  458. // Check for required headers and values. For fronting, required headers
  459. // may be used to identify a CDN edge. When this check fails,
  460. // TerminateHTTPConnection is called instead of handleError, so any
  461. // persistent connection is always closed.
  462. if len(server.support.Config.MeekRequiredHeaders) > 0 {
  463. for header, value := range server.support.Config.MeekRequiredHeaders {
  464. requestValue := request.Header.Get(header)
  465. // There's no ConstantTimeCompare for strings. While the
  466. // conversion from string to byte slice may leak the length of
  467. // the expected value, ConstantTimeCompare also takes time that's
  468. // a function of the length of the input byte slices; leaking the
  469. // expected value length isn't a vulnerability as long as the
  470. // secret is long enough and random.
  471. if subtle.ConstantTimeCompare([]byte(requestValue), []byte(value)) != 1 {
  472. log.WithTraceFields(LogFields{
  473. "header": header,
  474. "value": requestValue,
  475. }).Warning("invalid required meek header")
  476. common.TerminateHTTPConnection(responseWriter, request)
  477. return
  478. }
  479. }
  480. }
  481. // Check for the expected meek/session ID cookie. in-proxy broker requests
  482. // do not use or expect a meek cookie (the broker session protocol
  483. // encapsulated in the HTTP request/response payloads has its own
  484. // obfuscation and anti-replay mechanisms).
  485. //
  486. // TODO: log irregular tunnels for unexpected cookie cases?
  487. var meekCookie *http.Cookie
  488. for _, c := range request.Cookies() {
  489. meekCookie = c
  490. break
  491. }
  492. if (meekCookie == nil || len(meekCookie.Value) == 0) &&
  493. !server.support.Config.MeekServerRunInproxyBroker {
  494. log.WithTrace().Warning("missing meek cookie")
  495. common.TerminateHTTPConnection(responseWriter, request)
  496. return
  497. }
  498. if meekCookie != nil && server.support.Config.MeekServerInproxyBrokerOnly {
  499. log.WithTrace().Warning("unexpected meek cookie")
  500. common.TerminateHTTPConnection(responseWriter, request)
  501. return
  502. }
  503. // Check for prohibited HTTP headers.
  504. if len(server.support.Config.MeekProhibitedHeaders) > 0 {
  505. for _, header := range server.support.Config.MeekProhibitedHeaders {
  506. value := request.Header.Get(header)
  507. if header != "" {
  508. log.WithTraceFields(LogFields{
  509. "header": header,
  510. "value": value,
  511. }).Warning("prohibited meek header")
  512. server.handleError(responseWriter, request)
  513. return
  514. }
  515. }
  516. }
  517. // A valid meek cookie indicates which class of request this is:
  518. //
  519. // 1. A new meek session. Create a new session ID and proceed with
  520. // relaying tunnel traffic.
  521. //
  522. // 2. An existing meek session. Resume relaying tunnel traffic.
  523. //
  524. // 3. A request to an endpoint. This meek connection is not for relaying
  525. // tunnel traffic. Instead, the request is handed off to a custom handler.
  526. //
  527. // In the in-proxy broker case, there is no meek cookie, which avoids the
  528. // size and resource overhead of sending and processing a meek cookie
  529. // with each endpoint request.
  530. //
  531. // The broker session protocol encapsulated in the HTTP request/response
  532. // payloads has its own obfuscation and anti-replay mechanisms.
  533. //
  534. // In RunInproxyBroker mode, non-meek cookie requests are routed to the
  535. // in-proxy broker. getSessionOrEndpoint is still invoked in all cases,
  536. // to process GeoIP headers, invoke the meek rate limiter, etc.
  537. //
  538. // Limitations:
  539. //
  540. // - Adding arbirary cookies, as camouflage for plain HTTP for example, is
  541. // not supported.
  542. //
  543. // - the HTTP normalizer depends on the meek cookie
  544. // (see makeMeekHTTPNormalizerListener) so RunInproxyBroker mode is
  545. // incompatible with the HTTP normalizer.
  546. sessionID,
  547. session,
  548. underlyingConn,
  549. endPoint,
  550. endPointClientIP,
  551. endPointGeoIPData,
  552. err := server.getSessionOrEndpoint(request, meekCookie)
  553. if err != nil {
  554. // Debug since session cookie errors commonly occur during
  555. // normal operation.
  556. log.WithTraceFields(LogFields{"error": err}).Debug("session lookup failed")
  557. server.handleError(responseWriter, request)
  558. return
  559. }
  560. if endPoint != "" {
  561. // Route to endpoint handlers and return.
  562. handled := false
  563. switch endPoint {
  564. case tactics.TACTICS_END_POINT, tactics.SPEED_TEST_END_POINT:
  565. handled = server.support.TacticsServer.HandleEndPoint(
  566. endPoint,
  567. common.GeoIPData(*endPointGeoIPData),
  568. responseWriter,
  569. request)
  570. // Currently, TacticsServer.HandleEndPoint handles returning a 404 instead
  571. // leaving that up to server.handleError.
  572. //
  573. // TODO: call server.handleError, for its isFronting special case.
  574. case inproxy.BrokerEndPointName:
  575. handled = true
  576. err := server.inproxyBrokerHandler(
  577. endPointClientIP,
  578. common.GeoIPData(*endPointGeoIPData),
  579. responseWriter,
  580. request)
  581. if err != nil {
  582. var brokerLoggedEvent *inproxy.BrokerLoggedEvent
  583. var deobfuscationAnomoly *inproxy.DeobfuscationAnomoly
  584. alreadyLogged := std_errors.As(err, &brokerLoggedEvent) ||
  585. std_errors.As(err, &deobfuscationAnomoly)
  586. if !alreadyLogged {
  587. log.WithTraceFields(
  588. LogFields{"error": err}).Warning("inproxyBrokerHandler failed")
  589. }
  590. server.handleError(responseWriter, request)
  591. }
  592. }
  593. if !handled {
  594. log.WithTraceFields(LogFields{"endPoint": endPoint}).Warning("unhandled endpoint")
  595. server.handleError(responseWriter, request)
  596. }
  597. return
  598. }
  599. // Tunnel relay mode.
  600. // Ensure that there's only one concurrent request handler per client
  601. // session. Depending on the nature of a network disruption, it can
  602. // happen that a client detects a failure and retries while the server
  603. // is still streaming response in the handler for the _previous_ client
  604. // request.
  605. //
  606. // Even if the session.cachedResponse were safe for concurrent
  607. // use (it is not), concurrent handling could lead to loss of session
  608. // since upstream data read by the first request may not reach the
  609. // cached response before the second request reads the cached data.
  610. //
  611. // The existing handler will stream response data, holding the lock,
  612. // for no more than MEEK_EXTENDED_TURN_AROUND_TIMEOUT.
  613. //
  614. // TODO: interrupt an existing handler? The existing handler will be
  615. // sending data to the cached response, but if that buffer fills, the
  616. // session will be lost.
  617. requestNumber := session.requestCount.Add(1)
  618. // Wait for the existing request to complete.
  619. session.lock.Lock()
  620. defer session.lock.Unlock()
  621. // Count this metric once the lock is acquired, to avoid concurrent and
  622. // potentially incorrect session.underlyingConn updates.
  623. //
  624. // It should never be the case that a new underlyingConn has the same
  625. // value as the previous session.underlyingConn, as each is a net.Conn
  626. // interface which includes a pointer, and the previous value cannot
  627. // be garbage collected until session.underlyingConn is updated.
  628. if session.underlyingConn != underlyingConn {
  629. session.metricUnderlyingConnCount.Add(1)
  630. session.underlyingConn = underlyingConn
  631. }
  632. // If a newer request has arrived while waiting, discard this one.
  633. // Do not delay processing the newest request.
  634. //
  635. // If the session expired and was deleted while this request was waiting,
  636. // discard this request. The session is no longer valid, and the final call
  637. // to session.cachedResponse.Reset may have already occured, so any further
  638. // session.cachedResponse access may deplete resources (fail to refill the pool).
  639. if session.requestCount.Load() > requestNumber || session.deleted {
  640. common.TerminateHTTPConnection(responseWriter, request)
  641. return
  642. }
  643. // pumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
  644. // read the request body as upstream traffic.
  645. // TODO: run pumpReads and pumpWrites concurrently?
  646. // pumpReads checksums the request payload and skips relaying it when
  647. // it matches the immediately previous request payload. This allows
  648. // clients to resend request payloads, when retrying due to connection
  649. // interruption, without knowing whether the server has received or
  650. // relayed the data.
  651. requestSize, err := session.clientConn.pumpReads(request.Body)
  652. if err != nil {
  653. if err != io.EOF {
  654. // Debug since errors such as "i/o timeout" occur during normal operation;
  655. // also, golang network error messages may contain client IP.
  656. log.WithTraceFields(LogFields{"error": err}).Debug("read request failed")
  657. }
  658. common.TerminateHTTPConnection(responseWriter, request)
  659. // Note: keep session open to allow client to retry
  660. return
  661. }
  662. // The extended turn around mechanism optimizes for downstream flows by
  663. // sending more data in the response as long as it's available. As a
  664. // heuristic, when the request size meets a threshold, optimize instead
  665. // of upstream flows by skipping the extended turn around.
  666. skipExtendedTurnAround := requestSize >= int64(server.skipExtendedTurnAroundThreshold)
  667. // Set cookie before writing the response.
  668. if session.meekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 && !session.sessionIDSent {
  669. // Replace the meek cookie with the session ID.
  670. // SetCookie for the the session ID cookie is only set once, to reduce overhead. This
  671. // session ID value replaces the original meek cookie value.
  672. http.SetCookie(responseWriter, &http.Cookie{Name: meekCookie.Name, Value: sessionID})
  673. session.sessionIDSent = true
  674. }
  675. // When streaming data into the response body, a copy is
  676. // retained in the cachedResponse buffer. This allows the
  677. // client to retry and request that the response be resent
  678. // when the HTTP connection is interrupted.
  679. //
  680. // If a Range header is present, the client is retrying,
  681. // possibly after having received a partial response. In
  682. // this case, use any cached response to attempt to resend
  683. // the response, starting from the resend position the client
  684. // indicates.
  685. //
  686. // When the resend position is not available -- because the
  687. // cachedResponse buffer could not hold it -- the client session
  688. // is closed, as there's no way to resume streaming the payload
  689. // uninterrupted.
  690. //
  691. // The client may retry before a cached response is prepared,
  692. // so a cached response is not always used when a Range header
  693. // is present.
  694. //
  695. // TODO: invalid Range header is ignored; should it be otherwise?
  696. position, isRetry := checkRangeHeader(request)
  697. if isRetry {
  698. session.metricClientRetries.Add(1)
  699. }
  700. hasCompleteCachedResponse := session.cachedResponse.HasPosition(0)
  701. // The client is not expected to send position > 0 when there is
  702. // no cached response; let that case fall through to the next
  703. // HasPosition check which will fail and close the session.
  704. var responseSize int
  705. var responseError error
  706. if isRetry && (hasCompleteCachedResponse || position > 0) {
  707. if !session.cachedResponse.HasPosition(position) {
  708. greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
  709. server.handleError(responseWriter, request)
  710. session.delete(true)
  711. return
  712. }
  713. responseWriter.WriteHeader(http.StatusPartialContent)
  714. // TODO: cachedResponse can now start releasing extended buffers, as
  715. // response bytes before "position" will never be requested again?
  716. responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter)
  717. greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize))
  718. // The client may again fail to receive the payload and may again
  719. // retry, so not yet releasing cachedResponse buffers.
  720. } else {
  721. // _Now_ we release buffers holding data from the previous
  722. // response. And then immediately stream the new response into
  723. // newly acquired buffers.
  724. session.cachedResponse.Reset()
  725. // Note: this code depends on an implementation detail of
  726. // io.MultiWriter: a Write() to the MultiWriter writes first
  727. // to the cache, and then to the response writer. So if the
  728. // write to the response writer fails, the payload is cached.
  729. multiWriter := io.MultiWriter(session.cachedResponse, responseWriter)
  730. // The client expects 206, not 200, whenever it sets a Range header,
  731. // which it may do even when no cached response is prepared.
  732. if isRetry {
  733. responseWriter.WriteHeader(http.StatusPartialContent)
  734. }
  735. // pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
  736. // write its downstream traffic through to the response body.
  737. // Limitation: pumpWrites may write more response bytes than can be
  738. // cached for future retries, either due to no extended buffers
  739. // available, or exceeding the per-client extended buffer limit. In
  740. // practice, with throttling in place and servers running under load
  741. // limiting, metrics indicate that this rarely occurs. A potential
  742. // future enhancement could be for pumpWrites to stop writing and
  743. // send the response once there's no buffers remaining, favoring
  744. // connection resilience over performance.
  745. //
  746. // TODO: use geo-targeted per-client extended buffer limit to reserve
  747. // extended cache buffers for regions or ISPs with active or expected
  748. // network connection interruptions?
  749. responseSize, responseError = session.clientConn.pumpWrites(multiWriter, skipExtendedTurnAround)
  750. greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
  751. greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
  752. }
  753. // responseError is the result of writing the body either from CopyFromPosition or pumpWrites
  754. if responseError != nil {
  755. if responseError != io.EOF {
  756. // Debug since errors such as "i/o timeout" occur during normal operation;
  757. // also, golang network error messages may contain client IP.
  758. log.WithTraceFields(LogFields{"error": responseError}).Debug("write response failed")
  759. }
  760. server.handleError(responseWriter, request)
  761. // Note: keep session open to allow client to retry
  762. return
  763. }
  764. }
  765. func (server *MeekServer) handleError(responseWriter http.ResponseWriter, request *http.Request) {
  766. // When fronted, keep the persistent connection open since it may be used
  767. // by many clients coming through the same edge. For performance reasons,
  768. // an error, including invalid input, from one client shouldn't close the
  769. // persistent connection used by other clients.
  770. if server.isFronted {
  771. http.NotFound(responseWriter, request)
  772. return
  773. }
  774. common.TerminateHTTPConnection(responseWriter, request)
  775. }
  776. func checkRangeHeader(request *http.Request) (int, bool) {
  777. rangeHeader := request.Header.Get("Range")
  778. if rangeHeader == "" {
  779. return 0, false
  780. }
  781. prefix := "bytes="
  782. suffix := "-"
  783. if !strings.HasPrefix(rangeHeader, prefix) ||
  784. !strings.HasSuffix(rangeHeader, suffix) {
  785. return 0, false
  786. }
  787. rangeHeader = strings.TrimPrefix(rangeHeader, prefix)
  788. rangeHeader = strings.TrimSuffix(rangeHeader, suffix)
  789. position, err := strconv.Atoi(rangeHeader)
  790. if err != nil {
  791. return 0, false
  792. }
  793. return position, true
  794. }
  795. // getSessionOrEndpoint checks if the cookie corresponds to an existing tunnel
  796. // relay session ID. If no session is found, the cookie must be an obfuscated
  797. // meek cookie. A new session is created when the meek cookie indicates relay
  798. // mode; or the endpoint is returned when the meek cookie indicates endpoint
  799. // mode.
  800. //
  801. // For performance reasons, in-proxy broker requests are allowed to omit the
  802. // meek cookie and pass in nil for meekCookie; getSessionOrEndpoint still
  803. // performs rate limiting and header handling for the in-proxy broker case.
  804. func (server *MeekServer) getSessionOrEndpoint(
  805. request *http.Request,
  806. meekCookie *http.Cookie) (string, *meekSession, net.Conn, string, string, *GeoIPData, error) {
  807. underlyingConn := request.Context().Value(meekNetConnContextKey).(net.Conn)
  808. // Check for an existing meek tunnel session.
  809. if meekCookie != nil {
  810. server.sessionsLock.RLock()
  811. existingSessionID := meekCookie.Value
  812. session, ok := server.sessions[existingSessionID]
  813. server.sessionsLock.RUnlock()
  814. if ok {
  815. // TODO: can multiple http client connections using same session cookie
  816. // cause race conditions on session struct?
  817. session.touch()
  818. return existingSessionID, session, underlyingConn, "", "", nil, nil
  819. }
  820. }
  821. // TODO: rename clientIP to peerIP to reflect the new terminology used in
  822. // psiphon/server code where the immediate peer may be an in-proxy proxy,
  823. // not the client.
  824. // Determine the client or peer remote address, which is used for
  825. // geolocation stats, rate limiting, anti-probing, discovery, and tactics
  826. // selection logic.
  827. //
  828. // When an intermediate proxy or CDN is in use, we may be
  829. // able to determine the original client address by inspecting HTTP
  830. // headers such as X-Forwarded-For.
  831. //
  832. // We trust only headers provided by CDNs. Fronted Psiphon server hosts
  833. // should be configured to accept tunnel connections only from CDN edges.
  834. // When the CDN passes along a chain of IPs, as in X-Forwarded-For, we
  835. // trust only the right-most IP, which is provided by the CDN.
  836. clientIP, _, err := net.SplitHostPort(request.RemoteAddr)
  837. if err != nil {
  838. return "", nil, nil, "", "", nil, errors.Trace(err)
  839. }
  840. if net.ParseIP(clientIP) == nil {
  841. return "", nil, nil, "", "", nil, errors.TraceNew("invalid IP address")
  842. }
  843. if server.isFronted && len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
  844. // When there are multiple header names in MeekProxyForwardedForHeaders,
  845. // the first valid match is preferred. MeekProxyForwardedForHeaders should be
  846. // configured to use header names that are always provided by the CDN(s) and
  847. // not header names that may be passed through from clients.
  848. for _, header := range server.support.Config.MeekProxyForwardedForHeaders {
  849. // In the case where there are multiple headers,
  850. // request.Header.Get returns the first header, but we want the
  851. // last header; so use request.Header.Values and select the last
  852. // value. As per RFC 2616 section 4.2, a proxy must not change
  853. // the order of field values, which implies that it should append
  854. // values to the last header.
  855. values := request.Header.Values(header)
  856. if len(values) > 0 {
  857. value := values[len(values)-1]
  858. // Some headers, such as X-Forwarded-For, are a comma-separated
  859. // list of IPs (each proxy in a chain). Select the last IP.
  860. IPs := strings.Split(value, ",")
  861. IP := IPs[len(IPs)-1]
  862. // Remove optional whitespace surrounding the commas.
  863. IP = strings.TrimSpace(IP)
  864. if net.ParseIP(IP) != nil {
  865. clientIP = IP
  866. break
  867. }
  868. }
  869. }
  870. }
  871. geoIPData := server.support.GeoIPService.Lookup(clientIP)
  872. // Check for a steering IP header, which contains an alternate dial IP to
  873. // be returned to the client via the secure API handshake response.
  874. // Steering may be used to load balance CDN traffic.
  875. //
  876. // The steering IP header is added by a CDN or CDN service process. To
  877. // prevent steering IP spoofing, the service process must filter out any
  878. // steering IP headers injected into ingress requests.
  879. //
  880. // Steering IP headers must appear in the first request of a meek session
  881. // in order to be recorded here and relayed to the client.
  882. var steeringIP string
  883. if server.isFronted && server.support.Config.EnableSteeringIPs {
  884. steeringIP = request.Header.Get("X-Psiphon-Steering-Ip")
  885. if steeringIP != "" {
  886. IP := net.ParseIP(steeringIP)
  887. if IP == nil || common.IsBogon(IP) {
  888. steeringIP = ""
  889. log.WithTraceFields(LogFields{"steeringIP": steeringIP}).Warning("invalid steering IP")
  890. }
  891. }
  892. }
  893. // The session is new (or expired). Treat the cookie value as a new meek
  894. // cookie, extract the payload, and create a new session.
  895. // Limitation: when the cookie is a session ID for an expired session, we
  896. // still attempt to treat it as a meek cookie. As it stands, that yields
  897. // either base64 decoding errors (RawStdEncoding vs. StdEncoding) or
  898. // length errors. We could log cleaner errors ("session is expired") by
  899. // checking that the cookie is a well-formed (base64.RawStdEncoding) value
  900. // between MEEK_MIN_SESSION_ID_LENGTH and MEEK_MAX_SESSION_ID_LENGTH
  901. // bytes -- assuming that MEEK_MAX_SESSION_ID_LENGTH is too short to be a
  902. // valid meek cookie.
  903. var payloadJSON []byte
  904. if server.normalizer != nil {
  905. // Limitation: RunInproxyBroker mode with no meek cookies is not
  906. // compatible with the HTTP normalizer.
  907. // NOTE: operates on the assumption that the normalizer is not wrapped
  908. // with a further conn.
  909. underlyingConn := request.Context().Value(meekNetConnContextKey).(net.Conn)
  910. normalizedConn := underlyingConn.(*transforms.HTTPNormalizer)
  911. payloadJSON = normalizedConn.ValidateMeekCookieResult
  912. } else {
  913. if meekCookie != nil {
  914. payloadJSON, err = server.getMeekCookiePayload(clientIP, meekCookie.Value)
  915. if err != nil {
  916. return "", nil, nil, "", "", nil, errors.Trace(err)
  917. }
  918. }
  919. }
  920. // Note: this meek server ignores legacy values PsiphonClientSessionId
  921. // and PsiphonServerAddress.
  922. var clientSessionData protocol.MeekCookieData
  923. if meekCookie != nil {
  924. err = json.Unmarshal(payloadJSON, &clientSessionData)
  925. if err != nil {
  926. return "", nil, nil, "", "", nil, errors.Trace(err)
  927. }
  928. } else {
  929. // Assume the in-proxy broker endpoint when there's no meek cookie.
  930. clientSessionData.EndPoint = inproxy.BrokerEndPointName
  931. }
  932. // Any rate limit is enforced after the meek cookie is validated, so a prober
  933. // without the obfuscation secret will be unable to fingerprint the server
  934. // based on response time combined with the rate limit configuration. The
  935. // rate limit is primarily intended to limit memory resource consumption and
  936. // not the overhead incurred by cookie validation.
  937. //
  938. // The meek rate limit is applied to new meek tunnel sessions and tactics
  939. // requests, both of which may reasonably be limited to as low as 1 event
  940. // per time period. The in-proxy broker is excluded from meek rate
  941. // limiting since it has its own rate limiter and in-proxy requests are
  942. // allowed to be more frequent.
  943. if clientSessionData.EndPoint != inproxy.BrokerEndPointName &&
  944. server.rateLimit(clientIP, geoIPData, server.listenerTunnelProtocol) {
  945. return "", nil, nil, "", "", nil, errors.TraceNew("rate limit exceeded")
  946. }
  947. // Handle endpoints before enforcing CheckEstablishTunnels.
  948. // Currently, endpoints are tactics requests, and we allow these to be
  949. // handled by servers which would otherwise reject new tunnels.
  950. if clientSessionData.EndPoint != "" {
  951. return "", nil, nil, clientSessionData.EndPoint, clientIP, &geoIPData, nil
  952. }
  953. // After this point, for the meek tunnel new session case, a meek cookie
  954. // is required and meekCookie must not be nil.
  955. if meekCookie == nil {
  956. return "", nil, nil, "", "", nil, errors.TraceNew("missing meek cookie")
  957. }
  958. // Don't create new sessions when not establishing. A subsequent SSH handshake
  959. // will not succeed, so creating a meek session just wastes resources.
  960. if server.support.TunnelServer != nil &&
  961. !server.support.TunnelServer.CheckEstablishTunnels() {
  962. return "", nil, nil, "", "", nil, errors.TraceNew("not establishing tunnels")
  963. }
  964. // Disconnect immediately if the tactics for the client restricts usage of
  965. // the fronting provider ID. The probability may be used to influence
  966. // usage of a given fronting provider; but when only that provider works
  967. // for a given client, and the probability is less than 1.0, the client
  968. // can retry until it gets a successful coin flip.
  969. //
  970. // Clients will also skip candidates with restricted fronting provider IDs.
  971. // The client-side probability, RestrictFrontingProviderIDsClientProbability,
  972. // is applied independently of the server-side coin flip here.
  973. //
  974. // At this stage, GeoIP tactics filters are active, but handshake API
  975. // parameters are not.
  976. //
  977. // See the comment in server.LoadConfig regarding fronting provider ID
  978. // limitations.
  979. if protocol.TunnelProtocolUsesFrontedMeek(server.listenerTunnelProtocol) &&
  980. server.support.ServerTacticsParametersCache != nil {
  981. p, err := server.support.ServerTacticsParametersCache.Get(geoIPData)
  982. if err != nil {
  983. return "", nil, nil, "", "", nil, errors.Trace(err)
  984. }
  985. if !p.IsNil() &&
  986. common.Contains(
  987. p.Strings(parameters.RestrictFrontingProviderIDs),
  988. server.support.Config.GetFrontingProviderID()) {
  989. if p.WeightedCoinFlip(
  990. parameters.RestrictFrontingProviderIDsServerProbability) {
  991. return "", nil, nil, "", "", nil, errors.TraceNew("restricted fronting provider")
  992. }
  993. }
  994. }
  995. // The tunnel protocol name is used for stats and traffic rules. In many
  996. // cases, its value is unambiguously determined by the listener port. In
  997. // certain cases, such as multiple fronted protocols with a single
  998. // backend listener, the client's reported tunnel protocol value is used.
  999. // The caller must validate clientTunnelProtocol with
  1000. // protocol.IsValidClientTunnelProtocol.
  1001. var clientTunnelProtocol string
  1002. if clientSessionData.ClientTunnelProtocol != "" {
  1003. if !protocol.IsValidClientTunnelProtocol(
  1004. clientSessionData.ClientTunnelProtocol,
  1005. server.listenerTunnelProtocol,
  1006. server.support.Config.GetRunningProtocols()) {
  1007. return "", nil, nil, "", "", nil, errors.Tracef(
  1008. "invalid client tunnel protocol: %s", clientSessionData.ClientTunnelProtocol)
  1009. }
  1010. clientTunnelProtocol = clientSessionData.ClientTunnelProtocol
  1011. }
  1012. // Create a new session
  1013. bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
  1014. if server.support.Config.MeekCachedResponseBufferSize != 0 {
  1015. bufferLength = server.support.Config.MeekCachedResponseBufferSize
  1016. }
  1017. cachedResponse := NewCachedResponse(bufferLength, server.bufferPool)
  1018. // The cookie name, Content-Type, and HTTP version of the first request in
  1019. // the session are recorded for stats. It's possible, but not expected,
  1020. // that later requests will have different values.
  1021. session := &meekSession{
  1022. meekProtocolVersion: clientSessionData.MeekProtocolVersion,
  1023. sessionIDSent: false,
  1024. cachedResponse: cachedResponse,
  1025. cookieName: meekCookie.Name,
  1026. contentType: request.Header.Get("Content-Type"),
  1027. httpVersion: request.Proto,
  1028. }
  1029. session.touch()
  1030. // Create a new meek conn that will relay the payload
  1031. // between meek request/responses and the tunnel server client
  1032. // handler. The client IP is also used to initialize the
  1033. // meek conn with a useful value to return when the tunnel
  1034. // server calls conn.RemoteAddr() to get the client's IP address.
  1035. // Assumes clientIP is a valid IP address; the port value is a stub
  1036. // and is expected to be ignored.
  1037. clientConn := newMeekConn(
  1038. server,
  1039. session,
  1040. underlyingConn,
  1041. &net.TCPAddr{
  1042. IP: net.ParseIP(clientIP),
  1043. Port: 0,
  1044. },
  1045. clientSessionData.MeekProtocolVersion)
  1046. session.clientConn = clientConn
  1047. // Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
  1048. // meek cookie to a session ID; v1 clients always send the
  1049. // original meek cookie value with each request. The issue with
  1050. // v1 is that clients which wake after a device sleep will attempt
  1051. // to resume a meek session and the server can't differentiate
  1052. // between resuming a session and creating a new session. This
  1053. // causes the v1 client connection to hang/timeout.
  1054. sessionID := meekCookie.Value
  1055. if clientSessionData.MeekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 {
  1056. sessionID, err = makeMeekSessionID()
  1057. if err != nil {
  1058. return "", nil, nil, "", "", nil, errors.Trace(err)
  1059. }
  1060. }
  1061. server.sessionsLock.Lock()
  1062. // MEEK_MAX_SESSION_COUNT is a simple sanity check and failsafe. Load
  1063. // limiting tuned to each server's host resources is provided by external
  1064. // components. See comment in newSSHServer for more details.
  1065. if len(server.sessions) >= MEEK_MAX_SESSION_COUNT {
  1066. server.sessionsLock.Unlock()
  1067. err := std_errors.New("MEEK_MAX_SESSION_COUNT exceeded")
  1068. log.WithTrace().Warning(err.Error())
  1069. return "", nil, nil, "", "", nil, errors.Trace(err)
  1070. }
  1071. server.sessions[sessionID] = session
  1072. server.sessionsLock.Unlock()
  1073. var additionalData *additionalTransportData
  1074. if clientTunnelProtocol != "" || steeringIP != "" {
  1075. additionalData = &additionalTransportData{
  1076. overrideTunnelProtocol: clientTunnelProtocol,
  1077. steeringIP: steeringIP,
  1078. }
  1079. }
  1080. // Note: from the tunnel server's perspective, this client connection
  1081. // will close when session.delete calls Close() on the meekConn.
  1082. server.clientHandler(session.clientConn, additionalData)
  1083. return sessionID, session, underlyingConn, "", "", nil, nil
  1084. }
  1085. func (server *MeekServer) rateLimit(
  1086. clientIP string, geoIPData GeoIPData, tunnelProtocol string) bool {
  1087. historySize,
  1088. thresholdSeconds,
  1089. tunnelProtocols,
  1090. regions,
  1091. ISPs,
  1092. ASNs,
  1093. cities,
  1094. GCTriggerCount, _, _ :=
  1095. server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
  1096. if historySize == 0 {
  1097. return false
  1098. }
  1099. if len(tunnelProtocols) > 0 {
  1100. if !common.Contains(tunnelProtocols, tunnelProtocol) {
  1101. return false
  1102. }
  1103. }
  1104. if len(regions) > 0 || len(ISPs) > 0 || len(ASNs) > 0 || len(cities) > 0 {
  1105. if len(regions) > 0 {
  1106. if !common.Contains(regions, geoIPData.Country) {
  1107. return false
  1108. }
  1109. }
  1110. if len(ISPs) > 0 {
  1111. if !common.Contains(ISPs, geoIPData.ISP) {
  1112. return false
  1113. }
  1114. }
  1115. if len(ASNs) > 0 {
  1116. if !common.Contains(ASNs, geoIPData.ASN) {
  1117. return false
  1118. }
  1119. }
  1120. if len(cities) > 0 {
  1121. if !common.Contains(cities, geoIPData.City) {
  1122. return false
  1123. }
  1124. }
  1125. }
  1126. // With IPv6, individual users or sites are users commonly allocated a /64
  1127. // or /56, so rate limit by /56.
  1128. rateLimitIP := clientIP
  1129. IP := net.ParseIP(clientIP)
  1130. if IP != nil && IP.To4() == nil {
  1131. rateLimitIP = IP.Mask(net.CIDRMask(56, 128)).String()
  1132. }
  1133. // go-cache-lru is safe for concurrent access, but lacks an atomic
  1134. // compare-and-set type operations to check if an entry exists before
  1135. // adding a new one. This mutex ensures the Get and Add are atomic
  1136. // (as well as synchronizing access to rateLimitCount).
  1137. server.rateLimitLock.Lock()
  1138. var rateLimiter *rate.Limiter
  1139. entry, ok := server.rateLimitHistory.Get(rateLimitIP)
  1140. if ok {
  1141. rateLimiter = entry.(*rate.Limiter)
  1142. } else {
  1143. // Set bursts to 1, which is appropriate for new meek tunnels and
  1144. // tactics requests.
  1145. limit := float64(historySize) / float64(thresholdSeconds)
  1146. bursts := 1
  1147. rateLimiter = rate.NewLimiter(rate.Limit(limit), bursts)
  1148. server.rateLimitHistory.Set(
  1149. rateLimitIP,
  1150. rateLimiter,
  1151. time.Duration(thresholdSeconds)*time.Second)
  1152. }
  1153. limit := !rateLimiter.Allow()
  1154. triggerGC := false
  1155. if limit {
  1156. server.rateLimitCount += 1
  1157. if server.rateLimitCount >= GCTriggerCount {
  1158. triggerGC = true
  1159. server.rateLimitCount = 0
  1160. }
  1161. }
  1162. server.rateLimitLock.Unlock()
  1163. if triggerGC {
  1164. select {
  1165. case server.rateLimitSignalGC <- struct{}{}:
  1166. default:
  1167. }
  1168. }
  1169. return limit
  1170. }
  1171. func (server *MeekServer) rateLimitWorker() {
  1172. for {
  1173. select {
  1174. case <-server.rateLimitSignalGC:
  1175. runtime.GC()
  1176. case <-server.stopBroadcast:
  1177. return
  1178. }
  1179. }
  1180. }
  1181. func (server *MeekServer) deleteSession(sessionID string) {
  1182. // Don't obtain the server.sessionsLock write lock until modifying
  1183. // server.sessions, as the session.delete can block for up to
  1184. // MEEK_HTTP_CLIENT_IO_TIMEOUT. Allow new sessions to be added
  1185. // concurrently.
  1186. //
  1187. // Since a lock isn't held for the duration, concurrent calls to
  1188. // deleteSession with the same sessionID could happen; this is
  1189. // not expected since only the reaper goroutine calls deleteExpiredSessions
  1190. // (and in any case concurrent execution of the ok block is not an issue).
  1191. server.sessionsLock.RLock()
  1192. session, ok := server.sessions[sessionID]
  1193. server.sessionsLock.RUnlock()
  1194. if ok {
  1195. session.delete(false)
  1196. server.sessionsLock.Lock()
  1197. delete(server.sessions, sessionID)
  1198. server.sessionsLock.Unlock()
  1199. }
  1200. }
  1201. func (server *MeekServer) deleteExpiredSessions() {
  1202. // A deleteSession call may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  1203. // so grab a snapshot list of expired sessions and do not hold a lock for
  1204. // the duration of deleteExpiredSessions. This allows new sessions to be
  1205. // added concurrently.
  1206. //
  1207. // New sessions added after the snapshot is taken will be checked for
  1208. // expiry on subsequent periodic calls to deleteExpiredSessions.
  1209. //
  1210. // To avoid long delays in releasing resources, individual deletes are
  1211. // performed concurrently.
  1212. server.sessionsLock.Lock()
  1213. expiredSessionIDs := make([]string, 0)
  1214. for sessionID, session := range server.sessions {
  1215. if session.expired() {
  1216. expiredSessionIDs = append(expiredSessionIDs, sessionID)
  1217. }
  1218. }
  1219. server.sessionsLock.Unlock()
  1220. start := time.Now()
  1221. deleteWaitGroup := new(sync.WaitGroup)
  1222. for _, sessionID := range expiredSessionIDs {
  1223. deleteWaitGroup.Add(1)
  1224. go func(sessionID string) {
  1225. defer deleteWaitGroup.Done()
  1226. server.deleteSession(sessionID)
  1227. }(sessionID)
  1228. }
  1229. deleteWaitGroup.Wait()
  1230. log.WithTraceFields(
  1231. LogFields{"elapsed time": time.Since(start)}).Debug("deleted expired sessions")
  1232. }
  1233. // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
  1234. // meek server.
  1235. func (server *MeekServer) httpConnStateCallback(conn net.Conn, connState http.ConnState) {
  1236. switch connState {
  1237. case http.StateNew:
  1238. server.openConns.Add(conn)
  1239. case http.StateHijacked, http.StateClosed:
  1240. server.openConns.Remove(conn)
  1241. }
  1242. }
  1243. // getMeekCookiePayload extracts the payload from a meek cookie. The cookie
  1244. // payload is base64 encoded, obfuscated, and NaCl encrypted.
  1245. func (server *MeekServer) getMeekCookiePayload(
  1246. clientIP string, cookieValue string) ([]byte, error) {
  1247. decodedValue, err := base64.StdEncoding.DecodeString(cookieValue)
  1248. if err != nil {
  1249. return nil, errors.Trace(err)
  1250. }
  1251. // The data consists of an obfuscated seed message prepended
  1252. // to the obfuscated, encrypted payload. The server obfuscator
  1253. // will read the seed message, leaving the remaining encrypted
  1254. // data in the reader.
  1255. reader := bytes.NewReader(decodedValue[:])
  1256. obfuscator, err := obfuscator.NewServerObfuscator(
  1257. &obfuscator.ObfuscatorConfig{
  1258. Keyword: server.support.Config.MeekObfuscatedKey,
  1259. SeedHistory: server.obfuscatorSeedHistory,
  1260. IrregularLogger: func(clientIP string, err error, logFields common.LogFields) {
  1261. logIrregularTunnel(
  1262. server.support,
  1263. server.listenerTunnelProtocol,
  1264. server.listenerPort,
  1265. clientIP,
  1266. errors.Trace(err),
  1267. LogFields(logFields))
  1268. },
  1269. // To allow for meek retries, replay of the same meek cookie is
  1270. // permitted (but only from the same source IP).
  1271. DisableStrictHistoryMode: true,
  1272. },
  1273. clientIP,
  1274. reader)
  1275. if err != nil {
  1276. return nil, errors.Trace(err)
  1277. }
  1278. offset, err := reader.Seek(0, 1)
  1279. if err != nil {
  1280. return nil, errors.Trace(err)
  1281. }
  1282. encryptedPayload := decodedValue[offset:]
  1283. obfuscator.ObfuscateClientToServer(encryptedPayload)
  1284. var nonce [24]byte
  1285. var privateKey, ephemeralPublicKey [32]byte
  1286. decodedPrivateKey, err := base64.StdEncoding.DecodeString(
  1287. server.support.Config.MeekCookieEncryptionPrivateKey)
  1288. if err != nil {
  1289. return nil, errors.Trace(err)
  1290. }
  1291. copy(privateKey[:], decodedPrivateKey)
  1292. if len(encryptedPayload) < 32 {
  1293. return nil, errors.TraceNew("unexpected encrypted payload size")
  1294. }
  1295. copy(ephemeralPublicKey[0:32], encryptedPayload[0:32])
  1296. payload, ok := box.Open(nil, encryptedPayload[32:], &nonce, &ephemeralPublicKey, &privateKey)
  1297. if !ok {
  1298. return nil, errors.TraceNew("open box failed")
  1299. }
  1300. return payload, nil
  1301. }
  1302. func (server *MeekServer) getWebServerCertificate() ([]byte, []byte, error) {
  1303. var certificate, privateKey string
  1304. if server.support.Config.MeekServerCertificate != "" {
  1305. certificate = server.support.Config.MeekServerCertificate
  1306. privateKey = server.support.Config.MeekServerPrivateKey
  1307. } else {
  1308. var err error
  1309. certificate, privateKey, _, err = common.GenerateWebServerCertificate(values.GetHostName())
  1310. if err != nil {
  1311. return nil, nil, errors.Trace(err)
  1312. }
  1313. }
  1314. return []byte(certificate), []byte(privateKey), nil
  1315. }
  1316. // makeFrontedMeekTLSConfig creates a TLS config for a fronted meek HTTPS
  1317. // listener.
  1318. func (server *MeekServer) makeFrontedMeekTLSConfig() (*tls.Config, error) {
  1319. certificate, privateKey, err := server.getWebServerCertificate()
  1320. if err != nil {
  1321. return nil, errors.Trace(err)
  1322. }
  1323. tlsCertificate, err := tls.X509KeyPair(
  1324. []byte(certificate), []byte(privateKey))
  1325. if err != nil {
  1326. return nil, errors.Trace(err)
  1327. }
  1328. // Vary the minimum version to frustrate scanning/fingerprinting of unfronted servers.
  1329. // Limitation: like the certificate, this value changes on restart.
  1330. minVersionCandidates := []uint16{tls.VersionTLS10, tls.VersionTLS11, tls.VersionTLS12}
  1331. minVersion := minVersionCandidates[prng.Intn(len(minVersionCandidates))]
  1332. // This is a reordering of the supported CipherSuites in golang 1.6[*]. Non-ephemeral key
  1333. // CipherSuites greatly reduce server load, and we try to select these since the meek
  1334. // protocol is providing obfuscation, not privacy/integrity (this is provided by the
  1335. // tunneled SSH), so we don't benefit from the perfect forward secrecy property provided
  1336. // by ephemeral key CipherSuites.
  1337. // https://github.com/golang/go/blob/1cb3044c9fcd88e1557eca1bf35845a4108bc1db/src/crypto/tls/cipher_suites.go#L75
  1338. //
  1339. // This optimization is applied only when there's a CDN in front of the meek server; in
  1340. // unfronted cases we prefer a more natural TLS handshake.
  1341. //
  1342. // [*] the list has since been updated, removing CipherSuites using RC4 and 3DES.
  1343. cipherSuites := []uint16{
  1344. tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
  1345. tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
  1346. tls.TLS_RSA_WITH_AES_128_CBC_SHA,
  1347. tls.TLS_RSA_WITH_AES_256_CBC_SHA,
  1348. tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
  1349. tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
  1350. tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  1351. tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
  1352. tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
  1353. tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
  1354. tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
  1355. tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
  1356. }
  1357. config := &tls.Config{
  1358. Certificates: []tls.Certificate{tlsCertificate},
  1359. // Offer and prefer "h2" for HTTP/2 support.
  1360. NextProtos: []string{"h2", "http/1.1"},
  1361. MinVersion: minVersion,
  1362. CipherSuites: cipherSuites,
  1363. }
  1364. return config, nil
  1365. }
  1366. // makeDirectMeekTLSConfig creates a TLS config for a direct meek HTTPS
  1367. // listener.
  1368. func (server *MeekServer) makeDirectMeekTLSConfig(
  1369. useObfuscatedSessionTickets bool) (*psiphon_tls.Config, error) {
  1370. certificate, privateKey, err := server.getWebServerCertificate()
  1371. if err != nil {
  1372. return nil, errors.Trace(err)
  1373. }
  1374. tlsCertificate, err := psiphon_tls.X509KeyPair(
  1375. []byte(certificate), []byte(privateKey))
  1376. if err != nil {
  1377. return nil, errors.Trace(err)
  1378. }
  1379. // Vary the minimum version to frustrate scanning/fingerprinting of unfronted servers.
  1380. // Limitation: like the certificate, this value changes on restart.
  1381. minVersionCandidates := []uint16{tls.VersionTLS10, tls.VersionTLS11, tls.VersionTLS12}
  1382. minVersion := minVersionCandidates[prng.Intn(len(minVersionCandidates))]
  1383. config := &psiphon_tls.Config{
  1384. Certificates: []psiphon_tls.Certificate{tlsCertificate},
  1385. // Omit "h2", so HTTP/2 is not negotiated. Note that the
  1386. // negotiated-ALPN extension in the ServerHello is plaintext, even in
  1387. // TLS 1.3.
  1388. NextProtos: []string{"http/1.1"},
  1389. MinVersion: minVersion,
  1390. }
  1391. if useObfuscatedSessionTickets {
  1392. // See obfuscated session ticket overview
  1393. // in NewObfuscatedClientSessionState.
  1394. config.UseObfuscatedSessionTickets = true
  1395. var obfuscatedSessionTicketKey [32]byte
  1396. key, err := hex.DecodeString(server.support.Config.MeekObfuscatedKey)
  1397. if err == nil && len(key) != 32 {
  1398. err = std_errors.New("invalid obfuscated session key length")
  1399. }
  1400. if err != nil {
  1401. return nil, errors.Trace(err)
  1402. }
  1403. copy(obfuscatedSessionTicketKey[:], key)
  1404. var standardSessionTicketKey [32]byte
  1405. _, err = rand.Read(standardSessionTicketKey[:])
  1406. if err != nil {
  1407. return nil, errors.Trace(err)
  1408. }
  1409. // Note: SessionTicketKey needs to be set, or else, it appears,
  1410. // tris.Config.serverInit() will clobber the value set by
  1411. // SetSessionTicketKeys.
  1412. config.SessionTicketKey = obfuscatedSessionTicketKey
  1413. config.SetSessionTicketKeys([][32]byte{
  1414. standardSessionTicketKey,
  1415. obfuscatedSessionTicketKey})
  1416. }
  1417. // When configured, initialize passthrough mode, an anti-probing defense.
  1418. // Clients must prove knowledge of the obfuscated key via a message sent in
  1419. // the TLS ClientHello random field.
  1420. //
  1421. // When clients fail to provide a valid message, the client connection is
  1422. // relayed to the designated passthrough address, typically another web site.
  1423. // The entire flow is relayed, including the original ClientHello, so the
  1424. // client will perform a TLS handshake with the passthrough target.
  1425. //
  1426. // Irregular events are logged for invalid client activity.
  1427. if server.passthroughAddress != "" {
  1428. config.PassthroughAddress = server.passthroughAddress
  1429. config.PassthroughVerifyMessage = func(
  1430. message []byte) bool {
  1431. return obfuscator.VerifyTLSPassthroughMessage(
  1432. !server.support.Config.LegacyPassthrough,
  1433. server.support.Config.MeekObfuscatedKey,
  1434. message)
  1435. }
  1436. config.PassthroughLogInvalidMessage = func(
  1437. clientIP string) {
  1438. logIrregularTunnel(
  1439. server.support,
  1440. server.listenerTunnelProtocol,
  1441. server.listenerPort,
  1442. clientIP,
  1443. errors.TraceNew("invalid passthrough message"),
  1444. nil)
  1445. }
  1446. config.PassthroughHistoryAddNew = func(
  1447. clientIP string,
  1448. clientRandom []byte) bool {
  1449. // Use a custom, shorter TTL based on the validity period of the
  1450. // passthrough message.
  1451. TTL := obfuscator.TLS_PASSTHROUGH_HISTORY_TTL
  1452. if server.support.Config.LegacyPassthrough {
  1453. TTL = obfuscator.HISTORY_SEED_TTL
  1454. }
  1455. // strictMode is true as, unlike with meek cookies, legitimate meek clients
  1456. // never retry TLS connections using a previous random value.
  1457. ok, logFields := server.obfuscatorSeedHistory.AddNewWithTTL(
  1458. true,
  1459. clientIP,
  1460. "client-random",
  1461. clientRandom,
  1462. TTL)
  1463. if logFields != nil {
  1464. logIrregularTunnel(
  1465. server.support,
  1466. server.listenerTunnelProtocol,
  1467. server.listenerPort,
  1468. clientIP,
  1469. errors.TraceNew("duplicate passthrough message"),
  1470. LogFields(*logFields))
  1471. }
  1472. return ok
  1473. }
  1474. }
  1475. return config, nil
  1476. }
  1477. // makeMeekHTTPNormalizerListener returns the meek server listener wrapped in
  1478. // an HTTP normalizer.
  1479. func (server *MeekServer) makeMeekHTTPNormalizerListener() *transforms.HTTPNormalizerListener {
  1480. normalizer := transforms.WrapListenerWithHTTPNormalizer(server.listener)
  1481. normalizer.ProhibitedHeaders = server.support.Config.MeekProhibitedHeaders
  1482. normalizer.MaxReqLineAndHeadersSize = 8192 // max number of header bytes common web servers will read before returning an error
  1483. if server.passthroughAddress != "" {
  1484. normalizer.PassthroughAddress = server.passthroughAddress
  1485. normalizer.PassthroughDialer = net.Dial
  1486. }
  1487. normalizer.PassthroughLogPassthrough = func(
  1488. clientIP string, tunnelError error, logFields map[string]interface{}) {
  1489. logIrregularTunnel(
  1490. server.support,
  1491. server.listenerTunnelProtocol,
  1492. server.listenerPort,
  1493. clientIP,
  1494. errors.Trace(tunnelError),
  1495. logFields)
  1496. }
  1497. // ValidateMeekCookie is invoked by the normalizer with the value of the
  1498. // cookie header (if present), before ServeHTTP gets the request and calls
  1499. // getSessionOrEndpoint; and then any valid meek cookie payload, or meek
  1500. // session ID, extracted in this callback is stored to be fetched by
  1501. // getSessionOrEndpoint.
  1502. // Note: if there are multiple cookie headers, even though prohibited by
  1503. // rfc6265, then ValidateMeekCookie will only be invoked once with the
  1504. // first one received.
  1505. normalizer.ValidateMeekCookie = func(clientIP string, rawCookies []byte) ([]byte, error) {
  1506. // Parse cookie.
  1507. if len(rawCookies) == 0 {
  1508. return nil, errors.TraceNew("no cookies")
  1509. }
  1510. // TODO/perf: readCookies in net/http is not exported, use a local
  1511. // implementation which does not require allocating an http.header
  1512. // each time.
  1513. request := http.Request{
  1514. Header: http.Header{
  1515. "Cookie": []string{string(rawCookies)},
  1516. },
  1517. }
  1518. cookies := request.Cookies()
  1519. if len(rawCookies) == 0 {
  1520. return nil, errors.Tracef("invalid cookies: %s", string(rawCookies))
  1521. }
  1522. // Use value of the first cookie.
  1523. meekCookieValue := cookies[0].Value
  1524. // Check for an existing session.
  1525. server.sessionsLock.RLock()
  1526. existingSessionID := meekCookieValue
  1527. _, ok := server.sessions[existingSessionID]
  1528. server.sessionsLock.RUnlock()
  1529. if ok {
  1530. // The cookie is a session ID for an active (not expired) session.
  1531. // Return it and then it will be stored and later fetched by
  1532. // getSessionOrEndpoint where it will be mapped to the existing
  1533. // session.
  1534. // Note: it's possible for the session to expire between this check
  1535. // and when getSessionOrEndpoint looks up the session.
  1536. return rawCookies, nil
  1537. }
  1538. // The session is new (or expired). Treat the cookie value as a new
  1539. // meek cookie, extract the payload, and return it; and then it will be
  1540. // stored and later fetched by getSessionOrEndpoint.
  1541. payloadJSON, err := server.getMeekCookiePayload(clientIP, meekCookieValue)
  1542. if err != nil {
  1543. return nil, errors.Trace(err)
  1544. }
  1545. return payloadJSON, nil
  1546. }
  1547. return normalizer
  1548. }
  1549. func (server *MeekServer) inproxyReloadTactics() error {
  1550. // Assumes no GeoIP targeting for InproxyAllCommonCompartmentIDs, in-proxy
  1551. // quality configuration, and other general broker tactics.
  1552. p, err := server.support.ServerTacticsParametersCache.Get(NewGeoIPData())
  1553. if err != nil {
  1554. return errors.Trace(err)
  1555. }
  1556. defer p.Close()
  1557. if p.IsNil() {
  1558. return nil
  1559. }
  1560. commonCompartmentIDs, err := inproxy.IDsFromStrings(
  1561. p.Strings(parameters.InproxyAllCommonCompartmentIDs))
  1562. if err != nil {
  1563. return errors.Trace(err)
  1564. }
  1565. err = server.inproxyBroker.SetCommonCompartmentIDs(commonCompartmentIDs)
  1566. if err != nil {
  1567. return errors.Trace(err)
  1568. }
  1569. server.inproxyBroker.SetTimeouts(
  1570. p.Duration(parameters.InproxyBrokerProxyAnnounceTimeout),
  1571. p.Duration(parameters.InproxyBrokerClientOfferTimeout),
  1572. p.Duration(parameters.InproxyBrokerClientOfferPersonalTimeout),
  1573. p.Duration(parameters.InproxyBrokerPendingServerRequestsTTL),
  1574. p.KeyDurations(parameters.InproxyFrontingProviderServerMaxRequestTimeouts))
  1575. nonlimitedProxyIDs, err := inproxy.IDsFromStrings(
  1576. p.Strings(parameters.InproxyBrokerMatcherAnnouncementNonlimitedProxyIDs))
  1577. if err != nil {
  1578. return errors.Trace(err)
  1579. }
  1580. server.inproxyBroker.SetLimits(
  1581. p.Int(parameters.InproxyBrokerMatcherAnnouncementLimitEntryCount),
  1582. p.Int(parameters.InproxyBrokerMatcherAnnouncementRateLimitQuantity),
  1583. p.Duration(parameters.InproxyBrokerMatcherAnnouncementRateLimitInterval),
  1584. nonlimitedProxyIDs,
  1585. p.Int(parameters.InproxyBrokerMatcherOfferLimitEntryCount),
  1586. p.Int(parameters.InproxyBrokerMatcherOfferRateLimitQuantity),
  1587. p.Duration(parameters.InproxyBrokerMatcherOfferRateLimitInterval),
  1588. p.Int(parameters.InproxyMaxCompartmentIDListLength),
  1589. p.Int(parameters.InproxyBrokerDSLRequestRateLimitQuantity),
  1590. p.Duration(parameters.InproxyBrokerDSLRequestRateLimitInterval))
  1591. server.inproxyBroker.SetProxyQualityParameters(
  1592. p.Bool(parameters.InproxyEnableProxyQuality),
  1593. p.Duration(parameters.InproxyProxyQualityTTL),
  1594. p.Duration(parameters.InproxyProxyQualityPendingFailedMatchDeadline),
  1595. p.Int(parameters.InproxyProxyQualityFailedMatchThreshold))
  1596. // Configure proxy/client match checklists.
  1597. //
  1598. // When an allow list is set, the client GeoIP data must appear in the
  1599. // proxy's list or the match isn't allowed. When a disallow list is set,
  1600. // the match isn't allowed if the client GeoIP data appears in the
  1601. // proxy's list.
  1602. makeCheckListLookup := func(
  1603. lists map[string][]string,
  1604. isAllowList bool) func(string, string) bool {
  1605. if len(lists) == 0 {
  1606. return func(string, string) bool {
  1607. // Allow when no list
  1608. return true
  1609. }
  1610. }
  1611. lookup := make(map[string]map[string]struct{})
  1612. for key, items := range lists {
  1613. // TODO: use linear search for lists below stringLookupThreshold?
  1614. itemLookup := make(map[string]struct{})
  1615. for _, item := range items {
  1616. itemLookup[item] = struct{}{}
  1617. }
  1618. lookup[key] = itemLookup
  1619. }
  1620. return func(key, item string) bool {
  1621. itemLookup := lookup[key]
  1622. if itemLookup == nil {
  1623. // Allow when no list
  1624. return true
  1625. }
  1626. _, found := itemLookup[item]
  1627. // Allow or disallow based on list type
  1628. return found == isAllowList
  1629. }
  1630. }
  1631. inproxyCheckAllowMatchByRegion := makeCheckListLookup(p.KeyStringsValue(
  1632. parameters.InproxyAllowMatchByRegion), true)
  1633. inproxyCheckAllowMatchByASN := makeCheckListLookup(p.KeyStringsValue(
  1634. parameters.InproxyAllowMatchByASN), true)
  1635. inproxyCheckDisallowMatchByRegion := makeCheckListLookup(p.KeyStringsValue(
  1636. parameters.InproxyDisallowMatchByRegion), false)
  1637. inproxyCheckDisallowMatchByASN := makeCheckListLookup(p.KeyStringsValue(
  1638. parameters.InproxyDisallowMatchByASN), false)
  1639. checkAllowMatch := func(proxyGeoIPData, clientGeoIPData common.GeoIPData) bool {
  1640. return inproxyCheckAllowMatchByRegion(proxyGeoIPData.Country, clientGeoIPData.Country) &&
  1641. inproxyCheckAllowMatchByASN(proxyGeoIPData.ASN, clientGeoIPData.ASN) &&
  1642. inproxyCheckDisallowMatchByRegion(proxyGeoIPData.Country, clientGeoIPData.Country) &&
  1643. inproxyCheckDisallowMatchByASN(proxyGeoIPData.ASN, clientGeoIPData.ASN)
  1644. }
  1645. server.inproxyCheckAllowMatch.Store(checkAllowMatch)
  1646. return nil
  1647. }
  1648. func (server *MeekServer) lookupAllowTactic(geoIPData common.GeoIPData, parameterName string) bool {
  1649. // Fallback to not-allow on failure or nil tactics.
  1650. p, err := server.support.ServerTacticsParametersCache.Get(GeoIPData(geoIPData))
  1651. if err != nil {
  1652. log.WithTraceFields(LogFields{"error": err}).Warning("ServerTacticsParametersCache.Get failed")
  1653. return false
  1654. }
  1655. defer p.Close()
  1656. if p.IsNil() {
  1657. return false
  1658. }
  1659. return p.Bool(parameterName)
  1660. }
  1661. func (server *MeekServer) inproxyBrokerAllowProxy(proxyGeoIPData common.GeoIPData) bool {
  1662. return server.lookupAllowTactic(proxyGeoIPData, parameters.InproxyAllowProxy)
  1663. }
  1664. func (server *MeekServer) inproxyBrokerAllowClient(clientGeoIPData common.GeoIPData) bool {
  1665. return server.lookupAllowTactic(clientGeoIPData, parameters.InproxyAllowClient)
  1666. }
  1667. func (server *MeekServer) inproxyBrokerAllowDomainFrontedDestinations(clientGeoIPData common.GeoIPData) bool {
  1668. return server.lookupAllowTactic(clientGeoIPData, parameters.InproxyAllowDomainFrontedDestinations)
  1669. }
  1670. func (server *MeekServer) inproxyBrokerAllowMatch(
  1671. proxyGeoIPData common.GeoIPData, clientGeoIPData common.GeoIPData) bool {
  1672. return server.inproxyCheckAllowMatch.Load().(func(proxy, client common.GeoIPData) bool)(
  1673. proxyGeoIPData, clientGeoIPData)
  1674. }
  1675. func (server *MeekServer) inproxyBrokerPrioritizeProxy(
  1676. proxyInproxyProtocolVersion int,
  1677. proxyGeoIPData common.GeoIPData,
  1678. proxyAPIParams common.APIParameters) bool {
  1679. // Fallback to not-prioritized on failure or nil tactics.
  1680. p, err := server.support.ServerTacticsParametersCache.Get(GeoIPData(proxyGeoIPData))
  1681. if err != nil {
  1682. log.WithTraceFields(LogFields{"error": err}).Warning("ServerTacticsParametersCache.Get failed")
  1683. return false
  1684. }
  1685. defer p.Close()
  1686. if p.IsNil() {
  1687. return false
  1688. }
  1689. // As API parameter filtering currently does not support range matching, the minimum version
  1690. // constraint is specified in a seperate parameter.
  1691. minProtocolVersion := p.Int(parameters.InproxyBrokerMatcherPrioritizeProxiesMinVersion)
  1692. if proxyInproxyProtocolVersion < minProtocolVersion {
  1693. return false
  1694. }
  1695. filter := p.KeyStringsValue(parameters.InproxyBrokerMatcherPrioritizeProxiesFilter)
  1696. if len(filter) == 0 {
  1697. // When InproxyBrokerMatcherPrioritizeProxiesFilter is empty, the
  1698. // default value, no proxies are prioritized.
  1699. return false
  1700. }
  1701. for name, values := range filter {
  1702. proxyValue, err := getStringRequestParam(proxyAPIParams, name)
  1703. if err != nil || !common.ContainsWildcard(values, proxyValue) {
  1704. return false
  1705. }
  1706. }
  1707. if !p.WeightedCoinFlip(parameters.InproxyBrokerMatcherPrioritizeProxiesProbability) {
  1708. return false
  1709. }
  1710. return true
  1711. }
  1712. // inproxyBrokerGetTacticsPayload is a callback used by the in-proxy broker to
  1713. // provide tactics to proxies.
  1714. //
  1715. // The proxy sends its current tactics tag in apiParameters, and, when there
  1716. // are new tactics, inproxyBrokerGetTacticsPayload returns the payload and the new
  1717. // tactics tag. The broker should log new_tactics_tag in its ProxyAnnounce
  1718. // handler.
  1719. func (server *MeekServer) inproxyBrokerGetTacticsPayload(
  1720. geoIPData common.GeoIPData,
  1721. apiParameters common.APIParameters) ([]byte, string, error) {
  1722. // When compressed tactics are requested, use CBOR binary encoding for the
  1723. // response.
  1724. var responseMarshaler func(any) ([]byte, error)
  1725. responseMarshaler = json.Marshal
  1726. compressTactics := protocol.GetCompressTactics(apiParameters)
  1727. if compressTactics {
  1728. responseMarshaler = protocol.CBOREncoding.Marshal
  1729. }
  1730. tacticsPayload, err := server.support.TacticsServer.GetTacticsPayload(
  1731. geoIPData, apiParameters, compressTactics)
  1732. if err != nil {
  1733. return nil, "", errors.Trace(err)
  1734. }
  1735. var marshaledTacticsPayload []byte
  1736. newTacticsTag := ""
  1737. if tacticsPayload != nil {
  1738. marshaledTacticsPayload, err = responseMarshaler(tacticsPayload)
  1739. if err != nil {
  1740. return nil, "", errors.Trace(err)
  1741. }
  1742. if len(tacticsPayload.Tactics) > 0 {
  1743. newTacticsTag = tacticsPayload.Tag
  1744. }
  1745. }
  1746. return marshaledTacticsPayload, newTacticsTag, nil
  1747. }
  1748. // inproxyBrokerRelayDSLRequest is a callback used by the in-proxy broker to
  1749. // relay client DSL requests.
  1750. func (server *MeekServer) inproxyBrokerRelayDSLRequest(
  1751. ctx context.Context,
  1752. extendTimeout inproxy.ExtendTransportTimeout,
  1753. clientIP string,
  1754. clientGeoIPData common.GeoIPData,
  1755. requestPayload []byte) ([]byte, error) {
  1756. responsePayload, err := dslHandleRequest(
  1757. ctx,
  1758. server.support,
  1759. extendTimeout,
  1760. clientIP,
  1761. clientGeoIPData,
  1762. false, // client request is untunneled
  1763. requestPayload)
  1764. return responsePayload, errors.Trace(err)
  1765. }
  1766. // inproxyBrokerHandler reads an in-proxy broker session protocol message from
  1767. // the HTTP request body, dispatches the message to the broker, and writes
  1768. // the broker session response message to the HTTP response body.
  1769. //
  1770. // The HTTP response write timeout may be extended be the broker, as required.
  1771. // Error cases can return without writing any HTTP response. The caller
  1772. // should invoke server.handleError when an error is returned.
  1773. func (server *MeekServer) inproxyBrokerHandler(
  1774. clientIP string,
  1775. geoIPData common.GeoIPData,
  1776. w http.ResponseWriter,
  1777. r *http.Request) (retErr error) {
  1778. // Don't read more than MEEK_ENDPOINT_MAX_REQUEST_PAYLOAD_LENGTH bytes, as
  1779. // a sanity check and defense against potential resource exhaustion.
  1780. packet, err := ioutil.ReadAll(http.MaxBytesReader(
  1781. w, r.Body, MEEK_ENDPOINT_MAX_REQUEST_PAYLOAD_LENGTH))
  1782. if err != nil {
  1783. return errors.Trace(err)
  1784. }
  1785. extendTimeout := func(timeout time.Duration) {
  1786. // Extend the HTTP response write timeout to accomodate the timeout
  1787. // specified by the broker, such as in the case of the ProxyAnnounce
  1788. // request long poll. The base httpClientIOTimeout value is added, as
  1789. // it covers HTTP transport network operations, which are not
  1790. // necessarily included in the broker's timeouts.
  1791. //
  1792. // Note that any existing write timeout of httpClientIOTimeout would
  1793. // have been set before the body read, which may have consumed time,
  1794. // so adding the full httpClientIOTimeout value again may exceed the
  1795. // original httpClientIOTimeout target.
  1796. http.NewResponseController(w).SetWriteDeadline(
  1797. time.Now().Add(server.httpClientIOTimeout + timeout))
  1798. }
  1799. // Per https://pkg.go.dev/net/http#Request.Context, the request context is
  1800. // canceled when the client's connection closes or an HTTP/2 request is
  1801. // canceled. So it is expected that the broker operation will abort and
  1802. // stop waiting (in the case of long polling) if the client disconnects
  1803. // for any reason before a response is sent.
  1804. //
  1805. // When fronted by a CDN using persistent connections used to multiplex
  1806. // many clients, it is expected that CDNs will perform an HTTP/3 request
  1807. // cancellation in this scenario.
  1808. transportLogFields := common.LogFields{
  1809. "meek_server_http_version": r.Proto,
  1810. }
  1811. packet, err = server.inproxyBroker.HandleSessionPacket(
  1812. r.Context(),
  1813. extendTimeout,
  1814. transportLogFields,
  1815. clientIP,
  1816. geoIPData,
  1817. packet)
  1818. if err != nil {
  1819. var deobfuscationAnomoly *inproxy.DeobfuscationAnomoly
  1820. isAnomolous := std_errors.As(err, &deobfuscationAnomoly)
  1821. if isAnomolous {
  1822. logIrregularTunnel(
  1823. server.support,
  1824. server.listenerTunnelProtocol,
  1825. server.listenerPort,
  1826. clientIP,
  1827. errors.Trace(err),
  1828. nil)
  1829. }
  1830. return errors.Trace(err)
  1831. }
  1832. w.WriteHeader(http.StatusOK)
  1833. _, err = w.Write(packet)
  1834. if err != nil {
  1835. return errors.Trace(err)
  1836. }
  1837. return nil
  1838. }
  1839. type meekSession struct {
  1840. lastActivity atomic.Int64
  1841. requestCount atomic.Int64
  1842. metricClientRetries atomic.Int64
  1843. metricPeakResponseSize atomic.Int64
  1844. metricPeakCachedResponseSize atomic.Int64
  1845. metricPeakCachedResponseHitSize atomic.Int64
  1846. metricCachedResponseMissPosition atomic.Int64
  1847. metricUnderlyingConnCount atomic.Int64
  1848. lock sync.Mutex
  1849. deleted bool
  1850. underlyingConn net.Conn
  1851. clientConn *meekConn
  1852. meekProtocolVersion int
  1853. sessionIDSent bool
  1854. cachedResponse *CachedResponse
  1855. cookieName string
  1856. contentType string
  1857. httpVersion string
  1858. }
  1859. func (session *meekSession) touch() {
  1860. session.lastActivity.Store(int64(monotime.Now()))
  1861. }
  1862. func (session *meekSession) expired() bool {
  1863. if session.clientConn == nil {
  1864. // Not fully initialized. meekSession.clientConn will be set before adding
  1865. // the session to MeekServer.sessions.
  1866. return false
  1867. }
  1868. lastActivity := monotime.Time(session.lastActivity.Load())
  1869. return monotime.Since(lastActivity) >
  1870. session.clientConn.meekServer.maxSessionStaleness
  1871. }
  1872. // delete releases all resources allocated by a session.
  1873. func (session *meekSession) delete(haveLock bool) {
  1874. // TODO: close the persistent HTTP client connection, if one exists?
  1875. // This final call session.cachedResponse.Reset releases shared resources.
  1876. //
  1877. // This call requires exclusive access. session.lock is be obtained before
  1878. // calling session.cachedResponse.Reset. Once the lock is obtained, no
  1879. // request for this session is being processed concurrently, and pending
  1880. // requests will block at session.lock.
  1881. //
  1882. // This logic assumes that no further session.cachedResponse access occurs,
  1883. // or else resources may deplete (buffers won't be returned to the pool).
  1884. // These requirements are achieved by obtaining the lock, setting
  1885. // session.deleted, and any subsequent request handlers checking
  1886. // session.deleted immediately after obtaining the lock.
  1887. //
  1888. // session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
  1889. // the timeout for any active request handler processing a session
  1890. // request.
  1891. //
  1892. // When the lock must be acquired, clientConn.Close is called first, to
  1893. // interrupt any existing request handler blocking on pumpReads or pumpWrites.
  1894. session.clientConn.Close()
  1895. if !haveLock {
  1896. session.lock.Lock()
  1897. }
  1898. // Release all extended buffers back to the pool.
  1899. // session.cachedResponse.Reset is not safe for concurrent calls.
  1900. session.cachedResponse.Reset()
  1901. session.deleted = true
  1902. if !haveLock {
  1903. session.lock.Unlock()
  1904. }
  1905. }
  1906. // GetMetrics implements the common.MetricsSource interface.
  1907. func (session *meekSession) GetMetrics() common.LogFields {
  1908. logFields := make(common.LogFields)
  1909. logFields["meek_client_retries"] = session.metricClientRetries.Load()
  1910. logFields["meek_peak_response_size"] = session.metricPeakResponseSize.Load()
  1911. logFields["meek_peak_cached_response_size"] = session.metricPeakCachedResponseSize.Load()
  1912. logFields["meek_peak_cached_response_hit_size"] = session.metricPeakCachedResponseHitSize.Load()
  1913. logFields["meek_cached_response_miss_position"] = session.metricCachedResponseMissPosition.Load()
  1914. logFields["meek_underlying_connection_count"] = session.metricUnderlyingConnCount.Load()
  1915. logFields["meek_cookie_name"] = session.cookieName
  1916. logFields["meek_content_type"] = session.contentType
  1917. logFields["meek_server_http_version"] = session.httpVersion
  1918. return logFields
  1919. }
  1920. // makeMeekSessionID creates a new session ID. The variable size is intended to
  1921. // frustrate traffic analysis of both plaintext and TLS meek traffic.
  1922. func makeMeekSessionID() (string, error) {
  1923. size := MEEK_MIN_SESSION_ID_LENGTH +
  1924. prng.Intn(MEEK_MAX_SESSION_ID_LENGTH-MEEK_MIN_SESSION_ID_LENGTH)
  1925. sessionID, err := common.MakeSecureRandomBytes(size)
  1926. if err != nil {
  1927. return "", errors.Trace(err)
  1928. }
  1929. // Omit padding to maximize variable size space. To the client, the session
  1930. // ID is an opaque string cookie value.
  1931. return base64.RawStdEncoding.EncodeToString(sessionID), nil
  1932. }
  1933. // meekConn implements the net.Conn interface and is to be used as a client
  1934. // connection by the tunnel server (being passed to sshServer.handleClient).
  1935. // meekConn bridges net/http request/response payload readers and writers
  1936. // and goroutines calling Read()s and Write()s.
  1937. type meekConn struct {
  1938. meekServer *MeekServer
  1939. meekSession *meekSession
  1940. firstUnderlyingConn net.Conn
  1941. remoteAddr net.Addr
  1942. protocolVersion int
  1943. closeBroadcast chan struct{}
  1944. closed int32
  1945. lastReadChecksum *uint64
  1946. readLock sync.Mutex
  1947. emptyReadBuffer chan *bytes.Buffer
  1948. partialReadBuffer chan *bytes.Buffer
  1949. fullReadBuffer chan *bytes.Buffer
  1950. writeLock sync.Mutex
  1951. nextWriteBuffer chan []byte
  1952. writeResult chan error
  1953. }
  1954. func newMeekConn(
  1955. meekServer *MeekServer,
  1956. meekSession *meekSession,
  1957. underlyingConn net.Conn,
  1958. remoteAddr net.Addr,
  1959. protocolVersion int) *meekConn {
  1960. // In order to inspect its properties, meekConn will hold a reference to
  1961. // firstUnderlyingConn, the _first_ underlying TCP conn, for the full
  1962. // lifetime of meekConn, which may exceed the lifetime of firstUnderlyingConn
  1963. // and include subsequent underlying TCP conns. In this case, it is expected
  1964. // that firstUnderlyingConn will be closed by "net/http", so no OS resources
  1965. // (e.g., a socket) are retained longer than necessary.
  1966. conn := &meekConn{
  1967. meekServer: meekServer,
  1968. meekSession: meekSession,
  1969. firstUnderlyingConn: underlyingConn,
  1970. remoteAddr: remoteAddr,
  1971. protocolVersion: protocolVersion,
  1972. closeBroadcast: make(chan struct{}),
  1973. closed: 0,
  1974. emptyReadBuffer: make(chan *bytes.Buffer, 1),
  1975. partialReadBuffer: make(chan *bytes.Buffer, 1),
  1976. fullReadBuffer: make(chan *bytes.Buffer, 1),
  1977. nextWriteBuffer: make(chan []byte, 1),
  1978. writeResult: make(chan error, 1),
  1979. }
  1980. // Read() calls and pumpReads() are synchronized by exchanging control
  1981. // of a single readBuffer. This is the same scheme used in and described
  1982. // in psiphon.MeekConn.
  1983. conn.emptyReadBuffer <- new(bytes.Buffer)
  1984. return conn
  1985. }
  1986. // GetMetrics implements the common.MetricsSource interface. The metrics are
  1987. // maintained in the meek session type; but logTunnel, which calls
  1988. // MetricsSource.GetMetrics, has a pointer only to this conn, so it calls
  1989. // through to the session.
  1990. func (conn *meekConn) GetMetrics() common.LogFields {
  1991. logFields := conn.meekSession.GetMetrics()
  1992. if conn.meekServer.passthroughAddress != "" {
  1993. logFields["passthrough_address"] = conn.meekServer.passthroughAddress
  1994. }
  1995. // Include metrics, such as fragmentor metrics, from the _first_ underlying
  1996. // TCP conn. Properties of subsequent underlying TCP conns are not reflected
  1997. // in these metrics; we assume that the first TCP conn, which most likely
  1998. // transits the various protocol handshakes, is most significant.
  1999. underlyingMetrics, ok := conn.firstUnderlyingConn.(common.MetricsSource)
  2000. if ok {
  2001. logFields.Add(underlyingMetrics.GetMetrics())
  2002. }
  2003. return logFields
  2004. }
  2005. // GetUnderlyingTCPAddrs implements the common.UnderlyingTCPAddrSource
  2006. // interface, returning the TCP addresses for the _first_ underlying TCP
  2007. // connection in the meek tunnel.
  2008. func (conn *meekConn) GetUnderlyingTCPAddrs() (*net.TCPAddr, *net.TCPAddr, bool) {
  2009. localAddr, ok := conn.firstUnderlyingConn.LocalAddr().(*net.TCPAddr)
  2010. if !ok {
  2011. return nil, nil, false
  2012. }
  2013. remoteAddr, ok := conn.firstUnderlyingConn.RemoteAddr().(*net.TCPAddr)
  2014. if !ok {
  2015. return nil, nil, false
  2016. }
  2017. return localAddr, remoteAddr, true
  2018. }
  2019. // SetReplay implements the common.FragmentorReplayAccessor interface, applying
  2020. // the inputs to the _first_ underlying TCP connection in the meek tunnel. If
  2021. // the underlying connection is closed, then SetSeed call will have no effect.
  2022. func (conn *meekConn) SetReplay(PRNG *prng.PRNG) {
  2023. underlyingConn := conn.firstUnderlyingConn
  2024. if conn.meekServer.normalizer != nil {
  2025. // The underlying conn is wrapped with a normalizer.
  2026. normalizer, ok := underlyingConn.(*transforms.HTTPNormalizer)
  2027. if ok {
  2028. underlyingConn = normalizer.Conn
  2029. }
  2030. }
  2031. fragmentor, ok := underlyingConn.(common.FragmentorAccessor)
  2032. if ok {
  2033. fragmentor.SetReplay(PRNG)
  2034. }
  2035. }
  2036. // GetReplay implements the FragmentorReplayAccessor interface, getting the
  2037. // outputs from the _first_ underlying TCP connection in the meek tunnel.
  2038. //
  2039. // We assume that the first TCP conn is most significant: the initial TCP
  2040. // connection most likely fragments protocol handshakes; and, in the case the
  2041. // packet manipulation, any selected packet manipulation spec would have been
  2042. // successful.
  2043. func (conn *meekConn) GetReplay() (*prng.Seed, bool) {
  2044. underlyingConn := conn.firstUnderlyingConn
  2045. if conn.meekServer.normalizer != nil {
  2046. // The underlying conn is wrapped with a normalizer.
  2047. normalizer, ok := underlyingConn.(*transforms.HTTPNormalizer)
  2048. if ok {
  2049. underlyingConn = normalizer.Conn
  2050. }
  2051. }
  2052. fragmentor, ok := underlyingConn.(common.FragmentorAccessor)
  2053. if ok {
  2054. return fragmentor.GetReplay()
  2055. }
  2056. return nil, false
  2057. }
  2058. func (conn *meekConn) StopFragmenting() {
  2059. fragmentor, ok := conn.firstUnderlyingConn.(common.FragmentorAccessor)
  2060. if ok {
  2061. fragmentor.StopFragmenting()
  2062. }
  2063. }
  2064. // pumpReads causes goroutines blocking on meekConn.Read() to read
  2065. // from the specified reader. This function blocks until the reader
  2066. // is fully consumed or the meekConn is closed. A read buffer allows
  2067. // up to MEEK_MAX_REQUEST_PAYLOAD_LENGTH bytes to be read and buffered
  2068. // without a Read() immediately consuming the bytes, but there's still
  2069. // a possibility of a stall if no Read() calls are made after this
  2070. // read buffer is full.
  2071. // Returns the number of request bytes read.
  2072. // Note: assumes only one concurrent call to pumpReads
  2073. func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
  2074. // Use either an empty or partial buffer. By using a partial
  2075. // buffer, pumpReads will not block if the Read() caller has
  2076. // not fully drained the read buffer.
  2077. var readBuffer *bytes.Buffer
  2078. select {
  2079. case readBuffer = <-conn.emptyReadBuffer:
  2080. case readBuffer = <-conn.partialReadBuffer:
  2081. case <-conn.closeBroadcast:
  2082. return 0, io.EOF
  2083. }
  2084. newDataOffset := readBuffer.Len()
  2085. // Since we need to read the full request payload in order to
  2086. // take its checksum before relaying it, the read buffer can
  2087. // grow to up to 2 x MEEK_MAX_REQUEST_PAYLOAD_LENGTH + 1.
  2088. // +1 allows for an explicit check for request payloads that
  2089. // exceed the maximum permitted length.
  2090. limitReader := io.LimitReader(reader, MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1)
  2091. n, err := readBuffer.ReadFrom(limitReader)
  2092. if err == nil && n == MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1 {
  2093. err = std_errors.New("invalid request payload length")
  2094. }
  2095. // If the request read fails, don't relay the new data. This allows
  2096. // the client to retry and resend its request payload without
  2097. // interrupting/duplicating the payload flow.
  2098. if err != nil {
  2099. readBuffer.Truncate(newDataOffset)
  2100. conn.replaceReadBuffer(readBuffer)
  2101. return 0, errors.Trace(err)
  2102. }
  2103. // Check if request payload checksum matches immediately
  2104. // previous payload. On match, assume this is a client retry
  2105. // sending payload that was already relayed and skip this
  2106. // payload. Payload is OSSH ciphertext and almost surely
  2107. // will not repeat. In the highly unlikely case that it does,
  2108. // the underlying SSH connection will fail and the client
  2109. // must reconnect.
  2110. checksum := crc64.Checksum(
  2111. readBuffer.Bytes()[newDataOffset:], conn.meekServer.checksumTable)
  2112. if conn.lastReadChecksum == nil {
  2113. conn.lastReadChecksum = new(uint64)
  2114. } else if *conn.lastReadChecksum == checksum {
  2115. readBuffer.Truncate(newDataOffset)
  2116. }
  2117. *conn.lastReadChecksum = checksum
  2118. conn.replaceReadBuffer(readBuffer)
  2119. return n, nil
  2120. }
  2121. var errMeekConnectionHasClosed = std_errors.New("meek connection has closed")
  2122. // Read reads from the meekConn into buffer. Read blocks until
  2123. // some data is read or the meekConn closes. Under the hood, it
  2124. // waits for pumpReads to submit a reader to read from.
  2125. // Note: lock is to conform with net.Conn concurrency semantics
  2126. func (conn *meekConn) Read(buffer []byte) (int, error) {
  2127. conn.readLock.Lock()
  2128. defer conn.readLock.Unlock()
  2129. var readBuffer *bytes.Buffer
  2130. select {
  2131. case readBuffer = <-conn.partialReadBuffer:
  2132. case readBuffer = <-conn.fullReadBuffer:
  2133. case <-conn.closeBroadcast:
  2134. return 0, errors.Trace(errMeekConnectionHasClosed)
  2135. }
  2136. n, err := readBuffer.Read(buffer)
  2137. conn.replaceReadBuffer(readBuffer)
  2138. return n, err
  2139. }
  2140. func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
  2141. length := readBuffer.Len()
  2142. if length >= MEEK_MAX_REQUEST_PAYLOAD_LENGTH {
  2143. conn.fullReadBuffer <- readBuffer
  2144. } else if length == 0 {
  2145. conn.emptyReadBuffer <- readBuffer
  2146. } else {
  2147. conn.partialReadBuffer <- readBuffer
  2148. }
  2149. }
  2150. // pumpWrites causes goroutines blocking on meekConn.Write() to write
  2151. // to the specified writer. This function blocks until the meek response
  2152. // body limits (size for protocol v1, turn around time for protocol v2+)
  2153. // are met, or the meekConn is closed.
  2154. //
  2155. // Note: channel scheme assumes only one concurrent call to pumpWrites
  2156. func (conn *meekConn) pumpWrites(
  2157. writer io.Writer, skipExtendedTurnAround bool) (int, error) {
  2158. startTime := time.Now()
  2159. timeout := time.NewTimer(conn.meekServer.turnAroundTimeout)
  2160. defer timeout.Stop()
  2161. n := 0
  2162. for {
  2163. select {
  2164. case buffer := <-conn.nextWriteBuffer:
  2165. written, err := writer.Write(buffer)
  2166. n += written
  2167. // Assumes that writeResult won't block.
  2168. // Note: always send the err to writeResult,
  2169. // as the Write() caller is blocking on this.
  2170. conn.writeResult <- err
  2171. if err != nil {
  2172. return n, err
  2173. }
  2174. if conn.protocolVersion < MEEK_PROTOCOL_VERSION_1 {
  2175. // Pre-protocol version 1 clients expect at most
  2176. // MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
  2177. return n, nil
  2178. }
  2179. if skipExtendedTurnAround {
  2180. // When fast turn around is indicated, skip the extended turn
  2181. // around timeout. This optimizes for upstream flows.
  2182. return n, nil
  2183. }
  2184. totalElapsedTime := time.Since(startTime) / time.Millisecond
  2185. if totalElapsedTime >= conn.meekServer.extendedTurnAroundTimeout {
  2186. return n, nil
  2187. }
  2188. timeout.Reset(conn.meekServer.turnAroundTimeout)
  2189. case <-timeout.C:
  2190. return n, nil
  2191. case <-conn.closeBroadcast:
  2192. return n, errors.Trace(errMeekConnectionHasClosed)
  2193. }
  2194. }
  2195. }
  2196. // Write writes the buffer to the meekConn. It blocks until the
  2197. // entire buffer is written to or the meekConn closes. Under the
  2198. // hood, it waits for sufficient pumpWrites calls to consume the
  2199. // write buffer.
  2200. // Note: lock is to conform with net.Conn concurrency semantics
  2201. func (conn *meekConn) Write(buffer []byte) (int, error) {
  2202. conn.writeLock.Lock()
  2203. defer conn.writeLock.Unlock()
  2204. // TODO: may be more efficient to send whole buffer
  2205. // and have pumpWrites stash partial buffer when can't
  2206. // send it all.
  2207. n := 0
  2208. for n < len(buffer) {
  2209. end := n + MEEK_MAX_REQUEST_PAYLOAD_LENGTH
  2210. if end > len(buffer) {
  2211. end = len(buffer)
  2212. }
  2213. // Only write MEEK_MAX_REQUEST_PAYLOAD_LENGTH at a time,
  2214. // to ensure compatibility with v1 protocol.
  2215. chunk := buffer[n:end]
  2216. select {
  2217. case conn.nextWriteBuffer <- chunk:
  2218. case <-conn.closeBroadcast:
  2219. return n, errors.Trace(errMeekConnectionHasClosed)
  2220. }
  2221. // Wait for the buffer to be processed.
  2222. select {
  2223. case <-conn.writeResult:
  2224. // The err from conn.writeResult comes from the
  2225. // io.MultiWriter used in pumpWrites, which writes
  2226. // to both the cached response and the HTTP response.
  2227. //
  2228. // Don't stop on error here, since only writing
  2229. // to the HTTP response will fail, and the client
  2230. // may retry and use the cached response.
  2231. //
  2232. // It's possible that the cached response buffer
  2233. // is too small for the client to successfully
  2234. // retry, but that cannot be determined. In this
  2235. // case, the meek connection will eventually fail.
  2236. //
  2237. // err is already logged in ServeHTTP.
  2238. case <-conn.closeBroadcast:
  2239. return n, errors.Trace(errMeekConnectionHasClosed)
  2240. }
  2241. n += len(chunk)
  2242. }
  2243. return n, nil
  2244. }
  2245. // Close closes the meekConn. This will interrupt any blocked
  2246. // Read, Write, pumpReads, and pumpWrites.
  2247. func (conn *meekConn) Close() error {
  2248. if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
  2249. close(conn.closeBroadcast)
  2250. // In general, we rely on "net/http" to close underlying TCP conns. In
  2251. // this case, we can directly close the first once, if it's still
  2252. // open. Don't close a persistent connection when fronted, as it may
  2253. // be still be used by other clients.
  2254. if !conn.meekServer.isFronted {
  2255. conn.firstUnderlyingConn.Close()
  2256. }
  2257. }
  2258. return nil
  2259. }
  2260. // Stub implementation of net.Conn.LocalAddr
  2261. func (conn *meekConn) LocalAddr() net.Addr {
  2262. return nil
  2263. }
  2264. // RemoteAddr returns the remoteAddr specified in newMeekConn. This
  2265. // acts as a proxy for the actual remote address, which is either a
  2266. // direct HTTP/HTTPS connection remote address, or in the case of
  2267. // downstream proxy of CDN fronts, some other value determined via
  2268. // HTTP headers.
  2269. func (conn *meekConn) RemoteAddr() net.Addr {
  2270. return conn.remoteAddr
  2271. }
  2272. // SetDeadline is not a true implementation of net.Conn.SetDeadline. It
  2273. // merely checks that the requested timeout exceeds the MEEK_MAX_SESSION_STALENESS
  2274. // period. When it does, and the session is idle, the meekConn Read/Write will
  2275. // be interrupted and return an error (not a timeout error) before the deadline.
  2276. // In other words, this conn will approximate the desired functionality of
  2277. // timing out on idle on or before the requested deadline.
  2278. func (conn *meekConn) SetDeadline(t time.Time) error {
  2279. // Overhead: nanoseconds (https://blog.cloudflare.com/its-go-time-on-linux/)
  2280. if time.Now().Add(conn.meekServer.maxSessionStaleness).Before(t) {
  2281. return nil
  2282. }
  2283. return errors.TraceNew("not supported")
  2284. }
  2285. // Stub implementation of net.Conn.SetReadDeadline
  2286. func (conn *meekConn) SetReadDeadline(t time.Time) error {
  2287. return errors.TraceNew("not supported")
  2288. }
  2289. // Stub implementation of net.Conn.SetWriteDeadline
  2290. func (conn *meekConn) SetWriteDeadline(t time.Time) error {
  2291. return errors.TraceNew("not supported")
  2292. }