Explorar o código

Fix upstreamproxy integration bugs

* Type assertions for upstreamproxy.Error were invalid
* Direct calls to DialTCP missed the upstreamproxy integration
  * Refactored NewTCPDialer/DialTCP to ensure both use upstreamproxy
  * Now DialTCP returns a net.Conn instead of a psiphon.TCPConn, since
    that's what we get from upstreamproxy's dialer
  * Reworked setting of CloseSignal since DialTCP callers no longer
    see psiphon.TCPConn instances
Rod Hynes %!s(int64=10) %!d(string=hai) anos
pai
achega
a296ec01e1
Modificáronse 4 ficheiros con 40 adicións e 31 borrados
  1. 26 13
      psiphon/TCPConn.go
  2. 7 0
      psiphon/net.go
  3. 5 16
      psiphon/tunnel.go
  4. 2 2
      psiphon/upstreamproxy/upstreamproxy.go

+ 26 - 13
psiphon/TCPConn.go

@@ -49,12 +49,35 @@ type TCPConn struct {
 
 // NewTCPDialer creates a TCPDialer.
 func NewTCPDialer(config *DialConfig) Dialer {
+	return makeTCPDialer(config)
+}
+
+// DialTCP creates a new, connected TCPConn.
+func DialTCP(addr string, config *DialConfig) (conn net.Conn, err error) {
+	return makeTCPDialer(config)("tcp", addr)
+}
+
+// makeTCPDialer creates a custom dialer which creates TCPConn. An upstream
+// proxy is used when specified.
+func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, error) {
 
 	dialer := func(network, addr string) (net.Conn, error) {
 		if network != "tcp" {
-			return nil, errors.New("unsupported network type in NewTCPDialer")
+			return nil, errors.New("unsupported network type in TCPConn dialer")
+		}
+		conn, err := interruptibleTCPDial(addr, config)
+		if err != nil {
+			return nil, ContextError(err)
 		}
-		return DialTCP(addr, config)
+		if config.ClosedSignal != nil {
+			if !conn.SetClosedSignal(config.ClosedSignal) {
+				// Conn is already closed. This is not unexpected -- for example,
+				// when establish is interrupted.
+				// TODO: make this not log an error when called from establishTunnelWorker?
+				return nil, ContextError(errors.New("conn already closed"))
+			}
+		}
+		return conn, nil
 	}
 
 	if config.UpstreamProxyUrl != "" {
@@ -88,7 +111,7 @@ func NewTCPDialer(config *DialConfig) Dialer {
 			}()
 			result := <-resultChannel
 
-			if _, ok := result.err.(upstreamproxy.Error); ok {
+			if _, ok := result.err.(*upstreamproxy.Error); ok {
 				NoticeUpstreamProxyError(result.err)
 			}
 
@@ -99,16 +122,6 @@ func NewTCPDialer(config *DialConfig) Dialer {
 	return dialer
 }
 
-// TCPConn creates a new, connected TCPConn. It uses an upstream proxy
-// when specified.
-func DialTCP(addr string, config *DialConfig) (conn *TCPConn, err error) {
-	conn, err = interruptibleTCPDial(addr, config)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	return conn, nil
-}
-
 // SetClosedSignal implements psiphon.Conn.SetClosedSignal.
 func (conn *TCPConn) SetClosedSignal(closedSignal chan struct{}) bool {
 	conn.mutex.Lock()

+ 7 - 0
psiphon/net.go

@@ -34,6 +34,13 @@ const DNS_PORT = 53
 // of a Psiphon dialer (TCPDial, MeekDial, etc.)
 type DialConfig struct {
 
+	// ClosedSignal is triggered when an underlying TCPConn network
+	// connection is closed. This is used in operateTunnel to detect
+	// an unexpected disconnect. Channel should be have buffer to
+	// receive at least on signal. Sender in TCPConn.Close() does not
+	// block.
+	ClosedSignal chan struct{}
+
 	// UpstreamProxyUrl specifies a proxy to connect through.
 	// E.g., "http://proxyhost:8080"
 	//       "socks5://user:password@proxyhost:1080"

+ 5 - 16
psiphon/tunnel.go

@@ -82,7 +82,7 @@ type Tunnel struct {
 	serverEntry              *ServerEntry
 	session                  *Session
 	protocol                 string
-	conn                     Conn
+	conn                     net.Conn
 	closedSignal             chan struct{}
 	sshClient                *ssh.Client
 	operateWaitGroup         *sync.WaitGroup
@@ -337,7 +337,7 @@ func dialSsh(
 	pendingConns *Conns,
 	serverEntry *ServerEntry,
 	selectedProtocol,
-	sessionId string) (conn Conn, closedSignal chan struct{}, sshClient *ssh.Client, err error) {
+	sessionId string) (conn net.Conn, closedSignal chan struct{}, sshClient *ssh.Client, err error) {
 
 	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
 	// So depending on which protocol is used, multiple layers are initialized.
@@ -382,8 +382,11 @@ func dialSsh(
 		selectedProtocol,
 		frontingAddress)
 
+	closedSignal = make(chan struct{}, 1)
+
 	// Create the base transport: meek or direct connection
 	dialConfig := &DialConfig{
+		ClosedSignal:     closedSignal,
 		UpstreamProxyUrl: config.UpstreamProxyUrl,
 		ConnectTimeout:   TUNNEL_CONNECT_TIMEOUT,
 		ReadTimeout:      TUNNEL_READ_TIMEOUT,
@@ -412,20 +415,6 @@ func dialSsh(
 		}
 	}()
 
-	// Create signal which is triggered when the underlying network connection is closed,
-	// this is used in operateTunnel to detect an unexpected disconnect. SetClosedSignal
-	// is called here, well before operateTunnel, so that we don't need to handle the
-	// "already closed" with a tunnelOwner.SignalTunnelFailure() in operateTunnel (this
-	// was previously the order of events, which caused the establish process to sometimes
-	// run briefly when not needed).
-	closedSignal = make(chan struct{}, 1)
-	if !conn.SetClosedSignal(closedSignal) {
-		// Conn is already closed. This is not unexpected -- for example,
-		// when establish is interrupted.
-		// TODO: make this not log an error when called from establishTunnelWorker?
-		return nil, nil, nil, ContextError(errors.New("conn already closed"))
-	}
-
 	// Add obfuscated SSH layer
 	var sshConn net.Conn
 	sshConn = conn

+ 2 - 2
psiphon/upstreamproxy/upstreamproxy.go

@@ -33,8 +33,8 @@ type Error struct {
 }
 
 func proxyError(err error) error {
-	//Avoid multiple upstream.Error wrapping
-	if _, ok := err.(Error); ok {
+	// Avoid multiple upstream.Error wrapping
+	if _, ok := err.(*Error); ok {
 		return err
 	}
 	return &Error{error: fmt.Errorf("upstreamproxy error: %s", err)}