Răsfoiți Sursa

Fix: further revised fix for 3b29161

* The revised fix in a01e135 was still flawed.
* The a01e135 revision passed the interruptibleTCPSocket
  struct by value to interruptibleTCPClose, which created
  a copy of the fd which then created a double-close race
  condition, which interfered with state in other components
  using the reassigned fd value.
* Even with the double-close fixed, it remained possible
  for syscall.Connect to be called on a previously closed
  fd, which would conflict with external state when the fd
  value was already reassigned to some other component.
* Now no longer attempting to use syscall.Close to interrupt
  a syscall.Connect. This means potentially more goroutines
  remain running after a Controller.Stop(), with the
  duration depending on the host OS TCP connect timeout.
* Domain name resolve is now both part of the timeout
  and interruptable (in the remains-running sense).
* Fix: cleanup resources for abandoned net.Dials. This
  applies to Windows platform.
* Needs more review: recheck concurrency issues and
  danging goroutine concerns.
Rod Hynes 10 ani în urmă
părinte
comite
3dbf7e8422

+ 75 - 16
psiphon/TCPConn.go

@@ -21,14 +21,15 @@ package psiphon
 
 import (
 	"errors"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
 	"net"
 	"sync"
 	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
 )
 
 // TCPConn is a customized TCP connection that:
-// - can be interrupted while connecting;
+// - can be interrupted while dialing;
 // - implements a connect timeout;
 // - uses an upstream proxy when specified, and includes
 //   upstream proxy dialing in the connect timeout;
@@ -36,9 +37,9 @@ import (
 //   routing compatibility, for example);
 type TCPConn struct {
 	net.Conn
-	mutex         sync.Mutex
-	isClosed      bool
-	interruptible interruptibleTCPSocket
+	mutex      sync.Mutex
+	isClosed   bool
+	dialResult chan error
 }
 
 // NewTCPDialer creates a TCPDialer.
@@ -113,21 +114,79 @@ func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, err
 	return dialer
 }
 
-// Close terminates a connected (net.Conn) or connecting (socketFd) TCPConn.
-// A mutex is required to support net.Conn concurrency semantics.
-// Note also use of mutex around conn.interruptible and conn.Conn in
-// TCPConn_unix.go.
+// interruptibleTCPDial establishes a TCP network connection. A conn is added
+// to config.PendingConns before blocking on network I/O, which enables interruption.
+// The caller is responsible for removing an established conn from PendingConns.
+//
+// Note that interruption does not actually cancel a connection in progress; it
+// stops waiting for the goroutine blocking on connect()/Dial.
+func interruptibleTCPDial(addr string, config *DialConfig) (*TCPConn, error) {
+
+	// Buffers the first result; senders should discard results when
+	// sending would block, as that means the first result is already set.
+	conn := &TCPConn{dialResult: make(chan error, 1)}
+
+	// Enable interruption
+	if !config.PendingConns.Add(conn) {
+		return nil, ContextError(errors.New("pending connections already closed"))
+	}
+
+	// Call the blocking Connect() in a goroutine
+	// Note: since this goroutine may be left running after an interrupt, don't
+	// call Notice() or perform other actions unexpected after a Controller stops.
+	// The lifetime of the goroutine may depend on the host OS TCP connect timeout
+	// when tcpDial, amoung other things, when makes a blocking syscall.Connect()
+	// call.
+	go func() {
+		netConn, err := tcpDial(addr, config, conn.dialResult)
+
+		// Mutex is necessary for referencing conn.isClosed and conn.Conn as
+		// TCPConn.Close may be called while this goroutine is running.
+		conn.mutex.Lock()
+
+		// If already interrupted, cleanup the net.Conn resource and discard.
+		if conn.isClosed && netConn != nil {
+			netConn.Close()
+			conn.mutex.Unlock()
+			return
+		}
+
+		conn.Conn = netConn
+		conn.mutex.Unlock()
+
+		select {
+		case conn.dialResult <- err:
+		default:
+		}
+	}()
+
+	// Wait until Dial completes (or times out) or until interrupt
+	err := <-conn.dialResult
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	return conn, nil
+}
+
+// Close terminates a connected TCPConn or interrupts a dialing TCPConn.
 func (conn *TCPConn) Close() (err error) {
 	conn.mutex.Lock()
 	defer conn.mutex.Unlock()
 
-	if !conn.isClosed {
-		conn.isClosed = true
-		if conn.Conn == nil {
-			err = interruptibleTCPClose(conn.interruptible)
-		} else {
-			err = conn.Conn.Close()
-		}
+	if conn.isClosed {
+		return
 	}
+	conn.isClosed = true
+
+	if conn.Conn != nil {
+		err = conn.Conn.Close()
+	}
+
+	select {
+	case conn.dialResult <- errors.New("dial interrupted"):
+	default:
+	}
+
 	return err
 }

+ 117 - 0
psiphon/TCPConn_bind.go

@@ -0,0 +1,117 @@
+// +build !windows
+
+/*
+ * Copyright (c) 2015, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"errors"
+	"fmt"
+	"net"
+	"os"
+	"strconv"
+	"syscall"
+	"time"
+)
+
+// tcpDial is the platform-specific part of interruptibleTCPDial
+//
+// To implement socket device binding, the lower-level syscall APIs are used.
+// The sequence of syscalls in this implementation are taken from:
+// https://code.google.com/p/go/issues/detail?id=6966
+func tcpDial(addr string, config *DialConfig, dialResult chan error) (net.Conn, error) {
+
+	// Like interruption, this timeout doesn't stop this connection goroutine,
+	// it just unblocks the calling interruptibleTCPDial.
+	if config.ConnectTimeout != 0 {
+		time.AfterFunc(config.ConnectTimeout, func() {
+			select {
+			case dialResult <- errors.New("connect timeout"):
+			default:
+			}
+		})
+	}
+
+	// Get the remote IP and port, resolving a domain name if necessary
+	host, strPort, err := net.SplitHostPort(addr)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	port, err := strconv.Atoi(strPort)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	ipAddrs, err := LookupIP(host, config)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	if len(ipAddrs) < 1 {
+		return nil, ContextError(errors.New("no IP address"))
+	}
+
+	// Select an IP at random from the list, so we're not always
+	// trying the same IP (when > 1) which may be blocked.
+	// TODO: retry all IPs until one connects? For now, this retry
+	// will happen on subsequent TCPDial calls, when a different IP
+	// is selected.
+	index, err := MakeSecureRandomInt(len(ipAddrs))
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	// TODO: IPv6 support
+	var ip [4]byte
+	copy(ip[:], ipAddrs[index].To4())
+
+	// Create a socket and bind to device, when configured to do so
+	socketFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	if config.DeviceBinder != nil {
+		// WARNING: this potentially violates the direction to not call into
+		// external components after the Controller may have been stopped.
+		// TODO: rework DeviceBinder as an internal 'service' which can trap
+		// external calls when they should not be made?
+		err = config.DeviceBinder.BindToDevice(socketFd)
+		if err != nil {
+			syscall.Close(socketFd)
+			return nil, ContextError(fmt.Errorf("BindToDevice failed: %s", err))
+		}
+	}
+
+	sockAddr := syscall.SockaddrInet4{Addr: ip, Port: port}
+	err = syscall.Connect(socketFd, &sockAddr)
+	if err != nil {
+		syscall.Close(socketFd)
+		return nil, ContextError(err)
+	}
+
+	// Convert the socket fd to a net.Conn
+	file := os.NewFile(uintptr(socketFd), "")
+	netConn, err := net.FileConn(file) // net.FileConn() dups socketFd
+	file.Close()                       // file.Close() closes socketFd
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	return netConn, nil
+}

+ 37 - 0
psiphon/TCPConn_nobind.go

@@ -0,0 +1,37 @@
+// +build windows
+
+/*
+ * Copyright (c) 2015, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"errors"
+	"net"
+)
+
+// tcpDial is the platform-specific part of interruptibleTCPDial
+func tcpDial(addr string, config *DialConfig, dialResult chan error) (net.Conn, error) {
+
+	if config.DeviceBinder != nil {
+		return nil, ContextError(errors.New("psiphon.interruptibleTCPDial with DeviceBinder not supported"))
+	}
+
+	return net.DialTimeout("tcp", addr, config.ConnectTimeout)
+}

+ 0 - 166
psiphon/TCPConn_unix.go

@@ -1,166 +0,0 @@
-// +build android darwin dragonfly freebsd linux nacl netbsd openbsd solaris
-
-/*
- * Copyright (c) 2015, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
-package psiphon
-
-import (
-	"errors"
-	"fmt"
-	"net"
-	"os"
-	"strconv"
-	"syscall"
-	"time"
-)
-
-type interruptibleTCPSocket struct {
-	socketFd int
-}
-
-const _INVALID_FD = -1
-
-// interruptibleTCPDial establishes a TCP network connection. A conn is added
-// to config.PendingConns before blocking on network IO, which enables interruption.
-// The caller is responsible for removing an established conn from PendingConns.
-//
-// To implement socket device binding and interruptible connecting, the lower-level
-// syscall APIs are used. The sequence of syscalls in this implementation are
-// taken from: https://code.google.com/p/go/issues/detail?id=6966
-func interruptibleTCPDial(addr string, config *DialConfig) (*TCPConn, error) {
-
-	// Get the remote IP and port, resolving a domain name if necessary
-	// TODO: domain name resolution isn't interruptible
-	host, strPort, err := net.SplitHostPort(addr)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	port, err := strconv.Atoi(strPort)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	ipAddrs, err := LookupIP(host, config)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	if len(ipAddrs) < 1 {
-		return nil, ContextError(errors.New("no IP address"))
-	}
-
-	// Select an IP at random from the list, so we're not always
-	// trying the same IP (when > 1) which may be blocked.
-	// TODO: retry all IPs until one connects? For now, this retry
-	// will happen on subsequent TCPDial calls, when a different IP
-	// is selected.
-	index, err := MakeSecureRandomInt(len(ipAddrs))
-	if err != nil {
-		return nil, ContextError(err)
-	}
-
-	// TODO: IPv6 support
-	var ip [4]byte
-	copy(ip[:], ipAddrs[index].To4())
-
-	// Create a socket and then, before connecting, add a TCPConn with
-	// the unconnected socket to pendingConns. This allows pendingConns to
-	// interrupt/abort connections in progress.
-	socketFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-
-	conn := &TCPConn{interruptible: interruptibleTCPSocket{socketFd: socketFd}}
-
-	// Cleanup on error
-	defer func() {
-		// Mutex required since conn may be in pendingConns, through which
-		// conn.Close() may be called from another goroutine. There are two
-		// risks:
-		// 1. standard race conditions reading/writing conn members.
-		// 2. closing the fd more than once, with the chance that other
-		//    concurrent goroutines or threads may have already reused the fd.
-		conn.mutex.Lock()
-		if err != nil && conn.interruptible.socketFd != _INVALID_FD {
-			syscall.Close(conn.interruptible.socketFd)
-			conn.interruptible.socketFd = _INVALID_FD
-		}
-		conn.mutex.Unlock()
-	}()
-
-	// Enable interruption
-	if !config.PendingConns.Add(conn) {
-		return nil, ContextError(errors.New("pending connections already closed"))
-	}
-
-	if config.DeviceBinder != nil {
-		err = config.DeviceBinder.BindToDevice(conn.interruptible.socketFd)
-		if err != nil {
-			return nil, ContextError(fmt.Errorf("BindToDevice failed: %s", err))
-		}
-	}
-
-	// Connect the socket
-	// TODO: adjust the timeout to account for time spent resolving hostname
-	sockAddr := syscall.SockaddrInet4{Addr: ip, Port: port}
-	if config.ConnectTimeout != 0 {
-		errChannel := make(chan error, 2)
-		time.AfterFunc(config.ConnectTimeout, func() {
-			errChannel <- errors.New("connect timeout")
-		})
-		go func() {
-			errChannel <- syscall.Connect(conn.interruptible.socketFd, &sockAddr)
-		}()
-		err = <-errChannel
-	} else {
-		err = syscall.Connect(conn.interruptible.socketFd, &sockAddr)
-	}
-	if err != nil {
-		return nil, ContextError(err)
-	}
-
-	// Convert the socket fd to a net.Conn
-	// See mutex note above.
-	conn.mutex.Lock()
-
-	file := os.NewFile(uintptr(conn.interruptible.socketFd), "")
-	fileConn, err := net.FileConn(file) // net.FileConn() dups the fd
-	file.Close()                        // file.Close() closes the fd
-	conn.interruptible.socketFd = _INVALID_FD
-	if err != nil {
-		conn.mutex.Unlock()
-		return nil, ContextError(err)
-	}
-	conn.Conn = fileConn
-	conn.mutex.Unlock()
-
-	return conn, nil
-}
-
-func interruptibleTCPClose(interruptible interruptibleTCPSocket) error {
-
-	// Assumes conn.mutex is held
-
-	if interruptible.socketFd == _INVALID_FD {
-		return nil
-	}
-	err := syscall.Close(interruptible.socketFd)
-	interruptible.socketFd = _INVALID_FD
-	return err
-}

+ 0 - 81
psiphon/TCPConn_windows.go

@@ -1,81 +0,0 @@
-// +build windows
-
-/*
- * Copyright (c) 2015, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
-package psiphon
-
-import (
-	"errors"
-	"net"
-)
-
-// interruptibleTCPSocket simulates interruptible semantics on Windows. A call
-// to interruptibleTCPClose doesn't actually interrupt a connect in progress,
-// but abandons a dial that's running in a goroutine.
-// Interruptible semantics are required by the controller for timely component
-// state changes.
-// TODO: implement true interruptible semantics on Windows; use syscall and
-// a HANDLE similar to how TCPConn_unix uses a file descriptor?
-type interruptibleTCPSocket struct {
-	results chan *interruptibleDialResult
-}
-
-type interruptibleDialResult struct {
-	netConn net.Conn
-	err     error
-}
-
-// interruptibleTCPDial establishes a TCP network connection. A conn is added
-// to config.PendingConns before blocking on network IO, which enables interruption.
-// The caller is responsible for removing an established conn from PendingConns.
-func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err error) {
-	if config.DeviceBinder != nil {
-		return nil, ContextError(errors.New("psiphon.interruptibleTCPDial with DeviceBinder not supported on Windows"))
-	}
-
-	// Enable interruption
-	conn = &TCPConn{
-		interruptible: interruptibleTCPSocket{results: make(chan *interruptibleDialResult, 2)}}
-
-	if !config.PendingConns.Add(conn) {
-		return nil, ContextError(errors.New("pending connections already closed"))
-	}
-
-	// Call the blocking Dial in a goroutine
-	results := conn.interruptible.results
-	go func() {
-		netConn, err := net.DialTimeout("tcp", addr, config.ConnectTimeout)
-		results <- &interruptibleDialResult{netConn, err}
-	}()
-
-	// Block until Dial completes (or times out) or until interrupt
-	result := <-conn.interruptible.results
-	if result.err != nil {
-		return nil, ContextError(result.err)
-	}
-	conn.Conn = result.netConn
-
-	return conn, nil
-}
-
-func interruptibleTCPClose(interruptible interruptibleTCPSocket) error {
-	interruptible.results <- &interruptibleDialResult{nil, errors.New("socket interrupted")}
-	return nil
-}

+ 1 - 1
psiphon/controller.go

@@ -742,7 +742,7 @@ func (controller *Controller) stopEstablishing() {
 	}
 	NoticeInfo("stop establishing")
 	close(controller.stopEstablishingBroadcast)
-	// Note: on Windows, interruptibleTCPClose doesn't really interrupt socket connects
+	// Note: interruptibleTCPClose doesn't really interrupt socket connects
 	// and may leave goroutines running for a time after the Wait call.
 	controller.establishPendingConns.CloseAll()
 	// Note: establishCandidateGenerator closes controller.candidateServerEntries

+ 4 - 4
psiphon/net.go

@@ -49,10 +49,10 @@ type DialConfig struct {
 
 	ConnectTimeout time.Duration
 
-	// PendingConns is used to interrupt dials in progress.
-	// The dial may be interrupted using PendingConns.CloseAll(): on platforms
-	// that support this, the new conn is added to pendingConns before the network
-	// connect begins and removed from pendingConns once the connect succeeds or fails.
+	// PendingConns is used to track and interrupt dials in progress.
+	// Dials may be interrupted using PendingConns.CloseAll(). Once instantiated,
+	// a conn is added to pendingConns before the network connect begins and
+	// removed from pendingConns once the connect succeeds or fails.
 	PendingConns *Conns
 
 	// BindToDevice parameters are used to exclude connections and