proxy.go 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "io"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  30. )
  31. const (
  32. proxyAnnounceDelay = 1 * time.Second
  33. proxyAnnounceDelayJitter = 0.5
  34. proxyAnnounceMaxBackoffDelay = 1 * time.Minute
  35. proxyAnnounceLogSampleSize = 2
  36. proxyAnnounceLogSamplePeriod = 30 * time.Minute
  37. proxyWebRTCAnswerTimeout = 20 * time.Second
  38. proxyDestinationDialTimeout = 20 * time.Second
  39. proxyRelayInactivityTimeout = 5 * time.Minute
  40. )
  41. // Proxy is the in-proxy proxying component, which relays traffic from a
  42. // client to a Psiphon server.
  43. type Proxy struct {
  44. bytesUp atomic.Int64
  45. bytesDown atomic.Int64
  46. peakBytesUp atomic.Int64
  47. peakBytesDown atomic.Int64
  48. announcing atomic.Int32
  49. connectingClients atomic.Int32
  50. connectedClients atomic.Int32
  51. config *ProxyConfig
  52. activityUpdateWrapper *activityUpdateWrapper
  53. lastAnnouncing int32
  54. lastConnectingClients int32
  55. lastConnectedClients int32
  56. networkDiscoveryMutex sync.Mutex
  57. networkDiscoveryRunOnce bool
  58. networkDiscoveryNetworkID string
  59. nextAnnounceMutex sync.Mutex
  60. nextAnnounceBrokerClient *BrokerClient
  61. nextAnnounceNotBefore time.Time
  62. useReducedSettings bool
  63. reducedStartMinute int
  64. reducedEndMinute int
  65. }
  66. // TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
  67. // configured, permanent port mappings.
  68. // ProxyConfig specifies the configuration for a Proxy run.
  69. type ProxyConfig struct {
  70. // Logger is used to log events.
  71. Logger common.Logger
  72. // EnableWebRTCDebugLogging indicates whether to emit WebRTC debug logs.
  73. EnableWebRTCDebugLogging bool
  74. // WaitForNetworkConnectivity is a callback that should block until there
  75. // is network connectivity or shutdown. The return value is true when
  76. // there is network connectivity, and false for shutdown.
  77. WaitForNetworkConnectivity func() bool
  78. // GetCurrentNetworkContext is a callback that returns a context tied to
  79. // the lifetime of the host's current active network interface. If the
  80. // active network changes, the previous context returned by
  81. // GetCurrentNetworkContext should cancel. This context is used to
  82. // immediately cancel/close individual connections when the active
  83. // network changes.
  84. GetCurrentNetworkContext func() context.Context
  85. // GetBrokerClient provides a BrokerClient which the proxy will use for
  86. // making broker requests. If GetBrokerClient returns a shared
  87. // BrokerClient instance, the BrokerClient must support multiple,
  88. // concurrent round trips, as the proxy will use it to concurrently
  89. // announce many proxy instances. The BrokerClient should be implemented
  90. // using multiplexing over a shared network connection -- for example,
  91. // HTTP/2 -- and a shared broker session for optimal performance.
  92. GetBrokerClient func() (*BrokerClient, error)
  93. // GetBaseAPIParameters returns Psiphon API parameters to be sent to and
  94. // logged by the broker. Expected parameters include client/proxy
  95. // application and build version information. GetBaseAPIParameters also
  96. // returns the network ID, corresponding to the parameters, to be used in
  97. // tactics logic; the network ID is not sent to the broker.
  98. GetBaseAPIParameters func(includeTacticsParameters bool) (
  99. common.APIParameters, string, error)
  100. // MakeWebRTCDialCoordinator provides a WebRTCDialCoordinator which
  101. // specifies WebRTC-related dial parameters, including selected STUN
  102. // server addresses; network topology information for the current netork;
  103. // NAT logic settings; and other settings.
  104. //
  105. // MakeWebRTCDialCoordinator is invoked for each proxy/client connection,
  106. // and the provider can select new parameters per connection as reqired.
  107. MakeWebRTCDialCoordinator func() (WebRTCDialCoordinator, error)
  108. // HandleTacticsPayload is a callback that receives any tactics payload,
  109. // provided by the broker in proxy announcement request responses.
  110. // HandleTacticsPayload must return true when the tacticsPayload includes
  111. // new tactics, indicating that the proxy should reinitialize components
  112. // controlled by tactics parameters.
  113. HandleTacticsPayload func(
  114. networkID string, compressTactics bool, tacticsPayload []byte) bool
  115. // MustUpgrade is a callback that is invoked when a MustUpgrade flag is
  116. // received from the broker. When MustUpgrade is received, the proxy
  117. // should be stopped and the user should be prompted to upgrade before
  118. // restarting the proxy.
  119. MustUpgrade func()
  120. // MaxClients is the maximum number of clients that are allowed to connect
  121. // to the proxy. Must be > 0.
  122. MaxClients int
  123. // LimitUpstreamBytesPerSecond limits the upstream data transfer rate for
  124. // a single client. When 0, there is no limit.
  125. LimitUpstreamBytesPerSecond int
  126. // LimitDownstreamBytesPerSecond limits the downstream data transfer rate
  127. // for a single client. When 0, there is no limit.
  128. LimitDownstreamBytesPerSecond int
  129. // ReducedStartTime specifies the local time of day (HH:MM, 24-hour, UTC)
  130. // at which reduced client settings begin.
  131. ReducedStartTime string
  132. // ReducedEndTime specifies the local time of day (HH:MM, 24-hour, UTC) at
  133. // which reduced client settings end.
  134. ReducedEndTime string
  135. // ReducedMaxClients specifies the maximum number of clients that are
  136. // allowed to connect to the proxy during the reduced time range.
  137. //
  138. // Clients connected when the reduced settings begin will not be
  139. // disconnected.
  140. ReducedMaxClients int
  141. // ReducedLimitUpstreamBytesPerSecond limits the upstream data transfer
  142. // rate for a single client during the reduced time range. When 0,
  143. // LimitUpstreamBytesPerSecond is the limit.
  144. //
  145. // Rates for clients already connected when the reduced settings begin or
  146. // end will not change.
  147. ReducedLimitUpstreamBytesPerSecond int
  148. // ReducedLimitDownstreamBytesPerSecond limits the downstream data
  149. // transfer rate for a single client during the reduced time range. When
  150. // 0, LimitDownstreamBytesPerSecond is the limit.
  151. //
  152. // Rates for clients already connected when the reduced settings begin or
  153. // end will not change.
  154. ReducedLimitDownstreamBytesPerSecond int
  155. // ActivityUpdater specifies an ActivityUpdater for activity associated
  156. // with this proxy.
  157. ActivityUpdater ActivityUpdater
  158. }
  159. // ActivityUpdater is a callback that is invoked when the proxy announces
  160. // availability, when clients connect and disconnect, and periodically with
  161. // data transfer updates (unless idle). This callback may be used to update
  162. // an activity UI. This callback should post this data to another thread or
  163. // handler and return immediately and not block on UI updates.
  164. type ActivityUpdater func(
  165. announcing int32,
  166. connectingClients int32,
  167. connectedClients int32,
  168. bytesUp int64,
  169. bytesDown int64,
  170. bytesDuration time.Duration)
  171. // NewProxy initializes a new Proxy with the specified configuration.
  172. func NewProxy(config *ProxyConfig) (*Proxy, error) {
  173. if config.MaxClients <= 0 {
  174. return nil, errors.TraceNew("invalid MaxClients")
  175. }
  176. p := &Proxy{
  177. config: config,
  178. }
  179. if config.ReducedStartTime != "" ||
  180. config.ReducedEndTime != "" ||
  181. config.ReducedMaxClients > 0 {
  182. startMinute, err := common.ParseTimeOfDayMinutes(config.ReducedStartTime)
  183. if err != nil {
  184. return nil, errors.Tracef("invalid ReducedStartTime: %v", err)
  185. }
  186. endMinute, err := common.ParseTimeOfDayMinutes(config.ReducedEndTime)
  187. if err != nil {
  188. return nil, errors.Tracef("invalid ReducedEndTime: %v", err)
  189. }
  190. if startMinute == endMinute {
  191. return nil, errors.TraceNew("invalid ReducedStartTime/ReducedEndTime")
  192. }
  193. if config.ReducedMaxClients <= 0 ||
  194. config.ReducedMaxClients > config.MaxClients {
  195. return nil, errors.TraceNew("invalid ReducedMaxClients")
  196. }
  197. p.useReducedSettings = true
  198. p.reducedStartMinute = startMinute
  199. p.reducedEndMinute = endMinute
  200. }
  201. p.activityUpdateWrapper = &activityUpdateWrapper{p: p}
  202. return p, nil
  203. }
  204. // activityUpdateWrapper implements the psiphon/common.ActivityUpdater
  205. // interface and is used to receive bytes transferred updates from the
  206. // ActivityConns wrapping proxied traffic. A wrapper is used so that
  207. // UpdateProgress is not exported from Proxy.
  208. type activityUpdateWrapper struct {
  209. p *Proxy
  210. }
  211. func (w *activityUpdateWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
  212. w.p.bytesUp.Add(bytesWritten)
  213. w.p.bytesDown.Add(bytesRead)
  214. }
  215. // Run runs the proxy. The proxy sends requests to the Broker announcing its
  216. // availability; the Broker matches the proxy with clients, and facilitates
  217. // an exchange of WebRTC connection information; the proxy and each client
  218. // attempt to establish a connection; and the client's traffic is relayed to
  219. // Psiphon server.
  220. //
  221. // Run ends when ctx is Done. A proxy run may continue across underlying
  222. // network changes assuming that the ProxyConfig GetBrokerClient and
  223. // MakeWebRTCDialCoordinator callbacks react to network changes and provide
  224. // instances that are reflect network changes.
  225. func (p *Proxy) Run(ctx context.Context) {
  226. // Run MaxClient proxying workers. Each worker handles one client at a time.
  227. proxyWaitGroup := new(sync.WaitGroup)
  228. // Capture activity updates every second, which is the required frequency
  229. // for PeakUp/DownstreamBytesPerSecond. This is also a reasonable
  230. // frequency for invoking the ActivityUpdater and updating UI widgets.
  231. proxyWaitGroup.Add(1)
  232. go func() {
  233. defer proxyWaitGroup.Done()
  234. p.lastAnnouncing = 0
  235. p.lastConnectingClients = 0
  236. p.lastConnectedClients = 0
  237. activityUpdatePeriod := 1 * time.Second
  238. ticker := time.NewTicker(activityUpdatePeriod)
  239. defer ticker.Stop()
  240. loop:
  241. for {
  242. select {
  243. case <-ticker.C:
  244. p.activityUpdate(activityUpdatePeriod)
  245. case <-ctx.Done():
  246. break loop
  247. }
  248. }
  249. }()
  250. // Launch the first proxy worker, passing a signal to be triggered once
  251. // the very first announcement round trip is complete. The first round
  252. // trip is awaited so that:
  253. //
  254. // - The first announce response will arrive with any new tactics,
  255. // which may be applied before launching additional workers.
  256. //
  257. // - The first worker gets no announcement delay and is also guaranteed to
  258. // be the shared session establisher. Since the announcement delays are
  259. // applied _after_ waitToShareSession, it would otherwise be possible,
  260. // with a race of MaxClient initial, concurrent announces, for the
  261. // session establisher to be a different worker than the no-delay worker.
  262. //
  263. // The first worker is the only proxy worker which sets
  264. // ProxyAnnounceRequest.CheckTactics.
  265. signalFirstAnnounceCtx, signalFirstAnnounceDone :=
  266. context.WithCancel(context.Background())
  267. proxyWaitGroup.Add(1)
  268. go func() {
  269. defer proxyWaitGroup.Done()
  270. p.proxyClients(ctx, signalFirstAnnounceDone, false)
  271. }()
  272. select {
  273. case <-signalFirstAnnounceCtx.Done():
  274. case <-ctx.Done():
  275. return
  276. }
  277. // Launch the remaining workers.
  278. for i := 0; i < p.config.MaxClients-1; i++ {
  279. // When reduced settings are in effect, a subset of workers will pause
  280. // during the reduced time period. Since ReducedMaxClients > 0 the
  281. // first proxy worker is never paused.
  282. workerNum := i + 1
  283. reducedPause := p.useReducedSettings &&
  284. workerNum >= p.config.ReducedMaxClients
  285. proxyWaitGroup.Add(1)
  286. go func(reducedPause bool) {
  287. defer proxyWaitGroup.Done()
  288. p.proxyClients(ctx, nil, reducedPause)
  289. }(reducedPause)
  290. }
  291. proxyWaitGroup.Wait()
  292. }
  293. func (p *Proxy) activityUpdate(period time.Duration) {
  294. // Concurrency: activityUpdate is called by only the single goroutine
  295. // created in Run.
  296. announcing := p.announcing.Load()
  297. connectingClients := p.connectingClients.Load()
  298. connectedClients := p.connectedClients.Load()
  299. bytesUp := p.bytesUp.Swap(0)
  300. bytesDown := p.bytesDown.Swap(0)
  301. greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
  302. greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
  303. stateChanged := announcing != p.lastAnnouncing ||
  304. connectingClients != p.lastConnectingClients ||
  305. connectedClients != p.lastConnectedClients
  306. p.lastAnnouncing = announcing
  307. p.lastConnectingClients = connectingClients
  308. p.lastConnectedClients = connectedClients
  309. if !stateChanged &&
  310. bytesUp == 0 &&
  311. bytesDown == 0 {
  312. // Skip the activity callback on idle bytes or no change in worker state.
  313. return
  314. }
  315. p.config.ActivityUpdater(
  316. announcing,
  317. connectingClients,
  318. connectedClients,
  319. bytesUp,
  320. bytesDown,
  321. period)
  322. }
  323. func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
  324. // Limitation: if there are two concurrent calls, the greater value could
  325. // get overwritten.
  326. old := addr.Load()
  327. if new > old {
  328. return addr.CompareAndSwap(old, new)
  329. }
  330. return false
  331. }
  332. func (p *Proxy) isReducedUntil() (int, time.Time) {
  333. if !p.useReducedSettings {
  334. return p.config.MaxClients, time.Time{}
  335. }
  336. now := time.Now().UTC()
  337. minute := now.Hour()*60 + now.Minute()
  338. isReduced := false
  339. if p.reducedStartMinute < p.reducedEndMinute {
  340. isReduced = minute >= p.reducedStartMinute && minute < p.reducedEndMinute
  341. } else {
  342. isReduced = minute >= p.reducedStartMinute || minute < p.reducedEndMinute
  343. }
  344. if !isReduced {
  345. return p.config.MaxClients, time.Time{}
  346. }
  347. endHour := p.reducedEndMinute / 60
  348. endMinute := p.reducedEndMinute % 60
  349. endTime := time.Date(
  350. now.Year(),
  351. now.Month(),
  352. now.Day(),
  353. endHour,
  354. endMinute,
  355. 0,
  356. 0,
  357. now.Location(),
  358. )
  359. if !endTime.After(now) {
  360. endTime = endTime.AddDate(0, 0, 1)
  361. }
  362. return p.config.ReducedMaxClients, endTime
  363. }
  364. func (p *Proxy) getLimits() (int, common.RateLimits) {
  365. rateLimits := common.RateLimits{
  366. ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
  367. WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
  368. }
  369. maxClients, reducedUntil := p.isReducedUntil()
  370. if !reducedUntil.IsZero() {
  371. upstream := p.config.ReducedLimitUpstreamBytesPerSecond
  372. if upstream == 0 {
  373. upstream = p.config.LimitUpstreamBytesPerSecond
  374. }
  375. downstream := p.config.ReducedLimitDownstreamBytesPerSecond
  376. if downstream == 0 {
  377. downstream = p.config.LimitDownstreamBytesPerSecond
  378. }
  379. rateLimits = common.RateLimits{
  380. ReadBytesPerSecond: int64(upstream),
  381. WriteBytesPerSecond: int64(downstream),
  382. }
  383. }
  384. return maxClients, rateLimits
  385. }
  386. // getAnnounceDelayParameters is a helper that fetches the proxy announcement
  387. // delay parameters from the current broker client.
  388. //
  389. // getAnnounceDelayParameters is used to configure a delay when
  390. // proxyOneClient fails. As having no broker clients is a possible
  391. // proxyOneClient failure case, GetBrokerClient errors are ignored here and
  392. // defaults used in that case.
  393. func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, float64) {
  394. brokerClient, err := p.config.GetBrokerClient()
  395. if err != nil {
  396. return proxyAnnounceDelay, proxyAnnounceMaxBackoffDelay, proxyAnnounceDelayJitter
  397. }
  398. brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
  399. return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
  400. common.ValueOrDefault(brokerCoordinator.AnnounceMaxBackoffDelay(), proxyAnnounceMaxBackoffDelay),
  401. common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
  402. }
  403. func (p *Proxy) proxyClients(
  404. ctx context.Context, signalAnnounceDone func(), reducedPause bool) {
  405. // Proxy one client, repeating until ctx is done.
  406. //
  407. // This worker starts with posting a long-polling announcement request.
  408. // The broker response with a matched client, and the proxy and client
  409. // attempt to establish a WebRTC connection for relaying traffic.
  410. //
  411. // Limitation: this design may not maximize the utility of the proxy,
  412. // since some proxy/client connections will fail at the WebRTC stage due
  413. // to NAT traversal failure, and at most MaxClient concurrent
  414. // establishments are attempted. Another scenario comes from the Psiphon
  415. // client horse race, which may start in-proxy dials but then abort them
  416. // when some other tunnel protocol succeeds.
  417. //
  418. // As a future enhancement, consider using M announcement goroutines and N
  419. // WebRTC dial goroutines. When an announcement gets a response,
  420. // immediately announce again unless there are already MaxClient active
  421. // connections established. This approach may require the proxy to
  422. // backpedal and reject connections when establishment is too successful.
  423. //
  424. // Another enhancement could be a signal from the client, to the broker,
  425. // relayed to the proxy, when a dial is aborted.
  426. failureDelayFactor := time.Duration(1)
  427. // To reduce diagnostic log noise, only log an initial sample of
  428. // announcement request timings (delays/elapsed time) and a periodic
  429. // sample of repeating errors such as "no match".
  430. logAnnounceCount := proxyAnnounceLogSampleSize
  431. logErrorsCount := proxyAnnounceLogSampleSize
  432. lastErrMsg := ""
  433. startLogSampleTime := time.Now()
  434. logAnnounce := func() bool {
  435. if logAnnounceCount > 0 {
  436. logAnnounceCount -= 1
  437. return true
  438. }
  439. return false
  440. }
  441. for ctx.Err() == nil {
  442. if !p.config.WaitForNetworkConnectivity() {
  443. break
  444. }
  445. // Pause designated workers during the reduced time range. In-flight
  446. // announces are not interrupted and connected clients are not
  447. // disconnected, so there is a gradual transition into reduced mode.
  448. if reducedPause {
  449. _, reducedUntil := p.isReducedUntil()
  450. if !reducedUntil.IsZero() {
  451. pauseDuration := time.Until(reducedUntil)
  452. p.config.Logger.WithTraceFields(common.LogFields{
  453. "duration": pauseDuration.String(),
  454. }).Info("pause worker")
  455. timer := time.NewTimer(pauseDuration)
  456. select {
  457. case <-timer.C:
  458. case <-ctx.Done():
  459. }
  460. timer.Stop()
  461. if ctx.Err() != nil {
  462. break
  463. }
  464. }
  465. }
  466. if time.Since(startLogSampleTime) >= proxyAnnounceLogSamplePeriod {
  467. logAnnounceCount = proxyAnnounceLogSampleSize
  468. logErrorsCount = proxyAnnounceLogSampleSize
  469. lastErrMsg = ""
  470. startLogSampleTime = time.Now()
  471. }
  472. backOff, err := p.proxyOneClient(
  473. ctx, logAnnounce, signalAnnounceDone)
  474. if !backOff || err == nil {
  475. failureDelayFactor = 1
  476. }
  477. if err != nil && ctx.Err() == nil {
  478. // Apply a simple exponential backoff based on whether
  479. // proxyOneClient either relayed client traffic or got no match,
  480. // or encountered a failure.
  481. //
  482. // The proxyOneClient failure could range from local
  483. // configuration (no broker clients) to network issues(failure to
  484. // completely establish WebRTC connection) and this backoff
  485. // prevents both excess local logging and churning in the former
  486. // case and excessive bad service to clients or unintentionally
  487. // overloading the broker in the latter case.
  488. delay, maxBackoffDelay, jitter := p.getAnnounceDelayParameters()
  489. delay = delay * failureDelayFactor
  490. if delay > maxBackoffDelay {
  491. delay = maxBackoffDelay
  492. }
  493. if failureDelayFactor < 1<<20 {
  494. failureDelayFactor *= 2
  495. }
  496. // Sample error log.
  497. //
  498. // Limitation: the lastErrMsg string comparison isn't compatible
  499. // with errors with minor variations, such as "unexpected
  500. // response status code %d after %v" from
  501. // InproxyBrokerRoundTripper.RoundTrip, with a time duration in
  502. // the second parameter.
  503. errMsg := err.Error()
  504. if lastErrMsg != errMsg {
  505. logErrorsCount = proxyAnnounceLogSampleSize
  506. lastErrMsg = errMsg
  507. }
  508. if logErrorsCount > 0 {
  509. p.config.Logger.WithTraceFields(
  510. common.LogFields{
  511. "error": errMsg,
  512. "delay": delay.String(),
  513. "jitter": jitter,
  514. }).Error("proxy client failed")
  515. logErrorsCount -= 1
  516. }
  517. common.SleepWithJitter(ctx, delay, jitter)
  518. }
  519. }
  520. }
  521. // resetNetworkDiscovery resets the network discovery state, which will force
  522. // another network discovery when doNetworkDiscovery is invoked.
  523. // resetNetworkDiscovery is called when new tactics have been received from
  524. // the broker, as new tactics may change parameters that control network
  525. // discovery.
  526. func (p *Proxy) resetNetworkDiscovery() {
  527. p.networkDiscoveryMutex.Lock()
  528. defer p.networkDiscoveryMutex.Unlock()
  529. p.networkDiscoveryRunOnce = false
  530. p.networkDiscoveryNetworkID = ""
  531. }
  532. func (p *Proxy) doNetworkDiscovery(
  533. ctx context.Context,
  534. webRTCCoordinator WebRTCDialCoordinator) {
  535. // Allow only one concurrent network discovery. In practise, this may
  536. // block all other proxyOneClient goroutines while one single goroutine
  537. // runs doNetworkDiscovery. Subsequently, all other goroutines will find
  538. // networkDiscoveryRunOnce is true and use the cached results.
  539. p.networkDiscoveryMutex.Lock()
  540. defer p.networkDiscoveryMutex.Unlock()
  541. networkID := webRTCCoordinator.NetworkID()
  542. if p.networkDiscoveryRunOnce &&
  543. p.networkDiscoveryNetworkID == networkID {
  544. // Already ran discovery for this network.
  545. //
  546. // TODO: periodically re-probe for port mapping services?
  547. return
  548. }
  549. // Reset and configure port mapper component, as required. See
  550. // initPortMapper comment.
  551. initPortMapper(webRTCCoordinator)
  552. // Gather local network NAT/port mapping metrics and configuration before
  553. // sending any announce requests. NAT topology metrics are used by the
  554. // Broker to optimize client and in-proxy matching. Unlike the client, we
  555. // always perform this synchronous step here, since waiting doesn't
  556. // necessarily block a client tunnel dial.
  557. waitGroup := new(sync.WaitGroup)
  558. waitGroup.Add(1)
  559. go func() {
  560. defer waitGroup.Done()
  561. // NATDiscover may use cached NAT type/port mapping values from
  562. // DialParameters, based on the network ID. If discovery is not
  563. // successful, the proxy still proceeds to announce.
  564. NATDiscover(
  565. ctx,
  566. &NATDiscoverConfig{
  567. Logger: p.config.Logger,
  568. WebRTCDialCoordinator: webRTCCoordinator,
  569. })
  570. }()
  571. waitGroup.Wait()
  572. p.networkDiscoveryRunOnce = true
  573. p.networkDiscoveryNetworkID = networkID
  574. }
  575. func (p *Proxy) proxyOneClient(
  576. ctx context.Context,
  577. logAnnounce func() bool,
  578. signalAnnounceDone func()) (bool, error) {
  579. // Cancel/close this connection immediately if the network changes.
  580. if p.config.GetCurrentNetworkContext != nil {
  581. var cancelFunc context.CancelFunc
  582. ctx, cancelFunc = common.MergeContextCancel(
  583. ctx, p.config.GetCurrentNetworkContext())
  584. defer cancelFunc()
  585. }
  586. // Do not trigger back-off unless the proxy successfully announces and
  587. // only then performs poorly.
  588. //
  589. // A no-match response should not trigger back-off, nor should broker
  590. // request transport errors which may include non-200 responses due to
  591. // CDN timeout mismatches or TLS errors due to CDN TLS fingerprint
  592. // incompatibility.
  593. backOff := false
  594. // Get a new WebRTCDialCoordinator, which should be configured with the
  595. // latest network tactics.
  596. webRTCCoordinator, err := p.config.MakeWebRTCDialCoordinator()
  597. if err != nil {
  598. return backOff, errors.Trace(err)
  599. }
  600. // Perform network discovery, to determine NAT type and other network
  601. // topology information that is reported to the broker in the proxy
  602. // announcement and used to optimize proxy/client matching. Unlike
  603. // clients, which can't easily delay dials in the tunnel establishment
  604. // horse race, proxies will always perform network discovery.
  605. // doNetworkDiscovery allows only one concurrent discovery and caches
  606. // results for the current network (as determined by
  607. // WebRTCCoordinator.GetNetworkID), so when multiple proxyOneClient
  608. // goroutines call doNetworkDiscovery, at most one discovery is performed
  609. // per network.
  610. p.doNetworkDiscovery(ctx, webRTCCoordinator)
  611. // Send the announce request
  612. // At this point, no NAT traversal operations have been performed by the
  613. // proxy, since its announcement may sit idle for the long-polling period
  614. // and NAT hole punches or port mappings could expire before the
  615. // long-polling period.
  616. //
  617. // As a future enhancement, the proxy could begin gathering WebRTC ICE
  618. // candidates while awaiting a client match, reducing the turn around
  619. // time after a match. This would make sense if there's high demand for
  620. // proxies, and so hole punches unlikely to expire while awaiting a client match.
  621. //
  622. // Another possibility may be to prepare and send a full offer SDP in the
  623. // announcment; and have the broker modify either the proxy or client
  624. // offer SDP to produce an answer SDP. In this case, the entire
  625. // ProxyAnswerRequest could be skipped as the WebRTC dial can begin after
  626. // the ProxyAnnounceRequest response (and ClientOfferRequest response).
  627. //
  628. // Furthermore, if a port mapping can be established, instead of using
  629. // WebRTC the proxy could run a Psiphon tunnel protocol listener at the
  630. // mapped port and send the dial information -- including some secret to
  631. // authenticate the client -- in its announcement. The client would then
  632. // receive this direct dial information from the broker and connect. The
  633. // proxy should be able to send keep alives to extend the port mapping
  634. // lifetime.
  635. brokerClient, err := p.config.GetBrokerClient()
  636. if err != nil {
  637. return backOff, errors.Trace(err)
  638. }
  639. brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
  640. // Only the first worker, which has signalAnnounceDone configured, checks
  641. // for tactics.
  642. checkTactics := signalAnnounceDone != nil
  643. maxClients, rateLimits := p.getLimits()
  644. // Get the base Psiphon API parameters and additional proxy metrics,
  645. // including performance information, which is sent to the broker in the
  646. // proxy announcment.
  647. //
  648. // tacticsNetworkID is the exact network ID that corresponds to the
  649. // tactics tag sent in the base parameters; this is passed to
  650. // HandleTacticsPayload in order to double check that any tactics
  651. // returned in the proxy announcment response are associated and stored
  652. // with the original network ID.
  653. metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
  654. checkTactics, brokerCoordinator, webRTCCoordinator, maxClients, rateLimits)
  655. if err != nil {
  656. return backOff, errors.Trace(err)
  657. }
  658. // Set a delay before announcing, to stagger the announce request times.
  659. // The delay helps to avoid triggering rate limits or similar errors from
  660. // any intermediate CDN between the proxy and the broker; and provides a
  661. // nudge towards better load balancing across multiple large MaxClients
  662. // proxies, as the broker primarily matches enqueued announces in FIFO
  663. // order, since older announces expire earlier.
  664. //
  665. // The delay is intended to be applied after doNetworkDiscovery, which has
  666. // no reason to be delayed; and also after any waitToShareSession delay,
  667. // as delaying before waitToShareSession can result in the announce
  668. // request times collapsing back together. Delaying after
  669. // waitToShareSession is handled by brokerClient.ProxyAnnounce, which
  670. // will also extend the base request timeout, as required, to account for
  671. // any deliberate delay.
  672. requestDelay := time.Duration(0)
  673. announceDelay, _, announceDelayJitter := p.getAnnounceDelayParameters()
  674. p.nextAnnounceMutex.Lock()
  675. nextDelay := prng.JitterDuration(announceDelay, announceDelayJitter)
  676. if p.nextAnnounceBrokerClient != brokerClient {
  677. // Reset the delay when the broker client changes.
  678. p.nextAnnounceNotBefore = time.Time{}
  679. p.nextAnnounceBrokerClient = brokerClient
  680. }
  681. if p.nextAnnounceNotBefore.IsZero() {
  682. p.nextAnnounceNotBefore = time.Now().Add(nextDelay)
  683. // No delay for the very first announce request, so leave
  684. // announceRequestDelay set to 0.
  685. } else {
  686. requestDelay = time.Until(p.nextAnnounceNotBefore)
  687. if requestDelay < 0 {
  688. // This announce did not arrive until after the next delay already
  689. // passed, so proceed with no delay.
  690. p.nextAnnounceNotBefore = time.Now().Add(nextDelay)
  691. requestDelay = 0
  692. } else {
  693. p.nextAnnounceNotBefore = p.nextAnnounceNotBefore.Add(nextDelay)
  694. }
  695. }
  696. p.nextAnnounceMutex.Unlock()
  697. // A proxy ID is implicitly sent with requests; it's the proxy's session
  698. // public key.
  699. //
  700. // ProxyAnnounce applies an additional request timeout to facilitate
  701. // long-polling.
  702. p.announcing.Add(1)
  703. announceStartTime := time.Now()
  704. personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
  705. announceResponse, err := brokerClient.ProxyAnnounce(
  706. ctx,
  707. requestDelay,
  708. &ProxyAnnounceRequest{
  709. PersonalCompartmentIDs: personalCompartmentIDs,
  710. Metrics: metrics,
  711. CheckTactics: checkTactics,
  712. })
  713. if logAnnounce() {
  714. p.config.Logger.WithTraceFields(common.LogFields{
  715. "delay": requestDelay.String(),
  716. "elapsedTime": time.Since(announceStartTime).String(),
  717. }).Info("announcement request")
  718. }
  719. p.announcing.Add(-1)
  720. if err != nil {
  721. return backOff, errors.Trace(err)
  722. }
  723. if len(announceResponse.TacticsPayload) > 0 {
  724. // The TacticsPayload may include new tactics, or may simply signal,
  725. // to the Psiphon client, that its tactics tag remains up-to-date and
  726. // to extend cached tactics TTL. HandleTacticsPayload returns true
  727. // when tactics haved changed; in this case we clear cached network
  728. // discovery but proceed with handling the proxy announcement
  729. // response as there may still be a match.
  730. if p.config.HandleTacticsPayload(
  731. tacticsNetworkID,
  732. compressTactics,
  733. announceResponse.TacticsPayload) {
  734. p.resetNetworkDiscovery()
  735. }
  736. }
  737. // Signal that the announce round trip is complete. At this point, the
  738. // broker Noise session should be established and any fresh tactics
  739. // applied.
  740. if signalAnnounceDone != nil {
  741. signalAnnounceDone()
  742. }
  743. // MustUpgrade has precedence over other cases, to ensure the callback is
  744. // invoked. Trigger back-off back off when rate/entry limited or must
  745. // upgrade; no back-off for no-match.
  746. if announceResponse.MustUpgrade {
  747. if p.config.MustUpgrade != nil {
  748. p.config.MustUpgrade()
  749. }
  750. backOff = true
  751. return backOff, errors.TraceNew("must upgrade")
  752. } else if announceResponse.Limited {
  753. backOff = true
  754. return backOff, errors.TraceNew("limited")
  755. } else if announceResponse.NoMatch {
  756. return backOff, errors.TraceNew("no match")
  757. }
  758. if announceResponse.SelectedProtocolVersion < ProtocolVersion1 ||
  759. (announceResponse.UseMediaStreams &&
  760. announceResponse.SelectedProtocolVersion < ProtocolVersion2) ||
  761. announceResponse.SelectedProtocolVersion > LatestProtocolVersion {
  762. backOff = true
  763. return backOff, errors.Tracef(
  764. "unsupported protocol version: %d",
  765. announceResponse.SelectedProtocolVersion)
  766. }
  767. // Trigger back-off if the following WebRTC operations fail to establish a
  768. // connections.
  769. //
  770. // Limitation: the proxy answer request to the broker may fail due to the
  771. // non-back-off reasons documented above for the proxy announcment request;
  772. // however, these should be unlikely assuming that the broker client is
  773. // using a persistent transport connection.
  774. backOff = true
  775. // For activity updates, indicate that a client connection is now underway.
  776. p.connectingClients.Add(1)
  777. connected := false
  778. defer func() {
  779. if !connected {
  780. p.connectingClients.Add(-1)
  781. }
  782. }()
  783. // Initialize WebRTC using the client's offer SDP
  784. webRTCAnswerCtx, webRTCAnswerCancelFunc := context.WithTimeout(
  785. ctx, common.ValueOrDefault(webRTCCoordinator.WebRTCAnswerTimeout(), proxyWebRTCAnswerTimeout))
  786. defer webRTCAnswerCancelFunc()
  787. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  788. // included in SDPs.
  789. hasPersonalCompartmentIDs := len(personalCompartmentIDs) > 0
  790. webRTCConn, SDP, sdpMetrics, webRTCErr := newWebRTCConnForAnswer(
  791. webRTCAnswerCtx,
  792. &webRTCConfig{
  793. Logger: p.config.Logger,
  794. EnableDebugLogging: p.config.EnableWebRTCDebugLogging,
  795. WebRTCDialCoordinator: webRTCCoordinator,
  796. ClientRootObfuscationSecret: announceResponse.ClientRootObfuscationSecret,
  797. DoDTLSRandomization: announceResponse.DoDTLSRandomization,
  798. UseMediaStreams: announceResponse.UseMediaStreams,
  799. TrafficShapingParameters: announceResponse.TrafficShapingParameters,
  800. // In media stream mode, this flag indicates to the proxy that it
  801. // should add the QUIC-based reliability layer wrapping to media
  802. // streams. In data channel mode, this flag is ignored, since the
  803. // client configures the data channel using
  804. // webrtc.DataChannelInit.Ordered, and this configuration is sent
  805. // to the proxy in the client's SDP.
  806. ReliableTransport: announceResponse.NetworkProtocol == NetworkProtocolTCP,
  807. },
  808. announceResponse.ClientOfferSDP,
  809. hasPersonalCompartmentIDs)
  810. var webRTCRequestErr string
  811. if webRTCErr != nil {
  812. webRTCErr = errors.Trace(webRTCErr)
  813. webRTCRequestErr = webRTCErr.Error()
  814. SDP = WebRTCSessionDescription{}
  815. sdpMetrics = &webRTCSDPMetrics{}
  816. // Continue to report the error to the broker. The broker will respond
  817. // with failure to the client's offer request.
  818. } else {
  819. defer webRTCConn.Close()
  820. }
  821. // Send answer request with SDP or error.
  822. _, err = brokerClient.ProxyAnswer(
  823. ctx,
  824. &ProxyAnswerRequest{
  825. ConnectionID: announceResponse.ConnectionID,
  826. ProxyAnswerSDP: SDP,
  827. ICECandidateTypes: sdpMetrics.iceCandidateTypes,
  828. AnswerError: webRTCRequestErr,
  829. })
  830. if err != nil {
  831. if webRTCErr != nil {
  832. // Prioritize returning any WebRTC error for logging.
  833. return backOff, webRTCErr
  834. }
  835. return backOff, errors.Trace(err)
  836. }
  837. // Now that an answer is sent, stop if WebRTC initialization failed.
  838. if webRTCErr != nil {
  839. return backOff, webRTCErr
  840. }
  841. // Await the WebRTC connection.
  842. // We could concurrently dial the destination, to have that network
  843. // connection available immediately once the WebRTC channel is
  844. // established. This would work only for TCP, not UDP, network protocols
  845. // and could only include the TCP connection, as client traffic is
  846. // required for all higher layers such as TLS, SSH, etc. This could also
  847. // create wasted load on destination Psiphon servers, particularly when
  848. // WebRTC connections fail.
  849. awaitReadyToProxyCtx, awaitReadyToProxyCancelFunc := context.WithTimeout(
  850. ctx,
  851. common.ValueOrDefault(
  852. webRTCCoordinator.WebRTCAwaitReadyToProxyTimeout(), readyToProxyAwaitTimeout))
  853. defer awaitReadyToProxyCancelFunc()
  854. err = webRTCConn.AwaitReadyToProxy(awaitReadyToProxyCtx, announceResponse.ConnectionID)
  855. if err != nil {
  856. return backOff, errors.Trace(err)
  857. }
  858. // Dial the destination, a Psiphon server. The broker validates that the
  859. // dial destination is a Psiphon server.
  860. destinationDialContext, destinationDialCancelFunc := context.WithTimeout(
  861. ctx,
  862. common.ValueOrDefault(
  863. webRTCCoordinator.ProxyDestinationDialTimeout(), proxyDestinationDialTimeout))
  864. defer destinationDialCancelFunc()
  865. // Use the custom resolver when resolving destination hostnames, such as
  866. // those used in domain fronted protocols.
  867. //
  868. // - Resolving at the in-proxy should yield a more optimal CDN edge, vs.
  869. // resolving at the client.
  870. //
  871. // - Sending unresolved hostnames to in-proxies can expose some domain
  872. // fronting configuration. This can be mitigated by enabling domain
  873. // fronting on this 2nd hop only when the in-proxy is located in a
  874. // region that may be censored or blocked; this is to be enforced by
  875. // the broker.
  876. //
  877. // - Any DNSResolverPreresolved tactics applied will be relative to the
  878. // in-proxy location.
  879. destinationAddress, err := webRTCCoordinator.ResolveAddress(
  880. ctx, "ip", announceResponse.DestinationAddress)
  881. if err != nil {
  882. return backOff, errors.Trace(err)
  883. }
  884. destinationConn, err := webRTCCoordinator.ProxyUpstreamDial(
  885. destinationDialContext,
  886. announceResponse.NetworkProtocol.String(),
  887. destinationAddress)
  888. if err != nil {
  889. return backOff, errors.Trace(err)
  890. }
  891. defer destinationConn.Close()
  892. // For activity updates, indicate that a client connection is established.
  893. connected = true
  894. p.connectingClients.Add(-1)
  895. p.connectedClients.Add(1)
  896. defer func() {
  897. p.connectedClients.Add(-1)
  898. }()
  899. // Throttle the relay connection.
  900. //
  901. // Here, each client gets LimitUp/DownstreamBytesPerSecond. Proxy
  902. // operators may to want to limit their bandwidth usage with a single
  903. // up/down value, an overall limit. The ProxyConfig can simply be
  904. // generated by dividing the limit by MaxClients. This approach favors
  905. // performance stability: each client gets the same throttling limits
  906. // regardless of how many other clients are connected.
  907. //
  908. // Rate limits are applied only when a client connection is established;
  909. // connected clients retain their initial limits even when reduced time
  910. // starts or ends.
  911. destinationConn = common.NewThrottledConn(
  912. destinationConn,
  913. announceResponse.NetworkProtocol.IsStream(),
  914. rateLimits)
  915. // Hook up bytes transferred counting for activity updates.
  916. // The ActivityMonitoredConn inactivity timeout is configured. For
  917. // upstream TCP connections, the destinationConn will close when the TCP
  918. // connection to the Psiphon server closes. But for upstream UDP flows,
  919. // the relay does not know when the upstream "connection" has closed.
  920. // Well-behaved clients will close the WebRTC half of the relay when
  921. // those clients know the UDP-based tunnel protocol connection is closed;
  922. // the inactivity timeout handles the remaining cases.
  923. inactivityTimeout :=
  924. common.ValueOrDefault(
  925. webRTCCoordinator.ProxyRelayInactivityTimeout(),
  926. proxyRelayInactivityTimeout)
  927. destinationConn, err = common.NewActivityMonitoredConn(
  928. destinationConn, inactivityTimeout, false, nil, p.activityUpdateWrapper)
  929. if err != nil {
  930. return backOff, errors.Trace(err)
  931. }
  932. // Relay the client traffic to the destination. The client traffic is a
  933. // standard Psiphon tunnel protocol destinated to a Psiphon server. Any
  934. // blocking/censorship at the 2nd hop will be mitigated by the use of
  935. // Psiphon circumvention protocols and techniques.
  936. // Limitation: clients may apply fragmentation to traffic relayed over the
  937. // data channel, and there's no guarantee that the fragmentation write
  938. // sizes or delays will carry over to the egress side.
  939. // The proxy operator's ISP may be able to observe that the operator's
  940. // host has nearly matching ingress and egress traffic. The traffic
  941. // content won't be the same: the ingress traffic is wrapped in a WebRTC
  942. // data channel, and the egress traffic is a Psiphon tunnel protocol.
  943. // With padding and decoy packets, the ingress and egress traffic shape
  944. // will differ beyond the basic WebRTC overheader. Even with this
  945. // measure, over time the number of bytes in and out of the proxy may
  946. // still indicate proxying.
  947. waitGroup := new(sync.WaitGroup)
  948. relayErrors := make(chan error, 2)
  949. var relayedUp, relayedDown int32
  950. waitGroup.Add(1)
  951. go func() {
  952. defer waitGroup.Done()
  953. // WebRTC data channels are based on SCTP, which is actually
  954. // message-based, not a stream. The (default) max message size for
  955. // pion/sctp is 65536:
  956. // https://github.com/pion/sctp/blob/44ed465396c880e379aae9c1bf81809a9e06b580/association.go#L52.
  957. //
  958. // As io.Copy uses a buffer size of 32K, each relayed message will be
  959. // less than the maximum. Calls to ClientConn.Write are also expected
  960. // to use io.Copy, keeping messages at most 32K in size.
  961. // io.Copy doesn't return an error on EOF, but we still want to signal
  962. // that relaying is done, so in this case a nil error is sent to the
  963. // channel.
  964. //
  965. // Limitation: if one io.Copy goproutine sends nil and the other
  966. // io.Copy goroutine sends a non-nil error concurrently, the non-nil
  967. // error isn't prioritized.
  968. n, err := io.Copy(webRTCConn, destinationConn)
  969. if n > 0 {
  970. atomic.StoreInt32(&relayedDown, 1)
  971. }
  972. relayErrors <- errors.Trace(err)
  973. }()
  974. waitGroup.Add(1)
  975. go func() {
  976. defer waitGroup.Done()
  977. n, err := io.Copy(destinationConn, webRTCConn)
  978. if n > 0 {
  979. atomic.StoreInt32(&relayedUp, 1)
  980. }
  981. relayErrors <- errors.Trace(err)
  982. }()
  983. select {
  984. case err = <-relayErrors:
  985. case <-ctx.Done():
  986. }
  987. // Interrupt the relay goroutines by closing the connections.
  988. webRTCConn.Close()
  989. destinationConn.Close()
  990. waitGroup.Wait()
  991. p.config.Logger.WithTraceFields(common.LogFields{
  992. "connectionID": announceResponse.ConnectionID,
  993. }).Info("connection closed")
  994. // Don't apply a back-off delay to the next announcement since this
  995. // iteration successfully relayed bytes.
  996. if atomic.LoadInt32(&relayedUp) == 1 || atomic.LoadInt32(&relayedDown) == 1 {
  997. backOff = false
  998. }
  999. return backOff, err
  1000. }
  1001. func (p *Proxy) getMetrics(
  1002. includeTacticsParameters bool,
  1003. brokerCoordinator BrokerDialCoordinator,
  1004. webRTCCoordinator WebRTCDialCoordinator,
  1005. maxClients int,
  1006. rateLimits common.RateLimits) (
  1007. *ProxyMetrics, string, bool, error) {
  1008. // tacticsNetworkID records the exact network ID that corresponds to the
  1009. // tactics tag sent in the base parameters, and is used when applying any
  1010. // new tactics returned by the broker.
  1011. baseParams, tacticsNetworkID, err := p.config.GetBaseAPIParameters(
  1012. includeTacticsParameters)
  1013. if err != nil {
  1014. return nil, "", false, errors.Trace(err)
  1015. }
  1016. apiParams := common.APIParameters{}
  1017. apiParams.Add(baseParams)
  1018. apiParams.Add(common.APIParameters(brokerCoordinator.MetricsForBrokerRequests()))
  1019. compressTactics := protocol.GetCompressTactics(apiParams)
  1020. packedParams, err := protocol.EncodePackedAPIParameters(apiParams)
  1021. if err != nil {
  1022. return nil, "", false, errors.Trace(err)
  1023. }
  1024. return &ProxyMetrics{
  1025. BaseAPIParameters: packedParams,
  1026. ProtocolVersion: LatestProtocolVersion,
  1027. NATType: webRTCCoordinator.NATType(),
  1028. PortMappingTypes: webRTCCoordinator.PortMappingTypes(),
  1029. MaxClients: int32(maxClients),
  1030. ConnectingClients: p.connectingClients.Load(),
  1031. ConnectedClients: p.connectedClients.Load(),
  1032. LimitUpstreamBytesPerSecond: rateLimits.ReadBytesPerSecond,
  1033. LimitDownstreamBytesPerSecond: rateLimits.WriteBytesPerSecond,
  1034. PeakUpstreamBytesPerSecond: p.peakBytesUp.Load(),
  1035. PeakDownstreamBytesPerSecond: p.peakBytesDown.Load(),
  1036. }, tacticsNetworkID, compressTactics, nil
  1037. }