proxy.go 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "io"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  30. )
  31. const (
  32. proxyAnnounceDelay = 1 * time.Second
  33. proxyAnnounceDelayJitter = 0.5
  34. proxyAnnounceMaxBackoffDelay = 1 * time.Minute
  35. proxyAnnounceLogSampleSize = 2
  36. proxyAnnounceLogSamplePeriod = 30 * time.Minute
  37. proxyWebRTCAnswerTimeout = 20 * time.Second
  38. proxyDestinationDialTimeout = 20 * time.Second
  39. proxyRelayInactivityTimeout = 5 * time.Minute
  40. )
  41. // Proxy is the in-proxy proxying component, which relays traffic from a
  42. // client to a Psiphon server.
  43. type Proxy struct {
  44. bytesUp atomic.Int64
  45. bytesDown atomic.Int64
  46. peakBytesUp atomic.Int64
  47. peakBytesDown atomic.Int64
  48. announcing atomic.Int32
  49. connectingClients atomic.Int32
  50. connectedClients atomic.Int32
  51. config *ProxyConfig
  52. activityUpdateWrapper *activityUpdateWrapper
  53. lastAnnouncing int32
  54. lastConnectingClients int32
  55. lastConnectedClients int32
  56. networkDiscoveryMutex sync.Mutex
  57. networkDiscoveryRunOnce bool
  58. networkDiscoveryNetworkID string
  59. nextAnnounceMutex sync.Mutex
  60. nextAnnounceBrokerClient *BrokerClient
  61. nextAnnounceNotBefore time.Time
  62. useReducedSettings bool
  63. reducedStartMinute int
  64. reducedEndMinute int
  65. personalStatsMutex sync.Mutex
  66. personalRegionActivity map[string]*RegionActivity
  67. commonStatsMutex sync.Mutex
  68. commonRegionActivity map[string]*RegionActivity
  69. }
  70. // RegionActivity holds metrics per-region for more detailed metric collection.
  71. type RegionActivity struct {
  72. bytesUp atomic.Int64
  73. bytesDown atomic.Int64
  74. connectingClients atomic.Int32
  75. connectedClients atomic.Int32
  76. }
  77. // TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
  78. // configured, permanent port mappings.
  79. // ProxyConfig specifies the configuration for a Proxy run.
  80. type ProxyConfig struct {
  81. // Logger is used to log events.
  82. Logger common.Logger
  83. // EnableWebRTCDebugLogging indicates whether to emit WebRTC debug logs.
  84. EnableWebRTCDebugLogging bool
  85. // WaitForNetworkConnectivity is a callback that should block until there
  86. // is network connectivity or shutdown. The return value is true when
  87. // there is network connectivity, and false for shutdown.
  88. WaitForNetworkConnectivity func() bool
  89. // GetCurrentNetworkContext is a callback that returns a context tied to
  90. // the lifetime of the host's current active network interface. If the
  91. // active network changes, the previous context returned by
  92. // GetCurrentNetworkContext should cancel. This context is used to
  93. // immediately cancel/close individual connections when the active
  94. // network changes.
  95. GetCurrentNetworkContext func() context.Context
  96. // GetBrokerClient provides a BrokerClient which the proxy will use for
  97. // making broker requests. If GetBrokerClient returns a shared
  98. // BrokerClient instance, the BrokerClient must support multiple,
  99. // concurrent round trips, as the proxy will use it to concurrently
  100. // announce many proxy instances. The BrokerClient should be implemented
  101. // using multiplexing over a shared network connection -- for example,
  102. // HTTP/2 -- and a shared broker session for optimal performance.
  103. GetBrokerClient func() (*BrokerClient, error)
  104. // GetBaseAPIParameters returns Psiphon API parameters to be sent to and
  105. // logged by the broker. Expected parameters include client/proxy
  106. // application and build version information. GetBaseAPIParameters also
  107. // returns the network ID, corresponding to the parameters, to be used in
  108. // tactics logic; the network ID is not sent to the broker.
  109. GetBaseAPIParameters func(includeTacticsParameters bool) (
  110. common.APIParameters, string, error)
  111. // MakeWebRTCDialCoordinator provides a WebRTCDialCoordinator which
  112. // specifies WebRTC-related dial parameters, including selected STUN
  113. // server addresses; network topology information for the current netork;
  114. // NAT logic settings; and other settings.
  115. //
  116. // MakeWebRTCDialCoordinator is invoked for each proxy/client connection,
  117. // and the provider can select new parameters per connection as reqired.
  118. MakeWebRTCDialCoordinator func() (WebRTCDialCoordinator, error)
  119. // HandleTacticsPayload is a callback that receives any tactics payload,
  120. // provided by the broker in proxy announcement request responses.
  121. // HandleTacticsPayload must return true when the tacticsPayload includes
  122. // new tactics, indicating that the proxy should reinitialize components
  123. // controlled by tactics parameters.
  124. HandleTacticsPayload func(
  125. networkID string, compressTactics bool, tacticsPayload []byte) bool
  126. // MustUpgrade is a callback that is invoked when a MustUpgrade flag is
  127. // received from the broker. When MustUpgrade is received, the proxy
  128. // should be stopped and the user should be prompted to upgrade before
  129. // restarting the proxy.
  130. MustUpgrade func()
  131. // MaxCommonClients (formerly MaxClients) is the maximum number of common
  132. // clients that are allowed to connect to the proxy. Must be > 0.
  133. MaxCommonClients int
  134. // MaxPersonalClients is the maximum number of personal clients that are
  135. // allowed to connect to the proxy. Must be > 0.
  136. MaxPersonalClients int
  137. // LimitUpstreamBytesPerSecond limits the upstream data transfer rate for
  138. // a single client. When 0, there is no limit.
  139. LimitUpstreamBytesPerSecond int
  140. // LimitDownstreamBytesPerSecond limits the downstream data transfer rate
  141. // for a single client. When 0, there is no limit.
  142. LimitDownstreamBytesPerSecond int
  143. // ReducedStartTime specifies the local time of day (HH:MM, 24-hour, UTC)
  144. // at which reduced client settings begin.
  145. ReducedStartTime string
  146. // ReducedEndTime specifies the local time of day (HH:MM, 24-hour, UTC) at
  147. // which reduced client settings end.
  148. ReducedEndTime string
  149. // ReducedMaxCommonClients specifies the maximum number of common clients
  150. // that are allowed to connect to the proxy during the reduced time range.
  151. //
  152. // Limitation: We currently do not support ReducedMaxPersonalClients.
  153. // We assume that due to the importance of personal clients, users
  154. // always prefer to have them connected.
  155. //
  156. // Clients connected when the reduced settings begin will not be
  157. // disconnected.
  158. ReducedMaxCommonClients int
  159. // ReducedLimitUpstreamBytesPerSecond limits the upstream data transfer
  160. // rate for a single client during the reduced time range. When 0,
  161. // LimitUpstreamBytesPerSecond is the limit.
  162. //
  163. // Rates for clients already connected when the reduced settings begin or
  164. // end will not change.
  165. ReducedLimitUpstreamBytesPerSecond int
  166. // ReducedLimitDownstreamBytesPerSecond limits the downstream data
  167. // transfer rate for a single client during the reduced time range. When
  168. // 0, LimitDownstreamBytesPerSecond is the limit.
  169. //
  170. // Rates for clients already connected when the reduced settings begin or
  171. // end will not change.
  172. ReducedLimitDownstreamBytesPerSecond int
  173. // ActivityUpdater specifies an ActivityUpdater for activity associated
  174. // with this proxy.
  175. ActivityUpdater ActivityUpdater
  176. }
  177. // RegionActivitySnapshot holds a point-in-time copy of per-region metrics.
  178. // This is used for the ActivityUpdater callback and notice serialization.
  179. type RegionActivitySnapshot struct {
  180. BytesUp int64 `json:"bytesUp"`
  181. BytesDown int64 `json:"bytesDown"`
  182. ConnectingClients int32 `json:"connectingClients"`
  183. ConnectedClients int32 `json:"connectedClients"`
  184. }
  185. // ActivityUpdater is a callback that is invoked when the proxy announces
  186. // availability, when clients connect and disconnect, and periodically with
  187. // data transfer updates (unless idle). This callback may be used to update
  188. // an activity UI. This callback should post this data to another thread or
  189. // handler and return immediately and not block on UI updates.
  190. //
  191. // The personalRegionActivity and commonRegionActivity parameters contain per-region
  192. // metrics (bytes transferred, connecting/connected counts) segmented by client
  193. // region.
  194. type ActivityUpdater func(
  195. announcing int32,
  196. connectingClients int32,
  197. connectedClients int32,
  198. bytesUp int64,
  199. bytesDown int64,
  200. bytesDuration time.Duration,
  201. personalRegionActivitySnapshot map[string]RegionActivitySnapshot,
  202. commonRegionActivitySnapshot map[string]RegionActivitySnapshot)
  203. // NewProxy initializes a new Proxy with the specified configuration.
  204. func NewProxy(config *ProxyConfig) (*Proxy, error) {
  205. // Check if there are no clients who can connect
  206. if config.MaxCommonClients+config.MaxPersonalClients <= 0 {
  207. return nil, errors.TraceNew("invalid MaxCommonClients")
  208. }
  209. p := &Proxy{
  210. config: config,
  211. personalRegionActivity: make(map[string]*RegionActivity),
  212. commonRegionActivity: make(map[string]*RegionActivity),
  213. }
  214. if config.ReducedStartTime != "" ||
  215. config.ReducedEndTime != "" ||
  216. config.ReducedMaxCommonClients > 0 {
  217. startMinute, err := common.ParseTimeOfDayMinutes(config.ReducedStartTime)
  218. if err != nil {
  219. return nil, errors.Tracef("invalid ReducedStartTime: %v", err)
  220. }
  221. endMinute, err := common.ParseTimeOfDayMinutes(config.ReducedEndTime)
  222. if err != nil {
  223. return nil, errors.Tracef("invalid ReducedEndTime: %v", err)
  224. }
  225. if startMinute == endMinute {
  226. return nil, errors.TraceNew("invalid ReducedStartTime/ReducedEndTime")
  227. }
  228. if config.ReducedMaxCommonClients <= 0 ||
  229. config.ReducedMaxCommonClients > config.MaxCommonClients {
  230. return nil, errors.TraceNew("invalid ReducedMaxCommonClients")
  231. }
  232. p.useReducedSettings = true
  233. p.reducedStartMinute = startMinute
  234. p.reducedEndMinute = endMinute
  235. }
  236. p.activityUpdateWrapper = &activityUpdateWrapper{p: p}
  237. return p, nil
  238. }
  239. // activityUpdateWrapper implements the psiphon/common.ActivityUpdater
  240. // interface and is used to receive bytes transferred updates from the
  241. // ActivityConns wrapping proxied traffic. A wrapper is used so that
  242. // UpdateProgress is not exported from Proxy.
  243. type activityUpdateWrapper struct {
  244. p *Proxy
  245. }
  246. func (w *activityUpdateWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
  247. w.p.bytesUp.Add(bytesWritten)
  248. w.p.bytesDown.Add(bytesRead)
  249. }
  250. // connectionActivityWrapper implements common.ActivityUpdater for a single
  251. // connection. It caches the RegionActivity pointer to enable atomic updates
  252. // with no mutex locking.
  253. type connectionActivityWrapper struct {
  254. p *Proxy
  255. regionActivity *RegionActivity
  256. }
  257. func (w *connectionActivityWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
  258. w.p.bytesUp.Add(bytesWritten)
  259. w.p.bytesDown.Add(bytesRead)
  260. if w.regionActivity != nil {
  261. w.regionActivity.bytesUp.Add(bytesWritten)
  262. w.regionActivity.bytesDown.Add(bytesRead)
  263. }
  264. }
  265. // Run runs the proxy. The proxy sends requests to the Broker announcing its
  266. // availability; the Broker matches the proxy with clients, and facilitates
  267. // an exchange of WebRTC connection information; the proxy and each client
  268. // attempt to establish a connection; and the client's traffic is relayed to
  269. // Psiphon server.
  270. //
  271. // Run ends when ctx is Done. A proxy run may continue across underlying
  272. // network changes assuming that the ProxyConfig GetBrokerClient and
  273. // MakeWebRTCDialCoordinator callbacks react to network changes and provide
  274. // instances that are reflect network changes.
  275. func (p *Proxy) Run(ctx context.Context) {
  276. // Run MaxClient proxying workers. Each worker handles one client at a time.
  277. proxyWaitGroup := new(sync.WaitGroup)
  278. // Capture activity updates every second, which is the required frequency
  279. // for PeakUp/DownstreamBytesPerSecond. This is also a reasonable
  280. // frequency for invoking the ActivityUpdater and updating UI widgets.
  281. proxyWaitGroup.Add(1)
  282. go func() {
  283. defer proxyWaitGroup.Done()
  284. p.lastAnnouncing = 0
  285. p.lastConnectingClients = 0
  286. p.lastConnectedClients = 0
  287. activityUpdatePeriod := 1 * time.Second
  288. ticker := time.NewTicker(activityUpdatePeriod)
  289. defer ticker.Stop()
  290. loop:
  291. for {
  292. select {
  293. case <-ticker.C:
  294. p.activityUpdate(activityUpdatePeriod)
  295. case <-ctx.Done():
  296. break loop
  297. }
  298. }
  299. }()
  300. // Launch the first proxy worker, passing a signal to be triggered once
  301. // the very first announcement round trip is complete. The first round
  302. // trip is awaited so that:
  303. //
  304. // - The first announce response will arrive with any new tactics,
  305. // which may be applied before launching additional workers.
  306. //
  307. // - The first worker gets no announcement delay and is also guaranteed to
  308. // be the shared session establisher. Since the announcement delays are
  309. // applied _after_ waitToShareSession, it would otherwise be possible,
  310. // with a race of MaxClient initial, concurrent announces, for the
  311. // session establisher to be a different worker than the no-delay worker.
  312. //
  313. // The first worker is the only proxy worker which sets
  314. // ProxyAnnounceRequest.CheckTactics/PreCheckTactics. PreCheckTactics is
  315. // used on the first announcement so the request returns immediately
  316. // without awaiting a match. This allows all workers to be launched
  317. // quickly.
  318. commonProxiesToCreate, personalProxiesToCreate :=
  319. p.config.MaxCommonClients, p.config.MaxPersonalClients
  320. // Doing this outside of the go routine to avoid race conditions
  321. firstWorkerIsPersonal := p.config.MaxCommonClients <= 0
  322. if firstWorkerIsPersonal {
  323. personalProxiesToCreate -= 1
  324. } else {
  325. commonProxiesToCreate -= 1
  326. }
  327. signalFirstAnnounceCtx, signalFirstAnnounceDone :=
  328. context.WithCancel(context.Background())
  329. proxyWaitGroup.Add(1)
  330. go func() {
  331. defer proxyWaitGroup.Done()
  332. p.proxyClients(ctx, signalFirstAnnounceDone, false, firstWorkerIsPersonal)
  333. }()
  334. select {
  335. case <-signalFirstAnnounceCtx.Done():
  336. case <-ctx.Done():
  337. return
  338. }
  339. // Launch the remaining workers.
  340. for i := 0; i < commonProxiesToCreate; i++ {
  341. isPersonal := false
  342. // When reduced settings are in effect, a subset of workers will pause
  343. // during the reduced time period. Since ReducedMaxCommonClients > 0 the
  344. // first proxy worker is never paused.
  345. workerNum := i + 1
  346. reducedPause := p.useReducedSettings &&
  347. workerNum >= p.config.ReducedMaxCommonClients
  348. proxyWaitGroup.Add(1)
  349. go func(reducedPause bool) {
  350. defer proxyWaitGroup.Done()
  351. p.proxyClients(ctx, nil, reducedPause, isPersonal)
  352. }(reducedPause)
  353. }
  354. for i := 0; i < personalProxiesToCreate; i++ {
  355. // Limitation: There are no reduced settings for personal proxies
  356. isPersonal := true
  357. proxyWaitGroup.Add(1)
  358. go func() {
  359. defer proxyWaitGroup.Done()
  360. p.proxyClients(ctx, nil, false, isPersonal)
  361. }()
  362. }
  363. proxyWaitGroup.Wait()
  364. }
  365. func (p *Proxy) activityUpdate(period time.Duration) {
  366. // Concurrency: activityUpdate is called by only the single goroutine
  367. // created in Run.
  368. announcing := p.announcing.Load()
  369. connectingClients := p.connectingClients.Load()
  370. connectedClients := p.connectedClients.Load()
  371. bytesUp := p.bytesUp.Swap(0)
  372. bytesDown := p.bytesDown.Swap(0)
  373. greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
  374. greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
  375. personalRegionActivity := p.snapshotAndResetRegionActivity(
  376. &p.personalStatsMutex, p.personalRegionActivity)
  377. commonRegionActivity := p.snapshotAndResetRegionActivity(
  378. &p.commonStatsMutex, p.commonRegionActivity)
  379. stateChanged := announcing != p.lastAnnouncing ||
  380. connectingClients != p.lastConnectingClients ||
  381. connectedClients != p.lastConnectedClients
  382. p.lastAnnouncing = announcing
  383. p.lastConnectingClients = connectingClients
  384. p.lastConnectedClients = connectedClients
  385. if !stateChanged &&
  386. bytesUp == 0 &&
  387. bytesDown == 0 {
  388. // Skip the activity callback on idle bytes or no change in worker state.
  389. return
  390. }
  391. p.config.ActivityUpdater(
  392. announcing,
  393. connectingClients,
  394. connectedClients,
  395. bytesUp,
  396. bytesDown,
  397. period,
  398. personalRegionActivity,
  399. commonRegionActivity)
  400. }
  401. // getOrCreateRegionActivity returns the RegionActivity for a region, creating it
  402. // if needed. This should be called once at connection start to avoid multiple
  403. // lock usage.
  404. func (p *Proxy) getOrCreateRegionActivity(region string, isPersonal bool) *RegionActivity {
  405. var mutex *sync.Mutex
  406. var statsMap map[string]*RegionActivity
  407. if isPersonal {
  408. mutex = &p.personalStatsMutex
  409. statsMap = p.personalRegionActivity
  410. } else {
  411. mutex = &p.commonStatsMutex
  412. statsMap = p.commonRegionActivity
  413. }
  414. mutex.Lock()
  415. defer mutex.Unlock()
  416. stats, exists := statsMap[region]
  417. if !exists {
  418. stats = &RegionActivity{}
  419. statsMap[region] = stats
  420. }
  421. return stats
  422. }
  423. // snapshotAndResetRegionActivity creates a copy of region stats with bytes reset
  424. // to zero, and prunes any entries that have no active connections and zero
  425. // bytes. The snapshot mechanism allows us to avoid holding locks during the
  426. // callback invocation.
  427. func (p *Proxy) snapshotAndResetRegionActivity(
  428. mutex *sync.Mutex,
  429. statsMap map[string]*RegionActivity,
  430. ) map[string]RegionActivitySnapshot {
  431. mutex.Lock()
  432. defer mutex.Unlock()
  433. result := make(map[string]RegionActivitySnapshot, len(statsMap))
  434. regionsToDelete := []string{}
  435. for region, stats := range statsMap {
  436. snapshot := RegionActivitySnapshot{
  437. BytesUp: stats.bytesUp.Swap(0),
  438. BytesDown: stats.bytesDown.Swap(0),
  439. ConnectingClients: stats.connectingClients.Load(),
  440. ConnectedClients: stats.connectedClients.Load(),
  441. }
  442. if snapshot.BytesUp > 0 || snapshot.BytesDown > 0 ||
  443. snapshot.ConnectingClients > 0 || snapshot.ConnectedClients > 0 {
  444. result[region] = snapshot
  445. } else {
  446. regionsToDelete = append(regionsToDelete, region)
  447. }
  448. }
  449. for _, region := range regionsToDelete {
  450. delete(statsMap, region)
  451. }
  452. return result
  453. }
  454. func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
  455. // Limitation: if there are two concurrent calls, the greater value could
  456. // get overwritten.
  457. old := addr.Load()
  458. if new > old {
  459. return addr.CompareAndSwap(old, new)
  460. }
  461. return false
  462. }
  463. func (p *Proxy) isReducedUntil() (int, time.Time) {
  464. if !p.useReducedSettings {
  465. return p.config.MaxCommonClients, time.Time{}
  466. }
  467. now := time.Now().UTC()
  468. minute := now.Hour()*60 + now.Minute()
  469. isReduced := false
  470. if p.reducedStartMinute < p.reducedEndMinute {
  471. isReduced = minute >= p.reducedStartMinute && minute < p.reducedEndMinute
  472. } else {
  473. isReduced = minute >= p.reducedStartMinute || minute < p.reducedEndMinute
  474. }
  475. if !isReduced {
  476. return p.config.MaxCommonClients, time.Time{}
  477. }
  478. endHour := p.reducedEndMinute / 60
  479. endMinute := p.reducedEndMinute % 60
  480. endTime := time.Date(
  481. now.Year(),
  482. now.Month(),
  483. now.Day(),
  484. endHour,
  485. endMinute,
  486. 0,
  487. 0,
  488. now.Location(),
  489. )
  490. if !endTime.After(now) {
  491. endTime = endTime.AddDate(0, 0, 1)
  492. }
  493. return p.config.ReducedMaxCommonClients, endTime
  494. }
  495. func (p *Proxy) getLimits() (int, int, common.RateLimits) {
  496. rateLimits := common.RateLimits{
  497. ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
  498. WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
  499. }
  500. maxCommonClients, reducedUntil := p.isReducedUntil()
  501. if !reducedUntil.IsZero() {
  502. upstream := p.config.ReducedLimitUpstreamBytesPerSecond
  503. if upstream == 0 {
  504. upstream = p.config.LimitUpstreamBytesPerSecond
  505. }
  506. downstream := p.config.ReducedLimitDownstreamBytesPerSecond
  507. if downstream == 0 {
  508. downstream = p.config.LimitDownstreamBytesPerSecond
  509. }
  510. rateLimits = common.RateLimits{
  511. ReadBytesPerSecond: int64(upstream),
  512. WriteBytesPerSecond: int64(downstream),
  513. }
  514. }
  515. maxPersonalClients := p.config.MaxPersonalClients
  516. return maxCommonClients, maxPersonalClients, rateLimits
  517. }
  518. // getAnnounceDelayParameters is a helper that fetches the proxy announcement
  519. // delay parameters from the current broker client.
  520. //
  521. // getAnnounceDelayParameters is used to configure a delay when
  522. // proxyOneClient fails. As having no broker clients is a possible
  523. // proxyOneClient failure case, GetBrokerClient errors are ignored here and
  524. // defaults used in that case.
  525. func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, float64) {
  526. brokerClient, err := p.config.GetBrokerClient()
  527. if err != nil {
  528. return proxyAnnounceDelay, proxyAnnounceMaxBackoffDelay, proxyAnnounceDelayJitter
  529. }
  530. brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
  531. return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
  532. common.ValueOrDefault(brokerCoordinator.AnnounceMaxBackoffDelay(), proxyAnnounceMaxBackoffDelay),
  533. common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
  534. }
  535. func (p *Proxy) proxyClients(
  536. ctx context.Context, signalAnnounceDone func(), reducedPause bool, isPersonal bool) {
  537. // Proxy one client, repeating until ctx is done.
  538. //
  539. // This worker starts with posting a long-polling announcement request.
  540. // The broker response with a matched client, and the proxy and client
  541. // attempt to establish a WebRTC connection for relaying traffic.
  542. //
  543. // Limitation: this design may not maximize the utility of the proxy,
  544. // since some proxy/client connections will fail at the WebRTC stage due
  545. // to NAT traversal failure, and at most MaxClient concurrent
  546. // establishments are attempted. Another scenario comes from the Psiphon
  547. // client horse race, which may start in-proxy dials but then abort them
  548. // when some other tunnel protocol succeeds.
  549. //
  550. // As a future enhancement, consider using M announcement goroutines and N
  551. // WebRTC dial goroutines. When an announcement gets a response,
  552. // immediately announce again unless there are already MaxClient active
  553. // connections established. This approach may require the proxy to
  554. // backpedal and reject connections when establishment is too successful.
  555. //
  556. // Another enhancement could be a signal from the client, to the broker,
  557. // relayed to the proxy, when a dial is aborted.
  558. failureDelayFactor := time.Duration(1)
  559. // To reduce diagnostic log noise, only log an initial sample of
  560. // announcement request timings (delays/elapsed time) and a periodic
  561. // sample of repeating errors such as "no match".
  562. logAnnounceCount := proxyAnnounceLogSampleSize
  563. logErrorsCount := proxyAnnounceLogSampleSize
  564. lastErrMsg := ""
  565. startLogSampleTime := time.Now()
  566. logAnnounce := func() bool {
  567. if logAnnounceCount > 0 {
  568. logAnnounceCount -= 1
  569. return true
  570. }
  571. return false
  572. }
  573. preCheckTacticsDone := false
  574. for ctx.Err() == nil {
  575. if !p.config.WaitForNetworkConnectivity() {
  576. break
  577. }
  578. // Pause designated workers during the reduced time range. In-flight
  579. // announces are not interrupted and connected clients are not
  580. // disconnected, so there is a gradual transition into reduced mode.
  581. if reducedPause {
  582. _, reducedUntil := p.isReducedUntil()
  583. if !reducedUntil.IsZero() {
  584. pauseDuration := time.Until(reducedUntil)
  585. p.config.Logger.WithTraceFields(common.LogFields{
  586. "duration": pauseDuration.String(),
  587. }).Info("pause worker")
  588. timer := time.NewTimer(pauseDuration)
  589. select {
  590. case <-timer.C:
  591. case <-ctx.Done():
  592. }
  593. timer.Stop()
  594. if ctx.Err() != nil {
  595. break
  596. }
  597. }
  598. }
  599. if time.Since(startLogSampleTime) >= proxyAnnounceLogSamplePeriod {
  600. logAnnounceCount = proxyAnnounceLogSampleSize
  601. logErrorsCount = proxyAnnounceLogSampleSize
  602. lastErrMsg = ""
  603. startLogSampleTime = time.Now()
  604. }
  605. backOff, err := p.proxyOneClient(
  606. ctx, logAnnounce, &preCheckTacticsDone, signalAnnounceDone, isPersonal)
  607. if !backOff || err == nil {
  608. failureDelayFactor = 1
  609. }
  610. if err != nil && ctx.Err() == nil {
  611. // Apply a simple exponential backoff based on whether
  612. // proxyOneClient either relayed client traffic or got no match,
  613. // or encountered a failure.
  614. //
  615. // The proxyOneClient failure could range from local
  616. // configuration (no broker clients) to network issues(failure to
  617. // completely establish WebRTC connection) and this backoff
  618. // prevents both excess local logging and churning in the former
  619. // case and excessive bad service to clients or unintentionally
  620. // overloading the broker in the latter case.
  621. delay, maxBackoffDelay, jitter := p.getAnnounceDelayParameters()
  622. delay = delay * failureDelayFactor
  623. if delay > maxBackoffDelay {
  624. delay = maxBackoffDelay
  625. }
  626. if failureDelayFactor < 1<<20 {
  627. failureDelayFactor *= 2
  628. }
  629. // Sample error log.
  630. //
  631. // Limitation: the lastErrMsg string comparison isn't compatible
  632. // with errors with minor variations, such as "unexpected
  633. // response status code %d after %v" from
  634. // InproxyBrokerRoundTripper.RoundTrip, with a time duration in
  635. // the second parameter.
  636. errMsg := err.Error()
  637. if lastErrMsg != errMsg {
  638. logErrorsCount = proxyAnnounceLogSampleSize
  639. lastErrMsg = errMsg
  640. }
  641. if logErrorsCount > 0 {
  642. p.config.Logger.WithTraceFields(
  643. common.LogFields{
  644. "error": errMsg,
  645. "delay": delay.String(),
  646. "jitter": jitter,
  647. }).Error("proxy client failed")
  648. logErrorsCount -= 1
  649. }
  650. common.SleepWithJitter(ctx, delay, jitter)
  651. }
  652. }
  653. }
  654. // resetNetworkDiscovery resets the network discovery state, which will force
  655. // another network discovery when doNetworkDiscovery is invoked.
  656. // resetNetworkDiscovery is called when new tactics have been received from
  657. // the broker, as new tactics may change parameters that control network
  658. // discovery.
  659. func (p *Proxy) resetNetworkDiscovery() {
  660. p.networkDiscoveryMutex.Lock()
  661. defer p.networkDiscoveryMutex.Unlock()
  662. p.networkDiscoveryRunOnce = false
  663. p.networkDiscoveryNetworkID = ""
  664. }
  665. func (p *Proxy) doNetworkDiscovery(
  666. ctx context.Context,
  667. webRTCCoordinator WebRTCDialCoordinator) {
  668. // Allow only one concurrent network discovery. In practise, this may
  669. // block all other proxyOneClient goroutines while one single goroutine
  670. // runs doNetworkDiscovery. Subsequently, all other goroutines will find
  671. // networkDiscoveryRunOnce is true and use the cached results.
  672. p.networkDiscoveryMutex.Lock()
  673. defer p.networkDiscoveryMutex.Unlock()
  674. networkID := webRTCCoordinator.NetworkID()
  675. if p.networkDiscoveryRunOnce &&
  676. p.networkDiscoveryNetworkID == networkID {
  677. // Already ran discovery for this network.
  678. //
  679. // TODO: periodically re-probe for port mapping services?
  680. return
  681. }
  682. // Reset and configure port mapper component, as required. See
  683. // initPortMapper comment.
  684. initPortMapper(webRTCCoordinator)
  685. // Gather local network NAT/port mapping metrics and configuration before
  686. // sending any announce requests. NAT topology metrics are used by the
  687. // Broker to optimize client and in-proxy matching. Unlike the client, we
  688. // always perform this synchronous step here, since waiting doesn't
  689. // necessarily block a client tunnel dial.
  690. waitGroup := new(sync.WaitGroup)
  691. waitGroup.Add(1)
  692. go func() {
  693. defer waitGroup.Done()
  694. // NATDiscover may use cached NAT type/port mapping values from
  695. // DialParameters, based on the network ID. If discovery is not
  696. // successful, the proxy still proceeds to announce.
  697. NATDiscover(
  698. ctx,
  699. &NATDiscoverConfig{
  700. Logger: p.config.Logger,
  701. WebRTCDialCoordinator: webRTCCoordinator,
  702. })
  703. }()
  704. waitGroup.Wait()
  705. p.networkDiscoveryRunOnce = true
  706. p.networkDiscoveryNetworkID = networkID
  707. }
  708. func (p *Proxy) proxyOneClient(
  709. ctx context.Context,
  710. logAnnounce func() bool,
  711. preCheckTacticsDone *bool,
  712. signalAnnounceDone func(),
  713. isPersonal bool) (bool, error) {
  714. // Cancel/close this connection immediately if the network changes.
  715. if p.config.GetCurrentNetworkContext != nil {
  716. var cancelFunc context.CancelFunc
  717. ctx, cancelFunc = common.MergeContextCancel(
  718. ctx, p.config.GetCurrentNetworkContext())
  719. defer cancelFunc()
  720. }
  721. // Do not trigger back-off unless the proxy successfully announces and
  722. // only then performs poorly.
  723. //
  724. // A no-match response should not trigger back-off, nor should broker
  725. // request transport errors which may include non-200 responses due to
  726. // CDN timeout mismatches or TLS errors due to CDN TLS fingerprint
  727. // incompatibility.
  728. backOff := false
  729. // Get a new WebRTCDialCoordinator, which should be configured with the
  730. // latest network tactics.
  731. webRTCCoordinator, err := p.config.MakeWebRTCDialCoordinator()
  732. if err != nil {
  733. return backOff, errors.Trace(err)
  734. }
  735. // Perform network discovery, to determine NAT type and other network
  736. // topology information that is reported to the broker in the proxy
  737. // announcement and used to optimize proxy/client matching. Unlike
  738. // clients, which can't easily delay dials in the tunnel establishment
  739. // horse race, proxies will always perform network discovery.
  740. // doNetworkDiscovery allows only one concurrent discovery and caches
  741. // results for the current network (as determined by
  742. // WebRTCCoordinator.GetNetworkID), so when multiple proxyOneClient
  743. // goroutines call doNetworkDiscovery, at most one discovery is performed
  744. // per network.
  745. p.doNetworkDiscovery(ctx, webRTCCoordinator)
  746. // Send the announce request
  747. // At this point, no NAT traversal operations have been performed by the
  748. // proxy, since its announcement may sit idle for the long-polling period
  749. // and NAT hole punches or port mappings could expire before the
  750. // long-polling period.
  751. //
  752. // As a future enhancement, the proxy could begin gathering WebRTC ICE
  753. // candidates while awaiting a client match, reducing the turn around
  754. // time after a match. This would make sense if there's high demand for
  755. // proxies, and so hole punches unlikely to expire while awaiting a client match.
  756. //
  757. // Another possibility may be to prepare and send a full offer SDP in the
  758. // announcment; and have the broker modify either the proxy or client
  759. // offer SDP to produce an answer SDP. In this case, the entire
  760. // ProxyAnswerRequest could be skipped as the WebRTC dial can begin after
  761. // the ProxyAnnounceRequest response (and ClientOfferRequest response).
  762. //
  763. // Furthermore, if a port mapping can be established, instead of using
  764. // WebRTC the proxy could run a Psiphon tunnel protocol listener at the
  765. // mapped port and send the dial information -- including some secret to
  766. // authenticate the client -- in its announcement. The client would then
  767. // receive this direct dial information from the broker and connect. The
  768. // proxy should be able to send keep alives to extend the port mapping
  769. // lifetime.
  770. brokerClient, err := p.config.GetBrokerClient()
  771. if err != nil {
  772. return backOff, errors.Trace(err)
  773. }
  774. brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
  775. // Only the first worker, which has signalAnnounceDone configured, checks
  776. // for tactics.
  777. checkTactics := signalAnnounceDone != nil && *preCheckTacticsDone
  778. preCheckTactics := signalAnnounceDone != nil && !*preCheckTacticsDone
  779. maxCommonClients, maxPersonalClients, rateLimits := p.getLimits()
  780. // Get the base Psiphon API parameters and additional proxy metrics,
  781. // including performance information, which is sent to the broker in the
  782. // proxy announcment.
  783. //
  784. // tacticsNetworkID is the exact network ID that corresponds to the
  785. // tactics tag sent in the base parameters; this is passed to
  786. // HandleTacticsPayload in order to double check that any tactics
  787. // returned in the proxy announcment response are associated and stored
  788. // with the original network ID.
  789. metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
  790. checkTactics || preCheckTactics,
  791. brokerCoordinator,
  792. webRTCCoordinator,
  793. maxCommonClients,
  794. maxPersonalClients,
  795. rateLimits)
  796. if err != nil {
  797. return backOff, errors.Trace(err)
  798. }
  799. // Set a delay before announcing, to stagger the announce request times.
  800. // The delay helps to avoid triggering rate limits or similar errors from
  801. // any intermediate CDN between the proxy and the broker; and provides a
  802. // nudge towards better load balancing across multiple large
  803. // MaxCommonClients proxies, as the broker primarily matches enqueued
  804. // announces in FIFO order, since older announces expire earlier.
  805. //
  806. // The delay is intended to be applied after doNetworkDiscovery, which has
  807. // no reason to be delayed; and also after any waitToShareSession delay,
  808. // as delaying before waitToShareSession can result in the announce
  809. // request times collapsing back together. Delaying after
  810. // waitToShareSession is handled by brokerClient.ProxyAnnounce, which
  811. // will also extend the base request timeout, as required, to account for
  812. // any deliberate delay.
  813. requestDelay := time.Duration(0)
  814. announceDelay, _, announceDelayJitter := p.getAnnounceDelayParameters()
  815. p.nextAnnounceMutex.Lock()
  816. nextDelay := prng.JitterDuration(announceDelay, announceDelayJitter)
  817. if p.nextAnnounceBrokerClient != brokerClient {
  818. // Reset the delay when the broker client changes.
  819. p.nextAnnounceNotBefore = time.Time{}
  820. p.nextAnnounceBrokerClient = brokerClient
  821. }
  822. if p.nextAnnounceNotBefore.IsZero() {
  823. p.nextAnnounceNotBefore = time.Now().Add(nextDelay)
  824. // No delay for the very first announce request, so leave
  825. // announceRequestDelay set to 0.
  826. } else {
  827. requestDelay = time.Until(p.nextAnnounceNotBefore)
  828. if requestDelay < 0 {
  829. // This announce did not arrive until after the next delay already
  830. // passed, so proceed with no delay.
  831. p.nextAnnounceNotBefore = time.Now().Add(nextDelay)
  832. requestDelay = 0
  833. } else {
  834. p.nextAnnounceNotBefore = p.nextAnnounceNotBefore.Add(nextDelay)
  835. }
  836. }
  837. p.nextAnnounceMutex.Unlock()
  838. // A proxy ID is implicitly sent with requests; it's the proxy's session
  839. // public key.
  840. //
  841. // ProxyAnnounce applies an additional request timeout to facilitate
  842. // long-polling.
  843. p.announcing.Add(1)
  844. announceStartTime := time.Now()
  845. // Ignore the personalCompartmentIDs if this proxy is not personal
  846. var personalCompartmentIDs []ID
  847. if isPersonal {
  848. personalCompartmentIDs = brokerCoordinator.PersonalCompartmentIDs()
  849. }
  850. announceResponse, err := brokerClient.ProxyAnnounce(
  851. ctx,
  852. requestDelay,
  853. &ProxyAnnounceRequest{
  854. PersonalCompartmentIDs: personalCompartmentIDs,
  855. Metrics: metrics,
  856. CheckTactics: checkTactics,
  857. PreCheckTactics: preCheckTactics,
  858. })
  859. if logAnnounce() {
  860. p.config.Logger.WithTraceFields(common.LogFields{
  861. "delay": requestDelay.String(),
  862. "elapsedTime": time.Since(announceStartTime).String(),
  863. }).Info("announcement request")
  864. }
  865. p.announcing.Add(-1)
  866. if err != nil {
  867. return backOff, errors.Trace(err)
  868. }
  869. if len(announceResponse.TacticsPayload) > 0 {
  870. // The TacticsPayload may include new tactics, or may simply signal,
  871. // to the Psiphon client, that its tactics tag remains up-to-date and
  872. // to extend cached tactics TTL. HandleTacticsPayload returns true
  873. // when tactics haved changed; in this case we clear cached network
  874. // discovery but proceed with handling the proxy announcement
  875. // response as there may still be a match.
  876. if p.config.HandleTacticsPayload(
  877. tacticsNetworkID,
  878. compressTactics,
  879. announceResponse.TacticsPayload) {
  880. p.resetNetworkDiscovery()
  881. }
  882. }
  883. // Signal that the announce round trip is complete, allowing other workers
  884. // to launch. At this point, the broker Noise session should be established
  885. // and any fresh tactics applied. Also toggle preCheckTacticsDone since
  886. // there's no need to retry PreCheckTactics once a round trip succeeds.
  887. if signalAnnounceDone != nil {
  888. signalAnnounceDone()
  889. }
  890. if preCheckTactics {
  891. *preCheckTacticsDone = true
  892. }
  893. // MustUpgrade has precedence over other cases, to ensure the callback is
  894. // invoked. Trigger back-off back off when rate/entry limited or must
  895. // upgrade; no back-off for no-match.
  896. if announceResponse.MustUpgrade {
  897. if p.config.MustUpgrade != nil {
  898. p.config.MustUpgrade()
  899. }
  900. backOff = true
  901. return backOff, errors.TraceNew("must upgrade")
  902. } else if announceResponse.Limited {
  903. backOff = true
  904. return backOff, errors.TraceNew("limited")
  905. } else if announceResponse.NoMatch {
  906. // No backoff for no-match.
  907. //
  908. // This is also the expected response for CheckTactics with a tactics
  909. // payload and PreCheckTactics with or without a tactics payload,
  910. // distinct cases which should not back off.
  911. return backOff, errors.TraceNew("no match")
  912. }
  913. if preCheckTactics && !announceResponse.NoMatch {
  914. // Sanity check: the broker should always respond with no-match for
  915. // PreCheckTactics.
  916. return backOff, errors.TraceNew("unexpected PreCheckTactics response")
  917. }
  918. if announceResponse.SelectedProtocolVersion < ProtocolVersion1 ||
  919. (announceResponse.UseMediaStreams &&
  920. announceResponse.SelectedProtocolVersion < ProtocolVersion2) ||
  921. announceResponse.SelectedProtocolVersion > LatestProtocolVersion {
  922. backOff = true
  923. return backOff, errors.Tracef(
  924. "unsupported protocol version: %d",
  925. announceResponse.SelectedProtocolVersion)
  926. }
  927. clientRegion := announceResponse.ClientRegion
  928. var regionActivity *RegionActivity
  929. if clientRegion != "" {
  930. regionActivity = p.getOrCreateRegionActivity(clientRegion, isPersonal)
  931. }
  932. // Create per-connection activity wrapper with cached regionActivity pointer
  933. connActivityWrapper := &connectionActivityWrapper{
  934. p: p,
  935. regionActivity: regionActivity,
  936. }
  937. // Trigger back-off if the following WebRTC operations fail to establish a
  938. // connections.
  939. backOff = true
  940. // For activity updates, indicate that a client connection is now underway.
  941. p.connectingClients.Add(1)
  942. if regionActivity != nil {
  943. regionActivity.connectingClients.Add(1)
  944. }
  945. connected := false
  946. defer func() {
  947. if !connected {
  948. p.connectingClients.Add(-1)
  949. if regionActivity != nil {
  950. regionActivity.connectingClients.Add(-1)
  951. }
  952. }
  953. }()
  954. // Initialize WebRTC using the client's offer SDP
  955. webRTCAnswerCtx, webRTCAnswerCancelFunc := context.WithTimeout(
  956. ctx, common.ValueOrDefault(webRTCCoordinator.WebRTCAnswerTimeout(), proxyWebRTCAnswerTimeout))
  957. defer webRTCAnswerCancelFunc()
  958. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  959. // included in SDPs.
  960. hasPersonalCompartmentIDs := len(personalCompartmentIDs) > 0
  961. webRTCConn, SDP, sdpMetrics, webRTCErr := newWebRTCConnForAnswer(
  962. webRTCAnswerCtx,
  963. &webRTCConfig{
  964. Logger: p.config.Logger,
  965. EnableDebugLogging: p.config.EnableWebRTCDebugLogging,
  966. WebRTCDialCoordinator: webRTCCoordinator,
  967. ClientRootObfuscationSecret: announceResponse.ClientRootObfuscationSecret,
  968. DoDTLSRandomization: announceResponse.DoDTLSRandomization,
  969. UseMediaStreams: announceResponse.UseMediaStreams,
  970. TrafficShapingParameters: announceResponse.TrafficShapingParameters,
  971. // In media stream mode, this flag indicates to the proxy that it
  972. // should add the QUIC-based reliability layer wrapping to media
  973. // streams. In data channel mode, this flag is ignored, since the
  974. // client configures the data channel using
  975. // webrtc.DataChannelInit.Ordered, and this configuration is sent
  976. // to the proxy in the client's SDP.
  977. ReliableTransport: announceResponse.NetworkProtocol == NetworkProtocolTCP,
  978. },
  979. announceResponse.ClientOfferSDP,
  980. hasPersonalCompartmentIDs)
  981. var webRTCRequestErr string
  982. if webRTCErr != nil {
  983. webRTCErr = errors.Trace(webRTCErr)
  984. webRTCRequestErr = webRTCErr.Error()
  985. SDP = WebRTCSessionDescription{}
  986. sdpMetrics = &webRTCSDPMetrics{}
  987. // Continue to report the error to the broker. The broker will respond
  988. // with failure to the client's offer request.
  989. } else {
  990. defer webRTCConn.Close()
  991. }
  992. // Send answer request with SDP or error.
  993. answerResponse, err := brokerClient.ProxyAnswer(
  994. ctx,
  995. &ProxyAnswerRequest{
  996. ConnectionID: announceResponse.ConnectionID,
  997. ProxyAnswerSDP: SDP,
  998. ICECandidateTypes: sdpMetrics.iceCandidateTypes,
  999. AnswerError: webRTCRequestErr,
  1000. })
  1001. if err != nil {
  1002. if webRTCErr != nil {
  1003. // Prioritize returning any WebRTC error for logging.
  1004. return backOff, webRTCErr
  1005. }
  1006. // Don't backoff if the answer request fails due to possible transient
  1007. // request transport errors.
  1008. backOff = false
  1009. return backOff, errors.Trace(err)
  1010. }
  1011. // Now that an answer is sent, stop if WebRTC initialization failed.
  1012. if webRTCErr != nil {
  1013. return backOff, webRTCErr
  1014. }
  1015. // Exit if the client was no longer awaiting the answer. There is no
  1016. // backoff in this case, and there's no error, as the proxy did not fail
  1017. // as it's not an unexpected outcome.
  1018. //
  1019. // Limitation: it's possible that the announce request responds quickly
  1020. // and the matched client offer is already close to timing out. The
  1021. // answer request will also respond quickly. There's an increased chance
  1022. // of hitting rate limits in this fast turn around scenario. This outcome
  1023. // is mitigated by InproxyBrokerMatcherOfferMinimumDeadline.
  1024. if answerResponse.NoAwaitingClient {
  1025. backOff = false
  1026. return backOff, nil
  1027. }
  1028. // Await the WebRTC connection.
  1029. // We could concurrently dial the destination, to have that network
  1030. // connection available immediately once the WebRTC channel is
  1031. // established. This would work only for TCP, not UDP, network protocols
  1032. // and could only include the TCP connection, as client traffic is
  1033. // required for all higher layers such as TLS, SSH, etc. This could also
  1034. // create wasted load on destination Psiphon servers, particularly when
  1035. // WebRTC connections fail.
  1036. awaitReadyToProxyCtx, awaitReadyToProxyCancelFunc := context.WithTimeout(
  1037. ctx,
  1038. common.ValueOrDefault(
  1039. webRTCCoordinator.WebRTCAwaitReadyToProxyTimeout(), readyToProxyAwaitTimeout))
  1040. defer awaitReadyToProxyCancelFunc()
  1041. err = webRTCConn.AwaitReadyToProxy(awaitReadyToProxyCtx, announceResponse.ConnectionID)
  1042. if err != nil {
  1043. return backOff, errors.Trace(err)
  1044. }
  1045. // Dial the destination, a Psiphon server. The broker validates that the
  1046. // dial destination is a Psiphon server.
  1047. destinationDialContext, destinationDialCancelFunc := context.WithTimeout(
  1048. ctx,
  1049. common.ValueOrDefault(
  1050. webRTCCoordinator.ProxyDestinationDialTimeout(), proxyDestinationDialTimeout))
  1051. defer destinationDialCancelFunc()
  1052. // Use the custom resolver when resolving destination hostnames, such as
  1053. // those used in domain fronted protocols.
  1054. //
  1055. // - Resolving at the in-proxy should yield a more optimal CDN edge, vs.
  1056. // resolving at the client.
  1057. //
  1058. // - Sending unresolved hostnames to in-proxies can expose some domain
  1059. // fronting configuration. This can be mitigated by enabling domain
  1060. // fronting on this 2nd hop only when the in-proxy is located in a
  1061. // region that may be censored or blocked; this is to be enforced by
  1062. // the broker.
  1063. //
  1064. // - Any DNSResolverPreresolved tactics applied will be relative to the
  1065. // in-proxy location.
  1066. destinationAddress, err := webRTCCoordinator.ResolveAddress(
  1067. ctx, "ip", announceResponse.DestinationAddress)
  1068. if err != nil {
  1069. return backOff, errors.Trace(err)
  1070. }
  1071. destinationConn, err := webRTCCoordinator.ProxyUpstreamDial(
  1072. destinationDialContext,
  1073. announceResponse.NetworkProtocol.String(),
  1074. destinationAddress)
  1075. if err != nil {
  1076. return backOff, errors.Trace(err)
  1077. }
  1078. defer destinationConn.Close()
  1079. // For activity updates, indicate that a client connection is established.
  1080. connected = true
  1081. p.connectingClients.Add(-1)
  1082. p.connectedClients.Add(1)
  1083. if regionActivity != nil {
  1084. regionActivity.connectingClients.Add(-1)
  1085. regionActivity.connectedClients.Add(1)
  1086. }
  1087. defer func() {
  1088. p.connectedClients.Add(-1)
  1089. if regionActivity != nil {
  1090. regionActivity.connectedClients.Add(-1)
  1091. }
  1092. }()
  1093. // Throttle the relay connection.
  1094. //
  1095. // Here, each client gets LimitUp/DownstreamBytesPerSecond. Proxy
  1096. // operators may to want to limit their bandwidth usage with a single
  1097. // up/down value, an overall limit. The ProxyConfig can simply be
  1098. // generated by dividing the limit by MaxCommonClients + MaxPersonalClients.
  1099. // This approach favors performance stability: each client gets the
  1100. // same throttling limits regardless of how many other clients are connected.
  1101. //
  1102. // Rate limits are applied only when a client connection is established;
  1103. // connected clients retain their initial limits even when reduced time
  1104. // starts or ends.
  1105. destinationConn = common.NewThrottledConn(
  1106. destinationConn,
  1107. announceResponse.NetworkProtocol.IsStream(),
  1108. rateLimits)
  1109. // Hook up bytes transferred counting for activity updates.
  1110. // The ActivityMonitoredConn inactivity timeout is configured. For
  1111. // upstream TCP connections, the destinationConn will close when the TCP
  1112. // connection to the Psiphon server closes. But for upstream UDP flows,
  1113. // the relay does not know when the upstream "connection" has closed.
  1114. // Well-behaved clients will close the WebRTC half of the relay when
  1115. // those clients know the UDP-based tunnel protocol connection is closed;
  1116. // the inactivity timeout handles the remaining cases.
  1117. inactivityTimeout :=
  1118. common.ValueOrDefault(
  1119. webRTCCoordinator.ProxyRelayInactivityTimeout(),
  1120. proxyRelayInactivityTimeout)
  1121. destinationConn, err = common.NewActivityMonitoredConn(
  1122. destinationConn, inactivityTimeout, false, nil, connActivityWrapper)
  1123. if err != nil {
  1124. return backOff, errors.Trace(err)
  1125. }
  1126. // Relay the client traffic to the destination. The client traffic is a
  1127. // standard Psiphon tunnel protocol destinated to a Psiphon server. Any
  1128. // blocking/censorship at the 2nd hop will be mitigated by the use of
  1129. // Psiphon circumvention protocols and techniques.
  1130. // Limitation: clients may apply fragmentation to traffic relayed over the
  1131. // data channel, and there's no guarantee that the fragmentation write
  1132. // sizes or delays will carry over to the egress side.
  1133. // The proxy operator's ISP may be able to observe that the operator's
  1134. // host has nearly matching ingress and egress traffic. The traffic
  1135. // content won't be the same: the ingress traffic is wrapped in a WebRTC
  1136. // data channel, and the egress traffic is a Psiphon tunnel protocol.
  1137. // With padding and decoy packets, the ingress and egress traffic shape
  1138. // will differ beyond the basic WebRTC overheader. Even with this
  1139. // measure, over time the number of bytes in and out of the proxy may
  1140. // still indicate proxying.
  1141. waitGroup := new(sync.WaitGroup)
  1142. relayErrors := make(chan error, 2)
  1143. var relayedUp, relayedDown int32
  1144. waitGroup.Add(1)
  1145. go func() {
  1146. defer waitGroup.Done()
  1147. // WebRTC data channels are based on SCTP, which is actually
  1148. // message-based, not a stream. The (default) max message size for
  1149. // pion/sctp is 65536:
  1150. // https://github.com/pion/sctp/blob/44ed465396c880e379aae9c1bf81809a9e06b580/association.go#L52.
  1151. //
  1152. // As io.Copy uses a buffer size of 32K, each relayed message will be
  1153. // less than the maximum. Calls to ClientConn.Write are also expected
  1154. // to use io.Copy, keeping messages at most 32K in size.
  1155. // io.Copy doesn't return an error on EOF, but we still want to signal
  1156. // that relaying is done, so in this case a nil error is sent to the
  1157. // channel.
  1158. //
  1159. // Limitation: if one io.Copy goproutine sends nil and the other
  1160. // io.Copy goroutine sends a non-nil error concurrently, the non-nil
  1161. // error isn't prioritized.
  1162. n, err := io.Copy(webRTCConn, destinationConn)
  1163. if n > 0 {
  1164. atomic.StoreInt32(&relayedDown, 1)
  1165. }
  1166. relayErrors <- errors.Trace(err)
  1167. }()
  1168. waitGroup.Add(1)
  1169. go func() {
  1170. defer waitGroup.Done()
  1171. n, err := io.Copy(destinationConn, webRTCConn)
  1172. if n > 0 {
  1173. atomic.StoreInt32(&relayedUp, 1)
  1174. }
  1175. relayErrors <- errors.Trace(err)
  1176. }()
  1177. select {
  1178. case err = <-relayErrors:
  1179. case <-ctx.Done():
  1180. }
  1181. // Interrupt the relay goroutines by closing the connections.
  1182. webRTCConn.Close()
  1183. destinationConn.Close()
  1184. waitGroup.Wait()
  1185. p.config.Logger.WithTraceFields(common.LogFields{
  1186. "connectionID": announceResponse.ConnectionID,
  1187. }).Info("connection closed")
  1188. // Don't apply a back-off delay to the next announcement since this
  1189. // iteration successfully relayed bytes.
  1190. if atomic.LoadInt32(&relayedUp) == 1 || atomic.LoadInt32(&relayedDown) == 1 {
  1191. backOff = false
  1192. }
  1193. return backOff, err
  1194. }
  1195. func (p *Proxy) getMetrics(
  1196. includeTacticsParameters bool,
  1197. brokerCoordinator BrokerDialCoordinator,
  1198. webRTCCoordinator WebRTCDialCoordinator,
  1199. maxCommonClients int,
  1200. maxPersonalClients int,
  1201. rateLimits common.RateLimits) (
  1202. *ProxyMetrics, string, bool, error) {
  1203. // tacticsNetworkID records the exact network ID that corresponds to the
  1204. // tactics tag sent in the base parameters, and is used when applying any
  1205. // new tactics returned by the broker.
  1206. baseParams, tacticsNetworkID, err := p.config.GetBaseAPIParameters(
  1207. includeTacticsParameters)
  1208. if err != nil {
  1209. return nil, "", false, errors.Trace(err)
  1210. }
  1211. apiParams := common.APIParameters{}
  1212. apiParams.Add(baseParams)
  1213. apiParams.Add(common.APIParameters(brokerCoordinator.MetricsForBrokerRequests()))
  1214. compressTactics := protocol.GetCompressTactics(apiParams)
  1215. packedParams, err := protocol.EncodePackedAPIParameters(apiParams)
  1216. if err != nil {
  1217. return nil, "", false, errors.Trace(err)
  1218. }
  1219. return &ProxyMetrics{
  1220. BaseAPIParameters: packedParams,
  1221. ProtocolVersion: LatestProtocolVersion,
  1222. NATType: webRTCCoordinator.NATType(),
  1223. PortMappingTypes: webRTCCoordinator.PortMappingTypes(),
  1224. MaxCommonClients: int32(maxCommonClients),
  1225. MaxPersonalClients: int32(maxPersonalClients),
  1226. ConnectingClients: p.connectingClients.Load(),
  1227. ConnectedClients: p.connectedClients.Load(),
  1228. LimitUpstreamBytesPerSecond: rateLimits.ReadBytesPerSecond,
  1229. LimitDownstreamBytesPerSecond: rateLimits.WriteBytesPerSecond,
  1230. PeakUpstreamBytesPerSecond: p.peakBytesUp.Load(),
  1231. PeakDownstreamBytesPerSecond: p.peakBytesDown.Load(),
  1232. }, tacticsNetworkID, compressTactics, nil
  1233. }