Просмотр исходного кода

Perform handshakes sequentially

- Ensures consistency of homepage
  notices in file mode.

- Saves resources on client and
  server by not performing unnecessary
  handshake.

- Reduces memory pressure both by
  skipping unnecessary handshake
  requests and operateTunnel runs;
  and by stopping establishment before
  performing the handshake.

- Removed "stopped establish worker"
  notice to avoid stop delay.
Rod Hynes 8 лет назад
Родитель
Сommit
3e55ce3f5d
3 измененных файлов с 201 добавлено и 75 удалено
  1. 126 38
      psiphon/controller.go
  2. 4 5
      psiphon/serverApi.go
  3. 71 32
      psiphon/tunnel.go

+ 126 - 38
psiphon/controller.go

@@ -46,7 +46,7 @@ type Controller struct {
 	componentFailureSignal             chan struct{}
 	shutdownBroadcast                  chan struct{}
 	runWaitGroup                       *sync.WaitGroup
-	establishedTunnels                 chan *Tunnel
+	connectedTunnels                   chan *Tunnel
 	failedTunnels                      chan *Tunnel
 	tunnelMutex                        sync.Mutex
 	establishedOnce                    bool
@@ -120,9 +120,9 @@ func NewController(config *Config) (controller *Controller, err error) {
 		componentFailureSignal: make(chan struct{}, 1),
 		shutdownBroadcast:      make(chan struct{}),
 		runWaitGroup:           new(sync.WaitGroup),
-		// establishedTunnels and failedTunnels buffer sizes are large enough to
+		// connectedTunnels and failedTunnels buffer sizes are large enough to
 		// receive full pools of tunnels without blocking. Senders should not block.
-		establishedTunnels:             make(chan *Tunnel, config.TunnelPoolSize),
+		connectedTunnels:               make(chan *Tunnel, config.TunnelPoolSize),
 		failedTunnels:                  make(chan *Tunnel, config.TunnelPoolSize),
 		tunnels:                        make([]*Tunnel, 0),
 		establishedOnce:                false,
@@ -631,15 +631,13 @@ loop:
 			// which will invoke a garbage collection.
 			failedTunnel = nil
 
-			// Concurrency note: only this goroutine may call startEstablishing/stopEstablishing
-			// and access isEstablishing.
-			if !controller.isEstablishing {
-				controller.startEstablishing()
-			}
+			// Concurrency note: only this goroutine may call startEstablishing/stopEstablishing,
+			// which reference controller.isEstablishing.
+			controller.startEstablishing()
 
-		case establishedTunnel := <-controller.establishedTunnels:
+		case connectedTunnel := <-controller.connectedTunnels:
 
-			if controller.isImpairedProtocol(establishedTunnel.protocol) {
+			if controller.isImpairedProtocol(connectedTunnel.protocol) {
 
 				// Protocol was classified as impaired while this tunnel established.
 				// This is most likely to occur with TunnelPoolSize > 0. We log the
@@ -649,25 +647,107 @@ loop:
 				// ESTABLISH_TUNNEL_WORK_TIME loop. By not discarding here, a true
 				// impaired protocol may require an extra reconnect.
 
-				NoticeAlert("established tunnel with impaired protocol: %s", establishedTunnel.protocol)
+				NoticeAlert("connected tunnel with impaired protocol: %s", connectedTunnel.protocol)
+			}
+
+			// Tunnel establishment has two phases: connection and activation.
+			//
+			// Connection is run concurrently by the establishTunnelWorkers, to minimize
+			// delay when it's not yet known which server and protocol will be available
+			// and unblocked.
+			//
+			// Activation is run serially, here, to minimize the overhead of making a
+			// handshake request and starting the operateTunnel management worker for a
+			// tunnel which may be discarded.
+			//
+			// When the active tunnel will complete establishment, establishment is
+			// stopped before activation. This interrupts all connecting tunnels and
+			// garbage collects their memory. The purpose is to minimize memory
+			// pressure when the handshake request is made. In the unlikely case that the
+			// handshake fails, establishment is restarted.
+			//
+			// Any delays in stopEstablishing will delay the handshake for the last
+			// active tunnel.
+			//
+			// In the typical case of TunnelPoolSize of 1, only a single handshake is
+			// performed and the homepages notices file, when used, will not be modifed
+			// after the NoticeTunnels(1) [i.e., connected] until NoticeTunnels(0) [i.e.,
+			// disconnected]. For TunnelPoolSize > 1, serial handshakes only ensures that
+			// each set of emitted NoticeHomepages is contiguous.
+
+			active, outstanding := controller.numTunnels()
+
+			discardTunnel := (outstanding <= 0)
+			isFirstTunnel := (active == 0)
+			isLastTunnel := (outstanding == 1)
+
+			if !discardTunnel {
+
+				if isLastTunnel {
+					controller.stopEstablishing()
+				}
+
+				// Call connectedTunnel.Activate in a goroutine, as it blocks on a network
+				// operation and would block shutdown. If the shutdown signal is received,
+				// discard the tunnel, which will interrupt the handshake request that may
+				// be blocking Activate.
+
+				activatedTunnelResult := make(chan error)
+
+				go func() {
+					activatedTunnelResult <- connectedTunnel.Activate(controller)
+				}()
+
+				var err error
+				select {
+				case err = <-activatedTunnelResult:
+				case <-controller.shutdownBroadcast:
+					controller.discardTunnel(connectedTunnel)
+					// Await the interrupted goroutine.
+					<-activatedTunnelResult
+					break loop
+				}
+
+				if err != nil {
+
+					if isLastTunnel {
+						controller.startEstablishing()
+					}
+
+					NoticeAlert("failed to activate %s: %s", connectedTunnel.serverEntry.IpAddress, err)
+					discardTunnel = true
+
+				} else {
+
+					// It's unlikely that registerTunnel will fail, since only this goroutine
+					// calls registerTunnel -- and after checking numTunnels; so failure is not
+					// expected.
+					if !controller.registerTunnel(connectedTunnel) {
+						NoticeAlert("failed to register %s: %s", connectedTunnel.serverEntry.IpAddress)
+						discardTunnel = true
+					}
+				}
 			}
 
-			tunnelCount, registered := controller.registerTunnel(establishedTunnel)
-			if !registered {
+			if discardTunnel {
 				// Already fully established, so discard.
-				controller.discardTunnel(establishedTunnel)
+				controller.discardTunnel(connectedTunnel)
 
 				// Clear the reference to this discarded tunnel and immediately run
 				// a garbage collection to reclaim its memory.
-				establishedTunnel = nil
+				connectedTunnel = nil
 				aggressiveGarbageCollection()
 
+				// Skip the rest of this case
 				break
 			}
 
-			NoticeActiveTunnel(establishedTunnel.serverEntry.IpAddress, establishedTunnel.protocol, establishedTunnel.serverEntry.SupportsSSHAPIRequests())
+			NoticeActiveTunnel(
+				connectedTunnel.serverEntry.IpAddress,
+				connectedTunnel.protocol,
+				connectedTunnel.serverEntry.SupportsSSHAPIRequests())
 
-			if tunnelCount == 1 {
+			if isFirstTunnel {
 
 				// The split tunnel classifier is started once the first tunnel is
 				// established. This first tunnel is passed in to be used to make
@@ -680,7 +760,7 @@ loop:
 				// change, and so all tunnels will fail and be re-established. Under
 				// that assumption, the classifier will be re-Start()-ed here when
 				// the region has changed.
-				controller.splitTunnelClassifier.Start(establishedTunnel)
+				controller.splitTunnelClassifier.Start(connectedTunnel)
 
 				// Signal a connected request on each 1st tunnel establishment. For
 				// multi-tunnels, the session is connected as long as at least one
@@ -690,10 +770,10 @@ loop:
 				// If the handshake indicated that a new client version is available,
 				// trigger an upgrade download.
 				// Note: serverContext is nil when DisableApi is set
-				if establishedTunnel.serverContext != nil &&
-					establishedTunnel.serverContext.clientUpgradeVersion != "" {
+				if connectedTunnel.serverContext != nil &&
+					connectedTunnel.serverContext.clientUpgradeVersion != "" {
 
-					handshakeVersion := establishedTunnel.serverContext.clientUpgradeVersion
+					handshakeVersion := connectedTunnel.serverContext.clientUpgradeVersion
 					select {
 					case controller.signalDownloadUpgrade <- handshakeVersion:
 					default:
@@ -713,7 +793,7 @@ loop:
 			// the packet tunnel is used.
 
 			if controller.packetTunnelTransport != nil {
-				controller.packetTunnelTransport.UseTunnel(establishedTunnel)
+				controller.packetTunnelTransport.UseTunnel(connectedTunnel)
 			}
 
 			// TODO: design issue -- might not be enough server entries with region/caps to ever fill tunnel slots;
@@ -736,8 +816,8 @@ loop:
 	controller.terminateAllTunnels()
 
 	// Drain tunnel channels
-	close(controller.establishedTunnels)
-	for tunnel := range controller.establishedTunnels {
+	close(controller.connectedTunnels)
+	for tunnel := range controller.connectedTunnels {
 		controller.discardTunnel(tunnel)
 	}
 	close(controller.failedTunnels)
@@ -858,18 +938,18 @@ func (controller *Controller) discardTunnel(tunnel *Tunnel) {
 // registerTunnel adds the connected tunnel to the pool of active tunnels
 // which are candidates for port forwarding. Returns true if the pool has an
 // empty slot and false if the pool is full (caller should discard the tunnel).
-func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
+func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
 	controller.tunnelMutex.Lock()
 	defer controller.tunnelMutex.Unlock()
 	if len(controller.tunnels) >= controller.config.TunnelPoolSize {
-		return len(controller.tunnels), false
+		return false
 	}
 	// Perform a final check just in case we've established
 	// a duplicate connection.
 	for _, activeTunnel := range controller.tunnels {
 		if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress {
 			NoticeAlert("duplicate tunnel: %s", tunnel.serverEntry.IpAddress)
-			return len(controller.tunnels), false
+			return false
 		}
 	}
 	controller.establishedOnce = true
@@ -884,7 +964,7 @@ func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
 		PromoteServerEntry(tunnel.serverEntry.IpAddress)
 	}
 
-	return len(controller.tunnels), true
+	return true
 }
 
 // hasEstablishedOnce indicates if at least one active tunnel has
@@ -903,6 +983,17 @@ func (controller *Controller) isFullyEstablished() bool {
 	return len(controller.tunnels) >= controller.config.TunnelPoolSize
 }
 
+// numTunnels returns the number of active and outstanding tunnels.
+// Oustanding is the number of tunnels required to fill the pool of
+// active tunnels.
+func (controller *Controller) numTunnels() (int, int) {
+	controller.tunnelMutex.Lock()
+	defer controller.tunnelMutex.Unlock()
+	active := len(controller.tunnels)
+	outstanding := controller.config.TunnelPoolSize - len(controller.tunnels)
+	return active, outstanding
+}
+
 // terminateTunnel removes a tunnel from the pool of active tunnels
 // and closes the tunnel. The next-tunnel state used by getNextActiveTunnel
 // is adjusted as required.
@@ -1108,6 +1199,7 @@ func (controller *Controller) stopEstablishing() {
 	// Note: establishCandidateGenerator closes controller.candidateServerEntries
 	// (as it may be sending to that channel).
 	controller.establishWaitGroup.Wait()
+	NoticeInfo("stopped establishing")
 
 	controller.isEstablishing = false
 	controller.establishWaitGroup = nil
@@ -1326,12 +1418,10 @@ loop:
 
 		iterator.Reset()
 	}
-
-	NoticeInfo("stopped candidate generator")
 }
 
 // establishTunnelWorker pulls candidates from the candidate queue, establishes
-// a connection to the tunnel server, and delivers the established tunnel to a channel.
+// a connection to the tunnel server, and delivers the connected tunnel to a channel.
 func (controller *Controller) establishTunnelWorker() {
 	defer controller.establishWaitGroup.Done()
 loop:
@@ -1347,7 +1437,7 @@ loop:
 			continue
 		}
 
-		// EstablishTunnel will allocate significant memory, so first attempt to
+		// ConnectTunnel will allocate significant memory, so first attempt to
 		// reclaim as much as possible.
 		aggressiveGarbageCollection()
 
@@ -1412,15 +1502,14 @@ loop:
 			}
 			controller.concurrentEstablishTunnelsMutex.Unlock()
 
-			tunnel, err = EstablishTunnel(
+			tunnel, err = ConnectTunnel(
 				controller.config,
 				controller.untunneledDialConfig,
 				controller.sessionId,
 				controller.establishPendingConns,
 				candidateServerEntry.serverEntry,
 				selectedProtocol,
-				candidateServerEntry.adjustedEstablishStartTime,
-				controller) // TunnelOwner
+				candidateServerEntry.adjustedEstablishStartTime)
 
 			controller.concurrentEstablishTunnelsMutex.Lock()
 			if isMeek {
@@ -1464,12 +1553,12 @@ loop:
 			continue
 		}
 
-		// Deliver established tunnel.
+		// Deliver connected tunnel.
 		// Don't block. Assumes the receiver has a buffer large enough for
 		// the number of desired tunnels. If there's no room, the tunnel must
 		// not be required so it's discarded.
 		select {
-		case controller.establishedTunnels <- tunnel:
+		case controller.connectedTunnels <- tunnel:
 		default:
 			controller.discardTunnel(tunnel)
 
@@ -1485,7 +1574,6 @@ loop:
 			close(controller.serverAffinityDoneBroadcast)
 		}
 	}
-	NoticeInfo("stopped establish worker")
 }
 
 func (controller *Controller) isStopEstablishingBroadcast() bool {

+ 4 - 5
psiphon/serverApi.go

@@ -80,11 +80,10 @@ func MakeSessionId() (sessionId string, err error) {
 	return hex.EncodeToString(randomId), nil
 }
 
-// NewServerContext makes the tunnelled handshake request to the Psiphon server
+// NewServerContext makes the tunneled handshake request to the Psiphon server
 // and returns a ServerContext struct for use with subsequent Psiphon server API
 // requests (e.g., periodic connected and status requests).
-func NewServerContext(
-	tunnel *Tunnel, sessionId string, ignoreStatsRegexps bool) (*ServerContext, error) {
+func NewServerContext(tunnel *Tunnel) (*ServerContext, error) {
 
 	// For legacy servers, set up psiphonHttpsClient for
 	// accessing the Psiphon API via the web service.
@@ -100,13 +99,13 @@ func NewServerContext(
 	}
 
 	serverContext := &ServerContext{
-		sessionId:          sessionId,
+		sessionId:          tunnel.sessionId,
 		tunnelNumber:       atomic.AddInt64(&nextTunnelNumber, 1),
 		tunnel:             tunnel,
 		psiphonHttpsClient: psiphonHttpsClient,
 	}
 
-	err := serverContext.doHandshakeRequest(ignoreStatsRegexps)
+	err := serverContext.doHandshakeRequest(tunnel.config.IgnoreHandshakeStatsRegexps)
 	if err != nil {
 		return nil, common.ContextError(err)
 	}

+ 71 - 32
psiphon/tunnel.go

@@ -71,8 +71,10 @@ type Tunnel struct {
 	mutex                        *sync.Mutex
 	config                       *Config
 	untunneledDialConfig         *DialConfig
+	isActivated                  bool
 	isDiscarded                  bool
 	isClosed                     bool
+	sessionId                    string
 	serverEntry                  *protocol.ServerEntry
 	serverContext                *ServerContext
 	protocol                     string
@@ -83,6 +85,7 @@ type Tunnel struct {
 	shutdownOperateBroadcast     chan struct{}
 	signalPortForwardFailure     chan struct{}
 	totalPortForwardFailures     int
+	adjustedEstablishStartTime   monotime.Time
 	establishDuration            time.Duration
 	establishedTime              monotime.Time
 	dialStats                    *TunnelDialStats
@@ -111,7 +114,7 @@ type TunnelDialStats struct {
 	TLSProfile                     string
 }
 
-// EstablishTunnel first makes a network transport connection to the
+// ConnectTunnel first makes a network transport connection to the
 // Psiphon server and then establishes an SSH client session on top of
 // that transport. The SSH server is authenticated using the public
 // key in the server entry.
@@ -121,15 +124,25 @@ type TunnelDialStats struct {
 // When requiredProtocol is not blank, that protocol is used. Otherwise,
 // the a random supported protocol is used.
 // untunneledDialConfig is used for untunneled final status requests.
-func EstablishTunnel(
+//
+// Call Activate on a connected tunnel to complete its establishment
+// before using.
+//
+// Tunnel establishment is split into two phases: connection, and
+// activation. The Controller will run many ConnectTunnel calls
+// concurrently and then, to avoid unnecessary overhead from making
+// handshake requests and starting operateTunnel from tunnels which
+// may be discarded, call Activate on connected tunnels sequentially
+// as necessary.
+//
+func ConnectTunnel(
 	config *Config,
 	untunneledDialConfig *DialConfig,
 	sessionId string,
 	pendingConns *common.Conns,
 	serverEntry *protocol.ServerEntry,
 	selectedProtocol string,
-	adjustedEstablishStartTime monotime.Time,
-	tunnelOwner TunnelOwner) (tunnel *Tunnel, err error) {
+	adjustedEstablishStartTime monotime.Time) (*Tunnel, error) {
 
 	if !serverEntry.SupportsProtocol(selectedProtocol) {
 		return nil, common.ContextError(fmt.Errorf("server does not support selected protocol"))
@@ -143,21 +156,15 @@ func EstablishTunnel(
 		return nil, common.ContextError(err)
 	}
 
-	// Cleanup on error
-	defer func() {
-		if err != nil {
-			dialResult.sshClient.Close()
-			dialResult.monitoredConn.Close()
-			pendingConns.Remove(dialResult.dialConn)
-		}
-	}()
+	// Now that connection dials are complete, cancel interruptibility
+	pendingConns.Remove(dialResult.dialConn)
 
 	// The tunnel is now connected
-	tunnel = &Tunnel{
+	return &Tunnel{
 		mutex:                    new(sync.Mutex),
 		config:                   config,
 		untunneledDialConfig:     untunneledDialConfig,
-		isClosed:                 false,
+		sessionId:                sessionId,
 		serverEntry:              serverEntry,
 		protocol:                 selectedProtocol,
 		conn:                     dialResult.monitoredConn,
@@ -167,27 +174,41 @@ func EstablishTunnel(
 		shutdownOperateBroadcast: make(chan struct{}),
 		// A buffer allows at least one signal to be sent even when the receiver is
 		// not listening. Senders should not block.
-		signalPortForwardFailure: make(chan struct{}, 1),
-		dialStats:                dialResult.dialStats,
+		signalPortForwardFailure:   make(chan struct{}, 1),
+		adjustedEstablishStartTime: adjustedEstablishStartTime,
+		dialStats:                  dialResult.dialStats,
 		// Buffer allows SetClientVerificationPayload to submit one new payload
 		// without blocking or dropping it.
 		newClientVerificationPayload: make(chan string, 1),
-	}
+	}, nil
+}
+
+// Activate completes the tunnel establishment, performing the handshake
+// request and starting operateTunnel, the worker that monitors the tunnel
+// and handles periodic management.
+func (tunnel *Tunnel) Activate(tunnelOwner TunnelOwner) error {
 
 	// Create a new Psiphon API server context for this tunnel. This includes
-	// performing a handshake request. If the handshake fails, this establishment
+	// performing a handshake request. If the handshake fails, this activation
 	// fails.
-	if !config.DisableApi {
+	var serverContext *ServerContext
+	if !tunnel.config.DisableApi {
 		NoticeInfo("starting server context for %s", tunnel.serverEntry.IpAddress)
-		tunnel.serverContext, err = NewServerContext(
-			tunnel, sessionId, config.IgnoreHandshakeStatsRegexps)
+		var err error
+		serverContext, err = NewServerContext(tunnel)
 		if err != nil {
-			return nil, common.ContextError(
+			return common.ContextError(
 				fmt.Errorf("error starting server context for %s: %s",
 					tunnel.serverEntry.IpAddress, err))
 		}
 	}
 
+	tunnel.mutex.Lock()
+
+	tunnel.isActivated = true
+
+	tunnel.serverContext = serverContext
+
 	// establishDuration is the elapsed time between the controller starting tunnel
 	// establishment and this tunnel being established. The reported value represents
 	// how long the user waited between starting the client and having a usable tunnel;
@@ -196,18 +217,19 @@ func EstablishTunnel(
 	//
 	// This time period may include time spent unsuccessfully connecting to other
 	// servers. Time spent waiting for network connectivity is excluded.
-	tunnel.establishDuration = monotime.Since(adjustedEstablishStartTime)
+	tunnel.establishDuration = monotime.Since(
+		tunnel.adjustedEstablishStartTime)
 
 	tunnel.establishedTime = monotime.Now()
 
-	// Now that network operations are complete, cancel interruptibility
-	pendingConns.Remove(dialResult.dialConn)
-
-	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
+	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
+	// stats updates.
 	tunnel.operateWaitGroup.Add(1)
 	go tunnel.operateTunnel(tunnelOwner)
 
-	return tunnel, nil
+	tunnel.mutex.Unlock()
+
+	return nil
 }
 
 // Close stops operating the tunnel and closes the underlying connection.
@@ -218,6 +240,7 @@ func (tunnel *Tunnel) Close(isDiscarded bool) {
 
 	tunnel.mutex.Lock()
 	tunnel.isDiscarded = isDiscarded
+	isActivated := tunnel.isActivated
 	isClosed := tunnel.isClosed
 	tunnel.isClosed = true
 	tunnel.mutex.Unlock()
@@ -231,10 +254,12 @@ func (tunnel *Tunnel) Close(isDiscarded bool) {
 		// In effect, the TUNNEL_OPERATE_SHUTDOWN_TIMEOUT value will take
 		// precedence over the PSIPHON_API_SERVER_TIMEOUT http.Client.Timeout
 		// value set in makePsiphonHttpsClient.
-		timer := time.AfterFunc(TUNNEL_OPERATE_SHUTDOWN_TIMEOUT, func() { tunnel.conn.Close() })
-		close(tunnel.shutdownOperateBroadcast)
-		tunnel.operateWaitGroup.Wait()
-		timer.Stop()
+		if isActivated {
+			timer := time.AfterFunc(TUNNEL_OPERATE_SHUTDOWN_TIMEOUT, func() { tunnel.conn.Close() })
+			close(tunnel.shutdownOperateBroadcast)
+			tunnel.operateWaitGroup.Wait()
+			timer.Stop()
+		}
 		tunnel.sshClient.Close()
 		// tunnel.conn.Close() may get called multiple times, which is allowed.
 		tunnel.conn.Close()
@@ -246,6 +271,13 @@ func (tunnel *Tunnel) Close(isDiscarded bool) {
 	}
 }
 
+// IsActivated returns the tunnel's activated flag.
+func (tunnel *Tunnel) IsActivated() bool {
+	tunnel.mutex.Lock()
+	defer tunnel.mutex.Unlock()
+	return tunnel.isActivated
+}
+
 // IsDiscarded returns the tunnel's discarded flag.
 func (tunnel *Tunnel) IsDiscarded() bool {
 	tunnel.mutex.Lock()
@@ -279,6 +311,10 @@ func (tunnel *Tunnel) SendAPIRequest(
 func (tunnel *Tunnel) Dial(
 	remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
 
+	if !tunnel.IsActivated() {
+		return nil, common.ContextError(errors.New("tunnel is not activated"))
+	}
+
 	type tunnelDialResult struct {
 		sshPortForwardConn net.Conn
 		err                error
@@ -314,6 +350,9 @@ func (tunnel *Tunnel) Dial(
 
 func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
 
+	if !tunnel.IsActivated() {
+		return nil, common.ContextError(errors.New("tunnel is not activated"))
+	}
 	channel, requests, err := tunnel.sshClient.OpenChannel(
 		protocol.PACKET_TUNNEL_CHANNEL_TYPE, nil)
 	if err != nil {