Procházet zdrojové kódy

Implemented meek protocol support; integrated meek, with refactoring of conn, dialer, and tunnel code; added config parameter to specify tunnel protocol

Rod Hynes před 11 roky
rodič
revize
69dc5fe9f8

+ 1 - 1
README.md

@@ -17,9 +17,9 @@ This project is currently at the proof-of-concept stage. Current production Psip
 
 
 * prefilter entries by capability; don't log "server does not have sufficient capabilities"
 * prefilter entries by capability; don't log "server does not have sufficient capabilities"
 * log noise: "use of closed network connection"
 * log noise: "use of closed network connection"
+* log noise(?): 'Unsolicited response received on idle HTTP channel starting with "H"'
 * use ContextError in more places
 * use ContextError in more places
 * build/test on Android and iOS
 * build/test on Android and iOS
-* integrate meek-client
 * disconnect all local proxy clients when tunnel disconnected
 * disconnect all local proxy clients when tunnel disconnected
 * add connection and idle timeouts to proxied connections where appropriate
 * add connection and idle timeouts to proxied connections where appropriate
 
 

+ 8 - 0
psiphon/config.go

@@ -35,6 +35,7 @@ type Config struct {
 	ClientPlatform                     string
 	ClientPlatform                     string
 	TunnelWholeDevice                  int
 	TunnelWholeDevice                  int
 	EgressRegion                       string
 	EgressRegion                       string
+	TunnelProtocol                     string
 	LocalSocksProxyPort                int
 	LocalSocksProxyPort                int
 	LocalHttpProxyPort                 int
 	LocalHttpProxyPort                 int
 }
 }
@@ -65,5 +66,12 @@ func LoadConfig(filename string) (*Config, error) {
 	if config.RemoteServerListSignaturePublicKey == "" {
 	if config.RemoteServerListSignaturePublicKey == "" {
 		return nil, errors.New("remote server list signature public key is missing from the configuration file")
 		return nil, errors.New("remote server list signature public key is missing from the configuration file")
 	}
 	}
+
+	if config.TunnelProtocol != "" {
+		if !Contains(SupportedTunnelProtocols, config.TunnelProtocol) {
+			return nil, errors.New("invalid tunnel protocol")
+		}
+	}
+
 	return &config, nil
 	return &config, nil
 }
 }

+ 18 - 114
psiphon/conn.go

@@ -20,138 +20,42 @@
 package psiphon
 package psiphon
 
 
 import (
 import (
-	"errors"
 	"net"
 	"net"
 	"sync"
 	"sync"
-	"time"
 )
 )
 
 
-// Conn is a customized network connection that:
-// - can be interrupted while connecting;
-// - turns on TCP keep alive;
-// - implements idle read/write timeouts;
-// - supports sending a signal to a channel when it disconnects;
-// - can be bound to a specific system device (for Android VpnService
-//   routing compatibility, for example).
-type Conn struct {
-	net.Conn
-	mutex         sync.Mutex
-	interruptible interruptibleConn
-	isClosed      bool
-	closedSignal  chan bool
-	readTimeout   time.Duration
-	writeTimeout  time.Duration
-}
+// Dialer is a custom dialer compatible with http.Transport.Dial.
+type Dialer func(string, string) (net.Conn, error)
 
 
-// Dial creates a new, connected Conn. The connection may be interrupted
-// using pendingConns.interrupt(): on platforms that support this, the new
-// Conn is added to pendingConns before the socket connect begins.
-// The caller is responsible for removing any Conns added to pendingConns,
-// even when Dial returns an error.
-// To implement device binding and interruptible connecting, the lower-level
-// syscall APIs are used. The sequence of syscalls in this implementation are
-// taken from: https://code.google.com/p/go/issues/detail?id=6966
-// connectTimeout is rounded up to the nearest second on some platforms.
-func Dial(
-	ipAddress string, port int,
-	connectTimeout, readTimeout, writeTimeout time.Duration,
-	pendingConns *PendingConns) (conn *Conn, err error) {
-
-	conn, err = interruptibleDial(ipAddress, port, connectTimeout, readTimeout, writeTimeout, pendingConns)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	return conn, nil
-}
-
-// SetClosedSignal sets the channel which will be signaled
-// when the connection is closed. This function returns an error
-// if the connection is already closed (and would never send
-// the signal).
-func (conn *Conn) SetClosedSignal(closedSignal chan bool) (err error) {
-	conn.mutex.Lock()
-	defer conn.mutex.Unlock()
-	if conn.isClosed {
-		return errors.New("connection is already closed")
-	}
-	conn.closedSignal = closedSignal
-	return nil
-}
-
-// Close terminates a connected (net.Conn) or connecting (socketFd) Conn.
-// A mutex syncs access to conn struct, allowing Close() to be called
-// from a goroutine that wants to interrupt the primary goroutine using
-// the connection.
-func (conn *Conn) Close() (err error) {
-	var closedSignal chan bool
-	conn.mutex.Lock()
-	if !conn.isClosed {
-		if conn.Conn == nil {
-			err = interruptibleClose(conn.interruptible)
-		} else {
-			err = conn.Conn.Close()
-		}
-		closedSignal = conn.closedSignal
-		conn.isClosed = true
-	}
-	conn.mutex.Unlock()
-	if closedSignal != nil {
-		select {
-		case closedSignal <- true:
-		default:
-		}
-	}
-	return err
-}
-
-// Read wraps standard Read to add an idle timeout. The connection
-// is explicitly closed on timeout.
-func (conn *Conn) Read(buffer []byte) (n int, err error) {
-	// Note: no mutex on the conn.readTimeout access
-	if conn.readTimeout != 0 {
-		err = conn.Conn.SetReadDeadline(time.Now().Add(conn.readTimeout))
-		if err != nil {
-			return 0, err
-		}
-	}
-	n, err = conn.Conn.Read(buffer)
-	if err != nil {
-		conn.Close()
-	}
-	return
-}
+// Conn is a net.Conn which supports sending a signal to a channel when
+// it is closed. In Psiphon, this interface is implemented by tunnel
+// connection types (DirectConn and MeekConn) and the close signal is
+// used as one trigger for tearing down the tunnel.
+type Conn interface {
+	net.Conn
 
 
-// Write wraps standard Write to add an idle timeout The connection
-// is explicitly closed on timeout.
-func (conn *Conn) Write(buffer []byte) (n int, err error) {
-	// Note: no mutex on the conn.writeTimeout access
-	if conn.writeTimeout != 0 {
-		err = conn.Conn.SetWriteDeadline(time.Now().Add(conn.writeTimeout))
-		if err != nil {
-			return 0, err
-		}
-	}
-	n, err = conn.Conn.Write(buffer)
-	if err != nil {
-		conn.Close()
-	}
-	return
+	// SetClosedSignal sets the channel which will be signaled
+	// when the connection is closed. This function returns an error
+	// if the connection is already closed (and would never send
+	// the signal). SetClosedSignal and Close may be called by
+	// concurrent goroutines.
+	SetClosedSignal(closedSignal chan struct{}) (err error)
 }
 }
 
 
-// PendingConns is a synchronized list of Conns that's used to coordinate
+// PendingConns is a synchronized list of Conns that is used to coordinate
 // interrupting a set of goroutines establishing connections.
 // interrupting a set of goroutines establishing connections.
 type PendingConns struct {
 type PendingConns struct {
 	mutex sync.Mutex
 	mutex sync.Mutex
-	conns []*Conn
+	conns []Conn
 }
 }
 
 
-func (pendingConns *PendingConns) Add(conn *Conn) {
+func (pendingConns *PendingConns) Add(conn Conn) {
 	pendingConns.mutex.Lock()
 	pendingConns.mutex.Lock()
 	defer pendingConns.mutex.Unlock()
 	defer pendingConns.mutex.Unlock()
 	pendingConns.conns = append(pendingConns.conns, conn)
 	pendingConns.conns = append(pendingConns.conns, conn)
 }
 }
 
 
-func (pendingConns *PendingConns) Remove(conn *Conn) {
+func (pendingConns *PendingConns) Remove(conn Conn) {
 	pendingConns.mutex.Lock()
 	pendingConns.mutex.Lock()
 	defer pendingConns.mutex.Unlock()
 	defer pendingConns.mutex.Unlock()
 	for index, pendingConn := range pendingConns.conns {
 	for index, pendingConn := range pendingConns.conns {

+ 155 - 0
psiphon/directConn.go

@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2014, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"errors"
+	"net"
+	"sync"
+	"time"
+)
+
+// DirectConn is a customized network connection that:
+// - can be interrupted while connecting;
+// - implements idle read/write timeouts;
+// - can be bound to a specific system device (for Android VpnService
+//   routing compatibility, for example).
+// - implements the psiphon.Conn interface
+type DirectConn struct {
+	net.Conn
+	mutex         sync.Mutex
+	isClosed      bool
+	closedSignal  chan struct{}
+	interruptible interruptibleConn
+	readTimeout   time.Duration
+	writeTimeout  time.Duration
+}
+
+// NewDirectDialer creates a DirectDialer.
+func NewDirectDialer(
+	connectTimeout, readTimeout, writeTimeout time.Duration,
+	pendingConns *PendingConns) Dialer {
+
+	return func(network, addr string) (net.Conn, error) {
+		if network != "tcp" {
+			Fatal("unsupported network type in NewDirectDialer")
+		}
+		return DirectDial(
+			addr,
+			connectTimeout, readTimeout, writeTimeout,
+			pendingConns)
+	}
+}
+
+// DirectDial creates a new, connected DirectConn. The connection may be
+// interrupted using pendingConns.interrupt(): on platforms that support this,
+// the new DirectConn is added to pendingConns before the socket connect begins
+// and removed from pendingConns once the connect succeeds or fails.
+func DirectDial(
+	addr string,
+	connectTimeout, readTimeout, writeTimeout time.Duration,
+	pendingConns *PendingConns) (conn *DirectConn, err error) {
+
+	conn, err = interruptibleDial(addr, connectTimeout, readTimeout, writeTimeout, pendingConns)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return conn, nil
+}
+
+// SetClosedSignal implements psiphon.Conn.SetClosedSignal
+func (conn *DirectConn) SetClosedSignal(closedSignal chan struct{}) (err error) {
+	conn.mutex.Lock()
+	defer conn.mutex.Unlock()
+	if conn.isClosed {
+		return ContextError(errors.New("connection is already closed"))
+	}
+	conn.closedSignal = closedSignal
+	return nil
+}
+
+// Close terminates a connected (net.Conn) or connecting (socketFd) DirectConn.
+// A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
+func (conn *DirectConn) Close() (err error) {
+	conn.mutex.Lock()
+	defer conn.mutex.Unlock()
+	if !conn.isClosed {
+		if conn.Conn == nil {
+			err = interruptibleClose(conn.interruptible)
+		} else {
+			err = conn.Conn.Close()
+		}
+		conn.isClosed = true
+		select {
+		case conn.closedSignal <- *new(struct{}):
+		default:
+		}
+	}
+	return err
+}
+
+// Read wraps standard Read to add an idle timeout. The connection
+// is explicitly closed on timeout.
+func (conn *DirectConn) Read(buffer []byte) (n int, err error) {
+	// Note: no mutex on the conn.readTimeout access
+	if conn.readTimeout != 0 {
+		err = conn.Conn.SetReadDeadline(time.Now().Add(conn.readTimeout))
+		if err != nil {
+			return 0, ContextError(err)
+		}
+	}
+	n, err = conn.Conn.Read(buffer)
+	if err != nil {
+		conn.Close()
+	}
+	return
+}
+
+// Write wraps standard Write to add an idle timeout The connection
+// is explicitly closed on timeout.
+func (conn *DirectConn) Write(buffer []byte) (n int, err error) {
+	// Note: no mutex on the conn.writeTimeout access
+	if conn.writeTimeout != 0 {
+		err = conn.Conn.SetWriteDeadline(time.Now().Add(conn.writeTimeout))
+		if err != nil {
+			return 0, ContextError(err)
+		}
+	}
+	n, err = conn.Conn.Write(buffer)
+	if err != nil {
+		conn.Close()
+	}
+	return
+}
+
+// Override implementation of net.Conn.SetDeadline
+func (conn *DirectConn) SetDeadline(t time.Time) error {
+	return ContextError(errors.New("not supported"))
+}
+
+// Override implementation of net.Conn.SetReadDeadline
+func (conn *DirectConn) SetReadDeadline(t time.Time) error {
+	return ContextError(errors.New("not supported"))
+}
+
+// Override implementation of net.Conn.SetWriteDeadline
+func (conn *DirectConn) SetWriteDeadline(t time.Time) error {
+	return ContextError(errors.New("not supported"))
+}

+ 123 - 0
psiphon/directConn_unix.go

@@ -0,0 +1,123 @@
+// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris
+
+/*
+ * Copyright (c) 2014, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"errors"
+	"net"
+	"os"
+	"strconv"
+	"syscall"
+	"time"
+)
+
+type interruptibleConn struct {
+	socketFd int
+}
+
+// interruptibleDial creates a socket connection.
+// To implement device binding and interruptible connecting, the lower-level
+// syscall APIs are used. The sequence of syscalls in this implementation are
+// taken from: https://code.google.com/p/go/issues/detail?id=6966
+func interruptibleDial(
+	addr string,
+	connectTimeout, readTimeout, writeTimeout time.Duration,
+	pendingConns *PendingConns) (conn *DirectConn, err error) {
+	// Create a socket and then, before connecting, add a DirectConn with
+	// the unconnected socket to pendingConns. This allows pendingConns to
+	// abort connections in progress.
+	socketFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	defer func() {
+		// Cleanup on error
+		if err != nil {
+			syscall.Close(socketFd)
+		}
+	}()
+	conn = &DirectConn{
+		interruptible: interruptibleConn{socketFd: socketFd},
+		readTimeout:   readTimeout,
+		writeTimeout:  writeTimeout}
+	pendingConns.Add(conn)
+	// Before connecting, ensure the socket doesn't route through a VPN interface
+	// TODO: this method requires root, which we won't have on Android in VpnService mode
+	// an alternative may be to use http://golang.org/pkg/syscall/#UnixRights to send the
+	// fd to the main Android process which receives the fd with
+	// http://developer.android.com/reference/android/net/LocalSocket.html#getAncillaryFileDescriptors%28%29
+	// and then calls
+	// http://developer.android.com/reference/android/net/VpnService.html#protect%28int%29.
+	// See, for example:
+	// https://code.google.com/p/ics-openvpn/source/browse/main/src/main/java/de/blinkt/openvpn/core/OpenVpnManagementThread.java#164
+	/*
+		const SO_BINDTODEVICE = 0x19 // only defined for Linux
+		err = syscall.SetsockoptString(socketFd, syscall.SOL_SOCKET, SO_BINDTODEVICE, deviceName)
+	*/
+	// Resolve domain name
+	// TODO: ensure DNS UDP traffic doesn't route through a VPN interface
+	// ...use https://golang.org/src/pkg/net/dnsclient.go?
+	host, strPort, err := net.SplitHostPort(addr)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	port, err := strconv.Atoi(strPort)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	ipAddr, err := net.ResolveIPAddr("ip", host)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	var ip [4]byte
+	copy(ip[:], ipAddr.IP)
+	// Connect the socket
+	sockAddr := syscall.SockaddrInet4{Addr: ip, Port: port}
+	if connectTimeout != 0 {
+		errChannel := make(chan error, 2)
+		time.AfterFunc(connectTimeout, func() {
+			errChannel <- errors.New("connect timeout")
+		})
+		go func() {
+			errChannel <- syscall.Connect(conn.interruptible.socketFd, &sockAddr)
+		}()
+		err = <-errChannel
+	} else {
+		err = syscall.Connect(conn.interruptible.socketFd, &sockAddr)
+	}
+	pendingConns.Remove(conn)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	// Convert the syscall socket to a net.Conn
+	file := os.NewFile(uintptr(conn.interruptible.socketFd), "")
+	defer file.Close()
+	conn.Conn, err = net.FileConn(file)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return conn, nil
+}
+
+func interruptibleClose(interruptible interruptibleConn) error {
+	return syscall.Close(interruptible.socketFd)
+}

+ 51 - 0
psiphon/directConn_windows.go

@@ -0,0 +1,51 @@
+// +build windows
+
+/*
+ * Copyright (c) 2014, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"fmt"
+	"net"
+	"time"
+)
+
+type interruptibleConn struct {
+}
+
+func interruptibleDial(
+	addr string,
+	connectTimeout, readTimeout, writeTimeout time.Duration,
+	pendingConns *PendingConns) (conn *DirectConn, err error) {
+	// Note: using net.Dial(); interruptible connections not supported on Windows
+	netConn, err := net.DialTimeout("tcp", addr, connectTimeout)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	conn = &DirectConn{
+		Conn:         netConn,
+		readTimeout:  readTimeout,
+		writeTimeout: writeTimeout}
+	return conn, nil
+}
+
+func interruptibleClose(interruptible interruptibleConn) error {
+	Fatal("interruptibleClose not supported on Windows")
+}

+ 4 - 4
psiphon/httpProxy.go

@@ -32,14 +32,14 @@ import (
 // the tunnel SSH client.
 // the tunnel SSH client.
 type HttpProxy struct {
 type HttpProxy struct {
 	tunnel        *Tunnel
 	tunnel        *Tunnel
-	failureSignal chan bool
+	stoppedSignal chan struct{}
 	listener      net.Listener
 	listener      net.Listener
 	waitGroup     *sync.WaitGroup
 	waitGroup     *sync.WaitGroup
 	httpRelay     *http.Transport
 	httpRelay     *http.Transport
 }
 }
 
 
 // NewHttpProxy initializes and runs a new HTTP proxy server.
 // NewHttpProxy initializes and runs a new HTTP proxy server.
-func NewHttpProxy(listenPort int, tunnel *Tunnel, failureSignal chan bool) (proxy *HttpProxy, err error) {
+func NewHttpProxy(listenPort int, tunnel *Tunnel, stoppedSignal chan struct{}) (proxy *HttpProxy, err error) {
 	listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
 	listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -55,7 +55,7 @@ func NewHttpProxy(listenPort int, tunnel *Tunnel, failureSignal chan bool) (prox
 	}
 	}
 	proxy = &HttpProxy{
 	proxy = &HttpProxy{
 		tunnel:        tunnel,
 		tunnel:        tunnel,
-		failureSignal: failureSignal,
+		stoppedSignal: stoppedSignal,
 		listener:      listener,
 		listener:      listener,
 		waitGroup:     new(sync.WaitGroup),
 		waitGroup:     new(sync.WaitGroup),
 		httpRelay:     transport,
 		httpRelay:     transport,
@@ -184,7 +184,7 @@ func (proxy *HttpProxy) serveHttpRequests() {
 	err := httpServer.Serve(proxy.listener)
 	err := httpServer.Serve(proxy.listener)
 	if err != nil {
 	if err != nil {
 		select {
 		select {
-		case proxy.failureSignal <- true:
+		case proxy.stoppedSignal <- *new(struct{}):
 		default:
 		default:
 		}
 		}
 		Notice(NOTICE_ALERT, "%s", ContextError(err))
 		Notice(NOTICE_ALERT, "%s", ContextError(err))

+ 442 - 0
psiphon/meekConn.go

@@ -0,0 +1,442 @@
+/*
+ * Copyright (c) 2014, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"bytes"
+	"code.google.com/p/go.crypto/nacl/box"
+	"crypto/rand"
+	"encoding/base64"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+	"net/url"
+	"sync"
+	"time"
+)
+
+// MeekConn is based on meek-client.go from Tor and Psiphon:
+//
+// https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
+// CC0 1.0 Universal
+//
+// https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
+
+const (
+	MEEK_PROTOCOL_VERSION     = 1
+	MEEK_COOKIE_MAX_PADDING   = 32
+	MAX_SEND_PAYLOAD_LENGTH   = 65536
+	READ_PAYLOAD_CHUNK_LENGTH = 65536
+	MIN_POLL_INTERVAL         = 100 * time.Millisecond
+	MAX_POLL_INTERVAL         = 5 * time.Second
+	POLL_INTERNAL_MULTIPLIER  = 1.5
+)
+
+// MeekConn is a network connection that tunnels TCP over HTTP and supports "fronting". Meek sends
+// client->server flow in HTTP request bodies and receives server->client flow in HTTP response bodies.
+// Polling is used to achieve full duplex TCP.
+//
+// Fronting is an obfuscation technique in which the connection
+// to a web server, typically a CDN, is indistinguishable from any other HTTPS connection to the generic
+// "fronting domain" -- the HTTP Host header is used to route the requests to the actual destination.
+// See https://trac.torproject.org/projects/tor/wiki/doc/meek for more details.
+//
+// MeekConn also operates in unfronted mode, in which plain HTTP connections are made without routing
+// through a CDN.
+type MeekConn struct {
+	url                 *url.URL
+	cookie              *http.Cookie
+	transport           *http.Transport
+	mutex               sync.Mutex
+	isClosed            bool
+	closedSignal        chan struct{}
+	broadcastClosed     chan struct{}
+	relayWaitGroup      *sync.WaitGroup
+	availableReadBuffer chan *bytes.Buffer
+	emptyReadBuffer     chan *bytes.Buffer
+	writeQueue          chan []byte
+}
+
+// NewMeekConn returns an initialized meek connection. A meek connection is
+// an HTTP session which does not depend on an underlying socket connection (although
+// persistent HTTP connections are used for performance). This function does not
+// wait for the connection to be "established" before returning. A goroutine
+// is spawned which will eventually start HTTP polling.
+// useFronting assumes caller has already checked server entry capabilities.
+func NewMeekConn(
+	serverEntry *ServerEntry, sessionId string, useFronting bool,
+	connectTimeout, readTimeout, writeTimeout time.Duration,
+	pendingConns *PendingConns) (meek *MeekConn, err error) {
+	// Configure transport
+	var host string
+	var dialer Dialer
+	directDialer := NewDirectDialer(connectTimeout, readTimeout, writeTimeout, pendingConns)
+	if useFronting {
+		// In this case, host is not what is dialed but is what ends up in the HTTP Host header
+		host = serverEntry.MeekFrontingHost
+		// Custom TLS dialer:
+		//  - ignores the HTTP request address and uses the fronting domain
+		//  - disables SNI -- SNI breaks fronting when used with CDNs that support SNI on the server side.
+		dialer = NewCustomTLSDialer(
+			&CustomTLSConfig{
+				Dial:           directDialer,
+				Timeout:        connectTimeout,
+				FrontingAddr:   serverEntry.MeekFrontingDomain,
+				SendServerName: false,
+			})
+	} else {
+		// In this case, host is both what is dialed and what ends up in the HTTP Host header
+		host = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
+		dialer = directDialer
+	}
+	// Scheme is always "http". Otherwise http.Transport will try to do another TLS
+	// handshake inside the explicit TLS session (in fronting mode).
+	url := &url.URL{
+		Scheme: "http",
+		Host:   host,
+		Path:   "/",
+	}
+	cookie, err := makeCookie(serverEntry, sessionId)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	transport := &http.Transport{
+		Dial: dialer,
+		ResponseHeaderTimeout: TUNNEL_WRITE_TIMEOUT,
+	}
+	// The main loop of a MeekConn is run in the relay() goroutine.
+	// A MeekConn net.Conn concurrency semantics:
+	// "Multiple goroutines may invoke methods on a Conn simultaneously."
+	//
+	// Write() calls and relay() are synchronized with the writeQueue channel. Write sends
+	// payloads into the writeQueue, blocking when a payload is already in the queue as only
+	// one HTTP request is in flight at a time (the channel size is 1).
+	//
+	// Read() calls and relay() are synchronized by passing control of a single readBuffer
+	// (bytes.Buffer). This single buffer may be in the emptyReadBuffer channel (when it is
+	// available and empty), the availableReadBuffer channel (when it is available and contains
+	// data), or "checked out" by relay or Read when they are are writing to or reading from the
+	// buffer, respectively. relay will obtain the buffer from either channel, but Read will only
+	// obtain the buffer from availableReadBuffer, so it blocks when there is no data available
+	// to read.
+	meek = &MeekConn{
+		url:                 url,
+		cookie:              cookie,
+		transport:           transport,
+		broadcastClosed:     make(chan struct{}),
+		relayWaitGroup:      new(sync.WaitGroup),
+		availableReadBuffer: make(chan *bytes.Buffer, 1),
+		emptyReadBuffer:     make(chan *bytes.Buffer, 1),
+		writeQueue:          make(chan []byte, 1),
+	}
+	// TODO: benchmark bytes.Buffer vs. built-in append with slices?
+	meek.emptyReadBuffer <- new(bytes.Buffer)
+	meek.relayWaitGroup.Add(1)
+	go meek.relay()
+	return meek, nil
+}
+
+// SetClosedSignal implements psiphon.Conn.SetClosedSignal
+func (meek *MeekConn) SetClosedSignal(closedSignal chan struct{}) (err error) {
+	meek.mutex.Lock()
+	defer meek.mutex.Unlock()
+	if meek.isClosed {
+		return ContextError(errors.New("connection is already closed"))
+	}
+	meek.closedSignal = closedSignal
+	return nil
+}
+
+// Close terminates the meek connection. Close waits for the relay processing goroutine
+// to stop and releases HTTP transport resources.
+// A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
+// NOTE: currently doesn't interrupt any HTTP request in flight.
+func (meek *MeekConn) Close() (err error) {
+	meek.mutex.Lock()
+	defer meek.mutex.Unlock()
+	if !meek.isClosed {
+		// TODO: meek.transport.CancelRequest() for current request?
+		close(meek.broadcastClosed)
+		meek.relayWaitGroup.Wait()
+		meek.transport.CloseIdleConnections()
+		meek.isClosed = true
+		select {
+		case meek.closedSignal <- *new(struct{}):
+		default:
+		}
+	}
+	return nil
+}
+
+func (meek *MeekConn) closed() bool {
+	meek.mutex.Lock()
+	defer meek.mutex.Unlock()
+	return meek.isClosed
+}
+
+// Read reads data from the connection.
+// net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
+func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
+	if meek.closed() {
+		return 0, ContextError(errors.New("meek connection is closed"))
+	}
+	select {
+	case readBuffer := <-meek.availableReadBuffer:
+		n, err = readBuffer.Read(buffer)
+		if readBuffer.Len() > 0 {
+			meek.availableReadBuffer <- readBuffer
+		} else {
+			meek.emptyReadBuffer <- readBuffer
+		}
+		return n, err
+	case <-meek.broadcastClosed:
+		return 0, ContextError(errors.New("meek connection has closed"))
+	}
+}
+
+// Write writes data to the connection.
+// net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
+func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
+	if meek.closed() {
+		return 0, ContextError(errors.New("meek connection is closed"))
+	}
+	// The data to send is split into MAX_SEND_PAYLOAD_LENGTH chunks as
+	// this is the most that will be sent per HTTP request.
+	for len(buffer) > 0 {
+		nextWrite := MAX_SEND_PAYLOAD_LENGTH
+		if len(buffer) < nextWrite {
+			nextWrite = len(buffer)
+		}
+		// TODO: pool of reusable buffers?
+		queuedWrite := make([]byte, nextWrite)
+		copy(queuedWrite, buffer)
+		buffer = buffer[nextWrite:]
+		select {
+		case meek.writeQueue <- queuedWrite:
+		case <-meek.broadcastClosed:
+			return 0, ContextError(errors.New("meek connection has closed"))
+		}
+	}
+	return len(buffer), nil
+}
+
+// Stub implementation of net.Conn.LocalAddr
+func (meek *MeekConn) LocalAddr() net.Addr {
+	return nil
+}
+
+// Stub implementation of net.Conn.RemoteAddr
+func (meek *MeekConn) RemoteAddr() net.Addr {
+	return nil
+}
+
+// Stub implementation of net.Conn.SetDeadline
+func (meek *MeekConn) SetDeadline(t time.Time) error {
+	return ContextError(errors.New("not supported"))
+}
+
+// Stub implementation of net.Conn.SetReadDeadline
+func (meek *MeekConn) SetReadDeadline(t time.Time) error {
+	return ContextError(errors.New("not supported"))
+}
+
+// Stub implementation of net.Conn.SetWriteDeadline
+func (meek *MeekConn) SetWriteDeadline(t time.Time) error {
+	return ContextError(errors.New("not supported"))
+}
+
+// relay sends and receives tunnelled traffic (payload). An HTTP request is
+// triggered when data is in the write queue or at a polling interval.
+// There's a geometric increase, up to a maximum, in the polling interval when
+// no data is exchanged. Only one HTTP request is in flight at a time.
+func (meek *MeekConn) relay() {
+	defer meek.relayWaitGroup.Done()
+	interval := MIN_POLL_INTERVAL
+	var sendPayload []byte
+	for {
+		sendPayload = nil
+		select {
+		case sendPayload = <-meek.writeQueue:
+		case <-time.After(interval):
+		case <-meek.broadcastClosed:
+			return
+		}
+		receivedPayload, err := meek.roundTrip(sendPayload)
+		if err != nil {
+			Notice(NOTICE_ALERT, "%s", ContextError(err))
+			meek.Close()
+			return
+		}
+		receivedPayloadSize, err := meek.readPayload(receivedPayload)
+		if err != nil {
+			Notice(NOTICE_ALERT, "%s", ContextError(err))
+			meek.Close()
+			return
+		}
+		if receivedPayloadSize > 0 || sendPayload != nil {
+			interval = 0
+		} else if interval == 0 {
+			interval = MIN_POLL_INTERVAL
+		} else {
+			interval = time.Duration(float64(interval) * POLL_INTERNAL_MULTIPLIER)
+			if interval >= MAX_POLL_INTERVAL {
+				interval = MIN_POLL_INTERVAL
+			}
+		}
+	}
+}
+
+// readPayload reads the HTTP response  in chunks, making the read buffer available
+// to MeekConn.Read() calls after each chunk; the intention is to allow bytes to
+// flow back to the reader as soon as possible instead of buffering the entire payload.
+func (meek *MeekConn) readPayload(receivedPayload io.ReadCloser) (totalSize int64, err error) {
+	defer receivedPayload.Close()
+	totalSize = 0
+	for {
+		reader := io.LimitReader(receivedPayload, READ_PAYLOAD_CHUNK_LENGTH)
+		var readBuffer *bytes.Buffer
+		select {
+		case readBuffer = <-meek.availableReadBuffer:
+		case readBuffer = <-meek.emptyReadBuffer:
+		}
+		// TODO: block when readBuffer is too large?
+		n, err := readBuffer.ReadFrom(reader)
+		if err != nil {
+			return 0, ContextError(err)
+		}
+		totalSize += n
+		if n > 0 {
+			meek.availableReadBuffer <- readBuffer
+		} else {
+			meek.emptyReadBuffer <- readBuffer
+		}
+	}
+	return totalSize, nil
+}
+
+// roundTrip configures and makes the actual HTTP POST request
+func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadCloser, err error) {
+	request, err := http.NewRequest("POST", meek.url.String(), bytes.NewReader(sendPayload))
+	if err != nil {
+		return nil, err
+	}
+	// Don't use the default user agent ("Go 1.1 package http").
+	// For now, just omit the header (net/http/request.go: "may be blank to not send the header").
+	request.Header.Set("User-Agent", "")
+	request.Header.Set("Content-Type", "application/octet-stream")
+	request.AddCookie(meek.cookie)
+	// This retry mitigates intermittent failures between the client and front/server.
+	// Note: Retry will only be effective if entire request failed (underlying transport protocol
+	// such as SSH will fail if extra bytes are replayed in either direction due to partial relay
+	// success followed by retry).
+	var response *http.Response
+	for i := 0; i <= 1; i++ {
+		response, err = meek.transport.RoundTrip(request)
+		if err == nil {
+			break
+		}
+	}
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	if response.StatusCode != http.StatusOK {
+		return nil, ContextError(fmt.Errorf("http request failed %d", response.StatusCode))
+	}
+	return response.Body, nil
+}
+
+type meekCookieData struct {
+	ServerAddress       string `json:"p"`
+	SessionID           string `json:"s"`
+	MeekProtocolVersion int    `json:"v"`
+}
+
+// makeCookie creates the cookie to be sent with all meek HTTP requests.
+// The purpose of the cookie is to send the following to the server:
+//   ServerAddress -- the Psiphon Server address the meek server should relay to
+//   SessionID -- the Psiphon session ID (used by meek server to relay geolocation
+//     information obtained from the CDN through to the Psiphon Server)
+//   MeekProtocolVersion -- tells the meek server that this client understands
+//     the latest protocol.
+// The entire cookie also acts as an meek/HTTP session ID.
+// In unfronted meek mode, the cookie is visible over the adversary network, so the
+// cookie is encrypted and obfuscated.
+func makeCookie(serverEntry *ServerEntry, sessionId string) (cookie *http.Cookie, err error) {
+	// Make the JSON data
+	serverAddress := fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshObfuscatedPort)
+	cookieData := &meekCookieData{
+		ServerAddress:       serverAddress,
+		SessionID:           sessionId,
+		MeekProtocolVersion: MEEK_PROTOCOL_VERSION,
+	}
+	serializedCookie, err := json.Marshal(cookieData)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	// Encrypt the JSON data
+	// NaCl box is used for encryption. The peer public key comes from the server entry.
+	// Nonce is always all zeros, and is not sent in the cookie (the server also uses an all-zero nonce).
+	// http://nacl.cace-project.eu/box.html:
+	// "There is no harm in having the same nonce for different messages if the {sender, receiver} sets are
+	// different. This is true even if the sets overlap. For example, a sender can use the same nonce for two
+	// different messages if the messages are sent to two different public keys."
+	var publicKey [32]byte
+	publicKeyLen, err := base64.StdEncoding.Decode([]byte(serverEntry.MeekCookieEncryptionPublicKey), publicKey[:])
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	if publicKeyLen != len(publicKey) {
+		return nil, ContextError(errors.New("invalid NaCl public key"))
+	}
+	var nonce [24]byte
+	ephemeralPublicKey, ephemeralPrivateKey, err := box.GenerateKey(rand.Reader)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	box := box.Seal(nil, serializedCookie, &nonce, &publicKey, ephemeralPrivateKey)
+	encryptedCookie := make([]byte, 32+len(box))
+	copy(encryptedCookie[0:32], ephemeralPublicKey[0:32])
+	copy(encryptedCookie[32:], box)
+	// Obfuscate the encrypted data
+	obfuscator, err := NewObfuscator(
+		&ObfuscatorParams{Keyword: serverEntry.MeekObfuscatedKey, MaxPadding: MEEK_COOKIE_MAX_PADDING})
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	obfuscatedCookie := obfuscator.ConsumeSeedMessage()
+	seedLen := len(obfuscatedCookie)
+	obfuscatedCookie = append(obfuscatedCookie, encryptedCookie...)
+	obfuscator.ObfuscateClientToServer(obfuscatedCookie[seedLen:])
+	// Format the HTTP cookie
+	// The format is <random letter 'A'-'Z'>=<base64 data>, which is intended to match common cookie formats.
+	A := int([]byte("A")[0])
+	Z := int([]byte("Z")[0])
+	letterIndex, err := MakeSecureRandomInt(Z - A)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return &http.Cookie{
+			Name:  string(byte(A + letterIndex)),
+			Value: base64.StdEncoding.EncodeToString(obfuscatedCookie)},
+		nil
+}

+ 6 - 1
psiphon/obfuscatedSshConn.go

@@ -55,6 +55,11 @@ const (
 // transformations, and performs minimal parsing of the SSH protocol to
 // transformations, and performs minimal parsing of the SSH protocol to
 // determine when to stop obfuscation (after the first SSH_MSG_NEWKEYS is
 // determine when to stop obfuscation (after the first SSH_MSG_NEWKEYS is
 // sent by the client and received from the server).
 // sent by the client and received from the server).
+//
+// WARNING: doesn't fully conform to net.Conn concurrency semantics: there's
+// no synchronization of access to the read/writeBuffers, so concurrent
+// calls to one of Read or Write will result in undefined behavior.
+//
 type ObfuscatedSshConn struct {
 type ObfuscatedSshConn struct {
 	net.Conn
 	net.Conn
 	obfuscator  *Obfuscator
 	obfuscator  *Obfuscator
@@ -77,7 +82,7 @@ const (
 // conn must be used for SSH client traffic and must have transferred
 // conn must be used for SSH client traffic and must have transferred
 // no traffic.
 // no traffic.
 func NewObfuscatedSshConn(conn net.Conn, obfuscationKeyword string) (*ObfuscatedSshConn, error) {
 func NewObfuscatedSshConn(conn net.Conn, obfuscationKeyword string) (*ObfuscatedSshConn, error) {
-	obfuscator, err := NewObfuscator(obfuscationKeyword)
+	obfuscator, err := NewObfuscator(&ObfuscatorParams{Keyword: obfuscationKeyword})
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 15 - 6
psiphon/obfuscator.go

@@ -46,19 +46,24 @@ type Obfuscator struct {
 	serverToClientCipher *rc4.Cipher
 	serverToClientCipher *rc4.Cipher
 }
 }
 
 
+type ObfuscatorParams struct {
+	Keyword    string
+	MaxPadding int
+}
+
 // NewObfuscator creates a new Obfuscator, initializes it with
 // NewObfuscator creates a new Obfuscator, initializes it with
 // a seed message, derives client and server keys, and creates
 // a seed message, derives client and server keys, and creates
 // RC4 stream ciphers to obfuscate data.
 // RC4 stream ciphers to obfuscate data.
-func NewObfuscator(keyword string) (obfuscator *Obfuscator, err error) {
+func NewObfuscator(params *ObfuscatorParams) (obfuscator *Obfuscator, err error) {
 	seed, err := MakeSecureRandomBytes(OBFUSCATE_SEED_LENGTH)
 	seed, err := MakeSecureRandomBytes(OBFUSCATE_SEED_LENGTH)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	clientToServerKey, err := deriveKey(seed, []byte(keyword), []byte(OBFUSCATE_CLIENT_TO_SERVER_IV))
+	clientToServerKey, err := deriveKey(seed, []byte(params.Keyword), []byte(OBFUSCATE_CLIENT_TO_SERVER_IV))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	serverToClientKey, err := deriveKey(seed, []byte(keyword), []byte(OBFUSCATE_SERVER_TO_CLIENT_IV))
+	serverToClientKey, err := deriveKey(seed, []byte(params.Keyword), []byte(OBFUSCATE_SERVER_TO_CLIENT_IV))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -70,7 +75,11 @@ func NewObfuscator(keyword string) (obfuscator *Obfuscator, err error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	seedMessage, err := makeSeedMessage(seed, clientToServerCipher)
+	maxPadding := OBFUSCATE_MAX_PADDING
+	if params.MaxPadding > 0 {
+		maxPadding = params.MaxPadding
+	}
+	seedMessage, err := makeSeedMessage(maxPadding, seed, clientToServerCipher)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -115,8 +124,8 @@ func deriveKey(seed, keyword, iv []byte) ([]byte, error) {
 	return digest[0:OBFUSCATE_KEY_LENGTH], nil
 	return digest[0:OBFUSCATE_KEY_LENGTH], nil
 }
 }
 
 
-func makeSeedMessage(seed []byte, clientToServerCipher *rc4.Cipher) ([]byte, error) {
-	paddingLength, err := MakeSecureRandomInt(OBFUSCATE_MAX_PADDING)
+func makeSeedMessage(maxPadding int, seed []byte, clientToServerCipher *rc4.Cipher) ([]byte, error) {
+	paddingLength, err := MakeSecureRandomInt(maxPadding)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 25 - 18
psiphon/runTunnel.go

@@ -37,21 +37,24 @@ import (
 // if there's not already an established tunnel. This function is to be used in a pool
 // if there's not already an established tunnel. This function is to be used in a pool
 // of goroutines.
 // of goroutines.
 func establishTunnelWorker(
 func establishTunnelWorker(
-	waitGroup *sync.WaitGroup,
+	tunnelProtocol, sessionId string,
+	workerWaitGroup *sync.WaitGroup,
 	candidateServerEntries chan *ServerEntry,
 	candidateServerEntries chan *ServerEntry,
-	broadcastStopWorkers chan bool,
+	broadcastStopWorkers chan struct{},
 	pendingConns *PendingConns,
 	pendingConns *PendingConns,
 	establishedTunnels chan *Tunnel) {
 	establishedTunnels chan *Tunnel) {
 
 
-	defer waitGroup.Done()
+	defer workerWaitGroup.Done()
 	for serverEntry := range candidateServerEntries {
 	for serverEntry := range candidateServerEntries {
 		// Note: don't receive from candidateQueue and broadcastStopWorkers in the same
 		// Note: don't receive from candidateQueue and broadcastStopWorkers in the same
 		// select, since we want to prioritize receiving the stop signal
 		// select, since we want to prioritize receiving the stop signal
-		if IsSignalled(broadcastStopWorkers) {
+		select {
+		case <-broadcastStopWorkers:
 			return
 			return
+		default:
 		}
 		}
 		Notice(NOTICE_INFO, "connecting to %s in region %s", serverEntry.IpAddress, serverEntry.Region)
 		Notice(NOTICE_INFO, "connecting to %s in region %s", serverEntry.IpAddress, serverEntry.Region)
-		tunnel, err := EstablishTunnel(serverEntry, pendingConns)
+		tunnel, err := EstablishTunnel(tunnelProtocol, sessionId, serverEntry, pendingConns)
 		if err != nil {
 		if err != nil {
 			// TODO: distingush case where conn is interrupted?
 			// TODO: distingush case where conn is interrupted?
 			Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
 			Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
@@ -78,17 +81,18 @@ func discardTunnel(tunnel *Tunnel) {
 // establishTunnel coordinates a worker pool of goroutines to attempt several
 // establishTunnel coordinates a worker pool of goroutines to attempt several
 // tunnel connections in parallel, and this process is stopped once the first
 // tunnel connections in parallel, and this process is stopped once the first
 // tunnel is established.
 // tunnel is established.
-func establishTunnel(config *Config) (tunnel *Tunnel, err error) {
-	waitGroup := new(sync.WaitGroup)
+func establishTunnel(config *Config, sessionId string) (tunnel *Tunnel, err error) {
+	workerWaitGroup := new(sync.WaitGroup)
 	candidateServerEntries := make(chan *ServerEntry)
 	candidateServerEntries := make(chan *ServerEntry)
 	pendingConns := new(PendingConns)
 	pendingConns := new(PendingConns)
 	establishedTunnels := make(chan *Tunnel, 1)
 	establishedTunnels := make(chan *Tunnel, 1)
 	timeout := time.After(ESTABLISH_TUNNEL_TIMEOUT)
 	timeout := time.After(ESTABLISH_TUNNEL_TIMEOUT)
-	broadcastStopWorkers := make(chan bool)
+	broadcastStopWorkers := make(chan struct{})
 	for i := 0; i < CONNECTION_WORKER_POOL_SIZE; i++ {
 	for i := 0; i < CONNECTION_WORKER_POOL_SIZE; i++ {
-		waitGroup.Add(1)
+		workerWaitGroup.Add(1)
 		go establishTunnelWorker(
 		go establishTunnelWorker(
-			waitGroup, candidateServerEntries, broadcastStopWorkers,
+			config.TunnelProtocol, sessionId,
+			workerWaitGroup, candidateServerEntries, broadcastStopWorkers,
 			pendingConns, establishedTunnels)
 			pendingConns, establishedTunnels)
 	}
 	}
 	// TODO: add a throttle after each full cycle?
 	// TODO: add a throttle after each full cycle?
@@ -120,7 +124,7 @@ func establishTunnel(config *Config) (tunnel *Tunnel, err error) {
 		// Interrupt any partial connections in progress, so that
 		// Interrupt any partial connections in progress, so that
 		// the worker will terminate immediately
 		// the worker will terminate immediately
 		pendingConns.Interrupt()
 		pendingConns.Interrupt()
-		waitGroup.Wait()
+		workerWaitGroup.Wait()
 		// Drain any excess tunnels
 		// Drain any excess tunnels
 		close(establishedTunnels)
 		close(establishedTunnels)
 		for tunnel := range establishedTunnels {
 		for tunnel := range establishedTunnels {
@@ -144,13 +148,19 @@ func establishTunnel(config *Config) (tunnel *Tunnel, err error) {
 // error when the tunnel unexpectedly disconnects.
 // error when the tunnel unexpectedly disconnects.
 func runTunnel(config *Config) error {
 func runTunnel(config *Config) error {
 	Notice(NOTICE_INFO, "establishing tunnel")
 	Notice(NOTICE_INFO, "establishing tunnel")
-	tunnel, err := establishTunnel(config)
+	sessionId, err := MakeSessionId()
+	if err != nil {
+		return ContextError(err)
+	}
+	tunnel, err := establishTunnel(config, sessionId)
 	if err != nil {
 	if err != nil {
 		return ContextError(err)
 		return ContextError(err)
 	}
 	}
 	defer tunnel.Close()
 	defer tunnel.Close()
-	// TODO: could start local proxies, etc., before synchronizing work group is establishTunnel
-	stopTunnelSignal := make(chan bool)
+	// Tunnel connection and local proxies will send signals to this channel
+	// when they close or stop. Signal senders should not block. Allows at
+	// least one stop signal to be sent before there is a receiver.
+	stopTunnelSignal := make(chan struct{}, 1)
 	err = tunnel.conn.SetClosedSignal(stopTunnelSignal)
 	err = tunnel.conn.SetClosedSignal(stopTunnelSignal)
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("failed to set closed signal: %s", err)
 		return fmt.Errorf("failed to set closed signal: %s", err)
@@ -167,7 +177,7 @@ func runTunnel(config *Config) error {
 	defer httpProxy.Close()
 	defer httpProxy.Close()
 	Notice(NOTICE_INFO, "starting session")
 	Notice(NOTICE_INFO, "starting session")
 	localHttpProxyAddress := httpProxy.listener.Addr().String()
 	localHttpProxyAddress := httpProxy.listener.Addr().String()
-	_, err = NewSession(config, tunnel, localHttpProxyAddress)
+	_, err = NewSession(config, tunnel, localHttpProxyAddress, sessionId)
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("error starting session: %s", err)
 		return fmt.Errorf("error starting session: %s", err)
 	}
 	}
@@ -190,9 +200,6 @@ func RunTunnelForever(config *Config) {
 		}
 		}
 		defer logFile.Close()
 		defer logFile.Close()
 		log.SetOutput(logFile)
 		log.SetOutput(logFile)
-	} else {
-		// TODO
-		//log.SetOutput(ioutil.Discard)
 	}
 	}
 	Notice(NOTICE_VERSION, VERSION)
 	Notice(NOTICE_VERSION, VERSION)
 	// TODO: unlike existing Psiphon clients, this code
 	// TODO: unlike existing Psiphon clients, this code

+ 14 - 16
psiphon/serverApi.go

@@ -45,11 +45,11 @@ type Session struct {
 // Psiphon server and returns a Session struct, initialized with the
 // Psiphon server and returns a Session struct, initialized with the
 // session ID, for use with subsequent Psiphon server API requests (e.g.,
 // session ID, for use with subsequent Psiphon server API requests (e.g.,
 // periodic status requests).
 // periodic status requests).
-func NewSession(config *Config, tunnel *Tunnel, localHttpProxyAddress string) (session *Session, err error) {
-	sessionId, err := MakeSessionId()
-	if err != nil {
-		return nil, ContextError(err)
-	}
+func NewSession(
+	config *Config,
+	tunnel *Tunnel,
+	localHttpProxyAddress, sessionId string) (session *Session, err error) {
+
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel, localHttpProxyAddress)
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel, localHttpProxyAddress)
 	if err != nil {
 	if err != nil {
 		return nil, ContextError(err)
 		return nil, ContextError(err)
@@ -261,18 +261,16 @@ func makePsiphonHttpsClient(tunnel *Tunnel, localHttpProxyAddress string) (https
 	if err != nil {
 	if err != nil {
 		return nil, ContextError(err)
 		return nil, ContextError(err)
 	}
 	}
-	customDialer := func(network, addr string) (net.Conn, error) {
-		customTLSConfig := &CustomTLSConfig{
-			sendServerName:          false,
-			verifyLegacyCertificate: certificate,
-			httpProxyAddress:        localHttpProxyAddress,
-		}
-		return CustomTLSDialWithDialer(
-			&net.Dialer{Timeout: PSIPHON_API_SERVER_TIMEOUT},
-			network, addr, customTLSConfig)
-	}
+	dialer := NewCustomTLSDialer(
+		&CustomTLSConfig{
+			Dial:                    new(net.Dialer).Dial,
+			Timeout:                 PSIPHON_API_SERVER_TIMEOUT,
+			HttpProxyAddress:        localHttpProxyAddress,
+			SendServerName:          false,
+			VerifyLegacyCertificate: certificate,
+		})
 	transport := &http.Transport{
 	transport := &http.Transport{
-		Dial: customDialer,
+		Dial: dialer,
 		ResponseHeaderTimeout: PSIPHON_API_SERVER_TIMEOUT,
 		ResponseHeaderTimeout: PSIPHON_API_SERVER_TIMEOUT,
 	}
 	}
 	return &http.Client{Transport: transport}, nil
 	return &http.Client{Transport: transport}, nil

+ 4 - 4
psiphon/socksProxy.go

@@ -33,7 +33,7 @@ import (
 // forward.
 // forward.
 type SocksProxy struct {
 type SocksProxy struct {
 	tunnel        *Tunnel
 	tunnel        *Tunnel
-	failureSignal chan bool
+	stoppedSignal chan struct{}
 	listener      *socks.SocksListener
 	listener      *socks.SocksListener
 	waitGroup     *sync.WaitGroup
 	waitGroup     *sync.WaitGroup
 }
 }
@@ -41,14 +41,14 @@ type SocksProxy struct {
 // NewSocksProxy initializes a new SOCKS server. It begins listening for
 // NewSocksProxy initializes a new SOCKS server. It begins listening for
 // connections, starts a goroutine that runs an accept loop, and returns
 // connections, starts a goroutine that runs an accept loop, and returns
 // leaving the accept loop running.
 // leaving the accept loop running.
-func NewSocksProxy(listenPort int, tunnel *Tunnel, failureSignal chan bool) (proxy *SocksProxy, err error) {
+func NewSocksProxy(listenPort int, tunnel *Tunnel, stoppedSignal chan struct{}) (proxy *SocksProxy, err error) {
 	listener, err := socks.ListenSocks("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
 	listener, err := socks.ListenSocks("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 	proxy = &SocksProxy{
 	proxy = &SocksProxy{
 		tunnel:        tunnel,
 		tunnel:        tunnel,
-		failureSignal: failureSignal,
+		stoppedSignal: stoppedSignal,
 		listener:      listener,
 		listener:      listener,
 		waitGroup:     new(sync.WaitGroup)}
 		waitGroup:     new(sync.WaitGroup)}
 	proxy.waitGroup.Add(1)
 	proxy.waitGroup.Add(1)
@@ -109,7 +109,7 @@ func (proxy *SocksProxy) acceptSocksConnections() {
 			Notice(NOTICE_ALERT, "SOCKS proxy accept error: %s", err)
 			Notice(NOTICE_ALERT, "SOCKS proxy accept error: %s", err)
 			if e, ok := err.(net.Error); ok && !e.Temporary() {
 			if e, ok := err.(net.Error); ok && !e.Temporary() {
 				select {
 				select {
-				case proxy.failureSignal <- true:
+				case proxy.stoppedSignal <- *new(struct{}):
 				default:
 				default:
 				}
 				}
 				// Fatal error, stop the proxy
 				// Fatal error, stop the proxy

+ 54 - 55
psiphon/tlsDialer.go

@@ -90,76 +90,75 @@ func (timeoutError) Temporary() bool { return true }
 
 
 // CustomTLSConfig contains parameters to determine the behavior
 // CustomTLSConfig contains parameters to determine the behavior
 // of CustomTLSDial.
 // of CustomTLSDial.
-// httpProxyAddress - use the specified HTTP proxy (HTTP CONNECT) if not blank
-// sendServerName - use SNI (tlsdialer functionality)
-// verifyLegacyCertificate - special case self-signed server certificate
-//   case. Ignores IP SANs and basic constraints. No certificate chain. Just
-//   checks that the server presented the specified certificate.
-// tlsConfig - a tls.Config use in the non-verifyLegacyCertificate case.
 type CustomTLSConfig struct {
 type CustomTLSConfig struct {
-	httpProxyAddress        string
-	sendServerName          bool
-	verifyLegacyCertificate *x509.Certificate
-	tlsConfig               *tls.Config
+	// Dial is the network connection dialer. TLS is layered on
+	// top of a new network connection created with dialer.
+	Dial Dialer
+	// Timeout is and optional timeout for combined network
+	// connection dial and TLS handshake.
+	Timeout time.Duration
+	// FrontingAddr overrides the "addr" input to Dial when specified
+	FrontingAddr string
+	// HttpProxyAddress specifies an HTTP proxy to be used
+	// (with HTTP CONNECT).
+	HttpProxyAddress string
+	// SendServerName specifies whether to use SNI
+	// (tlsdialer functionality)
+	SendServerName bool
+	// VerifyLegacyCertificate is a special case self-signed server
+	// certificate case. Ignores IP SANs and basic constraints. No
+	// certificate chain. Just checks that the server presented the
+	// specified certificate.
+	VerifyLegacyCertificate *x509.Certificate
+	// TlsConfig is a tls.Config to use in the
+	// non-verifyLegacyCertificate case.
+	TlsConfig *tls.Config
 }
 }
 
 
-// tlsdialer:
-// Like crypto/tls.Dial, but with the ability to control whether or not to
-// send the ServerName extension in client handshakes through the sendServerName
-// flag.
-//
-// Note - if sendServerName is false, the VerifiedChains field on the
-// connection's ConnectionState will never get populated.
-func CustomTLSDial(network, addr string, config *CustomTLSConfig) (*tls.Conn, error) {
-	return CustomTLSDialWithDialer(new(net.Dialer), network, addr, config)
+func NewCustomTLSDialer(config *CustomTLSConfig) Dialer {
+	return func(network, addr string) (net.Conn, error) {
+		return CustomTLSDial(network, addr, config)
+	}
 }
 }
 
 
-// tlsdialer:
-// Like crypto/tls.DialWithDialer, but with the ability to control whether or
-// not to send the ServerName extension in client handshakes through the
-// sendServerName flag.
+// CustomTLSDialWithDialer is a customized replacement for tls.Dial.
+// Based on tlsdialer.DialWithDialer which is based on crypto/tls.DialWithDialer.
 //
 //
-// Note - if sendServerName is false, the VerifiedChains field on the
-// connection's ConnectionState will never get populated.
-func CustomTLSDialWithDialer(dialer *net.Dialer, network, addr string, config *CustomTLSConfig) (*tls.Conn, error) {
-	// We want the Timeout and Deadline values from dialer to cover the
-	// whole process: TCP connection and TLS handshake. This means that we
-	// also need to start our own timers now.
-	timeout := dialer.Timeout
-
-	if !dialer.Deadline.IsZero() {
-		deadlineTimeout := dialer.Deadline.Sub(time.Now())
-		if timeout == 0 || deadlineTimeout < timeout {
-			timeout = deadlineTimeout
-		}
-	}
-
+// tlsdialer comment:
+//   Note - if sendServerName is false, the VerifiedChains field on the
+//   connection's ConnectionState will never get populated.
+func CustomTLSDial(network, addr string, config *CustomTLSConfig) (*tls.Conn, error) {
 	var errChannel chan error
 	var errChannel chan error
 
 
-	if timeout != 0 {
+	if config.Timeout != 0 {
 		errChannel = make(chan error, 2)
 		errChannel = make(chan error, 2)
-		time.AfterFunc(timeout, func() {
+		time.AfterFunc(config.Timeout, func() {
 			errChannel <- timeoutError{}
 			errChannel <- timeoutError{}
 		})
 		})
 	}
 	}
 
 
 	dialAddr := addr
 	dialAddr := addr
-	if config.httpProxyAddress != "" {
-		dialAddr = config.httpProxyAddress
+	if config.HttpProxyAddress != "" {
+		dialAddr = config.HttpProxyAddress
 	}
 	}
 
 
-	rawConn, err := dialer.Dial(network, dialAddr)
+	rawConn, err := config.Dial(network, dialAddr)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	colonPos := strings.LastIndex(addr, ":")
+	tlsAddr := addr
+	if config.FrontingAddr != "" {
+		tlsAddr = config.FrontingAddr
+	}
+
+	colonPos := strings.LastIndex(tlsAddr, ":")
 	if colonPos == -1 {
 	if colonPos == -1 {
-		colonPos = len(addr)
+		colonPos = len(tlsAddr)
 	}
 	}
-	hostname := addr[:colonPos]
+	hostname := tlsAddr[:colonPos]
 
 
-	tlsConfig := config.tlsConfig
+	tlsConfig := config.TlsConfig
 	if tlsConfig == nil {
 	if tlsConfig == nil {
 		tlsConfig = &tls.Config{}
 		tlsConfig = &tls.Config{}
 	}
 	}
@@ -172,11 +171,11 @@ func CustomTLSDialWithDialer(dialer *net.Dialer, network, addr string, config *C
 		serverName = hostname
 		serverName = hostname
 	}
 	}
 
 
-	// copy config so we can tweak it
+	// Copy config so we can tweak it
 	tlsConfigCopy := new(tls.Config)
 	tlsConfigCopy := new(tls.Config)
 	*tlsConfigCopy = *tlsConfig
 	*tlsConfigCopy = *tlsConfig
 
 
-	if config.sendServerName {
+	if config.SendServerName {
 		// Set the ServerName and rely on the usual logic in
 		// Set the ServerName and rely on the usual logic in
 		// tls.Conn.Handshake() to do its verification
 		// tls.Conn.Handshake() to do its verification
 		tlsConfigCopy.ServerName = serverName
 		tlsConfigCopy.ServerName = serverName
@@ -190,10 +189,10 @@ func CustomTLSDialWithDialer(dialer *net.Dialer, network, addr string, config *C
 
 
 	establishConnection := func(rawConn net.Conn, conn *tls.Conn) error {
 	establishConnection := func(rawConn net.Conn, conn *tls.Conn) error {
 		// TODO: use the proxy request/response code from net/http/transport.go
 		// TODO: use the proxy request/response code from net/http/transport.go
-		if config.httpProxyAddress != "" {
+		if config.HttpProxyAddress != "" {
 			connectRequest := fmt.Sprintf(
 			connectRequest := fmt.Sprintf(
 				"CONNECT %s HTTP/1.1\r\nHost: %s\r\nConnection: Keep-Alive\r\n\r\n",
 				"CONNECT %s HTTP/1.1\r\nHost: %s\r\nConnection: Keep-Alive\r\n\r\n",
-				addr, hostname)
+				tlsAddr, hostname)
 			_, err := rawConn.Write([]byte(connectRequest))
 			_, err := rawConn.Write([]byte(connectRequest))
 			if err != nil {
 			if err != nil {
 				return err
 				return err
@@ -211,7 +210,7 @@ func CustomTLSDialWithDialer(dialer *net.Dialer, network, addr string, config *C
 		return conn.Handshake()
 		return conn.Handshake()
 	}
 	}
 
 
-	if timeout == 0 {
+	if config.Timeout == 0 {
 		err = establishConnection(rawConn, conn)
 		err = establishConnection(rawConn, conn)
 	} else {
 	} else {
 		go func() {
 		go func() {
@@ -220,9 +219,9 @@ func CustomTLSDialWithDialer(dialer *net.Dialer, network, addr string, config *C
 		err = <-errChannel
 		err = <-errChannel
 	}
 	}
 
 
-	if err == nil && config.verifyLegacyCertificate != nil {
-		err = verifyLegacyCertificate(conn, config.verifyLegacyCertificate)
-	} else if err == nil && !config.sendServerName && !tlsConfig.InsecureSkipVerify {
+	if err == nil && config.VerifyLegacyCertificate != nil {
+		err = verifyLegacyCertificate(conn, config.VerifyLegacyCertificate)
+	} else if err == nil && !config.SendServerName && !tlsConfig.InsecureSkipVerify {
 		// Manually verify certificates
 		// Manually verify certificates
 		err = verifyServerCerts(conn, serverName, tlsConfigCopy)
 		err = verifyServerCerts(conn, serverName, tlsConfigCopy)
 	}
 	}

+ 96 - 33
psiphon/tunnel.go

@@ -32,17 +32,27 @@ import (
 )
 )
 
 
 const (
 const (
-	PROTOCOL_SSH            = "SSH"
-	PROTOCOL_OBFUSCATED_SSH = "OSSH"
+	TUNNEL_PROTOCOL_SSH            = "SSH"
+	TUNNEL_PROTOCOL_OBFUSCATED_SSH = "OSSH"
+	TUNNEL_PROTOCOL_UNFRONTED_MEEK = "UNFRONTED-MEEK"
+	TUNNEL_PROTOCOL_FRONTED_MEEK   = "FRONTED-MEEK"
 )
 )
 
 
+// This is a list of supported tunnel protocols, in default preference order
+var SupportedTunnelProtocols = []string{
+	TUNNEL_PROTOCOL_FRONTED_MEEK,
+	TUNNEL_PROTOCOL_UNFRONTED_MEEK,
+	TUNNEL_PROTOCOL_OBFUSCATED_SSH,
+	TUNNEL_PROTOCOL_SSH,
+}
+
 // Tunnel is a connection to a Psiphon server. An established
 // Tunnel is a connection to a Psiphon server. An established
 // tunnel includes a network connection to the specified server
 // tunnel includes a network connection to the specified server
 // and an SSH session built on top of that transport.
 // and an SSH session built on top of that transport.
 type Tunnel struct {
 type Tunnel struct {
 	serverEntry      *ServerEntry
 	serverEntry      *ServerEntry
 	protocol         string
 	protocol         string
-	conn             *Conn
+	conn             Conn
 	sshClient        *ssh.Client
 	sshClient        *ssh.Client
 	sshKeepAliveQuit chan struct{}
 	sshKeepAliveQuit chan struct{}
 }
 }
@@ -64,50 +74,96 @@ func (tunnel *Tunnel) Close() {
 // Depending on the server's capabilities, the connection may use
 // Depending on the server's capabilities, the connection may use
 // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
 // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
 // HTTP (meek protocol).
 // HTTP (meek protocol).
-func EstablishTunnel(serverEntry *ServerEntry, pendingConns *PendingConns) (tunnel *Tunnel, err error) {
-	// First connect the transport
-	// TODO: meek
-	sshCapable := Contains(serverEntry.Capabilities, PROTOCOL_SSH)
-	obfuscatedSshCapable := Contains(serverEntry.Capabilities, PROTOCOL_OBFUSCATED_SSH)
-	if !sshCapable && !obfuscatedSshCapable {
-		return nil, fmt.Errorf("server does not have sufficient capabilities")
+// When requiredProtocol is not blank, that protocol is used. Otherwise,
+// the first protocol in SupportedTunnelProtocols that's also in the
+// server capabilities is used.
+func EstablishTunnel(
+	requiredProtocol, sessionId string,
+	serverEntry *ServerEntry,
+	pendingConns *PendingConns) (tunnel *Tunnel, err error) {
+	// Select the protocol
+	var selectedProtocol string
+	if requiredProtocol != "" {
+		if !Contains(serverEntry.Capabilities, requiredProtocol) {
+			return nil, ContextError(fmt.Errorf("server does not have required capability"))
+		}
+		selectedProtocol = requiredProtocol
+	} else {
+		// Order of SupportedTunnelProtocols is default preference order
+		for _, protocol := range SupportedTunnelProtocols {
+			if Contains(serverEntry.Capabilities, protocol) {
+				selectedProtocol = protocol
+				break
+			}
+		}
+		if selectedProtocol == "" {
+			return nil, ContextError(fmt.Errorf("server does not have any supported capabilities"))
+		}
 	}
 	}
-	selectedProtocol := PROTOCOL_SSH
-	port := serverEntry.SshPort
-	if obfuscatedSshCapable {
-		selectedProtocol = PROTOCOL_OBFUSCATED_SSH
+	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
+	// So depending on which protocol is used, multiple layers are initialized.
+	port := 0
+	useMeek := false
+	useFronting := false
+	useObfuscatedSsh := false
+	switch selectedProtocol {
+	case TUNNEL_PROTOCOL_FRONTED_MEEK:
+		useMeek = true
+		useFronting = true
+		useObfuscatedSsh = true
+	case TUNNEL_PROTOCOL_UNFRONTED_MEEK:
+		useMeek = true
+		useObfuscatedSsh = true
 		port = serverEntry.SshObfuscatedPort
 		port = serverEntry.SshObfuscatedPort
+	case TUNNEL_PROTOCOL_OBFUSCATED_SSH:
+		useObfuscatedSsh = true
+		port = serverEntry.SshObfuscatedPort
+	case TUNNEL_PROTOCOL_SSH:
+		port = serverEntry.SshPort
 	}
 	}
-	conn, err := Dial(
-		serverEntry.IpAddress, port,
-		TUNNEL_CONNECT_TIMEOUT, TUNNEL_READ_TIMEOUT, TUNNEL_WRITE_TIMEOUT,
-		pendingConns)
-	if err != nil {
-		return nil, err
+	// Create the base transport: meek or direct connection
+	var conn Conn
+	if useMeek {
+		conn, err = NewMeekConn(
+			serverEntry, sessionId, useFronting,
+			TUNNEL_CONNECT_TIMEOUT, TUNNEL_READ_TIMEOUT, TUNNEL_WRITE_TIMEOUT,
+			pendingConns)
+		if err != nil {
+			return nil, ContextError(err)
+		}
+	} else {
+		conn, err = DirectDial(
+			fmt.Sprintf("%s:%d", serverEntry.IpAddress, port),
+			TUNNEL_CONNECT_TIMEOUT, TUNNEL_READ_TIMEOUT, TUNNEL_WRITE_TIMEOUT,
+			pendingConns)
+		if err != nil {
+			return nil, ContextError(err)
+		}
 	}
 	}
 	defer func() {
 	defer func() {
-		pendingConns.Remove(conn)
+		// Cleanup on error
 		if err != nil {
 		if err != nil {
 			conn.Close()
 			conn.Close()
 		}
 		}
 	}()
 	}()
-	var netConn net.Conn
-	netConn = conn
-	if obfuscatedSshCapable {
-		netConn, err = NewObfuscatedSshConn(conn, serverEntry.SshObfuscatedKey)
+	// Add obfuscated SSH layer
+	var sshConn net.Conn
+	sshConn = conn
+	if useObfuscatedSsh {
+		sshConn, err = NewObfuscatedSshConn(conn, serverEntry.SshObfuscatedKey)
 		if err != nil {
 		if err != nil {
-			return nil, err
+			return nil, ContextError(err)
 		}
 		}
 	}
 	}
-	// Now establish the SSH session
+	// Now establish the SSH session over the sshConn transport
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return nil, ContextError(err)
 	}
 	}
 	sshCertChecker := &ssh.CertChecker{
 	sshCertChecker := &ssh.CertChecker{
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
 			if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
 			if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
-				return errors.New("unexpected host public key")
+				return ContextError(errors.New("unexpected host public key"))
 			}
 			}
 			return nil
 			return nil
 		},
 		},
@@ -121,11 +177,12 @@ func EstablishTunnel(serverEntry *ServerEntry, pendingConns *PendingConns) (tunn
 	}
 	}
 	// The folowing is adapted from ssh.Dial(), here using a custom conn
 	// The folowing is adapted from ssh.Dial(), here using a custom conn
 	sshAddress := strings.Join([]string{serverEntry.IpAddress, ":", strconv.Itoa(serverEntry.SshPort)}, "")
 	sshAddress := strings.Join([]string{serverEntry.IpAddress, ":", strconv.Itoa(serverEntry.SshPort)}, "")
-	sshConn, sshChans, sshReqs, err := ssh.NewClientConn(netConn, sshAddress, sshClientConfig)
+	sshClientConn, sshChans, sshReqs, err := ssh.NewClientConn(sshConn, sshAddress, sshClientConfig)
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return nil, ContextError(err)
 	}
 	}
-	sshClient := ssh.NewClient(sshConn, sshChans, sshReqs)
+	sshClient := ssh.NewClient(sshClientConn, sshChans, sshReqs)
+	// Run a goroutine to periodically execute SSH keepalive
 	sshKeepAliveQuit := make(chan struct{})
 	sshKeepAliveQuit := make(chan struct{})
 	sshKeepAliveTicker := time.NewTicker(TUNNEL_SSH_KEEP_ALIVE_PERIOD)
 	sshKeepAliveTicker := time.NewTicker(TUNNEL_SSH_KEEP_ALIVE_PERIOD)
 	go func() {
 	go func() {
@@ -145,5 +202,11 @@ func EstablishTunnel(serverEntry *ServerEntry, pendingConns *PendingConns) (tunn
 			}
 			}
 		}
 		}
 	}()
 	}()
-	return &Tunnel{serverEntry, selectedProtocol, conn, sshClient, sshKeepAliveQuit}, nil
+	return &Tunnel{
+			serverEntry:      serverEntry,
+			protocol:         selectedProtocol,
+			conn:             conn,
+			sshClient:        sshClient,
+			sshKeepAliveQuit: sshKeepAliveQuit},
+		nil
 }
 }

+ 0 - 12
psiphon/utils.go

@@ -30,18 +30,6 @@ import (
 	"runtime"
 	"runtime"
 )
 )
 
 
-// IsSignalled returns true when the signal channel yields
-// a value. To be used with the idiom in which a shared
-// channel is closed to broadcast a signal.
-func IsSignalled(signal chan bool) bool {
-	select {
-	case <-signal:
-		return true
-	default:
-	}
-	return false
-}
-
 // Contains is a helper function that returns true
 // Contains is a helper function that returns true
 // if the target string is in the list.
 // if the target string is in the list.
 func Contains(list []string, target string) bool {
 func Contains(list []string, target string) bool {