Sfoglia il codice sorgente

Fixes related to shutdown delays and hangs

* Handle tunnel connectivity timeouts exclusively at the
  SSH/HTTP level. Simplifies code -- reduce chance of
  deadlocks and race conditions.
  * Don't explicitly close TCPConn on read/write errors.
  * Don't set TCPConn read/write timeouts.
  * Remove ClosedSignal (and psiphon.Conn), eliminating
    Conn signaling to tunnel monitor (and resulting cycle).
* To reduce chance of deadlocks, simplify use of mutex for
  Close() in Tunnel/MeekConn: don't hold lock while waiting
  for goroutine components to shutdown.
* Fix: check pendingConn.Add return value in MeekConn and
  abort connection as required.
Rod Hynes 10 anni fa
parent
commit
015428cd2f
7 ha cambiato i file con 41 aggiunte e 159 eliminazioni
  1. 5 79
      psiphon/TCPConn.go
  2. 1 4
      psiphon/TCPConn_unix.go
  3. 1 3
      psiphon/TCPConn_windows.go
  4. 0 2
      psiphon/config.go
  5. 16 23
      psiphon/meekConn.go
  6. 0 24
      psiphon/net.go
  7. 18 24
      psiphon/tunnel.go

+ 5 - 79
psiphon/TCPConn.go

@@ -31,20 +31,15 @@ import (
 // TCPConn is a customized TCP connection that:
 // - can be interrupted while connecting;
 // - implements a connect timeout;
-// - implements idle read/write timeouts;
 // - uses an upstream proxy when specified, and includes
 //   upstream proxy dialing in the connect timeout;
 // - can be bound to a specific system device (for Android VpnService
 //   routing compatibility, for example);
-// - implements the psiphon.Conn interface
 type TCPConn struct {
 	net.Conn
 	mutex         sync.Mutex
 	isClosed      bool
-	closedSignal  chan struct{}
 	interruptible interruptibleTCPSocket
-	readTimeout   time.Duration
-	writeTimeout  time.Duration
 }
 
 // NewTCPDialer creates a TCPDialer.
@@ -69,14 +64,6 @@ func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, err
 		if err != nil {
 			return nil, ContextError(err)
 		}
-		if config.ClosedSignal != nil {
-			if !conn.SetClosedSignal(config.ClosedSignal) {
-				// Conn is already closed. This is not unexpected -- for example,
-				// when establish is interrupted.
-				// TODO: make this not log an error when called from establishTunnelWorker?
-				return nil, ContextError(errors.New("conn already closed"))
-			}
-		}
 		return conn, nil
 	}
 
@@ -122,82 +109,21 @@ func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, err
 	return dialer
 }
 
-// SetClosedSignal implements psiphon.Conn.SetClosedSignal.
-func (conn *TCPConn) SetClosedSignal(closedSignal chan struct{}) bool {
-	conn.mutex.Lock()
-	defer conn.mutex.Unlock()
-	if conn.isClosed {
-		return false
-	}
-	conn.closedSignal = closedSignal
-	return true
-}
-
 // Close terminates a connected (net.Conn) or connecting (socketFd) TCPConn.
-// A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
+// A mutex is required to support net.Conn concurrency semantics.
+// Note also use of mutex around conn.interruptible and conn.Conn in
+// TCPConn_unix.go.
 func (conn *TCPConn) Close() (err error) {
 	conn.mutex.Lock()
 	defer conn.mutex.Unlock()
+
 	if !conn.isClosed {
+		conn.isClosed = true
 		if conn.Conn == nil {
 			err = interruptibleTCPClose(conn.interruptible)
 		} else {
 			err = conn.Conn.Close()
 		}
-		conn.isClosed = true
-		select {
-		case conn.closedSignal <- *new(struct{}):
-		default:
-		}
 	}
 	return err
 }
-
-// Read wraps standard Read to add an idle timeout. The connection
-// is explicitly closed on timeout.
-func (conn *TCPConn) Read(buffer []byte) (n int, err error) {
-	// Note: no mutex on the conn.readTimeout access
-	if conn.readTimeout != 0 {
-		err = conn.Conn.SetReadDeadline(time.Now().Add(conn.readTimeout))
-		if err != nil {
-			return 0, ContextError(err)
-		}
-	}
-	n, err = conn.Conn.Read(buffer)
-	if err != nil {
-		conn.Close()
-	}
-	return
-}
-
-// Write wraps standard Write to add an idle timeout The connection
-// is explicitly closed on timeout.
-func (conn *TCPConn) Write(buffer []byte) (n int, err error) {
-	// Note: no mutex on the conn.writeTimeout access
-	if conn.writeTimeout != 0 {
-		err = conn.Conn.SetWriteDeadline(time.Now().Add(conn.writeTimeout))
-		if err != nil {
-			return 0, ContextError(err)
-		}
-	}
-	n, err = conn.Conn.Write(buffer)
-	if err != nil {
-		conn.Close()
-	}
-	return
-}
-
-// Override implementation of net.Conn.SetDeadline
-func (conn *TCPConn) SetDeadline(t time.Time) error {
-	return errors.New("net.Conn SetDeadline not supported")
-}
-
-// Override implementation of net.Conn.SetReadDeadline
-func (conn *TCPConn) SetReadDeadline(t time.Time) error {
-	return errors.New("net.Conn SetReadDeadline not supported")
-}
-
-// Override implementation of net.Conn.SetWriteDeadline
-func (conn *TCPConn) SetWriteDeadline(t time.Time) error {
-	return errors.New("net.Conn SetWriteDeadline not supported")
-}

+ 1 - 4
psiphon/TCPConn_unix.go

@@ -101,10 +101,7 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 	copy(ip[:], ipAddrs[index].To4())
 
 	// Enable interruption
-	conn = &TCPConn{
-		interruptible: interruptibleTCPSocket{socketFd: socketFd},
-		readTimeout:   config.ReadTimeout,
-		writeTimeout:  config.WriteTimeout}
+	conn = &TCPConn{interruptible: interruptibleTCPSocket{socketFd: socketFd}}
 
 	if !config.PendingConns.Add(conn) {
 		return nil, ContextError(errors.New("pending connections already closed"))

+ 1 - 3
psiphon/TCPConn_windows.go

@@ -52,9 +52,7 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 
 	// Enable interruption
 	conn = &TCPConn{
-		interruptible: interruptibleTCPSocket{results: make(chan *interruptibleDialResult, 2)},
-		readTimeout:   config.ReadTimeout,
-		writeTimeout:  config.WriteTimeout}
+		interruptible: interruptibleTCPSocket{results: make(chan *interruptibleDialResult, 2)}}
 
 	if !config.PendingConns.Add(conn) {
 		return nil, ContextError(errors.New("pending connections already closed"))

+ 0 - 2
psiphon/config.go

@@ -34,8 +34,6 @@ const (
 	CONNECTION_WORKER_POOL_SIZE                  = 10
 	TUNNEL_POOL_SIZE                             = 1
 	TUNNEL_CONNECT_TIMEOUT                       = 15 * time.Second
-	TUNNEL_READ_TIMEOUT                          = 0 * time.Second
-	TUNNEL_WRITE_TIMEOUT                         = 5 * time.Second
 	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT              = 2 * time.Second
 	TUNNEL_PORT_FORWARD_DIAL_TIMEOUT             = 10 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES      = 256

+ 16 - 23
psiphon/meekConn.go

@@ -77,7 +77,6 @@ type MeekConn struct {
 	transport            transporter
 	mutex                sync.Mutex
 	isClosed             bool
-	closedSignal         chan struct{}
 	broadcastClosed      chan struct{}
 	relayWaitGroup       *sync.WaitGroup
 	emptyReceiveBuffer   chan *bytes.Buffer
@@ -255,46 +254,40 @@ func DialMeek(
 	go meek.relay()
 
 	// Enable interruption
-	config.PendingConns.Add(meek)
+	if !config.PendingConns.Add(meek) {
+		meek.Close()
+		return nil, ContextError(errors.New("pending connections already closed"))
+	}
 
 	return meek, nil
 }
 
-// SetClosedSignal implements psiphon.Conn.SetClosedSignal
-func (meek *MeekConn) SetClosedSignal(closedSignal chan struct{}) bool {
-	meek.mutex.Lock()
-	defer meek.mutex.Unlock()
-	if meek.isClosed {
-		return false
-	}
-	meek.closedSignal = closedSignal
-	return true
-}
-
 // Close terminates the meek connection. Close waits for the relay processing goroutine
 // to stop and releases HTTP transport resources.
-// A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
+// A mutex is required to support net.Conn concurrency semantics.
 func (meek *MeekConn) Close() (err error) {
+
 	meek.mutex.Lock()
-	defer meek.mutex.Unlock()
-	if !meek.isClosed {
+	isClosed := meek.isClosed
+	meek.isClosed = true
+	meek.mutex.Unlock()
+
+	if !isClosed {
 		close(meek.broadcastClosed)
 		meek.pendingConns.CloseAll()
 		meek.relayWaitGroup.Wait()
 		meek.transport.CloseIdleConnections()
-		meek.isClosed = true
-		select {
-		case meek.closedSignal <- *new(struct{}):
-		default:
-		}
 	}
 	return nil
 }
 
 func (meek *MeekConn) closed() bool {
+
 	meek.mutex.Lock()
-	defer meek.mutex.Unlock()
-	return meek.isClosed
+	isClosed := meek.isClosed
+	meek.mutex.Unlock()
+
+	return isClosed
 }
 
 // Read reads data from the connection.

+ 0 - 24
psiphon/net.go

@@ -34,13 +34,6 @@ const DNS_PORT = 53
 // of a Psiphon dialer (TCPDial, MeekDial, etc.)
 type DialConfig struct {
 
-	// ClosedSignal is triggered when an underlying TCPConn network
-	// connection is closed. This is used in operateTunnel to detect
-	// an unexpected disconnect. Channel should be have buffer to
-	// receive at least on signal. Sender in TCPConn.Close() does not
-	// block.
-	ClosedSignal chan struct{}
-
 	// UpstreamProxyUrl specifies a proxy to connect through.
 	// E.g., "http://proxyhost:8080"
 	//       "socks5://user:password@proxyhost:1080"
@@ -53,8 +46,6 @@ type DialConfig struct {
 	UpstreamProxyUrl string
 
 	ConnectTimeout time.Duration
-	ReadTimeout    time.Duration
-	WriteTimeout   time.Duration
 
 	// PendingConns is used to interrupt dials in progress.
 	// The dial may be interrupted using PendingConns.CloseAll(): on platforms
@@ -101,21 +92,6 @@ func (TimeoutError) Temporary() bool { return true }
 // Dialer is a custom dialer compatible with http.Transport.Dial.
 type Dialer func(string, string) (net.Conn, error)
 
-// Conn is a net.Conn which supports sending a signal to a channel when
-// it is closed. In Psiphon, this interface is implemented by tunnel
-// connection types (DirectConn and MeekConn) and the close signal is
-// used as one trigger for tearing down the tunnel.
-type Conn interface {
-	net.Conn
-
-	// SetClosedSignal sets the channel which will be signaled
-	// when the connection is closed. This function returns false
-	// if the connection is already closed (and would never send
-	// the signal). SetClosedSignal and Close may be called by
-	// concurrent goroutines.
-	SetClosedSignal(closedSignal chan struct{}) bool
-}
-
 // Conns is a synchronized list of Conns that is used to coordinate
 // interrupting a set of goroutines establishing connections, or
 // close a set of open connections, etc.

+ 18 - 24
psiphon/tunnel.go

@@ -83,7 +83,6 @@ type Tunnel struct {
 	session                  *Session
 	protocol                 string
 	conn                     net.Conn
-	closedSignal             chan struct{}
 	sshClient                *ssh.Client
 	operateWaitGroup         *sync.WaitGroup
 	shutdownOperateBroadcast chan struct{}
@@ -114,7 +113,7 @@ func EstablishTunnel(
 	}
 
 	// Build transport layers and establish SSH connection
-	conn, closedSignal, sshClient, err := dialSsh(
+	conn, sshClient, err := dialSsh(
 		config, pendingConns, serverEntry, selectedProtocol, sessionId)
 	if err != nil {
 		return nil, ContextError(err)
@@ -135,7 +134,6 @@ func EstablishTunnel(
 		serverEntry:              serverEntry,
 		protocol:                 selectedProtocol,
 		conn:                     conn,
-		closedSignal:             closedSignal,
 		sshClient:                sshClient,
 		operateWaitGroup:         new(sync.WaitGroup),
 		shutdownOperateBroadcast: make(chan struct{}),
@@ -174,8 +172,13 @@ func EstablishTunnel(
 // Close stops operating the tunnel and closes the underlying connection.
 // Supports multiple and/or concurrent calls to Close().
 func (tunnel *Tunnel) Close() {
+
 	tunnel.mutex.Lock()
-	if !tunnel.isClosed {
+	isClosed := tunnel.isClosed
+	tunnel.isClosed = true
+	tunnel.mutex.Unlock()
+
+	if !isClosed {
 		// Signal operateTunnel to stop before closing the tunnel -- this
 		// allows a final status request to be made in the case of an orderly
 		// shutdown.
@@ -192,8 +195,6 @@ func (tunnel *Tunnel) Close() {
 		// tunnel.conn.Close() may get called twice, which is allowed.
 		tunnel.conn.Close()
 	}
-	tunnel.isClosed = true
-	tunnel.mutex.Unlock()
 }
 
 // Dial establishes a port forward connection through the tunnel
@@ -204,6 +205,7 @@ func (tunnel *Tunnel) Dial(
 	tunnel.mutex.Lock()
 	isClosed := tunnel.isClosed
 	tunnel.mutex.Unlock()
+
 	if isClosed {
 		return nil, errors.New("tunnel is closed")
 	}
@@ -352,7 +354,7 @@ func dialSsh(
 	pendingConns *Conns,
 	serverEntry *ServerEntry,
 	selectedProtocol,
-	sessionId string) (conn net.Conn, closedSignal chan struct{}, sshClient *ssh.Client, err error) {
+	sessionId string) (conn net.Conn, sshClient *ssh.Client, err error) {
 
 	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
 	// So depending on which protocol is used, multiple layers are initialized.
@@ -383,11 +385,11 @@ func dialSsh(
 		// fronting-capable servers.
 
 		if len(serverEntry.MeekFrontingAddresses) == 0 {
-			return nil, nil, nil, ContextError(errors.New("MeekFrontingAddresses is empty"))
+			return nil, nil, ContextError(errors.New("MeekFrontingAddresses is empty"))
 		}
 		index, err := MakeSecureRandomInt(len(serverEntry.MeekFrontingAddresses))
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 		frontingAddress = serverEntry.MeekFrontingAddresses[index]
 	}
@@ -397,15 +399,10 @@ func dialSsh(
 		selectedProtocol,
 		frontingAddress)
 
-	closedSignal = make(chan struct{}, 1)
-
 	// Create the base transport: meek or direct connection
 	dialConfig := &DialConfig{
-		ClosedSignal:     closedSignal,
 		UpstreamProxyUrl: config.UpstreamProxyUrl,
 		ConnectTimeout:   TUNNEL_CONNECT_TIMEOUT,
-		ReadTimeout:      TUNNEL_READ_TIMEOUT,
-		WriteTimeout:     TUNNEL_WRITE_TIMEOUT,
 		PendingConns:     pendingConns,
 		DeviceBinder:     config.DeviceBinder,
 		DnsServerGetter:  config.DnsServerGetter,
@@ -413,12 +410,12 @@ func dialSsh(
 	if useMeek {
 		conn, err = DialMeek(serverEntry, sessionId, frontingAddress, dialConfig)
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	} else {
 		conn, err = DialTCP(fmt.Sprintf("%s:%d", serverEntry.IpAddress, port), dialConfig)
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	}
 
@@ -436,14 +433,14 @@ func dialSsh(
 	if useObfuscatedSsh {
 		sshConn, err = NewObfuscatedSshConn(conn, serverEntry.SshObfuscatedKey)
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	}
 
 	// Now establish the SSH session over the sshConn transport
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	if err != nil {
-		return nil, nil, nil, ContextError(err)
+		return nil, nil, ContextError(err)
 	}
 	sshCertChecker := &ssh.CertChecker{
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
@@ -459,7 +456,7 @@ func dialSsh(
 			SshPassword string `json:"SshPassword"`
 		}{sessionId, serverEntry.SshPassword})
 	if err != nil {
-		return nil, nil, nil, ContextError(err)
+		return nil, nil, ContextError(err)
 	}
 	sshClientConfig := &ssh.ClientConfig{
 		User: serverEntry.SshUsername,
@@ -503,10 +500,10 @@ func dialSsh(
 
 	result := <-resultChannel
 	if result.err != nil {
-		return nil, nil, nil, ContextError(result.err)
+		return nil, nil, ContextError(result.err)
 	}
 
-	return conn, closedSignal, result.sshClient, nil
+	return conn, result.sshClient, nil
 }
 
 // operateTunnel periodically sends status requests (traffic stats updates updates)
@@ -598,9 +595,6 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			}
 
-		case <-tunnel.closedSignal:
-			err = errors.New("tunnel closed unexpectedly")
-
 		case <-tunnel.shutdownOperateBroadcast:
 			// Attempt to send any remaining stats
 			sendStats(tunnel)