|
|
@@ -73,6 +73,7 @@ type Controller struct {
|
|
|
concurrentIntensiveEstablishTunnels int
|
|
|
peakConcurrentEstablishTunnels int
|
|
|
peakConcurrentIntensiveEstablishTunnels int
|
|
|
+ establishInproxyForceSelectionCount int
|
|
|
establishCtx context.Context
|
|
|
stopEstablish context.CancelFunc
|
|
|
establishWaitGroup *sync.WaitGroup
|
|
|
@@ -1560,8 +1561,7 @@ func (p *protocolSelectionConstraints) isInitialCandidate(
|
|
|
p.initialLimitTunnelProtocols,
|
|
|
p.limitTunnelDialPortNumbers,
|
|
|
p.limitQUICVersions,
|
|
|
- excludeIntensive,
|
|
|
- false)) > 0
|
|
|
+ excludeIntensive)) > 0
|
|
|
}
|
|
|
|
|
|
func (p *protocolSelectionConstraints) isCandidate(
|
|
|
@@ -1574,8 +1574,7 @@ func (p *protocolSelectionConstraints) isCandidate(
|
|
|
p.limitTunnelProtocols,
|
|
|
p.limitTunnelDialPortNumbers,
|
|
|
p.limitQUICVersions,
|
|
|
- excludeIntensive,
|
|
|
- false)) > 0
|
|
|
+ excludeIntensive)) > 0
|
|
|
}
|
|
|
|
|
|
func (p *protocolSelectionConstraints) canReplay(
|
|
|
@@ -1590,7 +1589,7 @@ func (p *protocolSelectionConstraints) canReplay(
|
|
|
|
|
|
return common.Contains(
|
|
|
p.supportedProtocols(
|
|
|
- connectTunnelCount, excludeIntensive, false, serverEntry),
|
|
|
+ connectTunnelCount, excludeIntensive, serverEntry),
|
|
|
replayProtocol)
|
|
|
}
|
|
|
|
|
|
@@ -1611,8 +1610,7 @@ func (p *protocolSelectionConstraints) getLimitTunnelProtocols(
|
|
|
func (p *protocolSelectionConstraints) supportedProtocols(
|
|
|
connectTunnelCount int,
|
|
|
excludeIntensive bool,
|
|
|
- excludeInproxy bool,
|
|
|
- serverEntry *protocol.ServerEntry) []string {
|
|
|
+ serverEntry *protocol.ServerEntry) protocol.TunnelProtocols {
|
|
|
|
|
|
return serverEntry.GetSupportedProtocols(
|
|
|
conditionallyEnabledComponents{},
|
|
|
@@ -1620,18 +1618,25 @@ func (p *protocolSelectionConstraints) supportedProtocols(
|
|
|
p.getLimitTunnelProtocols(connectTunnelCount),
|
|
|
p.limitTunnelDialPortNumbers,
|
|
|
p.limitQUICVersions,
|
|
|
- excludeIntensive,
|
|
|
- excludeInproxy)
|
|
|
+ excludeIntensive)
|
|
|
}
|
|
|
|
|
|
func (p *protocolSelectionConstraints) selectProtocol(
|
|
|
connectTunnelCount int,
|
|
|
excludeIntensive bool,
|
|
|
- excludeInproxy bool,
|
|
|
+ preferInproxy bool,
|
|
|
serverEntry *protocol.ServerEntry) (string, time.Duration, bool) {
|
|
|
|
|
|
candidateProtocols := p.supportedProtocols(
|
|
|
- connectTunnelCount, excludeIntensive, excludeInproxy, serverEntry)
|
|
|
+ connectTunnelCount, excludeIntensive, serverEntry)
|
|
|
+
|
|
|
+ // Prefer selecting an in-proxy tunnel protocol when indicated, but fall
|
|
|
+ // back to other protocols when no in-proxy protocol is supported.
|
|
|
+
|
|
|
+ if preferInproxy && candidateProtocols.HasInproxyTunnelProtocols() {
|
|
|
+ NoticeInfo("in-proxy protocol preferred")
|
|
|
+ candidateProtocols = candidateProtocols.PruneNonInproxyTunnelProtocols()
|
|
|
+ }
|
|
|
|
|
|
if len(candidateProtocols) == 0 {
|
|
|
return "", 0, false
|
|
|
@@ -1662,6 +1667,15 @@ func (p *protocolSelectionConstraints) selectProtocol(
|
|
|
// so delay the dial. In other cases, skip the candidate and pick a
|
|
|
// non-in-proxy tunnel protocol.
|
|
|
//
|
|
|
+ // Also delay, rather than skip, when preferring an in-proxy protocol.
|
|
|
+ // Note that in the prefer case, failure to meet requirements, such as
|
|
|
+ // having broker specs, will fail the dial and consume
|
|
|
+ // InproxyTunnelProtocolForceSelectionCount, when that mechanism is
|
|
|
+ // active. These fast failures should eventually lead to selecting
|
|
|
+ // non-in-proxy candidates; as a potential future enhancement, check the
|
|
|
+ // requirements _before_ applying InproxyTunnelProtocolPreferProbability
|
|
|
+ // or InproxyTunnelProtocolForceSelectionCount.
|
|
|
+ //
|
|
|
// The delay is not applied here since the caller is holding the
|
|
|
// concurrentEstablishTunnelsMutex lock, potentially blocking other
|
|
|
// establishment workers. Instead the delay is returned and applied
|
|
|
@@ -1676,7 +1690,8 @@ func (p *protocolSelectionConstraints) selectProtocol(
|
|
|
//
|
|
|
// TODO: replace token on fast failure that doesn't reach the broker?
|
|
|
|
|
|
- if p.config.IsInproxyClientPersonalPairingMode() ||
|
|
|
+ if preferInproxy ||
|
|
|
+ p.config.IsInproxyClientPersonalPairingMode() ||
|
|
|
p.getLimitTunnelProtocols(connectTunnelCount).IsOnlyInproxyTunnelProtocols() {
|
|
|
|
|
|
// Check for missing in-proxy broker request requirements before
|
|
|
@@ -1723,10 +1738,7 @@ func (p *protocolSelectionConstraints) selectProtocol(
|
|
|
|
|
|
if skip {
|
|
|
|
|
|
- excludeInproxy = true
|
|
|
-
|
|
|
- candidateProtocols = p.supportedProtocols(
|
|
|
- connectTunnelCount, excludeIntensive, excludeInproxy, serverEntry)
|
|
|
+ candidateProtocols = candidateProtocols.PruneInproxyTunnelProtocols()
|
|
|
|
|
|
if len(candidateProtocols) == 0 {
|
|
|
return "", 0, false
|
|
|
@@ -1766,6 +1778,7 @@ func (controller *Controller) startEstablishing() {
|
|
|
controller.concurrentIntensiveEstablishTunnels = 0
|
|
|
controller.peakConcurrentEstablishTunnels = 0
|
|
|
controller.peakConcurrentIntensiveEstablishTunnels = 0
|
|
|
+ controller.establishInproxyForceSelectionCount = 0
|
|
|
controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
|
|
|
DoGarbageCollection()
|
|
|
@@ -1807,25 +1820,6 @@ func (controller *Controller) startEstablishing() {
|
|
|
// establish will eventually signal another fetch.
|
|
|
controller.establishSignalForceTacticsFetch = make(chan struct{})
|
|
|
|
|
|
- // Initialize the in-proxy client dial rate limiter. Rate limits are used in
|
|
|
- // protocolSelectionConstraints.selectProtocol. When
|
|
|
- // InproxyClientDialRateLimitQuantity is 0, there is no rate limit.
|
|
|
- //
|
|
|
- // The rate limiter is reset for each establishment, which ensures no
|
|
|
- // delays carry over from a previous establishment run. However, this
|
|
|
- // does mean that very frequent re-establishments may exceed the rate
|
|
|
- // limit overall.
|
|
|
-
|
|
|
- p := controller.config.GetParameters().Get()
|
|
|
- inproxyRateLimitQuantity := p.Int(parameters.InproxyClientDialRateLimitQuantity)
|
|
|
- inproxyRateLimitInterval := p.Duration(parameters.InproxyClientDialRateLimitInterval)
|
|
|
- if inproxyRateLimitQuantity > 0 {
|
|
|
- controller.inproxyClientDialRateLimiter = rate.NewLimiter(
|
|
|
- rate.Limit(float64(inproxyRateLimitQuantity)/inproxyRateLimitInterval.Seconds()),
|
|
|
- inproxyRateLimitQuantity)
|
|
|
- }
|
|
|
- p.Close()
|
|
|
-
|
|
|
controller.establishWaitGroup.Add(1)
|
|
|
go controller.launchEstablishing()
|
|
|
}
|
|
|
@@ -1985,6 +1979,42 @@ func (controller *Controller) launchEstablishing() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Initialize the in-proxy client dial rate limiter, using the latest
|
|
|
+ // tactics. Rate limits are used in
|
|
|
+ // protocolSelectionConstraints.selectProtocol. When
|
|
|
+ // InproxyClientDialRateLimitQuantity is 0, there is no rate limit.
|
|
|
+ //
|
|
|
+ // The rate limiter is reset for each establishment, which ensures no
|
|
|
+ // delays carry over from a previous establishment run. However, this
|
|
|
+ // does mean that very frequent re-establishments may exceed the rate
|
|
|
+ // limit overall.
|
|
|
+
|
|
|
+ inproxyRateLimitQuantity := p.Int(parameters.InproxyClientDialRateLimitQuantity)
|
|
|
+ inproxyRateLimitInterval := p.Duration(parameters.InproxyClientDialRateLimitInterval)
|
|
|
+ if inproxyRateLimitQuantity > 0 {
|
|
|
+ controller.inproxyClientDialRateLimiter = rate.NewLimiter(
|
|
|
+ rate.Limit(float64(inproxyRateLimitQuantity)/inproxyRateLimitInterval.Seconds()),
|
|
|
+ inproxyRateLimitQuantity)
|
|
|
+ }
|
|
|
+
|
|
|
+ // InproxyTunnelProtocolForceSelectionCount forces the specified number of
|
|
|
+ // early candidates to select in-proxy protocols.
|
|
|
+ //
|
|
|
+ // Only server entries with INPROXY capabilities are counted as forced
|
|
|
+ // selection candidates; and, as currently implemented, these server
|
|
|
+ // entries are not sorted to the front of the server entry iterator, so
|
|
|
+ // force selection is applied opportunistically as server entries with
|
|
|
+ // the necessary capabilities are encountered.
|
|
|
+ //
|
|
|
+ // If a forced server entry has existing replay data for a non-in-proxy
|
|
|
+ // protocol, that replay data is ignored for this dial, but not deleted.
|
|
|
+ //
|
|
|
+ // The affinity server entry candidate is a potential candidate for forced
|
|
|
+ // selection.
|
|
|
+
|
|
|
+ controller.establishInproxyForceSelectionCount =
|
|
|
+ p.Int(parameters.InproxyTunnelProtocolForceSelectionCount)
|
|
|
+
|
|
|
// ConnectionWorkerPoolSize may be set by tactics.
|
|
|
//
|
|
|
// In-proxy personal pairing mode uses a distinct parameter which is
|
|
|
@@ -2591,11 +2621,15 @@ loop:
|
|
|
|
|
|
p := controller.config.GetParameters().Get()
|
|
|
limitIntensiveConnectionWorkers := p.Int(parameters.LimitIntensiveConnectionWorkers)
|
|
|
- inproxySelectionProbability := p.Float(parameters.InproxyTunnelProtocolSelectionProbability)
|
|
|
+ inproxyPreferProbability := p.Float(parameters.InproxyTunnelProtocolPreferProbability)
|
|
|
staggerPeriod := p.Duration(parameters.StaggerConnectionWorkersPeriod)
|
|
|
staggerJitter := p.Float(parameters.StaggerConnectionWorkersJitter)
|
|
|
p.Close()
|
|
|
|
|
|
+ // Access to controller fields is synchronized with this lock. The
|
|
|
+ // canReplay and selectProtocol callbacks are intended to be invoked
|
|
|
+ // in MakeDialParameters while lock is held.
|
|
|
+
|
|
|
controller.concurrentEstablishTunnelsMutex.Lock()
|
|
|
|
|
|
excludeIntensive := false
|
|
|
@@ -2604,7 +2638,55 @@ loop:
|
|
|
excludeIntensive = true
|
|
|
}
|
|
|
|
|
|
+ // Force in-proxy protocol selection as required, and if the server
|
|
|
+ // entry supports in-proxy protocols. If this candidate happens to be
|
|
|
+ // a replay of an in-proxy protocol, it's still counted as a forced
|
|
|
+ // selection.
|
|
|
+ //
|
|
|
+ // Forced selection is skipped when excluding intensive protocols, as
|
|
|
+ // TunnelProtocolIsResourceIntensive currently includes
|
|
|
+ // TunnelProtocolUsesInproxy.
|
|
|
+
|
|
|
+ inproxyForceSelection := false
|
|
|
+ if !excludeIntensive &&
|
|
|
+ controller.establishInproxyForceSelectionCount > 0 &&
|
|
|
+ controller.protocolSelectionConstraints.supportedProtocols(
|
|
|
+ controller.establishConnectTunnelCount,
|
|
|
+ excludeIntensive,
|
|
|
+ candidateServerEntry.serverEntry).HasInproxyTunnelProtocols() {
|
|
|
+
|
|
|
+ NoticeInfo("in-proxy protocol selection forced")
|
|
|
+ inproxyForceSelection = true
|
|
|
+ controller.establishInproxyForceSelectionCount -= 1
|
|
|
+ }
|
|
|
+
|
|
|
canReplay := func(serverEntry *protocol.ServerEntry, replayProtocol string) bool {
|
|
|
+
|
|
|
+ if inproxyForceSelection {
|
|
|
+ if !protocol.TunnelProtocolUsesInproxy(replayProtocol) {
|
|
|
+
|
|
|
+ // Skip replay when forcing in-proxy protocol selection.
|
|
|
+ // MakeDialParameters will call the following
|
|
|
+ // selectProtocol callback with in-proxy preferred.
|
|
|
+ //
|
|
|
+ // Skipping here retains the existing replay data, as
|
|
|
+ // DialParameters.Failed will only delete it when
|
|
|
+ // IsReplay. However, the old replay data can be replaced
|
|
|
+ // if the in-proxy tunnel is successful.
|
|
|
+
|
|
|
+ return false
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ // MakeDialParameters calls canReplay only once it has
|
|
|
+ // replay data for the server entry candidate, so this
|
|
|
+ // will be a replay.
|
|
|
+
|
|
|
+ NoticeInfo("in-proxy protocol selection replayed")
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
return controller.protocolSelectionConstraints.canReplay(
|
|
|
controller.establishConnectTunnelCount,
|
|
|
excludeIntensive,
|
|
|
@@ -2620,19 +2702,15 @@ loop:
|
|
|
|
|
|
var dialRateLimitDelay time.Duration
|
|
|
|
|
|
- selectProtocol := func(serverEntry *protocol.ServerEntry) (string, bool) {
|
|
|
-
|
|
|
- // The in-proxy protocol selection probability allows for
|
|
|
- // tuning/limiting in-proxy usage independent of
|
|
|
- // LimitTunnelProtocol targeting.
|
|
|
+ selectProtocol := func(
|
|
|
+ serverEntry *protocol.ServerEntry) (string, bool) {
|
|
|
|
|
|
- onlyInproxy := controller.config.IsInproxyClientPersonalPairingMode()
|
|
|
- includeInproxy := onlyInproxy || prng.FlipWeightedCoin(inproxySelectionProbability)
|
|
|
+ preferInproxy := inproxyForceSelection || prng.FlipWeightedCoin(inproxyPreferProbability)
|
|
|
|
|
|
selectedProtocol, rateLimitDelay, ok := controller.protocolSelectionConstraints.selectProtocol(
|
|
|
controller.establishConnectTunnelCount,
|
|
|
excludeIntensive,
|
|
|
- !includeInproxy,
|
|
|
+ preferInproxy,
|
|
|
serverEntry)
|
|
|
|
|
|
dialRateLimitDelay = rateLimitDelay
|