Procházet zdrojové kódy

- Add Closer interface and implement in Conns
- When any port forward error occurs, the tunnel
operator will first check if the underlying
Conn is closed and, if so, bypass the SSH keep
alive probe and immediately terminate the tunnel
- Fix: log DialPluginProtocol init failures

Rod Hynes před 9 roky
rodič
revize
63c9d31ab7
6 změnil soubory, kde provedl 58 přidání a 20 odebrání
  1. 8 0
      psiphon/TCPConn.go
  2. 16 0
      psiphon/common/net.go
  3. 5 3
      psiphon/meekConn.go
  4. 0 7
      psiphon/net.go
  5. 1 1
      psiphon/tlsDialer.go
  6. 28 9
      psiphon/tunnel.go

+ 8 - 0
psiphon/TCPConn.go

@@ -212,6 +212,14 @@ func (conn *TCPConn) Close() (err error) {
 	return err
 	return err
 }
 }
 
 
+// IsClosed implements the Closer iterface. The return value
+// indicates whether the TCPConn has been closed.
+func (conn *TCPConn) IsClosed() bool {
+	conn.mutex.Lock()
+	defer conn.mutex.Unlock()
+	return conn.isClosed
+}
+
 // CloseWrite calls net.TCPConn.CloseWrite when the underlying
 // CloseWrite calls net.TCPConn.CloseWrite when the underlying
 // conn is a *net.TCPConn.
 // conn is a *net.TCPConn.
 func (conn *TCPConn) CloseWrite() (err error) {
 func (conn *TCPConn) CloseWrite() (err error) {

+ 16 - 0
psiphon/common/net.go

@@ -174,6 +174,12 @@ func (entry *LRUConnsEntry) Touch() {
 	entry.lruConns.list.MoveToFront(entry.element)
 	entry.lruConns.list.MoveToFront(entry.element)
 }
 }
 
 
+// Closer defines the interface to a type, typically
+// a net.Conn, that can be closed.
+type Closer interface {
+	IsClosed() bool
+}
+
 // ActivityMonitoredConn wraps a net.Conn, adding logic to deal with
 // ActivityMonitoredConn wraps a net.Conn, adding logic to deal with
 // events triggered by I/O activity.
 // events triggered by I/O activity.
 //
 //
@@ -310,3 +316,13 @@ func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
 	// Note: no context error to preserve error type
 	// Note: no context error to preserve error type
 	return n, err
 	return n, err
 }
 }
+
+// IsClosed implements the Closer iterface. The return value
+// indicates whether the underlying conn has been closed.
+func (conn *ActivityMonitoredConn) IsClosed() bool {
+	closer, ok := conn.Conn.(Closer)
+	if !ok {
+		return false
+	}
+	return closer.IsClosed()
+}

+ 5 - 3
psiphon/meekConn.go

@@ -353,7 +353,9 @@ func (meek *MeekConn) Close() (err error) {
 	return nil
 	return nil
 }
 }
 
 
-func (meek *MeekConn) closed() bool {
+// IsClosed implements the Closer iterface. The return value
+// indicates whether the MeekConn has been closed.
+func (meek *MeekConn) IsClosed() bool {
 
 
 	meek.mutex.Lock()
 	meek.mutex.Lock()
 	isClosed := meek.isClosed
 	isClosed := meek.isClosed
@@ -365,7 +367,7 @@ func (meek *MeekConn) closed() bool {
 // Read reads data from the connection.
 // Read reads data from the connection.
 // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
 // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
 func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
 func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
-	if meek.closed() {
+	if meek.IsClosed() {
 		return 0, common.ContextError(errors.New("meek connection is closed"))
 		return 0, common.ContextError(errors.New("meek connection is closed"))
 	}
 	}
 	// Block until there is received data to consume
 	// Block until there is received data to consume
@@ -384,7 +386,7 @@ func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
 // Write writes data to the connection.
 // Write writes data to the connection.
 // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
 // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
 func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
 func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
-	if meek.closed() {
+	if meek.IsClosed() {
 		return 0, common.ContextError(errors.New("meek connection is closed"))
 		return 0, common.ContextError(errors.New("meek connection is closed"))
 	}
 	}
 	// Repeats until all n bytes are written
 	// Repeats until all n bytes are written

+ 0 - 7
psiphon/net.go

@@ -129,13 +129,6 @@ type IPv6Synthesizer interface {
 	IPv6Synthesize(IPv4Addr string) string
 	IPv6Synthesize(IPv4Addr string) string
 }
 }
 
 
-// TimeoutError implements the error interface
-type TimeoutError struct{}
-
-func (TimeoutError) Error() string   { return "timed out" }
-func (TimeoutError) Timeout() bool   { return true }
-func (TimeoutError) Temporary() bool { return true }
-
 // Dialer is a custom dialer compatible with http.Transport.Dial.
 // Dialer is a custom dialer compatible with http.Transport.Dial.
 type Dialer func(string, string) (net.Conn, error)
 type Dialer func(string, string) (net.Conn, error)
 
 

+ 1 - 1
psiphon/tlsDialer.go

@@ -155,7 +155,7 @@ func CustomTLSDial(network, addr string, config *CustomTLSConfig) (net.Conn, err
 	if config.Timeout != 0 {
 	if config.Timeout != 0 {
 		errChannel = make(chan error, 2)
 		errChannel = make(chan error, 2)
 		time.AfterFunc(config.Timeout, func() {
 		time.AfterFunc(config.Timeout, func() {
-			errChannel <- TimeoutError{}
+			errChannel <- errors.New("timed out")
 		})
 		})
 	}
 	}
 
 

+ 28 - 9
psiphon/tunnel.go

@@ -709,8 +709,12 @@ func dialSsh(
 			},
 			},
 			directTCPDialAddress)
 			directTCPDialAddress)
 
 
+		if !dialedPlugin && err != nil {
+			NoticeInfo("DialPluginProtocol intialization failed: %s", err)
+		}
+
 		if dialedPlugin {
 		if dialedPlugin {
-			NoticeInfo("dialed plugin protocol for %s", serverEntry.IpAddress)
+			NoticeInfo("using DialPluginProtocol for %s", serverEntry.IpAddress)
 		} else {
 		} else {
 			// Standard direct connection.
 			// Standard direct connection.
 			dialConn, err = DialTCP(directTCPDialAddress, dialConfig)
 			dialConn, err = DialTCP(directTCPDialAddress, dialConfig)
@@ -721,6 +725,12 @@ func dialSsh(
 		}
 		}
 	}
 	}
 
 
+	// If dialConn is not a Closer, tunnel failure detection may be slower
+	_, ok := dialConn.(common.Closer)
+	if !ok {
+		NoticeAlert("tunnel.dialSsh: dialConn is not a Closer")
+	}
+
 	cleanupConn := dialConn
 	cleanupConn := dialConn
 	defer func() {
 	defer func() {
 		// Cleanup on error
 		// Cleanup on error
@@ -1075,14 +1085,23 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 			NoticeInfo("port forward failures for %s: %d",
 			NoticeInfo("port forward failures for %s: %d",
 				tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
 				tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
 
 
-			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(monotime.Now()) {
-				select {
-				case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAliveProbeTimeoutSeconds) * time.Second:
-				default:
+			// If the underlying Conn has closed (meek and other plugin protocols may close
+			// themselves in certain error conditions), the tunnel has certainly failed.
+			// Otherwise, probe with an SSH keep alive.
+
+			if tunnel.conn.IsClosed() {
+				err = errors.New("underlying conn is closed")
+			} else {
+				if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(monotime.Now()) {
+					select {
+					case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAliveProbeTimeoutSeconds) * time.Second:
+					default:
+					}
 				}
 				}
-			}
-			if !tunnel.config.DisablePeriodicSshKeepAlive {
-				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
+				if !tunnel.config.DisablePeriodicSshKeepAlive {
+					sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
+				}
+
 			}
 			}
 
 
 		case err = <-sshKeepAliveError:
 		case err = <-sshKeepAliveError:
@@ -1214,7 +1233,7 @@ func sendSshKeepAlive(
 	errChannel := make(chan error, 2)
 	errChannel := make(chan error, 2)
 	if timeout > 0 {
 	if timeout > 0 {
 		time.AfterFunc(timeout, func() {
 		time.AfterFunc(timeout, func() {
-			errChannel <- TimeoutError{}
+			errChannel <- errors.New("timed out")
 		})
 		})
 	}
 	}