| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841 |
- /*
- * Copyright (c) 2025, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package inproxy
- import (
- "container/list"
- "context"
- "sync"
- "sync/atomic"
- "time"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
- lrucache "github.com/cognusion/go-cache-lru"
- )
- const (
- proxyQualityMaxEntries = 10000000
- proxyQualityTTL = 24 * time.Hour
- proxyQualityMaxPendingFailedMatches = 1000000
- proxyQualityPendingFailedMatchDeadline = 5 * time.Minute
- proxyQualityFailedMatchThreshold = 10
- proxyQualityReporterMaxQueueEntries = 5000000
- proxyQualityReporterMaxRequestEntries = 1000
- proxyQualityReporterRequestDelay = 10 * time.Second
- proxyQualityReporterRequestTimeout = 10 * time.Second
- proxyQualityReporterRequestRetries = 1
- )
- // ProxyQualityState records and manages proxy tunnel quality data reported by
- // servers and used to prioritize proxies in the broker matching process.
- type ProxyQualityState struct {
- mutex sync.Mutex
- qualityTTL time.Duration
- pendingFailedMatchDeadline time.Duration
- failedMatchThreshold int
- entries *lrucache.Cache
- pendingFailedMatches *lrucache.Cache
- }
- type proxyQualityEntry struct {
- clientASNCounts ProxyQualityASNCounts
- failedMatchCount int
- }
- // NewProxyQuality creates a new ProxyQualityState.
- func NewProxyQuality() *ProxyQualityState {
- // Limitation: max cache sizes are not dynamically configurable and are
- // set to fixed values that are in line with other, indirectly related
- // limits, such as matcherAnnouncementQueueMaxSize.
- // TODO: lrucache.Cache.DeleteExpired is a linear scan; review the
- // performance of scanning up to 10,000,000 entries every 1 minute.
- q := &ProxyQualityState{
- qualityTTL: proxyQualityTTL,
- pendingFailedMatchDeadline: proxyQualityPendingFailedMatchDeadline,
- failedMatchThreshold: proxyQualityFailedMatchThreshold,
- entries: lrucache.NewWithLRU(
- 0, 1*time.Minute, proxyQualityMaxEntries),
- pendingFailedMatches: lrucache.NewWithLRU(
- 0, 1*time.Minute, proxyQualityMaxPendingFailedMatches),
- }
- q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
- return q
- }
- // SetProxyQualityRequestParameters overrides default values for proxy quality
- // state management parameters.
- //
- // qualityTTL is the TTL for a proxy's quality entry. Each AddQuality call
- // extends an entry's TTL.
- //
- // pendingFailedMatchDeadline is the elapsed time between calling Match for a
- // given proxy, and subsequently incrementing that proxy's failed match
- // count, unless an AddQuality call is made in the meantime.
- //
- // failedMatchThreshold is the threshold failed match count after which a
- // proxy's quality entry is deleted.
- func (q *ProxyQualityState) SetParameters(
- qualityTTL time.Duration,
- pendingFailedMatchDeadline time.Duration,
- failedMatchThreshold int) {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- q.qualityTTL = qualityTTL
- q.pendingFailedMatchDeadline = pendingFailedMatchDeadline
- q.failedMatchThreshold = failedMatchThreshold
- }
- // HasQuality indicates if the specified proxy, defined by its ID and ASN, has
- // a quality entry. If the input client ASN is blank, any entry suffices. If
- // a client ASN is given the proxy must have a quality tunnel for a client in
- // that ASN.
- func (q *ProxyQualityState) HasQuality(
- proxyID ID, proxyASN string, clientASN string) bool {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
- strProxyKey := string(proxyKey[:])
- entryValue, ok := q.entries.Get(strProxyKey)
- if !ok {
- return false
- }
- entry := entryValue.(*proxyQualityEntry)
- // Currently, the actual count value is not used; any count > 0
- // is "quality".
- if clientASN == "" {
- // No specific ASN.
- return len(entry.clientASNCounts) > 0
- }
- return entry.clientASNCounts[clientASN] > 0
- }
- // AddQuality adds a new quality entry or adds counts to an existing quality
- // entry for the specified proxy, defined by its ID and ASN. For an existing
- // entry, its TTL is extended, and any failed match count is reset to zero.
- // AddQuality deletes any pending failed match, set by Matched, for the
- // proxy.
- func (q *ProxyQualityState) AddQuality(
- proxyKey ProxyQualityKey, counts ProxyQualityASNCounts) {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- strProxyKey := string(proxyKey[:])
- entryValue, ok := q.entries.Get(strProxyKey)
- var entry *proxyQualityEntry
- if ok {
- entry = entryValue.(*proxyQualityEntry)
- } else {
- entry = &proxyQualityEntry{
- clientASNCounts: make(ProxyQualityASNCounts),
- }
- }
- // Reset the consecutive failed match count for existing entry.
- entry.failedMatchCount = 0
- // Add in counts.
- for ASN, count := range counts {
- entry.clientASNCounts[ASN] += count
- }
- // Set both updates the value and extends the TTL for any existing entry.
- q.entries.Set(strProxyKey, entry, q.qualityTTL)
- // Delete any pending failed match. The actual pending match may still be
- // in progress and may even fail, but the new quality event is considered
- // sufficient to ignore that outcome.
- //
- // lrucache.Cache.Delete invokes OnEvicted, so OnEvicted is temporarily
- // cleared to avoid incrementing the failed match count. In addition,
- // avoiding OnEvicted here ensures that addFailedMatch can assume that
- // the mutex lock is not held.
- q.pendingFailedMatches.OnEvicted(nil)
- q.pendingFailedMatches.Delete(strProxyKey)
- q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
- }
- // Matched reports that, for the specified proxy, defined by its ID and ASN, a
- // proxy announcement was just matched with a client offer, and an announcement
- // response returned to the proxy. Matched begins a "countdown" until a
- // subsequent, expected AddQuality call for the same proxy: if too much time
- // elapses with no AddQuality, the match is considered to have failed to
- // produce a successful tunnel. After exceeding a threshold count of
- // consecutive failed matches, a proxy's quality entry is deleted.
- //
- // Matched/AddQuality do not track the outcome of specific matches -- for a
- // given proxy, any successful, quality tunnel will cancel any pending failed
- // match.
- func (q *ProxyQualityState) Matched(proxyID ID, proxyASN string) {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- // This uses a lrucache.Cache and OnEvicted events as an implementation of
- // the failed match deadline without requiring a timer or goroutine per
- // pending match. When the cache entry expires due to TTL, the failed
- // match deadline is met.
- proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
- strProxyKey := string(proxyKey[:])
- _, ok := q.pendingFailedMatches.Get(strProxyKey)
- if ok {
- // When there's already a pending failed match, leave the existing
- // deadline in place and don't extend it.
- return
- }
- q.pendingFailedMatches.Add(
- strProxyKey, struct{}{}, q.pendingFailedMatchDeadline)
- }
- // Flush clears all quality state.
- func (q *ProxyQualityState) Flush() {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- q.entries.Flush()
- q.pendingFailedMatches.OnEvicted(nil)
- q.pendingFailedMatches.Flush()
- q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
- }
- // addFailedMatch is invoked when a pendingFailedMatches expires, increments
- // the failed match count, and removes a quality entry when the failed match
- // threshold count is exceeded.
- func (q *ProxyQualityState) addFailedMatch(strProxyKey string, _ interface{}) {
- // Assumes pendingFailedMatches.OnEvicted is not invoked while already
- // holding the mutex lock.
- q.mutex.Lock()
- defer q.mutex.Unlock()
- entryValue, ok := q.entries.Get(strProxyKey)
- if !ok {
- // No quality to remove.
- return
- }
- entry := entryValue.(*proxyQualityEntry)
- entry.failedMatchCount += 1
- if entry.failedMatchCount >= q.failedMatchThreshold {
- // Remove quality.
- q.entries.Delete(strProxyKey)
- }
- }
- // ProxyQualityReporter manages sending proxy quality requests to brokers.
- type ProxyQualityReporter struct {
- logger common.Logger
- serverBrokerSessions *ServerBrokerSessions
- serverSessionPrivateKey SessionPrivateKey
- roundTripperMaker ProxyQualityBrokerRoundTripperMaker
- runMutex sync.Mutex
- runContext context.Context
- stopRunning context.CancelFunc
- waitGroup *sync.WaitGroup
- queueMutex sync.Mutex
- reportQueue *list.List
- proxyIDQueueEntry map[ProxyQualityKey]*list.Element
- brokerPublicKeys atomic.Value
- brokerRootObfuscationSecrets atomic.Value
- requestDelay atomic.Int64
- maxRequestEntries atomic.Int64
- requestTimeout atomic.Int64
- requestRetries atomic.Int64
- signalReport chan struct{}
- }
- // ProxyQualityBrokerRoundTripperMaker is a callback which creates a new
- // RoundTripper for sending requests to the broker specified by the given
- // session public key.
- //
- // The optional common.APIParameters are broker dial parameter metrics to be
- // reported to the broker.
- type ProxyQualityBrokerRoundTripperMaker func(SessionPublicKey) (
- RoundTripper, common.APIParameters, error)
- type proxyQualityReportQueueEntry struct {
- proxyKey ProxyQualityKey
- counts ProxyQualityASNCounts
- }
- type serverBrokerClient struct {
- publicKey SessionPublicKey
- rootObfuscationSecret ObfuscationSecret
- brokerInitiatorID ID
- sessions *InitiatorSessions
- roundTripper RoundTripper
- dialParams common.APIParameters
- }
- // NewProxyQualityReporter creates a new ProxyQualityReporter.
- //
- // serverBrokerSessions is the server's ServerBrokerSessions instance which
- // manages inbound reports from the broker; the ServerBrokerSessions is
- // consulted to determine which brokers have recently communicated with the
- // server, and are therefore expected to trust the server's public key.
- //
- // serverSessionPrivateKey is the server's session private key to be used in
- // the quality reporting Noise sessions established with the brokers.
- // brokerPublicKeys specify the brokers to send to.
- //
- // roundTripperMaker is a callback which creates RoundTrippers for these
- // brokers. The ProxyQualityReporter will invoke roundTripperMaker when
- // attempting to send requests to a given broker; each RoundTripper will be
- // retained and reused as long as it continues to work successfully.
- func NewProxyQualityReporter(
- logger common.Logger,
- serverBrokerSessions *ServerBrokerSessions,
- serverSessionPrivateKey SessionPrivateKey,
- brokerPublicKeys []SessionPublicKey,
- brokerRootObfuscationSecrets []ObfuscationSecret,
- roundTripperMaker ProxyQualityBrokerRoundTripperMaker) (
- *ProxyQualityReporter, error) {
- r := &ProxyQualityReporter{
- logger: logger,
- serverBrokerSessions: serverBrokerSessions,
- serverSessionPrivateKey: serverSessionPrivateKey,
- roundTripperMaker: roundTripperMaker,
- waitGroup: new(sync.WaitGroup),
- reportQueue: list.New(),
- proxyIDQueueEntry: make(map[ProxyQualityKey]*list.Element),
- signalReport: make(chan struct{}, 1),
- }
- err := r.SetKnownBrokers(brokerPublicKeys, brokerRootObfuscationSecrets)
- if err != nil {
- return nil, errors.Trace(err)
- }
- r.requestDelay.Store(int64(proxyQualityReporterRequestDelay))
- r.maxRequestEntries.Store(int64(proxyQualityReporterMaxRequestEntries))
- r.requestTimeout.Store(int64(proxyQualityReporterRequestTimeout))
- r.requestRetries.Store(int64(proxyQualityReporterRequestRetries))
- return r, nil
- }
- // SetKnownBrokers updates the set of brokers to send to.
- func (r *ProxyQualityReporter) SetKnownBrokers(
- brokerPublicKeys []SessionPublicKey,
- brokerRootObfuscationSecrets []ObfuscationSecret) error {
- if len(brokerPublicKeys) != len(brokerRootObfuscationSecrets) {
- return errors.TraceNew("invalid broker specs")
- }
- r.brokerPublicKeys.Store(brokerPublicKeys)
- r.brokerRootObfuscationSecrets.Store(brokerRootObfuscationSecrets)
- return nil
- }
- // SetRequestParameters overrides default values for request parameters.
- func (r *ProxyQualityReporter) SetRequestParameters(
- maxRequestEntries int,
- requestDelay time.Duration,
- requestTimeout time.Duration,
- requestRetries int) {
- r.requestDelay.Store(int64(requestDelay))
- r.maxRequestEntries.Store(int64(maxRequestEntries))
- r.requestTimeout.Store(int64(requestTimeout))
- r.requestRetries.Store(int64(requestRetries))
- }
- // Start launches the request workers.
- func (r *ProxyQualityReporter) Start() error {
- r.runMutex.Lock()
- defer r.runMutex.Unlock()
- if r.runContext != nil {
- return errors.TraceNew("already running")
- }
- r.runContext, r.stopRunning = context.WithCancel(context.Background())
- r.waitGroup.Add(1)
- go func() {
- defer r.waitGroup.Done()
- r.requestScheduler(r.runContext)
- }()
- return nil
- }
- // Stop terminates the request workers.
- func (r *ProxyQualityReporter) Stop() {
- r.runMutex.Lock()
- defer r.runMutex.Unlock()
- r.stopRunning()
- r.waitGroup.Wait()
- r.runContext, r.stopRunning = nil, nil
- }
- // ReportQuality registers a quality tunnel for the specified proxy, defined
- // by its ID and ASN, and client ASN. Broker requests are scheduled to be
- // sent after a short delay -- intended to batch up additional data -- or
- // once sufficient request data is accumulated.
- func (r *ProxyQualityReporter) ReportQuality(
- proxyID ID, proxyASN string, clientASN string) {
- r.queueMutex.Lock()
- defer r.queueMutex.Unlock()
- proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
- // Proxy quality data is stored in a FIFO queue. New reports are merged
- // into existing entries for that same proxy ID when possible.
- entry, ok := r.proxyIDQueueEntry[proxyKey]
- if ok {
- entry.Value.(proxyQualityReportQueueEntry).counts[clientASN] += 1
- return
- }
- // Sanity check against an unbounded queue. When the queue is full, new
- // reports are simply dropped. There is no back pressure to slow down the
- // rate of quality tunnels, since the overall goal is to establish
- // quality tunnels.
- if r.reportQueue.Len() >= proxyQualityReporterMaxQueueEntries {
- r.logger.WithTrace().Warning("proxyQualityReporterMaxQueueEntries exceeded")
- return
- }
- counts := make(ProxyQualityASNCounts)
- counts[clientASN] += 1
- entry = r.reportQueue.PushBack(
- proxyQualityReportQueueEntry{
- proxyKey: proxyKey,
- counts: counts,
- })
- r.proxyIDQueueEntry[proxyKey] = entry
- // signalReport has a buffer size of 1, so when a signal can't be sent to
- // the channel, it's already signalled.
- select {
- case r.signalReport <- struct{}{}:
- default:
- }
- }
- func (r *ProxyQualityReporter) requestScheduler(ctx context.Context) {
- // Retain a set of serverBrokerClients, with established round trip
- // transports and Noise sessions, for reuse across many requests.
- // sendToBrokers will add to and trim this set.
- brokerClients := make(map[SessionPublicKey]*serverBrokerClient)
- for {
- // Await the signal that there is quality data to report.
- select {
- case <-r.signalReport:
- case <-ctx.Done():
- return
- }
- // Delay, for a brief moment, sending requests in an effort to batch
- // up more data for the requests.
- requestDelay := time.Duration(r.requestDelay.Load())
- if requestDelay > 0 {
- // TODO: SleepWithContext creates and discards a timer per call;
- // instead reuse an inline timer?
- common.SleepWithContext(ctx, requestDelay)
- }
- // Loop and drain the quality data queue, sending the same payload to
- // each broker in each iteration. sendToBrokers performs the broker
- // requests in parallel, but sendToBrokers doesn't return until all
- // requests are complete, meaning no broker will get far ahead of any
- // other.
- //
- // If a certain broker request fails, including retries, that may
- // delay the overall schedule, up to requestTimeout * requestRetries.
- // Furthermore, after all retries fail, the failing broker simply does
- // never receives the payload.
- // Future enhancements:
- //
- // - Use a dynamic request timeout for failing brokers, to avoid
- // repeatedly delaying every round when one broker persistently fails?
- //
- // - Consider skipping sending a quality payload if contains only the
- // exact same proxy ID(s) and client ASNs reported in a very recent
- // request? Currently, the quality _count_ values aren't used as
- // distinguisher, so the primary benefit for sending additional
- // counts for the same proxy ID and client ASN are TTL extensions
- // in the ProxyQualityState.
- for {
- requestCounts := r.prepareNextRequest()
- if len(requestCounts) == 0 {
- break
- }
- r.sendToBrokers(ctx, brokerClients, requestCounts)
- }
- }
- }
- func (r *ProxyQualityReporter) prepareNextRequest() ProxyQualityRequestCounts {
- r.queueMutex.Lock()
- defer r.queueMutex.Unlock()
- // prepareNextRequest should not hold the mutex for a long period, as this
- // blocks ReportQuality, which in turn could block tunnel I/O operations.
- if r.reportQueue.Len() == 0 {
- return nil
- }
- counts := make(ProxyQualityRequestCounts)
- queueEntry := r.reportQueue.Front()
- // Limit the size of each request, capping both the memory overhead and
- // the amount of data lost in a temporary network disruption.
- //
- // Limitation: maxRequestEntries doesn't take into account the number of
- // different client ASN counts per entry. In practice, there shouldn't be
- // an excessive number of client ASNs.
- for queueEntry != nil && int64(len(counts)) < r.maxRequestEntries.Load() {
- entry := queueEntry.Value.(proxyQualityReportQueueEntry)
- // Reuse queueEntry.counts rather than make a copy. As queueEntry is
- // removed from the queue here, this should be safe as no subsequent
- // ReportQuality will add to the same entry.
- counts[entry.proxyKey] = entry.counts
- removeEntry := queueEntry
- queueEntry = queueEntry.Next()
- r.reportQueue.Remove(removeEntry)
- delete(r.proxyIDQueueEntry, entry.proxyKey)
- }
- return counts
- }
- func (r *ProxyQualityReporter) sendToBrokers(
- ctx context.Context,
- brokerClients map[SessionPublicKey]*serverBrokerClient,
- requestCounts ProxyQualityRequestCounts) {
- // Iterate over the current list of brokers, as identified by the public
- // keys in brokerPublicKeys. For each broker, reuse any existing broker
- // client or create a new one. Spawns short term goroutine workers to
- // send requests to each broker in parallel, and await all worker
- // completion. Leave all working broker clients in place for future use,
- // but prune failed or unused broker clients from brokerClients. Assumes
- // only a handful of brokers.
- // This implementation is not using BrokerClient, the type used as the
- // proxy/client broker client, as BrokerClient uses a BrokerDialCoordinator
- // and is oriented to proxy/client functionality.
- var sendWaitGroup sync.WaitGroup
- var retainBrokerClientsMutex sync.Mutex
- retainBrokerClients := make(map[SessionPublicKey]struct{})
- brokerPublicKeys := r.brokerPublicKeys.Load().([]SessionPublicKey)
- brokerRootObfuscationSecrets := r.brokerRootObfuscationSecrets.Load().([]ObfuscationSecret)
- establishedBrokerIDs := r.serverBrokerSessions.sessions.GetEstablishedKnownInitiatorIDs()
- for i, brokerPublicKey := range brokerPublicKeys {
- // Get or create the brokerClient for brokerPublicKey.
- brokerClient, ok := brokerClients[brokerPublicKey]
- if !ok {
- initiatorID, err := brokerPublicKey.ToCurve25519()
- if err != nil {
- r.logger.WithTraceFields(
- common.LogFields{
- "brokerID": brokerPublicKey.String(),
- "error": err.Error()},
- ).Warning("ToCurve25519 failed")
- continue
- }
- brokerClient = &serverBrokerClient{
- publicKey: brokerPublicKey,
- rootObfuscationSecret: brokerRootObfuscationSecrets[i],
- brokerInitiatorID: ID(initiatorID),
- }
- // This partially initialized brokerClient will be retained even
- // if the following establishedBrokerIDs check fails, as this
- // caches the result of the ToCurve25519. The next sendToBrokers
- // call will check the same brokerPublicKey again -- unless
- // brokerPublicKeys changes.
- brokerClients[brokerPublicKey] = brokerClient
- }
- // Currently, brokers will only trust and allow proxy quality requests
- // from servers for which the broker has seen the corresponding
- // signed server entries as client proxy destinations. As such, the
- // following request is expected to fail unless the broker has
- // established a session with this server as indicated in
- // establishedBrokerIDs. Skip any broker that's not in
- // establishedBrokerIDs; those brokers will not receive this proxy
- // quality request payload.
- //
- // Mitigating factor: due to proxy affinity to a single broker, it's
- // likely that the proxy in any local ReportQuality call used and is
- // using a broker that has relayed a BrokerServerReport to this server.
- //
- // Future enhancement: the server could send its own signed server
- // entry to a broker, instead of relying on the broker to receive
- // that signed server entry in a client offer.
- if _, ok := establishedBrokerIDs[brokerClient.brokerInitiatorID]; !ok {
- // If there is a brokerClient for brokerPublicKey but the
- // establishedBrokerIDs check _no longer_ passes, remove and
- // garbage collect any round tripper and Noise session. The
- // remaining brokerClient is still retained, for the cached
- // ToCurve25519 conversion.
- brokerClient.sessions = nil
- if brokerClient.roundTripper != nil {
- // Close all network connections.
- brokerClient.roundTripper.Close()
- }
- brokerClient.roundTripper = nil
- retainBrokerClientsMutex.Lock()
- retainBrokerClients[brokerPublicKey] = struct{}{}
- retainBrokerClientsMutex.Unlock()
- continue
- }
- if brokerClient.sessions == nil {
- // Initialize the rest of the brokerClient: the round tripper and
- // the Noise session.
- //
- // Once initialized, these are retained after a successful round
- // trip, so that subsequent sendToBrokers calls can reuse the
- // existing, established network transport and Noise session.
- //
- // This implementation uses one Noise InitiatorSessions per
- // broker, instead of sharing a single instance, since
- // InitiatorSessions currently lacks an API to discard a
- // particular session.
- roundTripper, dialParams, err := r.roundTripperMaker(brokerPublicKey)
- if err != nil {
- r.logger.WithTraceFields(
- common.LogFields{
- "brokerID": brokerPublicKey.String(),
- "error": err.Error()},
- ).Warning("roundTripperMaker failed")
- continue
- }
- brokerClient.sessions = NewInitiatorSessions(r.serverSessionPrivateKey)
- brokerClient.roundTripper = roundTripper
- brokerClient.dialParams = dialParams
- }
- // Spawn a goroutine to send the request to this brokerClient.
- // Spawning goroutines for every request round should be efficient
- // enough, and avoids additional complexity in alternatives such as
- // maintaining long-running goroutine workers per broker.
- sendWaitGroup.Add(1)
- go func(brokerClient *serverBrokerClient) {
- defer sendWaitGroup.Done()
- retries := int(r.requestRetries.Load())
- for i := 0; i <= retries; i++ {
- err := r.sendBrokerRequest(ctx, brokerClient, requestCounts)
- if err != nil {
- r.logger.WithTraceFields(
- common.LogFields{
- "brokerID": brokerClient.publicKey.String(),
- "error": err.Error()},
- ).Warning("sendBrokerRequest failed")
- if i < retries {
- // Try again.
- continue
- }
- // No more retries, and don't retain the brokerClient.
- return
- }
- // Exit the retry loop, and retain the successful brokerClient.
- break
- }
- // Retain the successful brokerClient.
- retainBrokerClientsMutex.Lock()
- retainBrokerClients[brokerClient.publicKey] = struct{}{}
- retainBrokerClientsMutex.Unlock()
- }(brokerClient)
- }
- // Await all request worker completion.
- //
- // Currently there is no backoff for brokers whose requests fail Unlike
- // proxies (and to some degree clients), there is only one concurrent
- // request, from this server, per broker, so there is less expectation of
- // hitting rate limiting by some intermediary, such as a CDN. The
- // requestDelay, primarily intended for batching data payloads, should
- // also provide a short cool-down period after failures.
- sendWaitGroup.Wait()
- // Trim the set of broker clients. Broker clients in brokerClients but not
- // in retainBrokerClients include cases where the request failed and
- // where the broker is no longer in brokerPublicKeys.
- for brokerPublicKey, brokerClient := range brokerClients {
- if _, ok := retainBrokerClients[brokerPublicKey]; !ok {
- // Close all network connections.
- brokerClient.roundTripper.Close()
- delete(brokerClients, brokerPublicKey)
- }
- }
- }
- func (r *ProxyQualityReporter) sendBrokerRequest(
- ctx context.Context,
- brokerClient *serverBrokerClient,
- requestCounts ProxyQualityRequestCounts) error {
- requestTimeout := time.Duration(r.requestTimeout.Load())
- // While the request payload, requestCounts, is the same for every broker,
- // each broker round tripper may have different dial parameters, so each
- // request worker encodes and marshals its own request. requestCounts is
- // shared across multiple concurrent workers and must not be mutated.
- dialParams, err := protocol.EncodePackedAPIParameters(brokerClient.dialParams)
- if err != nil {
- return errors.Trace(err)
- }
- request := &ServerProxyQualityRequest{
- QualityCounts: requestCounts,
- DialParameters: dialParams,
- }
- requestPayload, err := MarshalServerProxyQualityRequest(request)
- if err != nil {
- return errors.Trace(err)
- }
- // Unlike clients and proxies, there is no Noise session sharing, as
- // there's only one, sequentially invoked sendBrokerRequest worker per
- // broker. The ServerProxyQualityRequest is not a long polling request,
- // so there's no special case, shorter Noise handshake timeout. There's
- // no request delay at this level.
- waitToShareSession := false
- sessionHandshakeTimeout := requestTimeout
- requestDelay := time.Duration(0)
- responsePayload, err := brokerClient.sessions.RoundTrip(
- ctx,
- brokerClient.roundTripper,
- brokerClient.publicKey,
- brokerClient.rootObfuscationSecret,
- waitToShareSession,
- sessionHandshakeTimeout,
- requestDelay,
- requestTimeout,
- requestPayload)
- if err != nil {
- // TODO: check if the error is a RoundTripperFailedError and,
- // if not, potentially retain the RoundTripper? At this time,
- // the server.InproxyProxyQualityBrokerRoundTripper.RoundTrip
- // implementation always returns RoundTripperFailedError.
- return errors.Trace(err)
- }
- // The response is simply an acknowledgement of the request.
- _, err = UnmarshalServerProxyQualityResponse(responsePayload)
- if err != nil {
- return errors.Trace(err)
- }
- return nil
- }
|