Просмотр исходного кода

Support configuring TunnelPoolSize via tactics

Rod Hynes 5 лет назад
Родитель
Сommit
d10827a138
4 измененных файлов с 95 добавлено и 19 удалено
  1. 2 0
      psiphon/common/parameters/parameters.go
  2. 8 2
      psiphon/config.go
  3. 68 17
      psiphon/controller.go
  4. 17 0
      psiphon/controller_test.go

+ 2 - 0
psiphon/common/parameters/parameters.go

@@ -78,6 +78,7 @@ const (
 	TacticsRetryPeriodJitter                         = "TacticsRetryPeriodJitter"
 	TacticsTimeout                                   = "TacticsTimeout"
 	ConnectionWorkerPoolSize                         = "ConnectionWorkerPoolSize"
+	TunnelPoolSize                                   = "TunnelPoolSize"
 	TunnelConnectTimeout                             = "TunnelConnectTimeout"
 	EstablishTunnelTimeout                           = "EstablishTunnelTimeout"
 	EstablishTunnelWorkTime                          = "EstablishTunnelWorkTime"
@@ -302,6 +303,7 @@ var defaultParameters = map[string]struct {
 	TacticsTimeout:           {value: 2 * time.Minute, minimum: 1 * time.Second, flags: useNetworkLatencyMultiplier},
 
 	ConnectionWorkerPoolSize:                 {value: 10, minimum: 1},
+	TunnelPoolSize:                           {value: 1, minimum: 1},
 	TunnelConnectTimeout:                     {value: 20 * time.Second, minimum: 1 * time.Second, flags: useNetworkLatencyMultiplier},
 	EstablishTunnelTimeout:                   {value: 300 * time.Second, minimum: time.Duration(0)},
 	EstablishTunnelWorkTime:                  {value: 60 * time.Second, minimum: 1 * time.Second},

+ 8 - 2
psiphon/config.go

@@ -42,7 +42,8 @@ import (
 )
 
 const (
-	TUNNEL_POOL_SIZE = 1
+	TUNNEL_POOL_SIZE     = 1
+	MAX_TUNNEL_POOL_SIZE = 32
 
 	// Psiphon data directory name, relative to config.DataRootDirectory.
 	// See config.GetPsiphonDataDirectory().
@@ -215,7 +216,8 @@ type Config struct {
 
 	// TunnelPoolSize specifies how many tunnels to run in parallel. Port
 	// forwards are multiplexed over multiple tunnels. If omitted or when 0,
-	// the default is TUNNEL_POOL_SIZE, which is recommended.
+	// the default is TUNNEL_POOL_SIZE, which is recommended. Any value over
+	// MAX_TUNNEL_POOL_SIZE is treated as MAX_TUNNEL_POOL_SIZE.
 	TunnelPoolSize int
 
 	// StaggerConnectionWorkersMilliseconds adds a specified delay before
@@ -1415,6 +1417,10 @@ func (config *Config) makeConfigParameters() map[string]interface{} {
 		applyParameters[parameters.ConnectionWorkerPoolSize] = config.ConnectionWorkerPoolSize
 	}
 
+	if config.TunnelPoolSize != 0 {
+		applyParameters[parameters.TunnelPoolSize] = config.TunnelPoolSize
+	}
+
 	if config.StaggerConnectionWorkersMilliseconds > 0 {
 		applyParameters[parameters.StaggerConnectionWorkersPeriod] = fmt.Sprintf("%dms", config.StaggerConnectionWorkersMilliseconds)
 	}

+ 68 - 17
psiphon/controller.go

@@ -52,6 +52,7 @@ type Controller struct {
 	failedTunnels                           chan *Tunnel
 	tunnelMutex                             sync.Mutex
 	establishedOnce                         bool
+	tunnelPoolSize                          int
 	tunnels                                 []*Tunnel
 	nextTunnel                              int
 	isEstablishing                          bool
@@ -108,8 +109,9 @@ func NewController(config *Config) (controller *Controller, err error) {
 		runWaitGroup: new(sync.WaitGroup),
 		// connectedTunnels and failedTunnels buffer sizes are large enough to
 		// receive full pools of tunnels without blocking. Senders should not block.
-		connectedTunnels:     make(chan *Tunnel, config.TunnelPoolSize),
-		failedTunnels:        make(chan *Tunnel, config.TunnelPoolSize),
+		connectedTunnels:     make(chan *Tunnel, MAX_TUNNEL_POOL_SIZE),
+		failedTunnels:        make(chan *Tunnel, MAX_TUNNEL_POOL_SIZE),
+		tunnelPoolSize:       TUNNEL_POOL_SIZE,
 		tunnels:              make([]*Tunnel, 0),
 		establishedOnce:      false,
 		isEstablishing:       false,
@@ -684,10 +686,10 @@ loop:
 			// Any delays in stopEstablishing will delay the handshake for the last
 			// active tunnel.
 			//
-			// In the typical case of TunnelPoolSize of 1, only a single handshake is
+			// In the typical case of tunnelPoolSize of 1, only a single handshake is
 			// performed and the homepages notices file, when used, will not be modifed
 			// after the NoticeTunnels(1) [i.e., connected] until NoticeTunnels(0) [i.e.,
-			// disconnected]. For TunnelPoolSize > 1, serial handshakes only ensures that
+			// disconnected]. For tunnelPoolSize > 1, serial handshakes only ensures that
 			// each set of emitted NoticeHomepages is contiguous.
 
 			active, outstanding := controller.numTunnels()
@@ -787,17 +789,15 @@ loop:
 			// channel over the new SSH tunnel and configure the packet tunnel client to use
 			// the new SSH channel as its transport.
 			//
-			// Note: as is, this logic is suboptimal for TunnelPoolSize > 1, as this would
+			// Note: as is, this logic is suboptimal for tunnelPoolSize > 1, as this would
 			// continuously initialize new packet tunnel sessions for each established
-			// server. For now, config validation requires TunnelPoolSize == 1 when
+			// server. For now, config validation requires tunnelPoolSize == 1 when
 			// the packet tunnel is used.
 
 			if controller.packetTunnelTransport != nil {
 				controller.packetTunnelTransport.UseTunnel(connectedTunnel)
 			}
 
-			// TODO: design issue -- might not be enough server entries with region/caps to ever fill tunnel slots;
-			// possible solution is establish target MIN(CountServerEntries(region, protocol), TunnelPoolSize)
 			if controller.isFullyEstablished() {
 				controller.stopEstablishing()
 			}
@@ -868,7 +868,7 @@ func (controller *Controller) discardTunnel(tunnel *Tunnel) {
 func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
 	controller.tunnelMutex.Lock()
 	defer controller.tunnelMutex.Unlock()
-	if len(controller.tunnels) >= controller.config.TunnelPoolSize {
+	if len(controller.tunnels) >= controller.tunnelPoolSize {
 		return false
 	}
 	// Perform a final check just in case we've established
@@ -909,7 +909,7 @@ func (controller *Controller) hasEstablishedOnce() bool {
 func (controller *Controller) isFullyEstablished() bool {
 	controller.tunnelMutex.Lock()
 	defer controller.tunnelMutex.Unlock()
-	return len(controller.tunnels) >= controller.config.TunnelPoolSize
+	return len(controller.tunnels) >= controller.tunnelPoolSize
 }
 
 // numTunnels returns the number of active and outstanding tunnels.
@@ -919,7 +919,7 @@ func (controller *Controller) numTunnels() (int, int) {
 	controller.tunnelMutex.Lock()
 	defer controller.tunnelMutex.Unlock()
 	active := len(controller.tunnels)
-	outstanding := controller.config.TunnelPoolSize - len(controller.tunnels)
+	outstanding := controller.tunnelPoolSize - len(controller.tunnels)
 	return active, outstanding
 }
 
@@ -998,6 +998,24 @@ func (controller *Controller) isActiveTunnelServerEntry(
 	return false
 }
 
+func (controller *Controller) setTunnelPoolSize(tunnelPoolSize int) {
+	controller.tunnelMutex.Lock()
+	defer controller.tunnelMutex.Unlock()
+	if tunnelPoolSize < 1 {
+		tunnelPoolSize = 1
+	}
+	if tunnelPoolSize > MAX_TUNNEL_POOL_SIZE {
+		tunnelPoolSize = MAX_TUNNEL_POOL_SIZE
+	}
+	controller.tunnelPoolSize = tunnelPoolSize
+}
+
+func (controller *Controller) getTunnelPoolSize() int {
+	controller.tunnelMutex.Lock()
+	defer controller.tunnelMutex.Unlock()
+	return controller.tunnelPoolSize
+}
+
 // Dial selects an active tunnel and establishes a port forward
 // connection through the selected tunnel. Failure to connect is considered
 // a port forward failure, for the purpose of monitoring tunnel health.
@@ -1275,9 +1293,8 @@ func (controller *Controller) launchEstablishing() {
 		tacticsWaitPeriod.Stop()
 
 		if controller.isStopEstablishing() {
-			// This check isn't strictly required by avoids the
-			// overhead of launching workers if establishment
-			// stopped while awaiting a tactics request.
+			// This check isn't strictly required but avoids the overhead of launching
+			// workers if establishment stopped while awaiting a tactics request.
 			return
 		}
 	}
@@ -1303,8 +1320,6 @@ func (controller *Controller) launchEstablishing() {
 
 	workerPoolSize := p.Int(parameters.ConnectionWorkerPoolSize)
 
-	p.Close()
-
 	// When TargetServerEntry is used, override any worker pool size config or
 	// tactic parameter and use a pool size of 1. The typical use case for
 	// TargetServerEntry is to test a specific server with a single connection
@@ -1314,6 +1329,42 @@ func (controller *Controller) launchEstablishing() {
 		workerPoolSize = 1
 	}
 
+	// TunnelPoolSize may be set by tactics, subject to local constraints. A pool
+	// size of one is forced in packet tunnel mode or when using a
+	// TargetServerEntry. The tunnel pool size is reduced when there are
+	// insufficent known server entries, within the set region and protocol
+	// constraints, to satisfy the target.
+	//
+	// Limitations, to simplify concurrent access to shared state: a ceiling of
+	// MAX_TUNNEL_POOL_SIZE is enforced by setTunnelPoolSize; the tunnel pool
+	// size target is not re-adjusted after an API handshake, even though the
+	// handshake response may deliver new tactics, or prune server entries which
+	// were potential candidates; nor is the target re-adjusted after fetching
+	// new server entries during this establishment.
+
+	tunnelPoolSize := p.Int(parameters.TunnelPoolSize)
+	if controller.config.PacketTunnelTunFileDescriptor > 0 ||
+		controller.config.TargetServerEntry != "" {
+		tunnelPoolSize = 1
+	}
+	if tunnelPoolSize > 1 {
+		// Initial count is ignored as count candidates will eventually become
+		// available.
+		_, count := CountServerEntriesWithConstraints(
+			controller.config.UseUpstreamProxy(),
+			controller.config.EgressRegion,
+			controller.protocolSelectionConstraints)
+		if count < tunnelPoolSize {
+			if count < 1 {
+				count = 1
+			}
+			tunnelPoolSize = count
+		}
+	}
+	controller.setTunnelPoolSize(tunnelPoolSize)
+
+	p.Close()
+
 	// If InitialLimitTunnelProtocols is configured but cannot be satisfied,
 	// skip the initial phase in this establishment. This avoids spinning,
 	// unable to connect, in this case. InitialLimitTunnelProtocols is
@@ -1459,7 +1510,7 @@ func (controller *Controller) establishCandidateGenerator() {
 	defer iterator.Close()
 
 	// TODO: reconcile server affinity scheme with multi-tunnel mode
-	if controller.config.TunnelPoolSize > 1 {
+	if controller.getTunnelPoolSize() > 1 {
 		applyServerAffinity = false
 	}
 

+ 17 - 0
psiphon/controller_test.go

@@ -463,6 +463,23 @@ func TestFrontedQUIC(t *testing.T) {
 		})
 }
 
+func TestTunnelPool(t *testing.T) {
+	controllerRun(t,
+		&controllerRunConfig{
+			expectNoServerEntries:    false,
+			protocol:                 protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
+			clientIsLatestVersion:    false,
+			disableUntunneledUpgrade: true,
+			disableEstablishing:      false,
+			disableApi:               false,
+			tunnelPoolSize:           2,
+			useUpstreamProxy:         false,
+			disruptNetwork:           false,
+			transformHostNames:       false,
+			useFragmentor:            false,
+		})
+}
+
 type controllerRunConfig struct {
 	expectNoServerEntries    bool
 	protocol                 string