Browse Source

MeekConn uses its own pendingConn to respect the lifetime of its underlying direct connections; comprehensive read timeout for serverApi requests to detect when response body reading hangs.

Rod Hynes 11 years ago
parent
commit
fc9b3e9d7e
5 changed files with 42 additions and 20 deletions
  1. 5 9
      README.md
  2. 1 1
      psiphon/defaults.go
  3. 13 4
      psiphon/meekConn.go
  4. 19 4
      psiphon/serverApi.go
  5. 4 2
      psiphon/tunnel.go

+ 5 - 9
README.md

@@ -15,14 +15,7 @@ This project is currently at the proof-of-concept stage. Current production Psip
 
 
 ### TODO (proof-of-concept)
 ### TODO (proof-of-concept)
 
 
-* pendingConns lifecycle issue: MeekConn's dialer uses the establishTunnel pendingConns, which means there's
-  a chance that the asynchronous pendingConns.Interrupt() by establishTunnel will close a meek HTTPS conn. Also,
-  MeekConn is holding a reference to this pendingConns long after establishTunnel finishes.
-  fix: MeekConn should keep its own PendingConns for underlying HTTPS connections; MeekConn should go into the
-  establish pendingConns and when it closes, it should in turn close its own pendingConns.
-  ...And the same for serverApi.
-* ResponseHeaderTimeout is not sufficient to detect dead tunneled web requests
-  fix: use DirectDialer with all timeouts set, use dedicated pendingConns (see above)
+* PendingConns: is interrupting connection establishment worth the extra code complexity?
 * prefilter entries by capability; don't log "server does not have sufficient capabilities"
 * prefilter entries by capability; don't log "server does not have sufficient capabilities"
 * log noise: "use of closed network connection"
 * log noise: "use of closed network connection"
 * log noise(?): 'Unsolicited response received on idle HTTP channel starting with "H"'
 * log noise(?): 'Unsolicited response received on idle HTTP channel starting with "H"'
@@ -36,8 +29,11 @@ This project is currently at the proof-of-concept stage. Current production Psip
 * SOCKS5 support
 * SOCKS5 support
 * SSH compression
 * SSH compression
 * preemptive reconnect functionality
 * preemptive reconnect functionality
+  * unfronted meek almost makes this obsolete, since meek sessions survive underlying
+     HTTP transport socket disconnects. The client could prefer unfronted meek protocol
+     when handshake returns a preemptive_reconnect_lifetime_milliseconds.
 * implement page view stats
 * implement page view stats
-* implement local traffic stats (e.g., to display bytes sent/received
+* implement local traffic stats (e.g., to display bytes sent/received)
 * control interface (w/ event messages)?
 * control interface (w/ event messages)?
 * VpnService compatibility
 * VpnService compatibility
 * upstream proxy support
 * upstream proxy support

+ 1 - 1
psiphon/defaults.go

@@ -37,6 +37,6 @@ const (
 	FETCH_REMOTE_SERVER_LIST_RETRY_TIMEOUT   = 5 * time.Second
 	FETCH_REMOTE_SERVER_LIST_RETRY_TIMEOUT   = 5 * time.Second
 	FETCH_REMOTE_SERVER_LIST_STALE_TIMEOUT   = 6 * time.Hour
 	FETCH_REMOTE_SERVER_LIST_STALE_TIMEOUT   = 6 * time.Hour
 	PSIPHON_API_CLIENT_SESSION_ID_LENGTH     = 16
 	PSIPHON_API_CLIENT_SESSION_ID_LENGTH     = 16
-	PSIPHON_API_SERVER_TIMEOUT               = 15 * time.Second
+	PSIPHON_API_SERVER_TIMEOUT               = 20 * time.Second
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST = 50
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST = 50
 )
 )

+ 13 - 4
psiphon/meekConn.go

@@ -66,6 +66,7 @@ const (
 type MeekConn struct {
 type MeekConn struct {
 	url                 *url.URL
 	url                 *url.URL
 	cookie              *http.Cookie
 	cookie              *http.Cookie
+	pendingConns        *PendingConns
 	transport           *http.Transport
 	transport           *http.Transport
 	mutex               sync.Mutex
 	mutex               sync.Mutex
 	isClosed            bool
 	isClosed            bool
@@ -85,12 +86,16 @@ type MeekConn struct {
 // useFronting assumes caller has already checked server entry capabilities.
 // useFronting assumes caller has already checked server entry capabilities.
 func NewMeekConn(
 func NewMeekConn(
 	serverEntry *ServerEntry, sessionId string, useFronting bool,
 	serverEntry *ServerEntry, sessionId string, useFronting bool,
-	connectTimeout, readTimeout, writeTimeout time.Duration,
-	pendingConns *PendingConns) (meek *MeekConn, err error) {
+	connectTimeout, readTimeout, writeTimeout time.Duration) (meek *MeekConn, err error) {
 	// Configure transport
 	// Configure transport
+	// Note: MeekConn has its own PendingConns to manage the underlying HTTP transport connections,
+	// which may be interrupted on MeekConn.Close(). This code previously used the establishTunnel
+	// pendingConns here, but that was a lifecycle mismatch: we don't want to abort HTTP transport
+	// connections while MeekConn is still in use
+	pendingConns := new(PendingConns)
+	directDialer := NewDirectDialer(connectTimeout, readTimeout, writeTimeout, pendingConns)
 	var host string
 	var host string
 	var dialer Dialer
 	var dialer Dialer
-	directDialer := NewDirectDialer(connectTimeout, readTimeout, writeTimeout, pendingConns)
 	if useFronting {
 	if useFronting {
 		// In this case, host is not what is dialed but is what ends up in the HTTP Host header
 		// In this case, host is not what is dialed but is what ends up in the HTTP Host header
 		host = serverEntry.MeekFrontingHost
 		host = serverEntry.MeekFrontingHost
@@ -142,6 +147,7 @@ func NewMeekConn(
 	meek = &MeekConn{
 	meek = &MeekConn{
 		url:                 url,
 		url:                 url,
 		cookie:              cookie,
 		cookie:              cookie,
+		pendingConns:        pendingConns,
 		transport:           transport,
 		transport:           transport,
 		broadcastClosed:     make(chan struct{}),
 		broadcastClosed:     make(chan struct{}),
 		relayWaitGroup:      new(sync.WaitGroup),
 		relayWaitGroup:      new(sync.WaitGroup),
@@ -175,9 +181,12 @@ func (meek *MeekConn) Close() (err error) {
 	meek.mutex.Lock()
 	meek.mutex.Lock()
 	defer meek.mutex.Unlock()
 	defer meek.mutex.Unlock()
 	if !meek.isClosed {
 	if !meek.isClosed {
-		// TODO: meek.transport.CancelRequest() for current request?
 		close(meek.broadcastClosed)
 		close(meek.broadcastClosed)
 		meek.relayWaitGroup.Wait()
 		meek.relayWaitGroup.Wait()
+		meek.pendingConns.Interrupt()
+		// TODO: meek.transport.CancelRequest() for current in-flight request?
+		// (pendingConns will abort establishing connections, but not established
+		// persistent connections)
 		meek.transport.CloseIdleConnections()
 		meek.transport.CloseIdleConnections()
 		meek.isClosed = true
 		meek.isClosed = true
 		select {
 		select {

+ 19 - 4
psiphon/serverApi.go

@@ -25,7 +25,6 @@ import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"io/ioutil"
 	"io/ioutil"
-	"net"
 	"net/http"
 	"net/http"
 	"strconv"
 	"strconv"
 )
 )
@@ -38,6 +37,7 @@ type Session struct {
 	sessionId          string
 	sessionId          string
 	config             *Config
 	config             *Config
 	tunnel             *Tunnel
 	tunnel             *Tunnel
+	pendingConns       *PendingConns
 	psiphonHttpsClient *http.Client
 	psiphonHttpsClient *http.Client
 }
 }
 
 
@@ -50,7 +50,8 @@ func NewSession(
 	tunnel *Tunnel,
 	tunnel *Tunnel,
 	localHttpProxyAddress, sessionId string) (session *Session, err error) {
 	localHttpProxyAddress, sessionId string) (session *Session, err error) {
 
 
-	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel, localHttpProxyAddress)
+	pendingConns := new(PendingConns)
+	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel, pendingConns, localHttpProxyAddress)
 	if err != nil {
 	if err != nil {
 		return nil, ContextError(err)
 		return nil, ContextError(err)
 	}
 	}
@@ -58,6 +59,7 @@ func NewSession(
 		sessionId:          sessionId,
 		sessionId:          sessionId,
 		config:             config,
 		config:             config,
 		tunnel:             tunnel,
 		tunnel:             tunnel,
+		pendingConns:       pendingConns,
 		psiphonHttpsClient: psiphonHttpsClient,
 		psiphonHttpsClient: psiphonHttpsClient,
 	}
 	}
 	// Sending two seperate requests is a legacy from when the handshake was
 	// Sending two seperate requests is a legacy from when the handshake was
@@ -257,14 +259,27 @@ func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, er
 // As the custom dialer makes an explicit TLS connection, URLs submitted to the returned
 // As the custom dialer makes an explicit TLS connection, URLs submitted to the returned
 // http.Client should use the "http://" scheme. Otherwise http.Transport will try to do another TLS
 // http.Client should use the "http://" scheme. Otherwise http.Transport will try to do another TLS
 // handshake inside the explicit TLS session.
 // handshake inside the explicit TLS session.
-func makePsiphonHttpsClient(tunnel *Tunnel, localHttpProxyAddress string) (httpsClient *http.Client, err error) {
+func makePsiphonHttpsClient(
+	tunnel *Tunnel,
+	pendingConns *PendingConns,
+	localHttpProxyAddress string) (httpsClient *http.Client, err error) {
+
 	certificate, err := DecodeCertificate(tunnel.serverEntry.WebServerCertificate)
 	certificate, err := DecodeCertificate(tunnel.serverEntry.WebServerCertificate)
 	if err != nil {
 	if err != nil {
 		return nil, ContextError(err)
 		return nil, ContextError(err)
 	}
 	}
+	// Note: This use of readTimeout will tear down persistent HTTP connections, which is not the
+	// intended purpose. The readTimeout is to abort NewSession when the Psiphon server responds to
+	// handshake/connected requests but fails to deliver the response body (e.g., ResponseHeaderTimeout
+	// is not sufficient to timeout this case).
+	directDialer := NewDirectDialer(
+		PSIPHON_API_SERVER_TIMEOUT,
+		PSIPHON_API_SERVER_TIMEOUT,
+		PSIPHON_API_SERVER_TIMEOUT,
+		pendingConns)
 	dialer := NewCustomTLSDialer(
 	dialer := NewCustomTLSDialer(
 		&CustomTLSConfig{
 		&CustomTLSConfig{
-			Dial:                    new(net.Dialer).Dial,
+			Dial:                    directDialer,
 			Timeout:                 PSIPHON_API_SERVER_TIMEOUT,
 			Timeout:                 PSIPHON_API_SERVER_TIMEOUT,
 			HttpProxyAddress:        localHttpProxyAddress,
 			HttpProxyAddress:        localHttpProxyAddress,
 			SendServerName:          false,
 			SendServerName:          false,

+ 4 - 2
psiphon/tunnel.go

@@ -131,11 +131,13 @@ func EstablishTunnel(
 	if useMeek {
 	if useMeek {
 		conn, err = NewMeekConn(
 		conn, err = NewMeekConn(
 			serverEntry, sessionId, useFronting,
 			serverEntry, sessionId, useFronting,
-			TUNNEL_CONNECT_TIMEOUT, TUNNEL_READ_TIMEOUT, TUNNEL_WRITE_TIMEOUT,
-			pendingConns)
+			TUNNEL_CONNECT_TIMEOUT, TUNNEL_READ_TIMEOUT, TUNNEL_WRITE_TIMEOUT)
 		if err != nil {
 		if err != nil {
 			return nil, ContextError(err)
 			return nil, ContextError(err)
 		}
 		}
+		// TODO: MeekConn doesn't go into pendingConns since there's no direct connection to
+		// interrupt; underlying HTTP connections may be candidates for interruption, but only
+		// after relay starts polling...
 	} else {
 	} else {
 		conn, err = DirectDial(
 		conn, err = DirectDial(
 			fmt.Sprintf("%s:%d", serverEntry.IpAddress, port),
 			fmt.Sprintf("%s:%d", serverEntry.IpAddress, port),