|
|
@@ -272,13 +272,14 @@ loop:
|
|
|
// proxyOneClient fails. As having no broker clients is a possible
|
|
|
// proxyOneClient failure case, GetBrokerClient errors are ignored here and
|
|
|
// defaults used in that case.
|
|
|
-func (p *Proxy) getAnnounceDelayParameters() (time.Duration, float64) {
|
|
|
+func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, float64) {
|
|
|
brokerClient, err := p.config.GetBrokerClient()
|
|
|
if err != nil {
|
|
|
- return proxyAnnounceDelay, proxyAnnounceDelayJitter
|
|
|
+ return proxyAnnounceDelay, proxyAnnounceMaxBackoffDelay, proxyAnnounceDelayJitter
|
|
|
}
|
|
|
brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
|
|
|
return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
|
|
|
+ common.ValueOrDefault(brokerCoordinator.AnnounceMaxBackoffDelay(), proxyAnnounceMaxBackoffDelay),
|
|
|
common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
|
|
|
|
|
|
}
|
|
|
@@ -384,6 +385,10 @@ func (p *Proxy) proxyClients(
|
|
|
backOff, err := p.proxyOneClient(
|
|
|
ctx, logAnnounce, signalAnnounceDone)
|
|
|
|
|
|
+ if !backOff || err == nil {
|
|
|
+ failureDelayFactor = 1
|
|
|
+ }
|
|
|
+
|
|
|
if err != nil && ctx.Err() == nil {
|
|
|
|
|
|
// Apply a simple exponential backoff based on whether
|
|
|
@@ -396,17 +401,12 @@ func (p *Proxy) proxyClients(
|
|
|
// prevents both excess local logging and churning in the former
|
|
|
// case and excessive bad service to clients or unintentionally
|
|
|
// overloading the broker in the latter case.
|
|
|
- //
|
|
|
- // TODO: specific tactics parameters to control this logic.
|
|
|
|
|
|
- delay, jitter := p.getAnnounceDelayParameters()
|
|
|
+ delay, maxBackoffDelay, jitter := p.getAnnounceDelayParameters()
|
|
|
|
|
|
- if !backOff {
|
|
|
- failureDelayFactor = 1
|
|
|
- }
|
|
|
delay = delay * failureDelayFactor
|
|
|
- if delay > proxyAnnounceMaxBackoffDelay {
|
|
|
- delay = proxyAnnounceMaxBackoffDelay
|
|
|
+ if delay > maxBackoffDelay {
|
|
|
+ delay = maxBackoffDelay
|
|
|
}
|
|
|
if failureDelayFactor < 1<<20 {
|
|
|
failureDelayFactor *= 2
|
|
|
@@ -608,7 +608,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
// any deliberate delay.
|
|
|
|
|
|
requestDelay := time.Duration(0)
|
|
|
- announceDelay, announceDelayJitter := p.getAnnounceDelayParameters()
|
|
|
+ announceDelay, _, announceDelayJitter := p.getAnnounceDelayParameters()
|
|
|
p.nextAnnounceMutex.Lock()
|
|
|
nextDelay := prng.JitterDuration(announceDelay, announceDelayJitter)
|
|
|
if p.nextAnnounceBrokerClient != brokerClient {
|