quality.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841
  1. /*
  2. * Copyright (c) 2025, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  29. lrucache "github.com/cognusion/go-cache-lru"
  30. )
  31. const (
  32. proxyQualityMaxEntries = 10000000
  33. proxyQualityTTL = 24 * time.Hour
  34. proxyQualityMaxPendingFailedMatches = 1000000
  35. proxyQualityPendingFailedMatchDeadline = 5 * time.Minute
  36. proxyQualityFailedMatchThreshold = 10
  37. proxyQualityReporterMaxQueueEntries = 5000000
  38. proxyQualityReporterMaxRequestEntries = 1000
  39. proxyQualityReporterRequestDelay = 10 * time.Second
  40. proxyQualityReporterRequestTimeout = 10 * time.Second
  41. proxyQualityReporterRequestRetries = 1
  42. )
  43. // ProxyQualityState records and manages proxy tunnel quality data reported by
  44. // servers and used to prioritize proxies in the broker matching process.
  45. type ProxyQualityState struct {
  46. mutex sync.Mutex
  47. qualityTTL time.Duration
  48. pendingFailedMatchDeadline time.Duration
  49. failedMatchThreshold int
  50. entries *lrucache.Cache
  51. pendingFailedMatches *lrucache.Cache
  52. }
  53. type proxyQualityEntry struct {
  54. clientASNCounts ProxyQualityASNCounts
  55. failedMatchCount int
  56. }
  57. // NewProxyQuality creates a new ProxyQualityState.
  58. func NewProxyQuality() *ProxyQualityState {
  59. // Limitation: max cache sizes are not dynamically configurable and are
  60. // set to fixed values that are in line with other, indirectly related
  61. // limits, such as matcherAnnouncementQueueMaxSize.
  62. // TODO: lrucache.Cache.DeleteExpired is a linear scan; review the
  63. // performance of scanning up to 10,000,000 entries every 1 minute.
  64. q := &ProxyQualityState{
  65. qualityTTL: proxyQualityTTL,
  66. pendingFailedMatchDeadline: proxyQualityPendingFailedMatchDeadline,
  67. failedMatchThreshold: proxyQualityFailedMatchThreshold,
  68. entries: lrucache.NewWithLRU(
  69. 0, 1*time.Minute, proxyQualityMaxEntries),
  70. pendingFailedMatches: lrucache.NewWithLRU(
  71. 0, 1*time.Minute, proxyQualityMaxPendingFailedMatches),
  72. }
  73. q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
  74. return q
  75. }
  76. // SetProxyQualityRequestParameters overrides default values for proxy quality
  77. // state management parameters.
  78. //
  79. // qualityTTL is the TTL for a proxy's quality entry. Each AddQuality call
  80. // extends an entry's TTL.
  81. //
  82. // pendingFailedMatchDeadline is the elapsed time between calling Match for a
  83. // given proxy, and subsequently incrementing that proxy's failed match
  84. // count, unless an AddQuality call is made in the meantime.
  85. //
  86. // failedMatchThreshold is the threshold failed match count after which a
  87. // proxy's quality entry is deleted.
  88. func (q *ProxyQualityState) SetParameters(
  89. qualityTTL time.Duration,
  90. pendingFailedMatchDeadline time.Duration,
  91. failedMatchThreshold int) {
  92. q.mutex.Lock()
  93. defer q.mutex.Unlock()
  94. q.qualityTTL = qualityTTL
  95. q.pendingFailedMatchDeadline = pendingFailedMatchDeadline
  96. q.failedMatchThreshold = failedMatchThreshold
  97. }
  98. // HasQuality indicates if the specified proxy, defined by its ID and ASN, has
  99. // a quality entry. If the input client ASN is blank, any entry suffices. If
  100. // a client ASN is given the proxy must have a quality tunnel for a client in
  101. // that ASN.
  102. func (q *ProxyQualityState) HasQuality(
  103. proxyID ID, proxyASN string, clientASN string) bool {
  104. q.mutex.Lock()
  105. defer q.mutex.Unlock()
  106. proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
  107. strProxyKey := string(proxyKey[:])
  108. entryValue, ok := q.entries.Get(strProxyKey)
  109. if !ok {
  110. return false
  111. }
  112. entry := entryValue.(*proxyQualityEntry)
  113. // Currently, the actual count value is not used; any count > 0
  114. // is "quality".
  115. if clientASN == "" {
  116. // No specific ASN.
  117. return len(entry.clientASNCounts) > 0
  118. }
  119. return entry.clientASNCounts[clientASN] > 0
  120. }
  121. // AddQuality adds a new quality entry or adds counts to an existing quality
  122. // entry for the specified proxy, defined by its ID and ASN. For an existing
  123. // entry, its TTL is extended, and any failed match count is reset to zero.
  124. // AddQuality deletes any pending failed match, set by Matched, for the
  125. // proxy.
  126. func (q *ProxyQualityState) AddQuality(
  127. proxyKey ProxyQualityKey, counts ProxyQualityASNCounts) {
  128. q.mutex.Lock()
  129. defer q.mutex.Unlock()
  130. strProxyKey := string(proxyKey[:])
  131. entryValue, ok := q.entries.Get(strProxyKey)
  132. var entry *proxyQualityEntry
  133. if ok {
  134. entry = entryValue.(*proxyQualityEntry)
  135. } else {
  136. entry = &proxyQualityEntry{
  137. clientASNCounts: make(ProxyQualityASNCounts),
  138. }
  139. }
  140. // Reset the consecutive failed match count for existing entry.
  141. entry.failedMatchCount = 0
  142. // Add in counts.
  143. for ASN, count := range counts {
  144. entry.clientASNCounts[ASN] += count
  145. }
  146. // Set both updates the value and extends the TTL for any existing entry.
  147. q.entries.Set(strProxyKey, entry, q.qualityTTL)
  148. // Delete any pending failed match. The actual pending match may still be
  149. // in progress and may even fail, but the new quality event is considered
  150. // sufficient to ignore that outcome.
  151. //
  152. // lrucache.Cache.Delete invokes OnEvicted, so OnEvicted is temporarily
  153. // cleared to avoid incrementing the failed match count. In addition,
  154. // avoiding OnEvicted here ensures that addFailedMatch can assume that
  155. // the mutex lock is not held.
  156. q.pendingFailedMatches.OnEvicted(nil)
  157. q.pendingFailedMatches.Delete(strProxyKey)
  158. q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
  159. }
  160. // Matched reports that, for the specified proxy, defined by its ID and ASN, a
  161. // proxy announcement was just matched with a client offer, and an announcement
  162. // response returned to the proxy. Matched begins a "countdown" until a
  163. // subsequent, expected AddQuality call for the same proxy: if too much time
  164. // elapses with no AddQuality, the match is considered to have failed to
  165. // produce a successful tunnel. After exceeding a threshold count of
  166. // consecutive failed matches, a proxy's quality entry is deleted.
  167. //
  168. // Matched/AddQuality do not track the outcome of specific matches -- for a
  169. // given proxy, any successful, quality tunnel will cancel any pending failed
  170. // match.
  171. func (q *ProxyQualityState) Matched(proxyID ID, proxyASN string) {
  172. q.mutex.Lock()
  173. defer q.mutex.Unlock()
  174. // This uses a lrucache.Cache and OnEvicted events as an implementation of
  175. // the failed match deadline without requiring a timer or goroutine per
  176. // pending match. When the cache entry expires due to TTL, the failed
  177. // match deadline is met.
  178. proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
  179. strProxyKey := string(proxyKey[:])
  180. _, ok := q.pendingFailedMatches.Get(strProxyKey)
  181. if ok {
  182. // When there's already a pending failed match, leave the existing
  183. // deadline in place and don't extend it.
  184. return
  185. }
  186. q.pendingFailedMatches.Add(
  187. strProxyKey, struct{}{}, q.pendingFailedMatchDeadline)
  188. }
  189. // Flush clears all quality state.
  190. func (q *ProxyQualityState) Flush() {
  191. q.mutex.Lock()
  192. defer q.mutex.Unlock()
  193. q.entries.Flush()
  194. q.pendingFailedMatches.OnEvicted(nil)
  195. q.pendingFailedMatches.Flush()
  196. q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
  197. }
  198. // addFailedMatch is invoked when a pendingFailedMatches expires, increments
  199. // the failed match count, and removes a quality entry when the failed match
  200. // threshold count is exceeded.
  201. func (q *ProxyQualityState) addFailedMatch(strProxyKey string, _ interface{}) {
  202. // Assumes pendingFailedMatches.OnEvicted is not invoked while already
  203. // holding the mutex lock.
  204. q.mutex.Lock()
  205. defer q.mutex.Unlock()
  206. entryValue, ok := q.entries.Get(strProxyKey)
  207. if !ok {
  208. // No quality to remove.
  209. return
  210. }
  211. entry := entryValue.(*proxyQualityEntry)
  212. entry.failedMatchCount += 1
  213. if entry.failedMatchCount >= q.failedMatchThreshold {
  214. // Remove quality.
  215. q.entries.Delete(strProxyKey)
  216. }
  217. }
  218. // ProxyQualityReporter manages sending proxy quality requests to brokers.
  219. type ProxyQualityReporter struct {
  220. logger common.Logger
  221. serverBrokerSessions *ServerBrokerSessions
  222. serverSessionPrivateKey SessionPrivateKey
  223. roundTripperMaker ProxyQualityBrokerRoundTripperMaker
  224. runMutex sync.Mutex
  225. runContext context.Context
  226. stopRunning context.CancelFunc
  227. waitGroup *sync.WaitGroup
  228. queueMutex sync.Mutex
  229. reportQueue *list.List
  230. proxyIDQueueEntry map[ProxyQualityKey]*list.Element
  231. brokerPublicKeys atomic.Value
  232. brokerRootObfuscationSecrets atomic.Value
  233. requestDelay int64
  234. maxRequestEntries int64
  235. requestTimeout int64
  236. requestRetries int64
  237. signalReport chan struct{}
  238. }
  239. // ProxyQualityBrokerRoundTripperMaker is a callback which creates a new
  240. // RoundTripper for sending requests to the broker specified by the given
  241. // session public key.
  242. //
  243. // The optional common.APIParameters are broker dial parameter metrics to be
  244. // reported to the broker.
  245. type ProxyQualityBrokerRoundTripperMaker func(SessionPublicKey) (
  246. RoundTripper, common.APIParameters, error)
  247. type proxyQualityReportQueueEntry struct {
  248. proxyKey ProxyQualityKey
  249. counts ProxyQualityASNCounts
  250. }
  251. type serverBrokerClient struct {
  252. publicKey SessionPublicKey
  253. rootObfuscationSecret ObfuscationSecret
  254. brokerInitiatorID ID
  255. sessions *InitiatorSessions
  256. roundTripper RoundTripper
  257. dialParams common.APIParameters
  258. }
  259. // NewProxyQualityReporter creates a new ProxyQualityReporter.
  260. //
  261. // serverBrokerSessions is the server's ServerBrokerSessions instance which
  262. // manages inbound reports from the broker; the ServerBrokerSessions is
  263. // consulted to determine which brokers have recently communicated with the
  264. // server, and are therefore expected to trust the server's public key.
  265. //
  266. // serverSessionPrivateKey is the server's session private key to be used in
  267. // the quality reporting Noise sessions established with the brokers.
  268. // brokerPublicKeys specify the brokers to send to.
  269. //
  270. // roundTripperMaker is a callback which creates RoundTrippers for these
  271. // brokers. The ProxyQualityReporter will invoke roundTripperMaker when
  272. // attempting to send requests to a given broker; each RoundTripper will be
  273. // retained and reused as long as it continues to work successfully.
  274. func NewProxyQualityReporter(
  275. logger common.Logger,
  276. serverBrokerSessions *ServerBrokerSessions,
  277. serverSessionPrivateKey SessionPrivateKey,
  278. brokerPublicKeys []SessionPublicKey,
  279. brokerRootObfuscationSecrets []ObfuscationSecret,
  280. roundTripperMaker ProxyQualityBrokerRoundTripperMaker) (
  281. *ProxyQualityReporter, error) {
  282. r := &ProxyQualityReporter{
  283. logger: logger,
  284. serverBrokerSessions: serverBrokerSessions,
  285. serverSessionPrivateKey: serverSessionPrivateKey,
  286. roundTripperMaker: roundTripperMaker,
  287. waitGroup: new(sync.WaitGroup),
  288. requestDelay: int64(proxyQualityReporterRequestDelay),
  289. maxRequestEntries: proxyQualityReporterMaxRequestEntries,
  290. requestTimeout: int64(proxyQualityReporterRequestTimeout),
  291. requestRetries: proxyQualityReporterRequestRetries,
  292. reportQueue: list.New(),
  293. proxyIDQueueEntry: make(map[ProxyQualityKey]*list.Element),
  294. signalReport: make(chan struct{}, 1),
  295. }
  296. err := r.SetKnownBrokers(brokerPublicKeys, brokerRootObfuscationSecrets)
  297. if err != nil {
  298. return nil, errors.Trace(err)
  299. }
  300. return r, nil
  301. }
  302. // SetKnownBrokers updates the set of brokers to send to.
  303. func (r *ProxyQualityReporter) SetKnownBrokers(
  304. brokerPublicKeys []SessionPublicKey,
  305. brokerRootObfuscationSecrets []ObfuscationSecret) error {
  306. if len(brokerPublicKeys) != len(brokerRootObfuscationSecrets) {
  307. return errors.TraceNew("invalid broker specs")
  308. }
  309. r.brokerPublicKeys.Store(brokerPublicKeys)
  310. r.brokerRootObfuscationSecrets.Store(brokerRootObfuscationSecrets)
  311. return nil
  312. }
  313. // SetRequestParameters overrides default values for request parameters.
  314. func (r *ProxyQualityReporter) SetRequestParameters(
  315. maxRequestEntries int,
  316. requestDelay time.Duration,
  317. requestTimeout time.Duration,
  318. requestRetries int) {
  319. atomic.StoreInt64(&r.requestDelay, int64(requestDelay))
  320. atomic.StoreInt64(&r.maxRequestEntries, int64(maxRequestEntries))
  321. atomic.StoreInt64(&r.requestTimeout, int64(requestTimeout))
  322. atomic.StoreInt64(&r.requestRetries, int64(requestRetries))
  323. }
  324. // Start launches the request workers.
  325. func (r *ProxyQualityReporter) Start() error {
  326. r.runMutex.Lock()
  327. defer r.runMutex.Unlock()
  328. if r.runContext != nil {
  329. return errors.TraceNew("already running")
  330. }
  331. r.runContext, r.stopRunning = context.WithCancel(context.Background())
  332. r.waitGroup.Add(1)
  333. go func() {
  334. defer r.waitGroup.Done()
  335. r.requestScheduler(r.runContext)
  336. }()
  337. return nil
  338. }
  339. // Stop terminates the request workers.
  340. func (r *ProxyQualityReporter) Stop() {
  341. r.runMutex.Lock()
  342. defer r.runMutex.Unlock()
  343. r.stopRunning()
  344. r.waitGroup.Wait()
  345. r.runContext, r.stopRunning = nil, nil
  346. }
  347. // ReportQuality registers a quality tunnel for the specified proxy, defined
  348. // by its ID and ASN, and client ASN. Broker requests are scheduled to be
  349. // sent after a short delay -- intended to batch up additional data -- or
  350. // once sufficient request data is accumulated.
  351. func (r *ProxyQualityReporter) ReportQuality(
  352. proxyID ID, proxyASN string, clientASN string) {
  353. r.queueMutex.Lock()
  354. defer r.queueMutex.Unlock()
  355. proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
  356. // Proxy quality data is stored in a FIFO queue. New reports are merged
  357. // into existing entries for that same proxy ID when possible.
  358. entry, ok := r.proxyIDQueueEntry[proxyKey]
  359. if ok {
  360. entry.Value.(proxyQualityReportQueueEntry).counts[clientASN] += 1
  361. return
  362. }
  363. // Sanity check against an unbounded queue. When the queue is full, new
  364. // reports are simply dropped. There is no back pressure to slow down the
  365. // rate of quality tunnels, since the overall goal is to establish
  366. // quality tunnels.
  367. if r.reportQueue.Len() >= proxyQualityReporterMaxQueueEntries {
  368. r.logger.WithTrace().Warning("proxyQualityReporterMaxQueueEntries exceeded")
  369. return
  370. }
  371. counts := make(ProxyQualityASNCounts)
  372. counts[clientASN] += 1
  373. entry = r.reportQueue.PushBack(
  374. proxyQualityReportQueueEntry{
  375. proxyKey: proxyKey,
  376. counts: counts,
  377. })
  378. r.proxyIDQueueEntry[proxyKey] = entry
  379. // signalReport has a buffer size of 1, so when a signal can't be sent to
  380. // the channel, it's already signalled.
  381. select {
  382. case r.signalReport <- struct{}{}:
  383. default:
  384. }
  385. }
  386. func (r *ProxyQualityReporter) requestScheduler(ctx context.Context) {
  387. // Retain a set of serverBrokerClients, with established round trip
  388. // transports and Noise sessions, for reuse across many requests.
  389. // sendToBrokers will add to and trim this set.
  390. brokerClients := make(map[SessionPublicKey]*serverBrokerClient)
  391. for {
  392. // Await the signal that there is quality data to report.
  393. select {
  394. case <-r.signalReport:
  395. case <-ctx.Done():
  396. return
  397. }
  398. // Delay, for a brief moment, sending requests in an effort to batch
  399. // up more data for the requests.
  400. requestDelay := time.Duration(atomic.LoadInt64(&r.requestDelay))
  401. if requestDelay > 0 {
  402. // TODO: SleepWithContext creates and discards a timer per call;
  403. // instead reuse an inline timer?
  404. common.SleepWithContext(ctx, requestDelay)
  405. }
  406. // Loop and drain the quality data queue, sending the same payload to
  407. // each broker in each iteration. sendToBrokers performs the broker
  408. // requests in parallel, but sendToBrokers doesn't return until all
  409. // requests are complete, meaning no broker will get far ahead of any
  410. // other.
  411. //
  412. // If a certain broker request fails, including retries, that may
  413. // delay the overall schedule, up to requestTimeout * requestRetries.
  414. // Furthermore, after all retries fail, the failing broker simply does
  415. // never receives the payload.
  416. // Future enhancements:
  417. //
  418. // - Use a dynamic request timeout for failing brokers, to avoid
  419. // repeatedly delaying every round when one broker persistently fails?
  420. //
  421. // - Consider skipping sending a quality payload if contains only the
  422. // exact same proxy ID(s) and client ASNs reported in a very recent
  423. // request? Currently, the quality _count_ values aren't used as
  424. // distinguisher, so the primary benefit for sending additional
  425. // counts for the same proxy ID and client ASN are TTL extensions
  426. // in the ProxyQualityState.
  427. for {
  428. requestCounts := r.prepareNextRequest()
  429. if len(requestCounts) == 0 {
  430. break
  431. }
  432. r.sendToBrokers(ctx, brokerClients, requestCounts)
  433. }
  434. }
  435. }
  436. func (r *ProxyQualityReporter) prepareNextRequest() ProxyQualityRequestCounts {
  437. r.queueMutex.Lock()
  438. defer r.queueMutex.Unlock()
  439. // prepareNextRequest should not hold the mutex for a long period, as this
  440. // blocks ReportQuality, which in turn could block tunnel I/O operations.
  441. if r.reportQueue.Len() == 0 {
  442. return nil
  443. }
  444. counts := make(ProxyQualityRequestCounts)
  445. queueEntry := r.reportQueue.Front()
  446. // Limit the size of each request, capping both the memory overhead and
  447. // the amount of data lost in a temporary network disruption.
  448. //
  449. // Limitation: maxRequestEntries doesn't take into account the number of
  450. // different client ASN counts per entry. In practice, there shouldn't be
  451. // an excessive number of client ASNs.
  452. for queueEntry != nil && int64(len(counts)) < atomic.LoadInt64(&r.maxRequestEntries) {
  453. entry := queueEntry.Value.(proxyQualityReportQueueEntry)
  454. // Reuse queueEntry.counts rather than make a copy. As queueEntry is
  455. // removed from the queue here, this should be safe as no subsequent
  456. // ReportQuality will add to the same entry.
  457. counts[entry.proxyKey] = entry.counts
  458. removeEntry := queueEntry
  459. queueEntry = queueEntry.Next()
  460. r.reportQueue.Remove(removeEntry)
  461. delete(r.proxyIDQueueEntry, entry.proxyKey)
  462. }
  463. return counts
  464. }
  465. func (r *ProxyQualityReporter) sendToBrokers(
  466. ctx context.Context,
  467. brokerClients map[SessionPublicKey]*serverBrokerClient,
  468. requestCounts ProxyQualityRequestCounts) {
  469. // Iterate over the current list of brokers, as identified by the public
  470. // keys in brokerPublicKeys. For each broker, reuse any existing broker
  471. // client or create a new one. Spawns short term goroutine workers to
  472. // send requests to each broker in parallel, and await all worker
  473. // completion. Leave all working broker clients in place for future use,
  474. // but prune failed or unused broker clients from brokerClients. Assumes
  475. // only a handful of brokers.
  476. // This implementation is not using BrokerClient, the type used as the
  477. // proxy/client broker client, as BrokerClient uses a BrokerDialCoordinator
  478. // and is oriented to proxy/client functionality.
  479. var sendWaitGroup sync.WaitGroup
  480. var retainBrokerClientsMutex sync.Mutex
  481. retainBrokerClients := make(map[SessionPublicKey]struct{})
  482. brokerPublicKeys := r.brokerPublicKeys.Load().([]SessionPublicKey)
  483. brokerRootObfuscationSecrets := r.brokerRootObfuscationSecrets.Load().([]ObfuscationSecret)
  484. establishedBrokerIDs := r.serverBrokerSessions.sessions.GetEstablishedKnownInitiatorIDs()
  485. for i, brokerPublicKey := range brokerPublicKeys {
  486. // Get or create the brokerClient for brokerPublicKey.
  487. brokerClient, ok := brokerClients[brokerPublicKey]
  488. if !ok {
  489. initiatorID, err := brokerPublicKey.ToCurve25519()
  490. if err != nil {
  491. r.logger.WithTraceFields(
  492. common.LogFields{
  493. "brokerID": brokerPublicKey.String(),
  494. "error": err.Error()},
  495. ).Warning("ToCurve25519 failed")
  496. continue
  497. }
  498. brokerClient = &serverBrokerClient{
  499. publicKey: brokerPublicKey,
  500. rootObfuscationSecret: brokerRootObfuscationSecrets[i],
  501. brokerInitiatorID: ID(initiatorID),
  502. }
  503. // This partially initialized brokerClient will be retained even
  504. // if the following establishedBrokerIDs check fails, as this
  505. // caches the result of the ToCurve25519. The next sendToBrokers
  506. // call will check the same brokerPublicKey again -- unless
  507. // brokerPublicKeys changes.
  508. brokerClients[brokerPublicKey] = brokerClient
  509. }
  510. // Currently, brokers will only trust and allow proxy quality requests
  511. // from servers for which the broker has seen the corresponding
  512. // signed server entries as client proxy destinations. As such, the
  513. // following request is expected to fail unless the broker has
  514. // established a session with this server as indicated in
  515. // establishedBrokerIDs. Skip any broker that's not in
  516. // establishedBrokerIDs; those brokers will not receive this proxy
  517. // quality request payload.
  518. //
  519. // Mitigating factor: due to proxy affinity to a single broker, it's
  520. // likely that the proxy in any local ReportQuality call used and is
  521. // using a broker that has relayed a BrokerServerReport to this server.
  522. //
  523. // Future enhancement: the server could send its own signed server
  524. // entry to a broker, instead of relying on the broker to receive
  525. // that signed server entry in a client offer.
  526. if _, ok := establishedBrokerIDs[brokerClient.brokerInitiatorID]; !ok {
  527. // If there is a brokerClient for brokerPublicKey but the
  528. // establishedBrokerIDs check _no longer_ passes, remove and
  529. // garbage collect any round tripper and Noise session. The
  530. // remaining brokerClient is still retained, for the cached
  531. // ToCurve25519 conversion.
  532. brokerClient.sessions = nil
  533. if brokerClient.roundTripper != nil {
  534. // Close all network connections.
  535. brokerClient.roundTripper.Close()
  536. }
  537. brokerClient.roundTripper = nil
  538. retainBrokerClientsMutex.Lock()
  539. retainBrokerClients[brokerPublicKey] = struct{}{}
  540. retainBrokerClientsMutex.Unlock()
  541. continue
  542. }
  543. if brokerClient.sessions == nil {
  544. // Initialize the rest of the brokerClient: the round tripper and
  545. // the Noise session.
  546. //
  547. // Once initialized, these are retained after a successful round
  548. // trip, so that subsequent sendToBrokers calls can reuse the
  549. // existing, established network transport and Noise session.
  550. //
  551. // This implementation uses one Noise InitiatorSessions per
  552. // broker, instead of sharing a single instance, since
  553. // InitiatorSessions currently lacks an API to discard a
  554. // particular session.
  555. roundTripper, dialParams, err := r.roundTripperMaker(brokerPublicKey)
  556. if err != nil {
  557. r.logger.WithTraceFields(
  558. common.LogFields{
  559. "brokerID": brokerPublicKey.String(),
  560. "error": err.Error()},
  561. ).Warning("roundTripperMaker failed")
  562. continue
  563. }
  564. brokerClient.sessions = NewInitiatorSessions(r.serverSessionPrivateKey)
  565. brokerClient.roundTripper = roundTripper
  566. brokerClient.dialParams = dialParams
  567. }
  568. // Spawn a goroutine to send the request to this brokerClient.
  569. // Spawning goroutines for every request round should be efficient
  570. // enough, and avoids additional complexity in alternatives such as
  571. // maintaining long-running goroutine workers per broker.
  572. sendWaitGroup.Add(1)
  573. go func(brokerClient *serverBrokerClient) {
  574. defer sendWaitGroup.Done()
  575. retries := int(atomic.LoadInt64(&r.requestRetries))
  576. for i := 0; i <= retries; i++ {
  577. err := r.sendBrokerRequest(ctx, brokerClient, requestCounts)
  578. if err != nil {
  579. r.logger.WithTraceFields(
  580. common.LogFields{
  581. "brokerID": brokerClient.publicKey.String(),
  582. "error": err.Error()},
  583. ).Warning("sendBrokerRequest failed")
  584. if i < retries {
  585. // Try again.
  586. continue
  587. }
  588. // No more retries, and don't retain the brokerClient.
  589. return
  590. }
  591. // Exit the retry loop, and retain the successful brokerClient.
  592. break
  593. }
  594. // Retain the successful brokerClient.
  595. retainBrokerClientsMutex.Lock()
  596. retainBrokerClients[brokerClient.publicKey] = struct{}{}
  597. retainBrokerClientsMutex.Unlock()
  598. }(brokerClient)
  599. }
  600. // Await all request worker completion.
  601. //
  602. // Currently there is no backoff for brokers whose requests fail Unlike
  603. // proxies (and to some degree clients), there is only one concurrent
  604. // request, from this server, per broker, so there is less expectation of
  605. // hitting rate limiting by some intermediary, such as a CDN. The
  606. // requestDelay, primarily intended for batching data payloads, should
  607. // also provide a short cool-down period after failures.
  608. sendWaitGroup.Wait()
  609. // Trim the set of broker clients. Broker clients in brokerClients but not
  610. // in retainBrokerClients include cases where the request failed and
  611. // where the broker is no longer in brokerPublicKeys.
  612. for brokerPublicKey, brokerClient := range brokerClients {
  613. if _, ok := retainBrokerClients[brokerPublicKey]; !ok {
  614. // Close all network connections.
  615. brokerClient.roundTripper.Close()
  616. delete(brokerClients, brokerPublicKey)
  617. }
  618. }
  619. }
  620. func (r *ProxyQualityReporter) sendBrokerRequest(
  621. ctx context.Context,
  622. brokerClient *serverBrokerClient,
  623. requestCounts ProxyQualityRequestCounts) error {
  624. requestTimeout := time.Duration(atomic.LoadInt64(&r.requestTimeout))
  625. // While the request payload, requestCounts, is the same for every broker,
  626. // each broker round tripper may have different dial parameters, so each
  627. // request worker encodes and marshals its own request. requestCounts is
  628. // shared across multiple concurrent workers and must not be mutated.
  629. dialParams, err := protocol.EncodePackedAPIParameters(brokerClient.dialParams)
  630. if err != nil {
  631. return errors.Trace(err)
  632. }
  633. request := &ServerProxyQualityRequest{
  634. QualityCounts: requestCounts,
  635. DialParameters: dialParams,
  636. }
  637. requestPayload, err := MarshalServerProxyQualityRequest(request)
  638. if err != nil {
  639. return errors.Trace(err)
  640. }
  641. // Unlike clients and proxies, there is no Noise session sharing, as
  642. // there's only one, sequentially invoked sendBrokerRequest worker per
  643. // broker. The ServerProxyQualityRequest is not a long polling request,
  644. // so there's no special case, shorter Noise handshake timeout. There's
  645. // no request delay at this level.
  646. waitToShareSession := false
  647. sessionHandshakeTimeout := requestTimeout
  648. requestDelay := time.Duration(0)
  649. responsePayload, err := brokerClient.sessions.RoundTrip(
  650. ctx,
  651. brokerClient.roundTripper,
  652. brokerClient.publicKey,
  653. brokerClient.rootObfuscationSecret,
  654. waitToShareSession,
  655. sessionHandshakeTimeout,
  656. requestDelay,
  657. requestTimeout,
  658. requestPayload)
  659. if err != nil {
  660. // TODO: check if the error is a RoundTripperFailedError and,
  661. // if not, potentially retain the RoundTripper? At this time,
  662. // the server.InproxyProxyQualityBrokerRoundTripper.RoundTrip
  663. // implementation always returns RoundTripperFailedError.
  664. return errors.Trace(err)
  665. }
  666. // The response is simply an acknowledgement of the request.
  667. _, err = UnmarshalServerProxyQualityResponse(responsePayload)
  668. if err != nil {
  669. return errors.Trace(err)
  670. }
  671. return nil
  672. }