proxy.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "io"
  23. "net"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  29. "github.com/pion/webrtc/v3"
  30. )
  31. // Timeouts should be aligned with Broker timeouts.
  32. const (
  33. proxyAnnounceRequestTimeout = 2 * time.Minute
  34. proxyAnnounceRetryDelay = 1 * time.Second
  35. proxyAnnounceRetryJitter = 0.3
  36. proxyWebRTCAnswerTimeout = 20 * time.Second
  37. proxyAnswerRequestTimeout = 10 * time.Second
  38. proxyClientConnectTimeout = 30 * time.Second
  39. proxyDestinationDialTimeout = 30 * time.Second
  40. )
  41. // Proxy is the in-proxy proxying component, which relays traffic from a
  42. // client to a Psiphon server.
  43. type Proxy struct {
  44. // Note: 64-bit ints used with atomic operations are placed
  45. // at the start of struct to ensure 64-bit alignment.
  46. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  47. bytesUp int64
  48. bytesDown int64
  49. peakBytesUp int64
  50. peakBytesDown int64
  51. connectingClients int32
  52. connectedClients int32
  53. config *ProxyConfig
  54. brokerClient *BrokerClient
  55. activityUpdateWrapper *activityUpdateWrapper
  56. }
  57. // TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
  58. // configured, permanent port mappings.
  59. // ProxyConfig specifies the configuration for a Proxy run.
  60. type ProxyConfig struct {
  61. // Logger is used to log events.
  62. Logger common.Logger
  63. // BaseMetrics should be populated with Psiphon handshake metrics
  64. // parameters. These will be sent to and logger by the Broker.
  65. BaseMetrics common.APIParameters
  66. // OperatorMessageHandler is a callback that is invoked with any user
  67. // message JSON object that is sent to the Proxy from the Broker. This
  68. // facility may be used to alert proxy operators when required. The JSON
  69. // object schema is arbitrary and not defined here.
  70. OperatorMessageHandler func(messageJSON string)
  71. // DialParameters specifies specific broker and WebRTC dial configuration
  72. // and strategies and settings; DialParameters also facilities dial
  73. // replay by receiving callbacks when individual dial steps succeed or
  74. // fail.
  75. //
  76. // As a DialParameters is associated with one network ID, it is expected
  77. // that the proxy will be stopped and restarted when a network change is
  78. // detected.
  79. DialParameters DialParameters
  80. // MaxClients is the maximum number of clients that are allowed to connect
  81. // to the proxy.
  82. MaxClients int
  83. // LimitUpstreamBytesPerSecond limits the upstream data transfer rate for
  84. // a single client. When 0, there is no limit.
  85. LimitUpstreamBytesPerSecond int
  86. // LimitDownstreamBytesPerSecond limits the downstream data transfer rate
  87. // for a single client. When 0, there is no limit.
  88. LimitDownstreamBytesPerSecond int
  89. // ActivityUpdater specifies an ActivityUpdater for activity associated
  90. // with this proxy.
  91. ActivityUpdater ActivityUpdater
  92. }
  93. // ActivityUpdater is a callback that is invoked when clients connect and
  94. // disconnect and periodically with data transfer updates (unless idle). This
  95. // callback may be used to update an activity UI. This callback should post
  96. // this data to another thread or handler and return immediately and not
  97. // block on UI updates.
  98. type ActivityUpdater func(
  99. connectingClients int32,
  100. connectedClients int32,
  101. bytesUp int64,
  102. bytesDown int64,
  103. bytesDuration time.Duration)
  104. // NewProxy initializes a new Proxy with the specified configuration.
  105. func NewProxy(config *ProxyConfig) (*Proxy, error) {
  106. // Create one BrokerClient which will be shared for all requests. When the
  107. // round tripper supports multiplexing -- for example HTTP/2 -- many
  108. // concurrent requests can share the same TLS network connection and
  109. // established session.
  110. brokerClient, err := NewBrokerClient(config.DialParameters)
  111. if err != nil {
  112. return nil, errors.Trace(err)
  113. }
  114. p := &Proxy{
  115. config: config,
  116. brokerClient: brokerClient,
  117. }
  118. p.activityUpdateWrapper = &activityUpdateWrapper{p: p}
  119. return p, nil
  120. }
  121. // activityUpdateWrapper implements the psiphon/common.ActivityUpdater
  122. // interface and is used to receive bytes transferred updates from the
  123. // ActivityConns wrapping proxied traffic. A wrapper is used so that
  124. // UpdateProgress is not exported from Proxy.
  125. type activityUpdateWrapper struct {
  126. p *Proxy
  127. }
  128. func (w *activityUpdateWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
  129. atomic.AddInt64(&w.p.bytesUp, bytesWritten)
  130. atomic.AddInt64(&w.p.bytesDown, bytesRead)
  131. }
  132. // Run runs the Proxy. The proxy sends requests to the Broker announcing its
  133. // availability; the Broker matches the proxy with clients, and facilitates
  134. // an exchange of WebRTC connection information; the proxy and each client
  135. // attempt to establish a connection; and the client's traffic is relayed to
  136. // Psiphon server.
  137. //
  138. // Run ends when ctx is Done. When a network change is detected, Run should be
  139. // stopped and a new Proxy configured and started. This minimizes dangling
  140. // client connections running over the previous network; provides an
  141. // opportunity to gather fresh NAT/port mapping metrics for the new network;
  142. // and allows for a new DialParameters, associated with the new network, to
  143. // be configured.
  144. func (p *Proxy) Run(ctx context.Context) {
  145. // Reset and configure port mapper component, as required. See
  146. // initPortMapper comment.
  147. initPortMapper(p.config.DialParameters)
  148. // Gather local network NAT/port mapping metrics before sending any
  149. // announce requests. NAT topology metrics are used by the Broker to
  150. // optimize client and in-proxy matching. Unlike the client, we always
  151. // perform this synchronous step here, since waiting doesn't necessarily
  152. // block a client tunnel dial.
  153. initWaitGroup := new(sync.WaitGroup)
  154. initWaitGroup.Add(1)
  155. go func() {
  156. defer initWaitGroup.Done()
  157. // NATDiscover may use cached NAT type/port mapping values from
  158. // DialParameters, based on the network ID. If discovery is not
  159. // successful, the proxy still proceeds to announce.
  160. NATDiscover(
  161. ctx,
  162. &NATDiscoverConfig{
  163. Logger: p.config.Logger,
  164. DialParameters: p.config.DialParameters,
  165. })
  166. }()
  167. initWaitGroup.Wait()
  168. // Run MaxClient proxying workers. Each worker handles one client at a time.
  169. proxyWaitGroup := new(sync.WaitGroup)
  170. for i := 0; i < p.config.MaxClients; i++ {
  171. proxyWaitGroup.Add(1)
  172. go func() {
  173. defer proxyWaitGroup.Done()
  174. p.proxyClients(ctx)
  175. }()
  176. }
  177. // Capture activity updates every second, which is the required frequency
  178. // for PeakUp/DownstreamBytesPerSecond. This is also a reasonable
  179. // frequency for invoking the ActivityUpdater and updating UI widgets.
  180. activityUpdatePeriod := 1 * time.Second
  181. ticker := time.NewTicker(activityUpdatePeriod)
  182. defer ticker.Stop()
  183. loop:
  184. for {
  185. select {
  186. case <-ticker.C:
  187. p.activityUpdate(activityUpdatePeriod)
  188. case <-ctx.Done():
  189. break loop
  190. }
  191. }
  192. proxyWaitGroup.Wait()
  193. }
  194. func (p *Proxy) activityUpdate(period time.Duration) {
  195. connectingClients := atomic.LoadInt32(&p.connectingClients)
  196. connectedClients := atomic.LoadInt32(&p.connectedClients)
  197. bytesUp := atomic.SwapInt64(&p.bytesUp, 0)
  198. bytesDown := atomic.SwapInt64(&p.bytesDown, 0)
  199. greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
  200. greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
  201. if connectingClients == 0 &&
  202. connectedClients == 0 &&
  203. bytesUp == 0 &&
  204. bytesDown == 0 {
  205. // Skip the activity callback on idle.
  206. return
  207. }
  208. p.config.ActivityUpdater(
  209. connectingClients,
  210. connectedClients,
  211. bytesUp,
  212. bytesDown,
  213. period)
  214. }
  215. func greaterThanSwapInt64(addr *int64, new int64) bool {
  216. // Limitation: if there are two concurrent calls, the greater value could
  217. // get overwritten.
  218. old := atomic.LoadInt64(addr)
  219. if new > old {
  220. return atomic.CompareAndSwapInt64(addr, old, new)
  221. }
  222. return false
  223. }
  224. func (p *Proxy) proxyClients(ctx context.Context) {
  225. // Proxy one client, repeating until ctx is done.
  226. //
  227. // This worker starts with posting a long-polling announcement request.
  228. // The broker response with a matched client, and the proxy and client
  229. // attempt to establish a WebRTC connection for relaying traffic.
  230. //
  231. // Limitation: this design may not maximize the utility of the proxy,
  232. // since some proxy/client connections will fail at the WebRTC stage due
  233. // to NAT traversal failure, and at most MaxClient concurrent
  234. // establishments are attempted. Another scenario comes from the Psiphon
  235. // client horse race, which may start in-proxy dials but then abort them
  236. // when some other tunnel protocol succeeds.
  237. //
  238. // As a future enhancement, consider using M announcement goroutines and N
  239. // WebRTC dial goroutines. When an announcement gets a response,
  240. // immediately announce again unless there are already MaxClient active
  241. // connections established. This approach may require the proxy to
  242. // backpedal and reject connections when establishment is too successful.
  243. //
  244. // Another enhancement could be a signal from the client, to the broker,
  245. // relayed to the proxy, when a dial is aborted.
  246. for ctx.Err() == nil {
  247. err := p.proxyOneClient(ctx)
  248. if err != nil && ctx.Err() == nil {
  249. p.config.Logger.WithTraceFields(
  250. common.LogFields{
  251. "error": err.Error(),
  252. }).Error("proxy client failed")
  253. // Delay briefly, to avoid unintentionally overloading the broker
  254. // in some recurring failure case. Use a jitter to avoid a
  255. // regular traffic period.
  256. common.SleepWithJitter(
  257. ctx,
  258. common.ValueOrDefault(p.config.DialParameters.AnnounceRetryDelay(), proxyAnnounceRetryDelay),
  259. common.ValueOrDefault(p.config.DialParameters.AnnounceRetryJitter(), proxyAnnounceRetryJitter))
  260. }
  261. }
  262. }
  263. func (p *Proxy) proxyOneClient(ctx context.Context) error {
  264. // Send the announce request
  265. // At this point, no NAT traversal operations have been performed by the
  266. // proxy, since its announcement may sit idle for the long-polling period
  267. // and NAT hole punches or port mappings could expire before the
  268. // long-polling period.
  269. //
  270. // As a future enhancement, the proxy could begin gathering WebRTC ICE
  271. // candidates while awaiting a client match, reducing the turn around
  272. // time after a match. This would make sense if there's high demand for
  273. // proxies, and so hole punches unlikely to expire while awaiting a client match.
  274. //
  275. // Another possibility may be to prepare and send a full offer SDP in the
  276. // announcment; and have the broker modify either the proxy or client
  277. // offer SDP to produce an answer SDP. In this case, the entire
  278. // ProxyAnswerRequest could be skipped as the WebRTC dial can begin after
  279. // the ProxyAnnounceRequest response (and ClientOfferRequest response).
  280. //
  281. // Furthermore, if a port mapping can be established, instead of using
  282. // WebRTC the proxy could run a Psiphon tunnel protocol listener at the
  283. // mapped port and send the dial information -- including some secret to
  284. // authenticate the client -- in its announcement. The client would then
  285. // receive this direct dial information from the broker and connect. The
  286. // proxy should be able to send keep alives to extend the port mapping
  287. // lifetime.
  288. announceRequestCtx, announceRequestCancelFunc := context.WithTimeout(
  289. ctx, common.ValueOrDefault(p.config.DialParameters.AnnounceRequestTimeout(), proxyAnnounceRequestTimeout))
  290. defer announceRequestCancelFunc()
  291. metrics, err := p.getMetrics()
  292. if err != nil {
  293. return errors.Trace(err)
  294. }
  295. // A proxy ID is implicitly sent with requests; it's the proxy's session
  296. // public key.
  297. announceResponse, err := p.brokerClient.ProxyAnnounce(
  298. announceRequestCtx,
  299. &ProxyAnnounceRequest{
  300. PersonalCompartmentIDs: p.config.DialParameters.PersonalCompartmentIDs(),
  301. Metrics: metrics,
  302. })
  303. if err != nil {
  304. return errors.Trace(err)
  305. }
  306. if announceResponse.ClientProxyProtocolVersion != ProxyProtocolVersion1 {
  307. return errors.Tracef(
  308. "Unsupported proxy protocol version: %d",
  309. announceResponse.ClientProxyProtocolVersion)
  310. }
  311. if announceResponse.OperatorMessageJSON != "" {
  312. p.config.OperatorMessageHandler(announceResponse.OperatorMessageJSON)
  313. }
  314. // For activity updates, indicate that a client connection is now underway.
  315. atomic.AddInt32(&p.connectingClients, 1)
  316. connected := false
  317. defer func() {
  318. if !connected {
  319. atomic.AddInt32(&p.connectingClients, -1)
  320. }
  321. }()
  322. // Initialize WebRTC using the client's offer SDP
  323. webRTCAnswerCtx, webRTCAnswerCancelFunc := context.WithTimeout(
  324. ctx, common.ValueOrDefault(p.config.DialParameters.WebRTCAnswerTimeout(), proxyWebRTCAnswerTimeout))
  325. defer webRTCAnswerCancelFunc()
  326. webRTCConn, SDP, SDPMetrics, webRTCErr := NewWebRTCConnWithAnswer(
  327. webRTCAnswerCtx,
  328. &WebRTCConfig{
  329. Logger: p.config.Logger,
  330. DialParameters: p.config.DialParameters,
  331. ClientRootObfuscationSecret: announceResponse.ClientRootObfuscationSecret,
  332. },
  333. announceResponse.ClientOfferSDP)
  334. var webRTCRequestErr string
  335. if webRTCErr != nil {
  336. webRTCErr = errors.Trace(webRTCErr)
  337. webRTCRequestErr = webRTCErr.Error()
  338. SDP = webrtc.SessionDescription{}
  339. // Continue to report the error to the broker. The broker will respond
  340. // with failure to the client's offer request.
  341. }
  342. defer webRTCConn.Close()
  343. // Send answer request with SDP or error.
  344. answerRequestCtx, answerRequestCancelFunc := context.WithTimeout(
  345. ctx, common.ValueOrDefault(p.config.DialParameters.AnswerRequestTimeout(), proxyAnswerRequestTimeout))
  346. defer answerRequestCancelFunc()
  347. _, err = p.brokerClient.ProxyAnswer(
  348. answerRequestCtx,
  349. &ProxyAnswerRequest{
  350. ConnectionID: announceResponse.ConnectionID,
  351. SelectedProxyProtocolVersion: announceResponse.ClientProxyProtocolVersion,
  352. ProxyAnswerSDP: SDP,
  353. ICECandidateTypes: SDPMetrics.ICECandidateTypes,
  354. AnswerError: webRTCRequestErr,
  355. })
  356. if err != nil {
  357. if webRTCErr != nil {
  358. // Prioritize returning any WebRTC error for logging.
  359. return webRTCErr
  360. }
  361. return errors.Trace(err)
  362. }
  363. // Now that an answer is sent, stop if WebRTC initialization failed.
  364. if webRTCErr != nil {
  365. return webRTCErr
  366. }
  367. // Await the WebRTC connection.
  368. // We could concurrently dial the destination, to have that network
  369. // connection available immediately once the WebRTC channel is
  370. // established. This would work only for TCP, not UDP, network protocols
  371. // and could only include the TCP connection, as client traffic is
  372. // required for all higher layers such as TLS, SSH, etc. This could also
  373. // create wasted load on destination Psiphon servers, particularly when
  374. // WebRTC connections fail.
  375. clientConnectCtx, clientConnectCancelFunc := context.WithTimeout(
  376. ctx, common.ValueOrDefault(p.config.DialParameters.ProxyClientConnectTimeout(), proxyClientConnectTimeout))
  377. defer clientConnectCancelFunc()
  378. err = webRTCConn.AwaitInitialDataChannel(clientConnectCtx)
  379. if err != nil {
  380. return errors.Trace(err)
  381. }
  382. p.config.Logger.WithTraceFields(common.LogFields{
  383. "connectionID": announceResponse.ConnectionID,
  384. }).Info("WebRTC data channel established")
  385. // Dial the destination, a Psiphon server. The broker validates that the
  386. // dial destination is a Psiphon server.
  387. destinationDialContext, destinationDialCancelFunc := context.WithTimeout(
  388. ctx, common.ValueOrDefault(p.config.DialParameters.ProxyDestinationDialTimeout(), proxyDestinationDialTimeout))
  389. defer destinationDialCancelFunc()
  390. // Use the custom resolver when resolving destination hostnames, such as
  391. // those used in domain fronted protocols.
  392. //
  393. // - Resolving at the in-proxy should yield a more optimal CDN edge, vs.
  394. // resolving at the client.
  395. //
  396. // - Sending unresolved hostnames to in-proxies can expose some domain
  397. // fronting configuration. This can be mitigated by enabling domain
  398. // fronting on this 2nd hop only when the in-proxy is located in a
  399. // region that may be censored or blocked; this is to be enforced by
  400. // the broker.
  401. //
  402. // - Any DNSResolverPreresolved tactics applied will be relative to the
  403. // in-proxy location.
  404. destinationAddress, err := p.config.DialParameters.ResolveAddress(ctx, announceResponse.DestinationAddress)
  405. if err != nil {
  406. return errors.Trace(err)
  407. }
  408. var dialer net.Dialer
  409. destinationConn, err := dialer.DialContext(
  410. destinationDialContext,
  411. announceResponse.NetworkProtocol.String(),
  412. destinationAddress)
  413. if err != nil {
  414. return errors.Trace(err)
  415. }
  416. defer destinationConn.Close()
  417. // For activity updates, indicate that a client connection is established.
  418. connected = true
  419. atomic.AddInt32(&p.connectingClients, -1)
  420. atomic.AddInt32(&p.connectedClients, 1)
  421. defer func() {
  422. atomic.AddInt32(&p.connectedClients, -1)
  423. }()
  424. // Throttle the relay connection.
  425. //
  426. // Here, each client gets LimitUp/DownstreamBytesPerSecond. Proxy
  427. // operators may to want to limit their bandwidth usage with a single
  428. // up/down value, an overall limit. The ProxyConfig can simply be
  429. // generated by dividing the limit by MaxClients. This approach favors
  430. // performance stability: each client gets the same throttling limits
  431. // regardless of how many other clients are connected.
  432. destinationConn = common.NewThrottledConn(
  433. destinationConn,
  434. common.RateLimits{
  435. ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
  436. WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
  437. })
  438. // Hook up bytes transferred counting for activity updates.
  439. // The ActivityMonitoredConn inactivity timeout is not configured, since
  440. // the Psiphon server will close its connection to inactive clients on
  441. // its own schedule.
  442. destinationConn, err = common.NewActivityMonitoredConn(
  443. destinationConn, 0, false, nil, p.activityUpdateWrapper)
  444. if err != nil {
  445. return errors.Trace(err)
  446. }
  447. // Relay the client traffic to the destination. The client traffic is a
  448. // standard Psiphon tunnel protocol destinated to a Psiphon server. Any
  449. // blocking/censorship at the 2nd hop will be mitigated by the use of
  450. // Psiphon circumvention protocols and techniques.
  451. // Limitation: clients may apply fragmentation to traffic relayed over the
  452. // data channel, and there's no guarantee that the fragmentation write
  453. // sizes or delays will carry over to the egress side.
  454. // The proxy operator's ISP may be able to observe that the operator's
  455. // host has nearly matching ingress and egress traffic. The traffic
  456. // content won't be the same: the ingress traffic is wrapped in a WebRTC
  457. // data channel, and the egress traffic is a Psiphon tunnel protocol. But
  458. // the traffic shape will be close to the same. As a future enhancement,
  459. // consider adding data channel padding and decoy traffic, which is
  460. // dropped on egress. For performance, traffic shaping could be ceased
  461. // after some time. Even with this measure, over time the number of bytes
  462. // in and out of the proxy may still indicate proxying.
  463. waitGroup := new(sync.WaitGroup)
  464. relayErrors := make(chan error, 2)
  465. waitGroup.Add(1)
  466. go func() {
  467. defer waitGroup.Done()
  468. // WebRTC data channels are based on SCTP, which is actually
  469. // message-based, not a stream. The (default) max message size for
  470. // pion/sctp is 65536:
  471. // https://github.com/pion/sctp/blob/44ed465396c880e379aae9c1bf81809a9e06b580/association.go#L52.
  472. //
  473. // As io.Copy uses a buffer size of 32K, each relayed message will be
  474. // less than the maximum. Calls to ClientConn.Write are also expected
  475. // to use io.Copy, keeping messages at most 32K in size. Note that
  476. // testing with io.CopyBuffer and a buffer of size 65536 actually
  477. // yielded the pion error io.ErrShortBuffer, "short buffer", while a
  478. // buffer of size 65535 worked.
  479. _, err := io.Copy(webRTCConn, destinationConn)
  480. if err != nil {
  481. relayErrors <- errors.Trace(err)
  482. return
  483. }
  484. }()
  485. waitGroup.Add(1)
  486. go func() {
  487. defer waitGroup.Done()
  488. _, err := io.Copy(destinationConn, webRTCConn)
  489. if err != nil {
  490. relayErrors <- errors.Trace(err)
  491. return
  492. }
  493. }()
  494. select {
  495. case err = <-relayErrors:
  496. case <-ctx.Done():
  497. }
  498. // Interrupt the relay goroutines by closing the connections.
  499. webRTCConn.Close()
  500. destinationConn.Close()
  501. waitGroup.Wait()
  502. p.config.Logger.WithTraceFields(common.LogFields{
  503. "connectionID": announceResponse.ConnectionID,
  504. }).Info("connection closed")
  505. return err
  506. }
  507. func (p *Proxy) getMetrics() (*ProxyMetrics, error) {
  508. baseMetrics, err := EncodeBaseMetrics(p.config.BaseMetrics)
  509. if err != nil {
  510. return nil, errors.Trace(err)
  511. }
  512. return &ProxyMetrics{
  513. BaseMetrics: baseMetrics,
  514. ProxyProtocolVersion: ProxyProtocolVersion1,
  515. NATType: p.config.DialParameters.NATType(),
  516. PortMappingTypes: p.config.DialParameters.PortMappingTypes(),
  517. MaxClients: int32(p.config.MaxClients),
  518. ConnectingClients: atomic.LoadInt32(&p.connectingClients),
  519. ConnectedClients: atomic.LoadInt32(&p.connectedClients),
  520. LimitUpstreamBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
  521. LimitDownstreamBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
  522. PeakUpstreamBytesPerSecond: atomic.LoadInt64(&p.peakBytesUp),
  523. PeakDownstreamBytesPerSecond: atomic.LoadInt64(&p.peakBytesDown),
  524. }, nil
  525. }