|
|
@@ -41,38 +41,40 @@ import (
|
|
|
// connect to; establishes and monitors tunnels; and runs local proxies which
|
|
|
// route traffic through the tunnels.
|
|
|
type Controller struct {
|
|
|
- config *Config
|
|
|
- sessionId string
|
|
|
- componentFailureSignal chan struct{}
|
|
|
- shutdownBroadcast chan struct{}
|
|
|
- runWaitGroup *sync.WaitGroup
|
|
|
- establishedTunnels chan *Tunnel
|
|
|
- failedTunnels chan *Tunnel
|
|
|
- tunnelMutex sync.Mutex
|
|
|
- establishedOnce bool
|
|
|
- tunnels []*Tunnel
|
|
|
- nextTunnel int
|
|
|
- startedConnectedReporter bool
|
|
|
- isEstablishing bool
|
|
|
- concurrentEstablishTunnelsMutex sync.Mutex
|
|
|
- concurrentEstablishTunnels int32
|
|
|
- peakConcurrentEstablishTunnels int32
|
|
|
- establishWaitGroup *sync.WaitGroup
|
|
|
- stopEstablishingBroadcast chan struct{}
|
|
|
- candidateServerEntries chan *candidateServerEntry
|
|
|
- establishPendingConns *common.Conns
|
|
|
- untunneledPendingConns *common.Conns
|
|
|
- untunneledDialConfig *DialConfig
|
|
|
- splitTunnelClassifier *SplitTunnelClassifier
|
|
|
- signalFetchCommonRemoteServerList chan struct{}
|
|
|
- signalFetchObfuscatedServerLists chan struct{}
|
|
|
- signalDownloadUpgrade chan string
|
|
|
- impairedProtocolClassification map[string]int
|
|
|
- signalReportConnected chan struct{}
|
|
|
- serverAffinityDoneBroadcast chan struct{}
|
|
|
- newClientVerificationPayload chan string
|
|
|
- packetTunnelClient *tun.Client
|
|
|
- packetTunnelTransport *PacketTunnelTransport
|
|
|
+ config *Config
|
|
|
+ sessionId string
|
|
|
+ componentFailureSignal chan struct{}
|
|
|
+ shutdownBroadcast chan struct{}
|
|
|
+ runWaitGroup *sync.WaitGroup
|
|
|
+ establishedTunnels chan *Tunnel
|
|
|
+ failedTunnels chan *Tunnel
|
|
|
+ tunnelMutex sync.Mutex
|
|
|
+ establishedOnce bool
|
|
|
+ tunnels []*Tunnel
|
|
|
+ nextTunnel int
|
|
|
+ startedConnectedReporter bool
|
|
|
+ isEstablishing bool
|
|
|
+ concurrentEstablishTunnelsMutex sync.Mutex
|
|
|
+ concurrentEstablishTunnels int
|
|
|
+ concurrentMeekEstablishTunnels int
|
|
|
+ peakConcurrentEstablishTunnels int
|
|
|
+ peakConcurrentMeekEstablishTunnels int
|
|
|
+ establishWaitGroup *sync.WaitGroup
|
|
|
+ stopEstablishingBroadcast chan struct{}
|
|
|
+ candidateServerEntries chan *candidateServerEntry
|
|
|
+ establishPendingConns *common.Conns
|
|
|
+ untunneledPendingConns *common.Conns
|
|
|
+ untunneledDialConfig *DialConfig
|
|
|
+ splitTunnelClassifier *SplitTunnelClassifier
|
|
|
+ signalFetchCommonRemoteServerList chan struct{}
|
|
|
+ signalFetchObfuscatedServerLists chan struct{}
|
|
|
+ signalDownloadUpgrade chan string
|
|
|
+ impairedProtocolClassification map[string]int
|
|
|
+ signalReportConnected chan struct{}
|
|
|
+ serverAffinityDoneBroadcast chan struct{}
|
|
|
+ newClientVerificationPayload chan string
|
|
|
+ packetTunnelClient *tun.Client
|
|
|
+ packetTunnelTransport *PacketTunnelTransport
|
|
|
}
|
|
|
|
|
|
type candidateServerEntry struct {
|
|
|
@@ -629,6 +631,10 @@ loop:
|
|
|
|
|
|
controller.classifyImpairedProtocol(failedTunnel)
|
|
|
|
|
|
+ // Clear the reference to this tunnel before calling startEstablishing,
|
|
|
+ // which will invoke a garbage collection.
|
|
|
+ failedTunnel = nil
|
|
|
+
|
|
|
// Concurrency note: only this goroutine may call startEstablishing/stopEstablishing
|
|
|
// and access isEstablishing.
|
|
|
if !controller.isEstablishing {
|
|
|
@@ -654,6 +660,12 @@ loop:
|
|
|
if !registered {
|
|
|
// Already fully established, so discard.
|
|
|
controller.discardTunnel(establishedTunnel)
|
|
|
+
|
|
|
+ // Clear the reference to this discarded tunnel and immediately run
|
|
|
+ // a garbage collection to reclaim its memory.
|
|
|
+ establishedTunnel = nil
|
|
|
+ aggressiveGarbageCollection()
|
|
|
+
|
|
|
break
|
|
|
}
|
|
|
|
|
|
@@ -1022,35 +1034,13 @@ func (controller *Controller) startEstablishing() {
|
|
|
|
|
|
controller.concurrentEstablishTunnelsMutex.Lock()
|
|
|
controller.concurrentEstablishTunnels = 0
|
|
|
+ controller.concurrentMeekEstablishTunnels = 0
|
|
|
controller.peakConcurrentEstablishTunnels = 0
|
|
|
+ controller.peakConcurrentMeekEstablishTunnels = 0
|
|
|
controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
|
|
|
- workerCount := controller.config.ConnectionWorkerPoolSize
|
|
|
-
|
|
|
- if controller.config.LimitedMemoryEnvironment {
|
|
|
- aggressiveGarbageCollection()
|
|
|
- totalMemory := emitMemoryMetrics()
|
|
|
-
|
|
|
- // When total memory size exceeds the threshold, minimize
|
|
|
- // the number of concurrent connection workers.
|
|
|
- //
|
|
|
- // Limitations:
|
|
|
- // - totalMemory is, at this time, runtime.MemStats.Sys,
|
|
|
- // which is virtual memory, not RSS; and which may not
|
|
|
- // shrink; so this trigger could be premature and
|
|
|
- // permanent.
|
|
|
- // - Only 1 concurrent worker means a candidate that is
|
|
|
- // slow to fail will severely delay the establishment;
|
|
|
- // and that it may take significant time to cycle through
|
|
|
- // all protocols to find one that works when network
|
|
|
- // conditions change.
|
|
|
-
|
|
|
- if controller.config.LimitedMemorySingleConnectionWorkerThreshold > 0 &&
|
|
|
- totalMemory >= uint64(controller.config.LimitedMemorySingleConnectionWorkerThreshold) {
|
|
|
-
|
|
|
- workerCount = 1
|
|
|
- }
|
|
|
- }
|
|
|
+ aggressiveGarbageCollection()
|
|
|
+ emitMemoryMetrics()
|
|
|
|
|
|
controller.isEstablishing = true
|
|
|
controller.establishWaitGroup = new(sync.WaitGroup)
|
|
|
@@ -1085,7 +1075,7 @@ func (controller *Controller) startEstablishing() {
|
|
|
// TODO: should not favor the first server in this case
|
|
|
controller.serverAffinityDoneBroadcast = make(chan struct{})
|
|
|
|
|
|
- for i := 0; i < workerCount; i++ {
|
|
|
+ for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
|
|
|
controller.establishWaitGroup.Add(1)
|
|
|
go controller.establishTunnelWorker()
|
|
|
}
|
|
|
@@ -1119,15 +1109,17 @@ func (controller *Controller) stopEstablishing() {
|
|
|
|
|
|
controller.concurrentEstablishTunnelsMutex.Lock()
|
|
|
peakConcurrent := controller.peakConcurrentEstablishTunnels
|
|
|
+ peakConcurrentMeek := controller.peakConcurrentMeekEstablishTunnels
|
|
|
controller.concurrentEstablishTunnels = 0
|
|
|
+ controller.concurrentMeekEstablishTunnels = 0
|
|
|
controller.peakConcurrentEstablishTunnels = 0
|
|
|
+ controller.peakConcurrentMeekEstablishTunnels = 0
|
|
|
controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
NoticeInfo("peak concurrent establish tunnels: %d", peakConcurrent)
|
|
|
+ NoticeInfo("peak concurrent meek establish tunnels: %d", peakConcurrentMeek)
|
|
|
|
|
|
- if controller.config.LimitedMemoryEnvironment {
|
|
|
- emitMemoryMetrics()
|
|
|
- standardGarbageCollection()
|
|
|
- }
|
|
|
+ emitMemoryMetrics()
|
|
|
+ standardGarbageCollection()
|
|
|
}
|
|
|
|
|
|
// establishCandidateGenerator populates the candidate queue with server entries
|
|
|
@@ -1206,7 +1198,7 @@ loop:
|
|
|
// stored or reused.
|
|
|
if i == 0 {
|
|
|
serverEntry.DisableImpairedProtocols(impairedProtocols)
|
|
|
- if len(serverEntry.GetSupportedProtocols()) == 0 {
|
|
|
+ if len(serverEntry.GetSupportedProtocols(false)) == 0 {
|
|
|
// Skip this server entry, as it has no supported
|
|
|
// protocols after disabling the impaired ones
|
|
|
// TODO: modify ServerEntryIterator to skip these?
|
|
|
@@ -1223,6 +1215,8 @@ loop:
|
|
|
adjustedEstablishStartTime: establishStartTime.Add(networkWaitDuration),
|
|
|
}
|
|
|
|
|
|
+ wasServerAffinityCandidate := isServerAffinityCandidate
|
|
|
+
|
|
|
// Note: there must be only one server affinity candidate, as it
|
|
|
// closes the serverAffinityDoneBroadcast channel.
|
|
|
isServerAffinityCandidate = false
|
|
|
@@ -1244,12 +1238,27 @@ loop:
|
|
|
break
|
|
|
}
|
|
|
|
|
|
- if controller.config.LimitedMemoryEnvironment &&
|
|
|
- controller.config.LimitedMemoryStaggerConnectionWorkersMilliseconds != 0 {
|
|
|
+ if wasServerAffinityCandidate {
|
|
|
+
|
|
|
+ // Don't start the next candidate until either the server affinity
|
|
|
+ // candidate has completed (success or failure) or is still working
|
|
|
+ // and the grace period has elapsed.
|
|
|
+
|
|
|
+ timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD)
|
|
|
+ select {
|
|
|
+ case <-timer.C:
|
|
|
+ case <-controller.serverAffinityDoneBroadcast:
|
|
|
+ case <-controller.stopEstablishingBroadcast:
|
|
|
+ break loop
|
|
|
+ case <-controller.shutdownBroadcast:
|
|
|
+ break loop
|
|
|
+ }
|
|
|
+ } else if controller.config.StaggerConnectionWorkersMilliseconds != 0 {
|
|
|
+
|
|
|
+ // Stagger concurrent connection workers.
|
|
|
|
|
|
- timer := time.NewTimer(
|
|
|
- time.Duration(
|
|
|
- controller.config.LimitedMemoryStaggerConnectionWorkersMilliseconds) * time.Millisecond)
|
|
|
+ timer := time.NewTimer(time.Millisecond * time.Duration(
|
|
|
+ controller.config.StaggerConnectionWorkersMilliseconds))
|
|
|
select {
|
|
|
case <-timer.C:
|
|
|
case <-controller.stopEstablishingBroadcast:
|
|
|
@@ -1332,39 +1341,104 @@ loop:
|
|
|
|
|
|
// EstablishTunnel will allocate significant memory, so first attempt to
|
|
|
// reclaim as much as possible.
|
|
|
- if controller.config.LimitedMemoryEnvironment && !controller.isStopEstablishingBroadcast() {
|
|
|
- emitMemoryMetrics()
|
|
|
- aggressiveGarbageCollection()
|
|
|
- }
|
|
|
+ aggressiveGarbageCollection()
|
|
|
+
|
|
|
+ // Select the tunnel protocol. Unless config.TunnelProtocol is set, the
|
|
|
+ // selection will be made at random from protocols supported by the
|
|
|
+ // server entry.
|
|
|
+ //
|
|
|
+ // When limiting concurrent meek connection workers, and at the limit,
|
|
|
+ // do not select meek since otherwise the candidate must be skipped.
|
|
|
+ //
|
|
|
+ // If at the limit and unabled to select a non-meek protocol, skip the
|
|
|
+ // candidate entirely and move on to the next. Since candidates are shuffled
|
|
|
+ // it's probable that the next candidate is not meek. In this case, a
|
|
|
+ // StaggerConnectionWorkersMilliseconds delay may still be incurred.
|
|
|
|
|
|
+ excludeMeek := false
|
|
|
controller.concurrentEstablishTunnelsMutex.Lock()
|
|
|
- controller.concurrentEstablishTunnels += 1
|
|
|
- if controller.concurrentEstablishTunnels > controller.peakConcurrentEstablishTunnels {
|
|
|
- controller.peakConcurrentEstablishTunnels = controller.concurrentEstablishTunnels
|
|
|
+ if controller.config.LimitMeekConnectionWorkers > 0 &&
|
|
|
+ controller.concurrentMeekEstablishTunnels >=
|
|
|
+ controller.config.LimitMeekConnectionWorkers {
|
|
|
+ excludeMeek = true
|
|
|
}
|
|
|
controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
|
|
|
- tunnel, err := EstablishTunnel(
|
|
|
- controller.config,
|
|
|
- controller.untunneledDialConfig,
|
|
|
- controller.sessionId,
|
|
|
- controller.establishPendingConns,
|
|
|
- candidateServerEntry.serverEntry,
|
|
|
- candidateServerEntry.adjustedEstablishStartTime,
|
|
|
- controller) // TunnelOwner
|
|
|
+ selectedProtocol, err := selectProtocol(
|
|
|
+ controller.config, candidateServerEntry.serverEntry, excludeMeek)
|
|
|
|
|
|
- controller.concurrentEstablishTunnelsMutex.Lock()
|
|
|
- controller.concurrentEstablishTunnels -= 1
|
|
|
- controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
+ if err == errProtocolNotSupported {
|
|
|
+ // selectProtocol returns errProtocolNotSupported when excludeMeek
|
|
|
+ // is set and the server entry only supports meek protocols.
|
|
|
+ // Skip this candidate.
|
|
|
+ continue
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
+ var tunnel *Tunnel
|
|
|
+ if err == nil {
|
|
|
|
|
|
- // Immediately reclaim memory allocated by the failed establishment.
|
|
|
- if controller.config.LimitedMemoryEnvironment && !controller.isStopEstablishingBroadcast() {
|
|
|
- tunnel = nil
|
|
|
- emitMemoryMetrics()
|
|
|
- aggressiveGarbageCollection()
|
|
|
+ isMeek := protocol.TunnelProtocolUsesMeek(selectedProtocol) ||
|
|
|
+ protocol.TunnelProtocolUsesMeek(selectedProtocol)
|
|
|
+
|
|
|
+ controller.concurrentEstablishTunnelsMutex.Lock()
|
|
|
+ if isMeek {
|
|
|
+
|
|
|
+ // Recheck the limit now that we know we're selecting meek and
|
|
|
+ // adjusting concurrentMeekEstablishTunnels.
|
|
|
+ if controller.config.LimitMeekConnectionWorkers > 0 &&
|
|
|
+ controller.concurrentMeekEstablishTunnels >=
|
|
|
+ controller.config.LimitMeekConnectionWorkers {
|
|
|
+
|
|
|
+ // Skip this candidate.
|
|
|
+ controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ controller.concurrentMeekEstablishTunnels += 1
|
|
|
+ if controller.concurrentMeekEstablishTunnels > controller.peakConcurrentMeekEstablishTunnels {
|
|
|
+ controller.peakConcurrentMeekEstablishTunnels = controller.concurrentMeekEstablishTunnels
|
|
|
+ }
|
|
|
+ }
|
|
|
+ controller.concurrentEstablishTunnels += 1
|
|
|
+ if controller.concurrentEstablishTunnels > controller.peakConcurrentEstablishTunnels {
|
|
|
+ controller.peakConcurrentEstablishTunnels = controller.concurrentEstablishTunnels
|
|
|
+ }
|
|
|
+ controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
+
|
|
|
+ tunnel, err = EstablishTunnel(
|
|
|
+ controller.config,
|
|
|
+ controller.untunneledDialConfig,
|
|
|
+ controller.sessionId,
|
|
|
+ controller.establishPendingConns,
|
|
|
+ candidateServerEntry.serverEntry,
|
|
|
+ selectedProtocol,
|
|
|
+ candidateServerEntry.adjustedEstablishStartTime,
|
|
|
+ controller) // TunnelOwner
|
|
|
+
|
|
|
+ controller.concurrentEstablishTunnelsMutex.Lock()
|
|
|
+ if isMeek {
|
|
|
+ controller.concurrentMeekEstablishTunnels -= 1
|
|
|
}
|
|
|
+ controller.concurrentEstablishTunnels -= 1
|
|
|
+ controller.concurrentEstablishTunnelsMutex.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Periodically emit memory metrics during the establishment cycle.
|
|
|
+ if !controller.isStopEstablishingBroadcast() {
|
|
|
+ emitMemoryMetrics()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Immediately reclaim memory allocated by the establishment. In the case
|
|
|
+ // of failure, first clear the reference to the tunnel. In the case of
|
|
|
+ // success, the garbage collection may still be effective as the initial
|
|
|
+ // phases of some protocols involve significant memory allocation that
|
|
|
+ // could now be reclaimed.
|
|
|
+ if err != nil {
|
|
|
+ tunnel = nil
|
|
|
+ }
|
|
|
+
|
|
|
+ aggressiveGarbageCollection()
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
|
|
|
// Unblock other candidates immediately when
|
|
|
// server affinity candidate fails.
|
|
|
@@ -1377,20 +1451,11 @@ loop:
|
|
|
if controller.isStopEstablishingBroadcast() {
|
|
|
break loop
|
|
|
}
|
|
|
+
|
|
|
NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- // Block for server affinity grace period before delivering.
|
|
|
- if !candidateServerEntry.isServerAffinityCandidate {
|
|
|
- timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD)
|
|
|
- select {
|
|
|
- case <-timer.C:
|
|
|
- case <-controller.serverAffinityDoneBroadcast:
|
|
|
- case <-controller.stopEstablishingBroadcast:
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// Deliver established tunnel.
|
|
|
// Don't block. Assumes the receiver has a buffer large enough for
|
|
|
// the number of desired tunnels. If there's no room, the tunnel must
|
|
|
@@ -1399,6 +1464,11 @@ loop:
|
|
|
case controller.establishedTunnels <- tunnel:
|
|
|
default:
|
|
|
controller.discardTunnel(tunnel)
|
|
|
+
|
|
|
+ // Clear the reference to this discarded tunnel and immediately run
|
|
|
+ // a garbage collection to reclaim its memory.
|
|
|
+ tunnel = nil
|
|
|
+ aggressiveGarbageCollection()
|
|
|
}
|
|
|
|
|
|
// Unblock other candidates only after delivering when
|