Просмотр исходного кода

Refactor upstreamproxy integration

* Fix: for a successful upstreamproxy.Conn, underlying TCPConn
  was left in pendingConns and as a result was incorrectly closed
  after the tunnel was established
* Fix: leaked resources for abandoned connections
* Simplfied code; no longer multiple ConnectTimeout goroutines
Rod Hynes 10 лет назад
Родитель
Сommit
21f77df973
1 измененных файлов с 35 добавлено и 54 удалено
  1. 35 54
      psiphon/TCPConn.go

+ 35 - 54
psiphon/TCPConn.go

@@ -23,7 +23,6 @@ import (
 	"errors"
 	"net"
 	"sync"
-	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
 )
@@ -52,11 +51,9 @@ func DialTCP(addr string, config *DialConfig) (conn net.Conn, err error) {
 	return makeTCPDialer(config)("tcp", addr)
 }
 
-// makeTCPDialer creates a custom dialer which creates TCPConn. An upstream
-// proxy is used when specified.
+// makeTCPDialer creates a custom dialer which creates TCPConn.
 func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, error) {
-
-	dialer := func(network, addr string) (net.Conn, error) {
+	return func(network, addr string) (net.Conn, error) {
 		if network != "tcp" {
 			return nil, errors.New("unsupported network type in TCPConn dialer")
 		}
@@ -66,59 +63,18 @@ func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, err
 		}
 		return conn, nil
 	}
-
-	if config.UpstreamProxyUrl != "" {
-
-		upstreamDialer := upstreamproxy.NewProxyDialFunc(
-			&upstreamproxy.UpstreamProxyConfig{
-				ForwardDialFunc: dialer,
-				ProxyURIString:  config.UpstreamProxyUrl,
-			})
-
-		dialer = func(network, addr string) (conn net.Conn, err error) {
-
-			// The entire upstream dial is wrapped in an explicit timeout. This
-			// may include network connection read and writes when proxy auth negotation
-			// is performed.
-
-			type upstreamDialResult struct {
-				conn net.Conn
-				err  error
-			}
-			if config.ConnectTimeout != 0 {
-				resultChannel := make(chan *upstreamDialResult, 2)
-				time.AfterFunc(config.ConnectTimeout, func() {
-					// TODO: we could "interrupt" the underlying TCPConn at this point, as
-					// it's being abandoned. But we don't have a reference to it. It's left
-					// to the outer DialConfig.PendingConns to track and clean up that TCPConn.
-					resultChannel <- &upstreamDialResult{nil, errors.New("upstreamproxy dial timeout")}
-				})
-				go func() {
-					conn, err := upstreamDialer(network, addr)
-					resultChannel <- &upstreamDialResult{conn, err}
-				}()
-				result := <-resultChannel
-
-				conn, err = result.conn, result.err
-			} else {
-				conn, err = upstreamDialer(network, addr)
-			}
-
-			if _, ok := err.(*upstreamproxy.Error); ok {
-				NoticeUpstreamProxyError(err)
-			}
-			return conn, err
-		}
-	}
-
-	return dialer
 }
 
 // interruptibleTCPDial establishes a TCP network connection. A conn is added
 // to config.PendingConns before blocking on network I/O, which enables interruption.
 // The caller is responsible for removing an established conn from PendingConns.
+// An upstream proxy is used when specified.
+//
+// Note: do not to set a UpstreamProxyUrl in the config when using
+// NewTCPDialer as a custom dialer for NewProxyAuthTransport (or http.Transport
+// with a ProxyUrl), as that would result in double proxy chaining.
 //
-// Note that interruption does not actually cancel a connection in progress; it
+// Note: interruption does not actually cancel a connection in progress; it
 // stops waiting for the goroutine blocking on connect()/Dial.
 func interruptibleTCPDial(addr string, config *DialConfig) (*TCPConn, error) {
 
@@ -131,14 +87,21 @@ func interruptibleTCPDial(addr string, config *DialConfig) (*TCPConn, error) {
 		return nil, ContextError(errors.New("pending connections already closed"))
 	}
 
-	// Call the blocking Connect() in a goroutine
+	// Call the blocking Connect() in a goroutine. ConnectTimeout is handled
+	// in the platform-specific tcpDial helper function.
 	// Note: since this goroutine may be left running after an interrupt, don't
 	// call Notice() or perform other actions unexpected after a Controller stops.
 	// The lifetime of the goroutine may depend on the host OS TCP connect timeout
 	// when tcpDial, amoung other things, when makes a blocking syscall.Connect()
 	// call.
 	go func() {
-		netConn, err := tcpDial(addr, config, conn.dialResult)
+		var netConn net.Conn
+		var err error
+		if config.UpstreamProxyUrl != "" {
+			netConn, err = proxiedTcpDial(addr, config, conn.dialResult)
+		} else {
+			netConn, err = tcpDial(addr, config, conn.dialResult)
+		}
 
 		// Mutex is necessary for referencing conn.isClosed and conn.Conn as
 		// TCPConn.Close may be called while this goroutine is running.
@@ -169,6 +132,24 @@ func interruptibleTCPDial(addr string, config *DialConfig) (*TCPConn, error) {
 	return conn, nil
 }
 
+// proxiedTcpDial wraps a tcpDial call in an upstreamproxy dial.
+func proxiedTcpDial(
+	addr string, config *DialConfig, dialResult chan error) (net.Conn, error) {
+	dialer := func(network, addr string) (net.Conn, error) {
+		return tcpDial(addr, config, dialResult)
+	}
+	upstreamDialer := upstreamproxy.NewProxyDialFunc(
+		&upstreamproxy.UpstreamProxyConfig{
+			ForwardDialFunc: dialer,
+			ProxyURIString:  config.UpstreamProxyUrl,
+		})
+	netConn, err := upstreamDialer("tcp", addr)
+	if _, ok := err.(*upstreamproxy.Error); ok {
+		NoticeUpstreamProxyError(err)
+	}
+	return netConn, err
+}
+
 // Close terminates a connected TCPConn or interrupts a dialing TCPConn.
 func (conn *TCPConn) Close() (err error) {
 	conn.mutex.Lock()