Pārlūkot izejas kodu

Added local connection shutdown to SOCKS proxy; refactored PendingConns into a general conn tracker and closer used for pending conns and also HTTP and SOCKS proxy open conn management

Rod Hynes 11 gadi atpakaļ
vecāks
revīzija
ccde887e24

+ 1 - 3
README.md

@@ -21,12 +21,10 @@ This project is currently at the proof-of-concept stage. Current production Psip
 * log noise(?): 'Unsolicited response received on idle HTTP channel starting with "H"'
 * use ContextError in more places
 * build/test on Android and iOS
-* disconnect all local proxy clients when tunnel disconnected
-* add connection and idle timeouts to proxied connections where appropriate
+* reconnection busy loop when no network available (ex. close laptop)
 
 ### TODO (future)
 
-* SOCKS5 support
 * SSH compression
 * preemptive reconnect functionality
   * unfronted meek almost makes this obsolete, since meek sessions survive underlying

+ 21 - 21
psiphon/conn.go

@@ -42,34 +42,34 @@ type Conn interface {
 	SetClosedSignal(closedSignal chan struct{}) (err error)
 }
 
-// PendingConns is a synchronized list of Conns that is used to coordinate
-// interrupting a set of goroutines establishing connections.
-type PendingConns struct {
+// Conns is a synchronized list of Conns that is used to coordinate
+// interrupting a set of goroutines establishing connections, or
+// close a set of open connections, etc.
+type Conns struct {
 	mutex sync.Mutex
-	conns []Conn
+	conns map[net.Conn]bool
 }
 
-func (pendingConns *PendingConns) Add(conn Conn) {
-	pendingConns.mutex.Lock()
-	defer pendingConns.mutex.Unlock()
-	pendingConns.conns = append(pendingConns.conns, conn)
+func (conns *Conns) Add(conn net.Conn) {
+	conns.mutex.Lock()
+	defer conns.mutex.Unlock()
+	if conns.conns == nil {
+		conns.conns = make(map[net.Conn]bool)
+	}
+	conns.conns[conn] = true
 }
 
-func (pendingConns *PendingConns) Remove(conn Conn) {
-	pendingConns.mutex.Lock()
-	defer pendingConns.mutex.Unlock()
-	for index, pendingConn := range pendingConns.conns {
-		if conn == pendingConn {
-			pendingConns.conns = append(pendingConns.conns[:index], pendingConns.conns[index+1:]...)
-			break
-		}
-	}
+func (conns *Conns) Remove(conn net.Conn) {
+	conns.mutex.Lock()
+	defer conns.mutex.Unlock()
+	delete(conns.conns, conn)
 }
 
-func (pendingConns *PendingConns) Interrupt() {
-	pendingConns.mutex.Lock()
-	defer pendingConns.mutex.Unlock()
-	for _, conn := range pendingConns.conns {
+func (conns *Conns) CloseAll() {
+	conns.mutex.Lock()
+	defer conns.mutex.Unlock()
+	for conn, _ := range conns.conns {
 		conn.Close()
 	}
+	conns.conns = make(map[net.Conn]bool)
 }

+ 3 - 3
psiphon/directConn.go

@@ -45,7 +45,7 @@ type DirectConn struct {
 // NewDirectDialer creates a DirectDialer.
 func NewDirectDialer(
 	connectTimeout, readTimeout, writeTimeout time.Duration,
-	pendingConns *PendingConns) Dialer {
+	pendingConns *Conns) Dialer {
 
 	return func(network, addr string) (net.Conn, error) {
 		if network != "tcp" {
@@ -59,13 +59,13 @@ func NewDirectDialer(
 }
 
 // DirectDial creates a new, connected DirectConn. The connection may be
-// interrupted using pendingConns.interrupt(): on platforms that support this,
+// interrupted using pendingConns.CloseAll(): on platforms that support this,
 // the new DirectConn is added to pendingConns before the socket connect begins
 // and removed from pendingConns once the connect succeeds or fails.
 func DirectDial(
 	addr string,
 	connectTimeout, readTimeout, writeTimeout time.Duration,
-	pendingConns *PendingConns) (conn *DirectConn, err error) {
+	pendingConns *Conns) (conn *DirectConn, err error) {
 
 	conn, err = interruptibleDial(addr, connectTimeout, readTimeout, writeTimeout, pendingConns)
 	if err != nil {

+ 1 - 1
psiphon/directConn_unix.go

@@ -41,7 +41,7 @@ type interruptibleConn struct {
 func interruptibleDial(
 	addr string,
 	connectTimeout, readTimeout, writeTimeout time.Duration,
-	pendingConns *PendingConns) (conn *DirectConn, err error) {
+	pendingConns *Conns) (conn *DirectConn, err error) {
 	// Create a socket and then, before connecting, add a DirectConn with
 	// the unconnected socket to pendingConns. This allows pendingConns to
 	// abort connections in progress.

+ 1 - 1
psiphon/directConn_windows.go

@@ -32,7 +32,7 @@ type interruptibleConn struct {
 func interruptibleDial(
 	addr string,
 	connectTimeout, readTimeout, writeTimeout time.Duration,
-	pendingConns *PendingConns) (conn *DirectConn, err error) {
+	pendingConns *Conns) (conn *DirectConn, err error) {
 	// Note: using net.Dial(); interruptible connections not supported on Windows
 	netConn, err := net.DialTimeout("tcp", addr, connectTimeout)
 	if err != nil {

+ 7 - 22
psiphon/httpProxy.go

@@ -36,7 +36,7 @@ type HttpProxy struct {
 	listener      net.Listener
 	waitGroup     *sync.WaitGroup
 	httpRelay     *http.Transport
-	openConns     map[net.Conn]bool
+	openConns     *Conns
 }
 
 // NewHttpProxy initializes and runs a new HTTP proxy server.
@@ -60,7 +60,7 @@ func NewHttpProxy(listenPort int, tunnel *Tunnel, stoppedSignal chan struct{}) (
 		listener:      listener,
 		waitGroup:     new(sync.WaitGroup),
 		httpRelay:     transport,
-		openConns:     make(map[net.Conn]bool),
+		openConns:     new(Conns),
 	}
 	proxy.waitGroup.Add(1)
 	go proxy.serveHttpRequests()
@@ -73,7 +73,7 @@ func (proxy *HttpProxy) Close() {
 	proxy.listener.Close()
 	proxy.waitGroup.Wait()
 	// Close local->proxy persistent connections
-	proxy.closeOpenConns()
+	proxy.openConns.CloseAll()
 	// Close idle proxy->origin persistent connections
 	// TODO: also close active connections
 	proxy.httpRelay.CloseIdleConnections()
@@ -181,8 +181,8 @@ var hopHeaders = []string{
 
 func (proxy *HttpProxy) httpConnectHandler(tunnel *Tunnel, localHttpConn net.Conn, target string) (err error) {
 	defer localHttpConn.Close()
-	defer proxy.removeOpenConn(localHttpConn)
-	proxy.addOpenConn(localHttpConn)
+	defer proxy.openConns.Remove(localHttpConn)
+	proxy.openConns.Add(localHttpConn)
 	remoteSshForward, err := tunnel.sshClient.Dial("tcp", target)
 	if err != nil {
 		return ContextError(err)
@@ -205,29 +205,14 @@ func (proxy *HttpProxy) httpConnectHandler(tunnel *Tunnel, localHttpConn net.Con
 func (proxy *HttpProxy) httpConnStateCallback(conn net.Conn, connState http.ConnState) {
 	switch connState {
 	case http.StateNew:
-		proxy.addOpenConn(conn)
+		proxy.openConns.Add(conn)
 	case http.StateActive, http.StateIdle:
 		// No action
 	case http.StateHijacked, http.StateClosed:
-		proxy.removeOpenConn(conn)
+		proxy.openConns.Remove(conn)
 	}
 }
 
-func (proxy *HttpProxy) addOpenConn(conn net.Conn) {
-	proxy.openConns[conn] = true
-}
-
-func (proxy *HttpProxy) removeOpenConn(conn net.Conn) {
-	delete(proxy.openConns, conn)
-}
-
-func (proxy *HttpProxy) closeOpenConns() {
-	for conn, _ := range proxy.openConns {
-		conn.Close()
-	}
-	proxy.openConns = make(map[net.Conn]bool)
-}
-
 func (proxy *HttpProxy) serveHttpRequests() {
 	defer proxy.listener.Close()
 	defer proxy.waitGroup.Done()

+ 5 - 5
psiphon/meekConn.go

@@ -67,7 +67,7 @@ const (
 type MeekConn struct {
 	url                  *url.URL
 	cookie               *http.Cookie
-	pendingConns         *PendingConns
+	pendingConns         *Conns
 	transport            *http.Transport
 	mutex                sync.Mutex
 	isClosed             bool
@@ -96,7 +96,7 @@ func NewMeekConn(
 	// which may be interrupted on MeekConn.Close(). This code previously used the establishTunnel
 	// pendingConns here, but that was a lifecycle mismatch: we don't want to abort HTTP transport
 	// connections while MeekConn is still in use
-	pendingConns := new(PendingConns)
+	pendingConns := new(Conns)
 	directDialer := NewDirectDialer(connectTimeout, readTimeout, writeTimeout, pendingConns)
 	var host string
 	var dialer Dialer
@@ -192,11 +192,11 @@ func (meek *MeekConn) Close() (err error) {
 	defer meek.mutex.Unlock()
 	if !meek.isClosed {
 		close(meek.broadcastClosed)
-		meek.pendingConns.Interrupt()
+		meek.pendingConns.CloseAll()
 		meek.relayWaitGroup.Wait()
 		// TODO: meek.transport.CancelRequest() for current in-flight request?
-		// (pendingConns will abort establishing connections, but not established
-		// persistent connections)
+		// (currently pendingConns will abort establishing connections, but not
+		// established persistent connections)
 		meek.transport.CloseIdleConnections()
 		meek.isClosed = true
 		select {

+ 3 - 3
psiphon/runTunnel.go

@@ -41,7 +41,7 @@ func establishTunnelWorker(
 	workerWaitGroup *sync.WaitGroup,
 	candidateServerEntries chan *ServerEntry,
 	broadcastStopWorkers chan struct{},
-	pendingConns *PendingConns,
+	pendingConns *Conns,
 	establishedTunnels chan *Tunnel) {
 
 	defer workerWaitGroup.Done()
@@ -83,7 +83,7 @@ func discardTunnel(tunnel *Tunnel) {
 func establishTunnel(config *Config, sessionId string) (tunnel *Tunnel, err error) {
 	workerWaitGroup := new(sync.WaitGroup)
 	candidateServerEntries := make(chan *ServerEntry)
-	pendingConns := new(PendingConns)
+	pendingConns := new(Conns)
 	establishedTunnels := make(chan *Tunnel, 1)
 	timeout := time.After(ESTABLISH_TUNNEL_TIMEOUT)
 	broadcastStopWorkers := make(chan struct{})
@@ -122,7 +122,7 @@ func establishTunnel(config *Config, sessionId string) (tunnel *Tunnel, err erro
 	go func() {
 		// Interrupt any partial connections in progress, so that
 		// the worker will terminate immediately
-		pendingConns.Interrupt()
+		pendingConns.CloseAll()
 		workerWaitGroup.Wait()
 		// Drain any excess tunnels
 		close(establishedTunnels)

+ 3 - 4
psiphon/serverApi.go

@@ -37,7 +37,7 @@ type Session struct {
 	sessionId          string
 	config             *Config
 	tunnel             *Tunnel
-	pendingConns       *PendingConns
+	pendingConns       *Conns
 	psiphonHttpsClient *http.Client
 }
 
@@ -50,7 +50,7 @@ func NewSession(
 	tunnel *Tunnel,
 	localHttpProxyAddress, sessionId string) (session *Session, err error) {
 
-	pendingConns := new(PendingConns)
+	pendingConns := new(Conns)
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel, pendingConns, localHttpProxyAddress)
 	if err != nil {
 		return nil, ContextError(err)
@@ -260,8 +260,7 @@ func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, er
 // http.Client should use the "http://" scheme. Otherwise http.Transport will try to do another TLS
 // handshake inside the explicit TLS session.
 func makePsiphonHttpsClient(
-	tunnel *Tunnel,
-	pendingConns *PendingConns,
+	tunnel *Tunnel, pendingConns *Conns,
 	localHttpProxyAddress string) (httpsClient *http.Client, err error) {
 
 	certificate, err := DecodeCertificate(tunnel.serverEntry.WebServerCertificate)

+ 9 - 3
psiphon/socksProxy.go

@@ -36,6 +36,7 @@ type SocksProxy struct {
 	stoppedSignal chan struct{}
 	listener      *socks.SocksListener
 	waitGroup     *sync.WaitGroup
+	openConns     *Conns
 }
 
 // NewSocksProxy initializes a new SOCKS server. It begins listening for
@@ -50,7 +51,9 @@ func NewSocksProxy(listenPort int, tunnel *Tunnel, stoppedSignal chan struct{})
 		tunnel:        tunnel,
 		stoppedSignal: stoppedSignal,
 		listener:      listener,
-		waitGroup:     new(sync.WaitGroup)}
+		waitGroup:     new(sync.WaitGroup),
+		openConns:     new(Conns),
+	}
 	proxy.waitGroup.Add(1)
 	go proxy.acceptSocksConnections()
 	Notice(NOTICE_SOCKS_PROXY, "local SOCKS proxy running at address %s", proxy.listener.Addr().String())
@@ -62,10 +65,13 @@ func NewSocksProxy(listenPort int, tunnel *Tunnel, stoppedSignal chan struct{})
 func (proxy *SocksProxy) Close() {
 	proxy.listener.Close()
 	proxy.waitGroup.Wait()
+	proxy.openConns.CloseAll()
 }
 
-func socksConnectionHandler(tunnel *Tunnel, localSocksConn *socks.SocksConn) (err error) {
+func (proxy *SocksProxy) socksConnectionHandler(tunnel *Tunnel, localSocksConn *socks.SocksConn) (err error) {
 	defer localSocksConn.Close()
+	defer proxy.openConns.Remove(localSocksConn)
+	proxy.openConns.Add(localSocksConn)
 	remoteSshForward, err := tunnel.sshClient.Dial("tcp", localSocksConn.Req.Target)
 	if err != nil {
 		return ContextError(err)
@@ -119,7 +125,7 @@ func (proxy *SocksProxy) acceptSocksConnections() {
 			continue
 		}
 		go func() {
-			err := socksConnectionHandler(proxy.tunnel, socksConnection)
+			err := proxy.socksConnectionHandler(proxy.tunnel, socksConnection)
 			if err != nil {
 				Notice(NOTICE_ALERT, "%s", ContextError(err))
 			}

+ 1 - 1
psiphon/tunnel.go

@@ -79,7 +79,7 @@ func (tunnel *Tunnel) Close() {
 func EstablishTunnel(
 	requiredProtocol, sessionId string,
 	serverEntry *ServerEntry,
-	pendingConns *PendingConns) (tunnel *Tunnel, err error) {
+	pendingConns *Conns) (tunnel *Tunnel, err error) {
 	// Select the protocol
 	var selectedProtocol string
 	// TODO: properly handle protocols (e.g. FRONTED-MEEK-OSSH) vs. capabilities (e.g., {FRONTED-MEEK, OSSH})