|
|
@@ -75,6 +75,7 @@ type InproxyBrokerClientManager struct {
|
|
|
isProxy bool
|
|
|
|
|
|
mutex sync.Mutex
|
|
|
+ brokerSelectCount int
|
|
|
networkID string
|
|
|
brokerClientInstance *InproxyBrokerClientInstance
|
|
|
}
|
|
|
@@ -101,6 +102,7 @@ func NewInproxyBrokerClientManager(
|
|
|
// called when tactics have changed, which triggers a broker client reset in
|
|
|
// order to apply potentially changed parameters.
|
|
|
func (b *InproxyBrokerClientManager) TacticsApplied() error {
|
|
|
+
|
|
|
b.mutex.Lock()
|
|
|
defer b.mutex.Unlock()
|
|
|
|
|
|
@@ -113,7 +115,7 @@ func (b *InproxyBrokerClientManager) TacticsApplied() error {
|
|
|
// TODO: as a future future enhancement, don't reset when the tactics
|
|
|
// brokerSpecs.Hash() is unchanged?
|
|
|
|
|
|
- return errors.Trace(b.reset())
|
|
|
+ return errors.Trace(b.reset(resetBrokerClientReasonTacticsApplied))
|
|
|
}
|
|
|
|
|
|
// GetBrokerClient returns the current, shared broker client and its
|
|
|
@@ -128,7 +130,7 @@ func (b *InproxyBrokerClientManager) GetBrokerClient(
|
|
|
defer b.mutex.Unlock()
|
|
|
|
|
|
if b.brokerClientInstance == nil || b.networkID != networkID {
|
|
|
- err := b.reset()
|
|
|
+ err := b.reset(resetBrokerClientReasonInit)
|
|
|
if err != nil {
|
|
|
return nil, nil, errors.Trace(err)
|
|
|
}
|
|
|
@@ -155,10 +157,53 @@ func (b *InproxyBrokerClientManager) resetBrokerClientOnRoundTripperFailed(
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- return errors.Trace(b.reset())
|
|
|
+ return errors.Trace(b.reset(resetBrokerClientReasonRoundTripperFailed))
|
|
|
+}
|
|
|
+
|
|
|
+func (b *InproxyBrokerClientManager) resetBrokerClientOnNoMatch(
|
|
|
+ brokerClientInstance *InproxyBrokerClientInstance) error {
|
|
|
+
|
|
|
+ // Ignore the no match callback for proxies. For personal pairing, the
|
|
|
+ // broker rotation scheme has clients moving brokers to find relatively
|
|
|
+ // static proxies. For common pairing, we want to achieve balanced supply
|
|
|
+ // across brokers.
|
|
|
+ //
|
|
|
+ // Currently, inproxy.BrokerDialCoordinator.BrokerClientNoMatch is only
|
|
|
+ // wired up for clients, but this check ensures it'll still be ignored in
|
|
|
+ // case that changes.
|
|
|
+ if b.isProxy {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if b.brokerClientInstance != brokerClientInstance {
|
|
|
+ // See comment for same logic in resetBrokerClientOnRoundTripperFailed.
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ p := b.config.GetParameters().Get()
|
|
|
+ defer p.Close()
|
|
|
+
|
|
|
+ probability := parameters.InproxyClientNoMatchFailoverProbability
|
|
|
+ if b.config.IsInproxyPersonalPairingMode() {
|
|
|
+ probability = parameters.InproxyClientNoMatchFailoverPersonalProbability
|
|
|
+ }
|
|
|
+ if !p.WeightedCoinFlip(probability) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return errors.Trace(b.reset(resetBrokerClientReasonRoundNoMatch))
|
|
|
}
|
|
|
|
|
|
-func (b *InproxyBrokerClientManager) reset() error {
|
|
|
+type resetBrokerClientReason int
|
|
|
+
|
|
|
+const (
|
|
|
+ resetBrokerClientReasonInit resetBrokerClientReason = iota + 1
|
|
|
+ resetBrokerClientReasonTacticsApplied
|
|
|
+ resetBrokerClientReasonRoundTripperFailed
|
|
|
+ resetBrokerClientReasonRoundNoMatch
|
|
|
+)
|
|
|
+
|
|
|
+func (b *InproxyBrokerClientManager) reset(reason resetBrokerClientReason) error {
|
|
|
|
|
|
// Assumes b.mutex lock is held.
|
|
|
|
|
|
@@ -173,6 +218,20 @@ func (b *InproxyBrokerClientManager) reset() error {
|
|
|
b.brokerClientInstance.Close()
|
|
|
}
|
|
|
|
|
|
+ // b.brokerSelectCount tracks the number of broker resets and is used to
|
|
|
+ // iterate over the brokers in a deterministic rotation when running in
|
|
|
+ // personal pairing mode.
|
|
|
+
|
|
|
+ switch reason {
|
|
|
+ case resetBrokerClientReasonInit,
|
|
|
+ resetBrokerClientReasonTacticsApplied:
|
|
|
+ b.brokerSelectCount = 0
|
|
|
+
|
|
|
+ case resetBrokerClientReasonRoundTripperFailed,
|
|
|
+ resetBrokerClientReasonRoundNoMatch:
|
|
|
+ b.brokerSelectCount += 1
|
|
|
+ }
|
|
|
+
|
|
|
// Any existing broker client is removed, even if
|
|
|
// NewInproxyBrokerClientInstance fails. This ensures, for example, that
|
|
|
// an existing broker client is removed when its spec is no longer
|
|
|
@@ -183,7 +242,12 @@ func (b *InproxyBrokerClientManager) reset() error {
|
|
|
networkID := b.config.GetNetworkID()
|
|
|
|
|
|
brokerClientInstance, err := NewInproxyBrokerClientInstance(
|
|
|
- b.config, b, networkID, b.isProxy)
|
|
|
+ b.config,
|
|
|
+ b,
|
|
|
+ networkID,
|
|
|
+ b.isProxy,
|
|
|
+ b.brokerSelectCount,
|
|
|
+ reason == resetBrokerClientReasonRoundNoMatch)
|
|
|
if err != nil {
|
|
|
return errors.Trace(err)
|
|
|
}
|
|
|
@@ -218,6 +282,7 @@ type InproxyBrokerClientInstance struct {
|
|
|
announceDelayJitter float64
|
|
|
answerRequestTimeout time.Duration
|
|
|
offerRequestTimeout time.Duration
|
|
|
+ offerRequestPersonalTimeout time.Duration
|
|
|
offerRetryDelay time.Duration
|
|
|
offerRetryJitter float64
|
|
|
relayedPacketRequestTimeout time.Duration
|
|
|
@@ -236,7 +301,9 @@ func NewInproxyBrokerClientInstance(
|
|
|
config *Config,
|
|
|
brokerClientManager *InproxyBrokerClientManager,
|
|
|
networkID string,
|
|
|
- isProxy bool) (*InproxyBrokerClientInstance, error) {
|
|
|
+ isProxy bool,
|
|
|
+ brokerSelectCount int,
|
|
|
+ resetReasonNoMatch bool) (*InproxyBrokerClientInstance, error) {
|
|
|
|
|
|
p := config.GetParameters().Get()
|
|
|
defer p.Close()
|
|
|
@@ -251,6 +318,9 @@ func NewInproxyBrokerClientInstance(
|
|
|
if !isProxy && len(commonCompartmentIDs) == 0 && len(personalCompartmentIDs) == 0 {
|
|
|
return nil, errors.TraceNew("no compartment IDs")
|
|
|
}
|
|
|
+ if len(personalCompartmentIDs) > 1 {
|
|
|
+ return nil, errors.TraceNew("unexpected multiple personal compartment IDs")
|
|
|
+ }
|
|
|
|
|
|
// Select the broker to use, optionally favoring brokers with replay data.
|
|
|
// In the InproxyBrokerSpecs calls, the first non-empty tactics parameter
|
|
|
@@ -290,45 +360,109 @@ func NewInproxyBrokerClientInstance(
|
|
|
return nil, errors.TraceNew("no broker specs")
|
|
|
}
|
|
|
|
|
|
- // To ensure personal compartment ID client/proxy rendezvous at same
|
|
|
- // broker, simply pick the first configured broker.
|
|
|
+ // Select a broker.
|
|
|
+
|
|
|
+ // In common pairing mode, the available brokers are shuffled before
|
|
|
+ // selection, for random load balancing. Brokers with available dial
|
|
|
+ // parameter replay data are preferred. When rotating brokers due to a no
|
|
|
+ // match, the available replay data is ignored to increase the chance of
|
|
|
+ // selecting a different broker.
|
|
|
+ //
|
|
|
+ // In personal pairing mode, arrange for the proxy and client to
|
|
|
+ // rendezvous at the same broker by shuffling based on the shared
|
|
|
+ // personal compartment ID. Both the client and proxy will select the
|
|
|
+ // same initial broker, and fail over to other brokers in the same order.
|
|
|
+ // By design, clients will move between brokers aggressively, rotating on
|
|
|
+ // no-match responses and applying a shorter client offer timeout; while
|
|
|
+ // proxies will remain in place in order to be found. Since rendezvous
|
|
|
+ // depends on the ordering, each broker is selected in shuffle order;
|
|
|
+ // dial parameter replay data is used when available but not considered
|
|
|
+ // in selection ordering. The brokerSelectCount input is used to
|
|
|
+ // progressively index into the list of shuffled brokers.
|
|
|
+ //
|
|
|
+ // Potential future enhancements:
|
|
|
//
|
|
|
- // Limitations: there's no failover or load balancing for the personal
|
|
|
- // compartment ID case; and this logic assumes that the broker spec
|
|
|
- // tactics are the same for the client and proxy.
|
|
|
+ // - Use brokerSelectCount in the common pairing case as well, to ensure
|
|
|
+ // that a no-match reset always selects a different broker; but, unlike
|
|
|
+ // the personal pairing logic, still prefer brokers with replay rather
|
|
|
+ // than following a strict shuffle order.
|
|
|
+ //
|
|
|
+ // - The common pairing no match broker rotation is intended to partially
|
|
|
+ // mitigate poor common proxy load balancing that can leave a broker
|
|
|
+ // with little proxy supply. A more robust mitigation would be to make
|
|
|
+ // proxies distribute announcements across multiple or even all brokers.
|
|
|
+
|
|
|
+ personalPairing := len(personalCompartmentIDs) > 0
|
|
|
|
|
|
- if len(personalCompartmentIDs) > 0 {
|
|
|
- brokerSpecs = brokerSpecs[:1]
|
|
|
+ // In the following cases, don't shuffle or otherwise mutate the original
|
|
|
+ // broker spec slice, as it is a tactics parameter.
|
|
|
+
|
|
|
+ if personalPairing {
|
|
|
+
|
|
|
+ if len(personalCompartmentIDs[0]) < prng.SEED_LENGTH {
|
|
|
+ // Both inproxy.ID and prng.SEED_LENGTH are 32 bytes.
|
|
|
+ return nil, errors.TraceNew("unexpected ID length")
|
|
|
+ }
|
|
|
+
|
|
|
+ seed := prng.Seed(personalCompartmentIDs[0][0:prng.SEED_LENGTH])
|
|
|
+ PRNG := prng.NewPRNGWithSeed(&seed)
|
|
|
+
|
|
|
+ permutedIndexes := PRNG.Perm(len(brokerSpecs))
|
|
|
+ selectedIndex := permutedIndexes[brokerSelectCount%len(permutedIndexes)]
|
|
|
+ brokerSpecs = brokerSpecs[selectedIndex : selectedIndex+1]
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ permutedIndexes := prng.Perm(len(brokerSpecs))
|
|
|
+ shuffledBrokerSpecs := make(parameters.InproxyBrokerSpecsValue, len(brokerSpecs))
|
|
|
+ for i, index := range permutedIndexes {
|
|
|
+ shuffledBrokerSpecs[i] = brokerSpecs[index]
|
|
|
+ }
|
|
|
+ brokerSpecs = shuffledBrokerSpecs
|
|
|
}
|
|
|
|
|
|
- now := time.Now()
|
|
|
+ selectFirstCandidate := resetReasonNoMatch || personalPairing
|
|
|
|
|
|
- // Prefer a broker with replay data.
|
|
|
+ // Replay broker dial parameters.
|
|
|
+
|
|
|
+ // In selectFirstCandidate cases, SelectCandidateWithNetworkReplayParameters
|
|
|
+ // will always select the first candidate, returning corresponding replay
|
|
|
+ // data when available. Otherwise, SelectCandidateWithNetworkReplayParameters
|
|
|
+ // iterates over the shuffled candidates and returns the first with replay data.
|
|
|
+
|
|
|
+ var brokerSpec *parameters.InproxyBrokerSpec
|
|
|
+ var brokerDialParams *InproxyBrokerDialParameters
|
|
|
|
|
|
// Replay is disabled when the TTL, InproxyReplayBrokerDialParametersTTL,
|
|
|
// is 0.
|
|
|
+ now := time.Now()
|
|
|
ttl := p.Duration(parameters.InproxyReplayBrokerDialParametersTTL)
|
|
|
|
|
|
replayEnabled := ttl > 0 &&
|
|
|
!config.DisableReplay &&
|
|
|
prng.FlipWeightedCoin(p.Float(parameters.InproxyReplayBrokerDialParametersProbability))
|
|
|
|
|
|
- brokerSpec, brokerDialParams, err :=
|
|
|
- ShuffleAndGetNetworkReplayParameters[parameters.InproxyBrokerSpec, InproxyBrokerDialParameters](
|
|
|
- networkID,
|
|
|
- replayEnabled,
|
|
|
- brokerSpecs,
|
|
|
- func(spec *parameters.InproxyBrokerSpec) string { return spec.BrokerPublicKey },
|
|
|
- func(spec *parameters.InproxyBrokerSpec, dialParams *InproxyBrokerDialParameters) bool {
|
|
|
- return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
|
|
|
- bytes.Equal(dialParams.LastUsedBrokerSpecHash, hashBrokerSpec(spec))
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- NoticeWarning("ShuffleAndGetNetworkReplayParameters failed: %v", errors.Trace(err))
|
|
|
+ if replayEnabled {
|
|
|
+ brokerSpec, brokerDialParams, err =
|
|
|
+ SelectCandidateWithNetworkReplayParameters[parameters.InproxyBrokerSpec, InproxyBrokerDialParameters](
|
|
|
+ networkID,
|
|
|
+ selectFirstCandidate,
|
|
|
+ brokerSpecs,
|
|
|
+ func(spec *parameters.InproxyBrokerSpec) string { return spec.BrokerPublicKey },
|
|
|
+ func(spec *parameters.InproxyBrokerSpec, dialParams *InproxyBrokerDialParameters) bool {
|
|
|
+ return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
|
|
|
+ bytes.Equal(dialParams.LastUsedBrokerSpecHash, hashBrokerSpec(spec))
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ NoticeWarning("SelectCandidateWithNetworkReplayParameters failed: %v", errors.Trace(err))
|
|
|
+ // Continue without replay
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // When there's an error, try to continue, using a random broker spec
|
|
|
- // and no replay dial parameters.
|
|
|
- brokerSpec = brokerSpecs[prng.Intn(len(brokerSpecs)-1)]
|
|
|
+ // Select the first broker in the shuffle when replay is not enabled or in
|
|
|
+ // case SelectCandidateWithNetworkReplayParameters fails.
|
|
|
+ if brokerSpec == nil {
|
|
|
+ brokerSpec = brokerSpecs[0]
|
|
|
}
|
|
|
|
|
|
// Generate new broker dial parameters if not replaying. Later, isReplay
|
|
|
@@ -407,6 +541,7 @@ func NewInproxyBrokerClientInstance(
|
|
|
announceDelayJitter: p.Float(parameters.InproxyProxyAnnounceDelayJitter),
|
|
|
answerRequestTimeout: p.Duration(parameters.InproxyProxyAnswerRequestTimeout),
|
|
|
offerRequestTimeout: p.Duration(parameters.InproxyClientOfferRequestTimeout),
|
|
|
+ offerRequestPersonalTimeout: p.Duration(parameters.InproxyClientOfferRequestPersonalTimeout),
|
|
|
offerRetryDelay: p.Duration(parameters.InproxyClientOfferRetryDelay),
|
|
|
offerRetryJitter: p.Float(parameters.InproxyClientOfferRetryJitter),
|
|
|
relayedPacketRequestTimeout: p.Duration(parameters.InproxyClientRelayedPacketRequestTimeout),
|
|
|
@@ -414,6 +549,22 @@ func NewInproxyBrokerClientInstance(
|
|
|
replayUpdateFrequency: p.Duration(parameters.InproxyReplayBrokerUpdateFrequency),
|
|
|
}
|
|
|
|
|
|
+ // Adjust long-polling request timeouts to respect any maximum request
|
|
|
+ // timeout supported by the provider fronting the request.
|
|
|
+ maxRequestTimeout, ok := p.KeyDurations(
|
|
|
+ parameters.InproxyFrontingProviderClientMaxRequestTimeouts)[brokerDialParams.FrontingProviderID]
|
|
|
+ if ok && maxRequestTimeout > 0 {
|
|
|
+ if b.announceRequestTimeout > maxRequestTimeout {
|
|
|
+ b.announceRequestTimeout = maxRequestTimeout
|
|
|
+ }
|
|
|
+ if b.offerRequestTimeout > maxRequestTimeout {
|
|
|
+ b.offerRequestTimeout = maxRequestTimeout
|
|
|
+ }
|
|
|
+ if b.offerRequestPersonalTimeout > maxRequestTimeout {
|
|
|
+ b.offerRequestPersonalTimeout = maxRequestTimeout
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Initialize broker client. This will start with a fresh broker session.
|
|
|
//
|
|
|
// When resetBrokerClientOnRoundTripperFailed is invoked due to a failure
|
|
|
@@ -462,9 +613,29 @@ func prepareCompartmentIDs(
|
|
|
|
|
|
maxCompartmentIDListLength := p.Int(parameters.InproxyMaxCompartmentIDListLength)
|
|
|
|
|
|
- configPersonalCompartmentIDs := config.InproxyProxyPersonalCompartmentIDs
|
|
|
- if !isProxy {
|
|
|
- configPersonalCompartmentIDs = config.InproxyClientPersonalCompartmentIDs
|
|
|
+ // Personal compartment ID limitations:
|
|
|
+ //
|
|
|
+ // The broker API messages, ProxyAnnounceRequest and ClientOfferRequest,
|
|
|
+ // support lists of personal compartment IDs. However, both the proxy and
|
|
|
+ // the client are currently limited to specifying at most one personal
|
|
|
+ // compartment ID due to the following limitations:
|
|
|
+ //
|
|
|
+ // - On the broker side, the matcher queue implementation supports at most
|
|
|
+ // one proxy personal compartment ID. See inproxy/Matcher.Announce. The
|
|
|
+ // broker currently enforces that at most one personal compartment ID
|
|
|
+ // may be specified per ProxyAnnounceRequest.
|
|
|
+ //
|
|
|
+ // - On the proxy/client side, the personal pairing rendezvous logic --
|
|
|
+ // which aims for proxies and clients to select the same initial broker
|
|
|
+ // and same order of failover to other brokers -- uses a shuffle that
|
|
|
+ // assumes both the proxy and client use the same single, personal
|
|
|
+ // compartment ID
|
|
|
+
|
|
|
+ var configPersonalCompartmentIDs []string
|
|
|
+ if isProxy && len(config.InproxyProxyPersonalCompartmentID) > 0 {
|
|
|
+ configPersonalCompartmentIDs = []string{config.InproxyProxyPersonalCompartmentID}
|
|
|
+ } else if !isProxy && len(config.InproxyClientPersonalCompartmentID) > 0 {
|
|
|
+ configPersonalCompartmentIDs = []string{config.InproxyClientPersonalCompartmentID}
|
|
|
}
|
|
|
personalCompartmentIDs, err := inproxy.IDsFromStrings(configPersonalCompartmentIDs)
|
|
|
if err != nil {
|
|
|
@@ -698,6 +869,27 @@ func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperFailed(roundTrippe
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// Implements the inproxy.BrokerDialCoordinator interface.
|
|
|
+func (b *InproxyBrokerClientInstance) BrokerClientNoMatch(roundTripper inproxy.RoundTripper) {
|
|
|
+ b.mutex.Lock()
|
|
|
+ defer b.mutex.Unlock()
|
|
|
+
|
|
|
+ if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
|
|
|
+ // See roundTripper check comment in BrokerClientRoundTripperFailed.
|
|
|
+ NoticeError("BrokerClientNoMatch: roundTripper instance mismatch")
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Any persistent replay dial parameters are retained and not deleted,
|
|
|
+ // since the broker client successfully transacted with the broker.
|
|
|
+
|
|
|
+ err := b.brokerClientManager.resetBrokerClientOnNoMatch(b)
|
|
|
+ if err != nil {
|
|
|
+ NoticeWarning("reset broker client failed: %v", errors.Trace(err))
|
|
|
+ // Continue with old broker client instance.
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// Implements the inproxy.BrokerDialCoordinator interface.
|
|
|
func (b *InproxyBrokerClientInstance) AnnounceRequestTimeout() time.Duration {
|
|
|
return b.announceRequestTimeout
|
|
|
@@ -728,6 +920,11 @@ func (b *InproxyBrokerClientInstance) OfferRequestTimeout() time.Duration {
|
|
|
return b.offerRequestTimeout
|
|
|
}
|
|
|
|
|
|
+// Implements the inproxy.BrokerDialCoordinator interface.
|
|
|
+func (b *InproxyBrokerClientInstance) OfferRequestPersonalTimeout() time.Duration {
|
|
|
+ return b.offerRequestPersonalTimeout
|
|
|
+}
|
|
|
+
|
|
|
// Implements the inproxy.BrokerDialCoordinator interface.
|
|
|
func (b *InproxyBrokerClientInstance) OfferRetryDelay() time.Duration {
|
|
|
return b.offerRetryDelay
|
|
|
@@ -1075,11 +1272,26 @@ func (brokerDialParams *InproxyBrokerDialParameters) prepareDialConfigs(
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// GetBrokerMetrics returns dial parameter log fields to be reported to a
|
|
|
+// broker.
|
|
|
+func (brokerDialParams *InproxyBrokerDialParameters) GetBrokerMetrics() common.LogFields {
|
|
|
+
|
|
|
+ logFields := common.LogFields{}
|
|
|
+
|
|
|
+ // TODO: add additional broker fronting dial parameters to be logged by
|
|
|
+ // the broker -- as successful parameters might not otherwise by logged
|
|
|
+ // via server_tunnel if the subsequent WebRTC dials fail.
|
|
|
+
|
|
|
+ logFields["fronting_provider_id"] = brokerDialParams.FrontingProviderID
|
|
|
+
|
|
|
+ return logFields
|
|
|
+}
|
|
|
+
|
|
|
// GetMetrics implements the common.MetricsSource interface and returns log
|
|
|
// fields detailing the broker dial parameters.
|
|
|
func (brokerDialParams *InproxyBrokerDialParameters) GetMetrics() common.LogFields {
|
|
|
|
|
|
- logFields := make(common.LogFields)
|
|
|
+ logFields := common.LogFields{}
|
|
|
|
|
|
logFields["inproxy_broker_transport"] = brokerDialParams.BrokerTransport
|
|
|
|