ソースを参照

Ensure that a SetClosedSignal "already closed" condition is handled in EstablishTunnel

This avoids hitting the condition in operateTunnel and having to send a failed tunnel
to the controller, which could cause a spurious establish run. This also somewhat reverts
the change in f08aa60, as "already closed" once again becomes logged as an error (although
now it's performed much sooner in the connection lifetime, so less likely to fail.)
Rod Hynes 11 年 前
コミット
101383acd6
1 ファイル変更31 行追加19 行削除
  1. 31 19
      psiphon/tunnel.go

+ 31 - 19
psiphon/tunnel.go

@@ -75,6 +75,7 @@ type Tunnel struct {
 	session                  *Session
 	protocol                 string
 	conn                     Conn
+	closedSignal             chan struct{}
 	sshClient                *ssh.Client
 	operateWaitGroup         *sync.WaitGroup
 	shutdownOperateBroadcast chan struct{}
@@ -114,7 +115,8 @@ func EstablishTunnel(
 	}
 
 	// Build transport layers and establish SSH connection
-	conn, sshClient, err := dialSsh(config, pendingConns, serverEntry, selectedProtocol, sessionId)
+	conn, closedSignal, sshClient, err := dialSsh(
+		config, pendingConns, serverEntry, selectedProtocol, sessionId)
 	if err != nil {
 		return nil, ContextError(err)
 	}
@@ -133,6 +135,7 @@ func EstablishTunnel(
 		serverEntry:              serverEntry,
 		protocol:                 selectedProtocol,
 		conn:                     conn,
+		closedSignal:             closedSignal,
 		sshClient:                sshClient,
 		operateWaitGroup:         new(sync.WaitGroup),
 		shutdownOperateBroadcast: make(chan struct{}),
@@ -268,8 +271,11 @@ func selectProtocol(config *Config, serverEntry *ServerEntry) (selectedProtocol
 
 // dialSsh is a helper that builds the transport layers and establishes the SSH connection
 func dialSsh(
-	config *Config, pendingConns *Conns, serverEntry *ServerEntry,
-	selectedProtocol, sessionId string) (conn Conn, sshClient *ssh.Client, err error) {
+	config *Config,
+	pendingConns *Conns,
+	serverEntry *ServerEntry,
+	selectedProtocol,
+	sessionId string) (conn Conn, closedSignal chan struct{}, sshClient *ssh.Client, err error) {
 
 	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
 	// So depending on which protocol is used, multiple layers are initialized.
@@ -305,12 +311,12 @@ func dialSsh(
 	if useMeek {
 		conn, err = DialMeek(serverEntry, sessionId, useFronting, dialConfig)
 		if err != nil {
-			return nil, nil, ContextError(err)
+			return nil, nil, nil, ContextError(err)
 		}
 	} else {
 		conn, err = DialTCP(fmt.Sprintf("%s:%d", serverEntry.IpAddress, port), dialConfig)
 		if err != nil {
-			return nil, nil, ContextError(err)
+			return nil, nil, nil, ContextError(err)
 		}
 	}
 
@@ -322,20 +328,34 @@ func dialSsh(
 		}
 	}()
 
+	// Create signal which is triggered when the underlying network connection is closed,
+	// this is used in operateTunnel to detect an unexpected disconnect. SetClosedSignal
+	// is called here, well before operateTunnel, so that we don't need to handle the
+	// "already closed" with a tunnelOwner.SignalTunnelFailure() in operateTunnel (this
+	// was previously the order of events, which caused the establish process to sometimes
+	// run briefly when not needed).
+	closedSignal = make(chan struct{}, 1)
+	if !conn.SetClosedSignal(closedSignal) {
+		// Conn is already closed. This is not unexpected -- for example,
+		// when establish is interrupted.
+		// TODO: make this not log an error when called from establishTunnelWorker?
+		return nil, nil, nil, ContextError(errors.New("conn already closed"))
+	}
+
 	// Add obfuscated SSH layer
 	var sshConn net.Conn
 	sshConn = conn
 	if useObfuscatedSsh {
 		sshConn, err = NewObfuscatedSshConn(conn, serverEntry.SshObfuscatedKey)
 		if err != nil {
-			return nil, nil, ContextError(err)
+			return nil, nil, nil, ContextError(err)
 		}
 	}
 
 	// Now establish the SSH session over the sshConn transport
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	if err != nil {
-		return nil, nil, ContextError(err)
+		return nil, nil, nil, ContextError(err)
 	}
 	sshCertChecker := &ssh.CertChecker{
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
@@ -351,7 +371,7 @@ func dialSsh(
 			SshPassword string `json:"SshPassword"`
 		}{sessionId, serverEntry.SshPassword})
 	if err != nil {
-		return nil, nil, ContextError(err)
+		return nil, nil, nil, ContextError(err)
 	}
 	sshClientConfig := &ssh.ClientConfig{
 		User: serverEntry.SshUsername,
@@ -365,11 +385,11 @@ func dialSsh(
 	sshAddress := ""
 	sshClientConn, sshChans, sshReqs, err := ssh.NewClientConn(sshConn, sshAddress, sshClientConfig)
 	if err != nil {
-		return nil, nil, ContextError(err)
+		return nil, nil, nil, ContextError(err)
 	}
 	sshClient = ssh.NewClient(sshClientConn, sshChans, sshReqs)
 
-	return conn, sshClient, nil
+	return conn, closedSignal, sshClient, nil
 }
 
 // operateTunnel periodically sends stats updates to the Psiphon API and
@@ -406,14 +426,6 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	sshKeepAliveTicker := time.NewTicker(TUNNEL_SSH_KEEP_ALIVE_PERIOD)
 	defer sshKeepAliveTicker.Stop()
 
-	tunnelClosedSignal := make(chan struct{}, 1)
-	if !tunnel.conn.SetClosedSignal(tunnelClosedSignal) {
-		// Tunnel is already closed. This is not unexpected -- for example,
-		// when establish is interrupted.
-		Notice(NOTICE_INFO, "shutdown operate tunnel (tunnel already closed)")
-		return
-	}
-
 	var err error
 	for err == nil {
 		select {
@@ -435,7 +447,7 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 				err = errors.New("tunnel exceeded port forward failure threshold")
 			}
 
-		case <-tunnelClosedSignal:
+		case <-tunnel.closedSignal:
 			err = errors.New("tunnel closed unexpectedly")
 
 		case <-tunnel.shutdownOperateBroadcast: