Przeglądaj źródła

Merge remote-tracking branch 'upstream/master'

Adam Pritchard 11 lat temu
rodzic
commit
a418fded78

BIN
AndroidApp/app/src/main/jniLibs/arm64-v8a/libtun2socks.so


+ 55 - 11
ConsoleClient/psiphonClient.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -33,14 +33,21 @@ import (
 
 func main() {
 
+	// Define command-line parameters
+
 	var configFilename string
 	flag.StringVar(&configFilename, "config", "", "configuration input file")
 
+	var embeddedServerEntryListFilename string
+	flag.StringVar(&embeddedServerEntryListFilename, "serverList", "", "embedded server entry list input file")
+
 	var profileFilename string
 	flag.StringVar(&profileFilename, "profile", "", "CPU profile output file")
 
 	flag.Parse()
 
+	// Handle required config file parameter
+
 	if configFilename == "" {
 		log.Fatalf("configuration file is required")
 	}
@@ -53,6 +60,19 @@ func main() {
 		log.Fatalf("error processing configuration file: %s", err)
 	}
 
+	// Set logfile, if configured
+
+	if config.LogFilename != "" {
+		logFile, err := os.OpenFile(config.LogFilename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+		if err != nil {
+			log.Fatalf("error opening log file: %s", err)
+		}
+		defer logFile.Close()
+		log.SetOutput(logFile)
+	}
+
+	// Handle optional profiling parameter
+
 	if profileFilename != "" {
 		profileFile, err := os.Create(profileFilename)
 		if err != nil {
@@ -62,34 +82,58 @@ func main() {
 		defer pprof.StopCPUProfile()
 	}
 
+	// Initialize data store
+
 	err = psiphon.InitDataStore(config)
 	if err != nil {
 		log.Fatalf("error initializing datastore: %s", err)
 	}
 
-	if config.LogFilename != "" {
-		logFile, err := os.OpenFile(config.LogFilename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+	// Handle optional embedded server list file parameter
+	// If specified, the embedded server list is loaded and stored before
+	// running Psiphon.
+
+	if embeddedServerEntryListFilename != "" {
+		serverEntryList, err := ioutil.ReadFile(embeddedServerEntryListFilename)
 		if err != nil {
-			log.Fatalf("error opening log file: %s", err)
+			log.Fatalf("error loading embedded server entry list file: %s", err)
+		}
+		// TODO: stream embedded server list data? also, the cast makaes an unnecessary copy of a large buffer?
+		serverEntries, err := psiphon.DecodeServerEntryList(string(serverEntryList))
+		if err != nil {
+			log.Fatalf("error decoding embedded server entry list file: %s", err)
+		}
+		// Since embedded server list entries may become stale, they will not
+		// overwrite existing stored entries for the same server.
+		err = psiphon.StoreServerEntries(serverEntries, false)
+		if err != nil {
+			log.Fatalf("error storing embedded server entry list data: %s", err)
 		}
-		defer logFile.Close()
-		log.SetOutput(logFile)
 	}
 
+	// Run Psiphon
+
 	controller := psiphon.NewController(config)
+	controllerStopSignal := make(chan struct{}, 1)
 	shutdownBroadcast := make(chan struct{})
 	controllerWaitGroup := new(sync.WaitGroup)
 	controllerWaitGroup.Add(1)
 	go func() {
 		defer controllerWaitGroup.Done()
 		controller.Run(shutdownBroadcast)
+		controllerStopSignal <- *new(struct{})
 	}()
 
+	// Wait for an OS signal or a Run stop signal, then stop Psiphon and exit
+
 	systemStopSignal := make(chan os.Signal, 1)
 	signal.Notify(systemStopSignal, os.Interrupt, os.Kill)
-	<-systemStopSignal
-
-	psiphon.Notice(psiphon.NOTICE_INFO, "shutdown by system")
-	close(shutdownBroadcast)
-	controllerWaitGroup.Wait()
+	select {
+	case <-systemStopSignal:
+		psiphon.Notice(psiphon.NOTICE_INFO, "shutdown by system")
+		close(shutdownBroadcast)
+		controllerWaitGroup.Wait()
+	case <-controllerStopSignal:
+		psiphon.Notice(psiphon.NOTICE_INFO, "shutdown by controller")
+	}
 }

+ 5 - 5
psiphon/TCPConn.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -61,15 +61,15 @@ func DialTCP(addr string, config *DialConfig) (conn *TCPConn, err error) {
 	return conn, nil
 }
 
-// SetClosedSignal implements psiphon.Conn.SetClosedSignal
-func (conn *TCPConn) SetClosedSignal(closedSignal chan struct{}) (err error) {
+// SetClosedSignal implements psiphon.Conn.SetClosedSignal.
+func (conn *TCPConn) SetClosedSignal(closedSignal chan struct{}) bool {
 	conn.mutex.Lock()
 	defer conn.mutex.Unlock()
 	if conn.isClosed {
-		return ContextError(errors.New("connection is already closed"))
+		return false
 	}
 	conn.closedSignal = closedSignal
-	return nil
+	return true
 }
 
 // Close terminates a connected (net.Conn) or connecting (socketFd) TCPConn.

+ 43 - 9
psiphon/TCPConn_unix.go

@@ -1,7 +1,7 @@
 // +build android darwin dragonfly freebsd linux nacl netbsd openbsd solaris
 
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -34,7 +34,12 @@ type interruptibleTCPSocket struct {
 	socketFd int
 }
 
-// interruptibleTCPDial creates a socket connection.
+const _INVALID_FD = -1
+
+// interruptibleTCPDial establishes a TCP network connection. A conn is added
+// to config.PendingConns before blocking on network IO, which enables interruption.
+// The caller is responsible for removing an established conn from PendingConns.
+//
 // To implement socket device binding and interruptible connecting, the lower-level
 // syscall APIs are used. The sequence of syscalls in this implementation are
 // taken from: https://code.google.com/p/go/issues/detail?id=6966
@@ -49,7 +54,8 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 	}
 	defer func() {
 		// Cleanup on error
-		if err != nil {
+		// (socketFd is reset to _INVALID_FD once it should no longer be closed)
+		if err != nil && socketFd != _INVALID_FD {
 			syscall.Close(socketFd)
 		}
 	}()
@@ -67,6 +73,7 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 	}
 
 	// Get the remote IP and port, resolving a domain name if necessary
+	// TODO: domain name resolution isn't interruptible
 	host, strPort, err := net.SplitHostPort(dialAddr)
 	if err != nil {
 		return nil, ContextError(err)
@@ -91,8 +98,10 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 		interruptible: interruptibleTCPSocket{socketFd: socketFd},
 		readTimeout:   config.ReadTimeout,
 		writeTimeout:  config.WriteTimeout}
-	config.PendingConns.Add(conn)
-	defer config.PendingConns.Remove(conn)
+
+	if !config.PendingConns.Add(conn) {
+		return nil, ContextError(errors.New("pending connections already closed"))
+	}
 
 	// Connect the socket
 	// TODO: adjust the timeout to account for time spent resolving hostname
@@ -109,17 +118,39 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 	} else {
 		err = syscall.Connect(socketFd, &sockAddr)
 	}
+
+	// Mutex required for writing to conn, since conn remains in
+	// pendingConns, through which conn.Close() may be called from
+	// another goroutine.
+
+	conn.mutex.Lock()
+
+	// From this point, ensure conn.interruptible.socketFd is reset
+	// since the fd value may be reused for a different file or socket
+	// before Close() -- and interruptibleTCPClose() -- is called for
+	// this conn.
+	conn.interruptible.socketFd = _INVALID_FD // (requires mutex)
+
+	// This is the syscall.Connect result
 	if err != nil {
+		conn.mutex.Unlock()
 		return nil, ContextError(err)
 	}
 
-	// Convert the syscall socket to a net.Conn
-	file := os.NewFile(uintptr(conn.interruptible.socketFd), "")
-	defer file.Close()
-	conn.Conn, err = net.FileConn(file)
+	// Convert the socket fd to a net.Conn
+
+	file := os.NewFile(uintptr(socketFd), "")
+	fileConn, err := net.FileConn(file)
+	file.Close()
+	// No more deferred fd clean up on err
+	socketFd = _INVALID_FD
 	if err != nil {
+		conn.mutex.Unlock()
 		return nil, ContextError(err)
 	}
+	conn.Conn = fileConn // (requires mutex)
+
+	conn.mutex.Unlock()
 
 	// Going through upstream HTTP proxy
 	if config.UpstreamHttpProxyAddress != "" {
@@ -134,5 +165,8 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 }
 
 func interruptibleTCPClose(interruptible interruptibleTCPSocket) error {
+	if interruptible.socketFd == _INVALID_FD {
+		return nil
+	}
 	return syscall.Close(interruptible.socketFd)
 }

+ 9 - 3
psiphon/TCPConn_windows.go

@@ -1,7 +1,7 @@
 // +build windows
 
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -42,16 +42,23 @@ type interruptibleDialResult struct {
 	err     error
 }
 
+// interruptibleTCPDial establishes a TCP network connection. A conn is added
+// to config.PendingConns before blocking on network IO, which enables interruption.
+// The caller is responsible for removing an established conn from PendingConns.
 func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err error) {
 	if config.BindToDeviceProvider != nil {
 		return nil, ContextError(errors.New("psiphon.interruptibleTCPDial with bind not supported on Windows"))
 	}
 
+	// Enable interruption
 	conn = &TCPConn{
 		interruptible: interruptibleTCPSocket{results: make(chan *interruptibleDialResult, 2)},
 		readTimeout:   config.ReadTimeout,
 		writeTimeout:  config.WriteTimeout}
-	config.PendingConns.Add(conn)
+
+	if !config.PendingConns.Add(conn) {
+		return nil, ContextError(errors.New("pending connections already closed"))
+	}
 
 	// Call the blocking Dial in a goroutine
 	results := conn.interruptible.results
@@ -78,7 +85,6 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 
 	// Block until Dial completes (or times out) or until interrupt
 	result := <-conn.interruptible.results
-	config.PendingConns.Remove(conn)
 	if result.err != nil {
 		return nil, ContextError(result.err)
 	}

+ 1 - 1
psiphon/config.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify

+ 21 - 6
psiphon/conn.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -77,28 +77,42 @@ type Conn interface {
 	net.Conn
 
 	// SetClosedSignal sets the channel which will be signaled
-	// when the connection is closed. This function returns an error
+	// when the connection is closed. This function returns false
 	// if the connection is already closed (and would never send
 	// the signal). SetClosedSignal and Close may be called by
 	// concurrent goroutines.
-	SetClosedSignal(closedSignal chan struct{}) (err error)
+	SetClosedSignal(closedSignal chan struct{}) bool
 }
 
 // Conns is a synchronized list of Conns that is used to coordinate
 // interrupting a set of goroutines establishing connections, or
 // close a set of open connections, etc.
+// Once the list is closed, no more items may be added to the
+// list (unless it is reset).
 type Conns struct {
-	mutex sync.Mutex
-	conns map[net.Conn]bool
+	mutex    sync.Mutex
+	isClosed bool
+	conns    map[net.Conn]bool
 }
 
-func (conns *Conns) Add(conn net.Conn) {
+func (conns *Conns) Reset() {
 	conns.mutex.Lock()
 	defer conns.mutex.Unlock()
+	conns.isClosed = false
+	conns.conns = make(map[net.Conn]bool)
+}
+
+func (conns *Conns) Add(conn net.Conn) bool {
+	conns.mutex.Lock()
+	defer conns.mutex.Unlock()
+	if conns.isClosed {
+		return false
+	}
 	if conns.conns == nil {
 		conns.conns = make(map[net.Conn]bool)
 	}
 	conns.conns[conn] = true
+	return true
 }
 
 func (conns *Conns) Remove(conn net.Conn) {
@@ -110,6 +124,7 @@ func (conns *Conns) Remove(conn net.Conn) {
 func (conns *Conns) CloseAll() {
 	conns.mutex.Lock()
 	defer conns.mutex.Unlock()
+	conns.isClosed = true
 	for conn, _ := range conns.conns {
 		conn.Close()
 	}

+ 92 - 210
psiphon/controller.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -25,8 +25,6 @@ package psiphon
 
 import (
 	"errors"
-	"fmt"
-	"io"
 	"net"
 	"sync"
 	"time"
@@ -37,7 +35,7 @@ import (
 // route traffic through the tunnels.
 type Controller struct {
 	config                    *Config
-	failureSignal             chan struct{}
+	componentFailureSignal    chan struct{}
 	shutdownBroadcast         chan struct{}
 	runWaitGroup              *sync.WaitGroup
 	establishedTunnels        chan *Tunnel
@@ -45,32 +43,32 @@ type Controller struct {
 	tunnelMutex               sync.Mutex
 	tunnels                   []*Tunnel
 	nextTunnel                int
-	operateWaitGroup          *sync.WaitGroup
 	isEstablishing            bool
 	establishWaitGroup        *sync.WaitGroup
 	stopEstablishingBroadcast chan struct{}
 	candidateServerEntries    chan *ServerEntry
-	pendingConns              *Conns
+	establishPendingConns     *Conns
+	fetchRemotePendingConns   *Conns
 }
 
 // NewController initializes a new controller.
 func NewController(config *Config) (controller *Controller) {
 	return &Controller{
 		config: config,
-		// failureSignal receives a signal from a component (including socks and
+		// componentFailureSignal receives a signal from a component (including socks and
 		// http local proxies) if they unexpectedly fail. Senders should not block.
 		// A buffer allows at least one stop signal to be sent before there is a receiver.
-		failureSignal:     make(chan struct{}, 1),
-		shutdownBroadcast: make(chan struct{}),
-		runWaitGroup:      new(sync.WaitGroup),
+		componentFailureSignal: make(chan struct{}, 1),
+		shutdownBroadcast:      make(chan struct{}),
+		runWaitGroup:           new(sync.WaitGroup),
 		// establishedTunnels and failedTunnels buffer sizes are large enough to
 		// receive full pools of tunnels without blocking. Senders should not block.
-		establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
-		failedTunnels:      make(chan *Tunnel, config.TunnelPoolSize),
-		tunnels:            make([]*Tunnel, 0),
-		operateWaitGroup:   new(sync.WaitGroup),
-		isEstablishing:     false,
-		pendingConns:       new(Conns),
+		establishedTunnels:      make(chan *Tunnel, config.TunnelPoolSize),
+		failedTunnels:           make(chan *Tunnel, config.TunnelPoolSize),
+		tunnels:                 make([]*Tunnel, 0),
+		isEstablishing:          false,
+		establishPendingConns:   new(Conns),
+		fetchRemotePendingConns: new(Conns),
 	}
 }
 
@@ -91,9 +89,10 @@ func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 		return
 	}
 	defer socksProxy.Close()
+
 	httpProxy, err := NewHttpProxy(controller.config, controller)
 	if err != nil {
-		Notice(NOTICE_ALERT, "error initializing local SOCKS proxy: %s", err)
+		Notice(NOTICE_ALERT, "error initializing local HTTP proxy: %s", err)
 		return
 	}
 	defer httpProxy.Close()
@@ -105,24 +104,23 @@ func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 	select {
 	case <-shutdownBroadcast:
 		Notice(NOTICE_INFO, "controller shutdown by request")
-	case <-controller.failureSignal:
-		Notice(NOTICE_ALERT, "controller shutdown due to failure")
+	case <-controller.componentFailureSignal:
+		Notice(NOTICE_ALERT, "controller shutdown due to component failure")
 	}
 
-	// Note: in addition to establish(), this pendingConns will interrupt
-	// FetchRemoteServerList
-	controller.pendingConns.CloseAll()
 	close(controller.shutdownBroadcast)
+	controller.establishPendingConns.CloseAll()
+	controller.fetchRemotePendingConns.CloseAll()
 	controller.runWaitGroup.Wait()
 
 	Notice(NOTICE_INFO, "exiting controller")
 }
 
-// SignalFailure notifies the controller that an associated component has failed.
+// SignalComponentFailure notifies the controller that an associated component has failed.
 // This will terminate the controller.
-func (controller *Controller) SignalFailure() {
+func (controller *Controller) SignalComponentFailure() {
 	select {
-	case controller.failureSignal <- *new(struct{}):
+	case controller.componentFailureSignal <- *new(struct{}):
 	default:
 	}
 }
@@ -133,13 +131,13 @@ func (controller *Controller) SignalFailure() {
 func (controller *Controller) remoteServerListFetcher() {
 	defer controller.runWaitGroup.Done()
 
-	// Note: unlike existing Psiphon clients, this code
+	// Note: unlike legacy Psiphon clients, this code
 	// always makes the fetch remote server list request
 loop:
 	for {
-		// TODO: FetchRemoteServerList should have its own pendingConns,
-		// otherwise it may needlessly abort when establish is stopped.
-		err := FetchRemoteServerList(controller.config, controller.pendingConns)
+		err := FetchRemoteServerList(
+			controller.config, controller.fetchRemotePendingConns)
+
 		var duration time.Duration
 		if err != nil {
 			Notice(NOTICE_ALERT, "failed to fetch remote server list: %s", err)
@@ -162,9 +160,8 @@ loop:
 // runTunnels is the controller tunnel management main loop. It starts and stops
 // establishing tunnels based on the target tunnel pool size and the current size
 // of the pool. Tunnels are established asynchronously using worker goroutines.
-// When a tunnel is established, it's added to the active pool and a corresponding
-// operateTunnel goroutine is launched which starts a session in the tunnel and
-// monitors the tunnel for failures.
+// When a tunnel is established, it's added to the active pool. The tunnel's
+// operateTunnel goroutine monitors the tunnel.
 // When a tunnel fails, it's removed from the pool and the establish process is
 // restarted to fill the pool.
 func (controller *Controller) runTunnels() {
@@ -205,8 +202,6 @@ loop:
 			Notice(NOTICE_INFO, "established tunnel: %s", establishedTunnel.serverEntry.IpAddress)
 			if controller.registerTunnel(establishedTunnel) {
 				Notice(NOTICE_INFO, "active tunnel: %s", establishedTunnel.serverEntry.IpAddress)
-				controller.operateWaitGroup.Add(1)
-				go controller.operateTunnel(establishedTunnel)
 			} else {
 				controller.discardTunnel(establishedTunnel)
 			}
@@ -220,7 +215,6 @@ loop:
 	}
 	controller.stopEstablishing()
 	controller.terminateAllTunnels()
-	controller.operateWaitGroup.Wait()
 
 	// Drain tunnel channels
 	close(controller.establishedTunnels)
@@ -235,6 +229,22 @@ loop:
 	Notice(NOTICE_INFO, "exiting run tunnels")
 }
 
+// SignalTunnelFailure implements the TunnelOwner interface. This function
+// is called by Tunnel.operateTunnel when the tunnel has detected that it
+// has failed. The Controller will signal runTunnels to create a new
+// tunnel and/or remove the tunnel from the list of active tunnels.
+func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) {
+	// Don't block. Assumes the receiver has a buffer large enough for
+	// the typical number of operated tunnels. In case there's no room,
+	// terminate the tunnel (runTunnels won't get a signal in this case,
+	// but the tunnel will be removed from the list of active tunnels).
+	select {
+	case controller.failedTunnels <- tunnel:
+	default:
+		controller.terminateTunnel(tunnel)
+	}
+}
+
 // discardTunnel disposes of a successful connection that is no longer required.
 func (controller *Controller) discardTunnel(tunnel *Tunnel) {
 	Notice(NOTICE_INFO, "discard tunnel: %s", tunnel.serverEntry.IpAddress)
@@ -319,12 +329,7 @@ func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
 		tunnel = controller.tunnels[controller.nextTunnel]
 		controller.nextTunnel =
 			(controller.nextTunnel + 1) % len(controller.tunnels)
-		// A tunnel must[*] have started its session (performed the server
-		// API handshake sequence) before it may be used for tunneling traffic
-		// [*]currently not enforced by the server, but may be in the future.
-		if tunnel.IsSessionStarted() {
-			return tunnel
-		}
+		return tunnel
 	}
 	return nil
 }
@@ -342,142 +347,6 @@ func (controller *Controller) isActiveTunnelServerEntry(serverEntry *ServerEntry
 	return false
 }
 
-// operateTunnel starts a Psiphon session (handshake, etc.) on a newly
-// connected tunnel, and then monitors the tunnel for failures:
-//
-// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
-// channel on keep-alive failure and other transport I/O errors. In case
-// of such a failure, the tunnel is marked as failed.
-//
-// 2. Tunnel port forward failures: the tunnel connection may stay up but
-// the client may still fail to establish port forwards due to server load
-// and other conditions. After a threshold number of such failures, the
-// overall tunnel is marked as failed.
-//
-// TODO: currently, any connect (dial), read, or write error associated with
-// a port forward is counted as a failure. It may be important to differentiate
-// between failures due to Psiphon server conditions and failures due to the
-// origin/target server (in the latter case, the tunnel is healthy). Here are
-// some typical error messages to consider matching against (or ignoring):
-//
-// - "ssh: rejected: administratively prohibited (open failed)"
-// - "ssh: rejected: connect failed (Connection timed out)"
-// - "write tcp ... broken pipe"
-// - "read tcp ... connection reset by peer"
-// - "ssh: unexpected packet in response to channel open: <nil>"
-//
-func (controller *Controller) operateTunnel(tunnel *Tunnel) {
-	defer controller.operateWaitGroup.Done()
-
-	tunnelClosedSignal := make(chan struct{}, 1)
-	err := tunnel.conn.SetClosedSignal(tunnelClosedSignal)
-	if err != nil {
-		err = fmt.Errorf("failed to set closed signal: %s", err)
-	}
-
-	Notice(NOTICE_INFO, "starting session for %s", tunnel.serverEntry.IpAddress)
-	// TODO: NewSession server API calls may block shutdown
-	session, err := NewSession(controller.config, tunnel)
-	if err != nil {
-		err = fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err)
-	}
-
-	// Tunnel may now be used for port forwarding
-	tunnel.SetSessionStarted()
-
-	// Promote this successful tunnel to first rank so it's one
-	// of the first candidates next time establish runs.
-	PromoteServerEntry(tunnel.serverEntry.IpAddress)
-
-	statsTimer := time.NewTimer(NextSendPeriod())
-
-	for err == nil {
-		select {
-		case failures := <-tunnel.portForwardFailures:
-			tunnel.portForwardFailureTotal += failures
-			Notice(
-				NOTICE_INFO, "port forward failures for %s: %d",
-				tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
-			if tunnel.portForwardFailureTotal > controller.config.PortForwardFailureThreshold {
-				err = errors.New("tunnel exceeded port forward failure threshold")
-			}
-
-		case <-tunnelClosedSignal:
-			// TODO: this signal can be received during a commanded shutdown due to
-			// how tunnels are closed; should rework this to avoid log noise.
-			err = errors.New("tunnel closed unexpectedly")
-
-		case <-controller.shutdownBroadcast:
-			// Send final stats
-			sendStats(tunnel, session, true)
-			Notice(NOTICE_INFO, "shutdown operate tunnel")
-			return
-
-		case <-statsTimer.C:
-			sendStats(tunnel, session, false)
-			statsTimer.Reset(NextSendPeriod())
-		}
-	}
-
-	if err != nil {
-		Notice(NOTICE_ALERT, "operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
-		// Don't block. Assumes the receiver has a buffer large enough for
-		// the typical number of operated tunnels. In case there's no room,
-		// terminate the tunnel (runTunnels won't get a signal in this case).
-		select {
-		case controller.failedTunnels <- tunnel:
-		default:
-			controller.terminateTunnel(tunnel)
-		}
-	}
-}
-
-// sendStats is a helper for sending session stats to the server.
-func sendStats(tunnel *Tunnel, session *Session, final bool) {
-	payload := GetForServer(tunnel.serverEntry.IpAddress)
-	if payload != nil {
-		err := session.DoStatusRequest(payload, final)
-		if err != nil {
-			Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
-			PutBack(tunnel.serverEntry.IpAddress, payload)
-		}
-	}
-}
-
-// TunneledConn implements net.Conn and wraps a port foward connection.
-// It is used to hook into Read and Write to observe I/O errors and
-// report these errors back to the tunnel monitor as port forward failures.
-type TunneledConn struct {
-	net.Conn
-	tunnel *Tunnel
-}
-
-func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
-	n, err = conn.Conn.Read(buffer)
-	if err != nil && err != io.EOF {
-		// Report 1 new failure. Won't block; assumes the receiver
-		// has a sufficient buffer for the threshold number of reports.
-		// TODO: conditional on type of error or error message?
-		select {
-		case conn.tunnel.portForwardFailures <- 1:
-		default:
-		}
-	}
-	return
-}
-
-func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
-	n, err = conn.Conn.Write(buffer)
-	if err != nil && err != io.EOF {
-		// Same as TunneledConn.Read()
-		select {
-		case conn.tunnel.portForwardFailures <- 1:
-		default:
-		}
-	}
-	return
-}
-
 // Dial selects an active tunnel and establishes a port forward
 // connection through the selected tunnel. Failure to connect is considered
 // a port foward failure, for the purpose of monitoring tunnel health.
@@ -486,23 +355,16 @@ func (controller *Controller) Dial(remoteAddr string) (conn net.Conn, err error)
 	if tunnel == nil {
 		return nil, ContextError(errors.New("no active tunnels"))
 	}
-	tunnelConn, err := tunnel.Dial(remoteAddr)
+
+	tunneledConn, err := tunnel.Dial(remoteAddr)
 	if err != nil {
-		// TODO: conditional on type of error or error message?
-		select {
-		case tunnel.portForwardFailures <- 1:
-		default:
-		}
 		return nil, ContextError(err)
 	}
 
-	statsConn := NewStatsConn(tunnelConn, tunnel.ServerID(), tunnel.StatsRegexps())
-
-	conn = &TunneledConn{
-		Conn:   statsConn,
-		tunnel: tunnel}
+	statsConn := NewStatsConn(
+		tunneledConn, tunnel.session.StatsServerID(), tunnel.session.StatsRegexps())
 
-	return
+	return statsConn, nil
 }
 
 // startEstablishing creates a pool of worker goroutines which will
@@ -517,6 +379,7 @@ func (controller *Controller) startEstablishing() {
 	controller.establishWaitGroup = new(sync.WaitGroup)
 	controller.stopEstablishingBroadcast = make(chan struct{})
 	controller.candidateServerEntries = make(chan *ServerEntry)
+	controller.establishPendingConns.Reset()
 
 	for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
 		controller.establishWaitGroup.Add(1)
@@ -535,10 +398,10 @@ func (controller *Controller) stopEstablishing() {
 		return
 	}
 	Notice(NOTICE_INFO, "stop establishing")
+	close(controller.stopEstablishingBroadcast)
 	// Note: on Windows, interruptibleTCPClose doesn't really interrupt socket connects
 	// and may leave goroutines running for a time after the Wait call.
-	controller.pendingConns.CloseAll()
-	close(controller.stopEstablishingBroadcast)
+	controller.establishPendingConns.CloseAll()
 	// Note: establishCandidateGenerator closes controller.candidateServerEntries
 	// (as it may be sending to that channel).
 	controller.establishWaitGroup.Wait()
@@ -559,7 +422,7 @@ func (controller *Controller) establishCandidateGenerator() {
 		controller.config.EgressRegion, controller.config.TunnelProtocol)
 	if err != nil {
 		Notice(NOTICE_ALERT, "failed to iterate over candidates: %s", err)
-		controller.SignalFailure()
+		controller.SignalComponentFailure()
 		return
 	}
 	defer iterator.Close()
@@ -570,7 +433,7 @@ loop:
 			serverEntry, err := iterator.Next()
 			if err != nil {
 				Notice(NOTICE_ALERT, "failed to get next candidate: %s", err)
-				controller.SignalFailure()
+				controller.SignalComponentFailure()
 				break loop
 			}
 			if serverEntry == nil {
@@ -608,33 +471,52 @@ loop:
 // a connection to the tunnel server, and delivers the established tunnel to a channel.
 func (controller *Controller) establishTunnelWorker() {
 	defer controller.establishWaitGroup.Done()
+loop:
 	for serverEntry := range controller.candidateServerEntries {
-		// Note: don't receive from candidateQueue and broadcastStopWorkers in the same
-		// select, since we want to prioritize receiving the stop signal
-		select {
-		case <-controller.stopEstablishingBroadcast:
-			return
-		default:
+		// Note: don't receive from candidateServerEntries and stopEstablishingBroadcast
+		// in the same select, since we want to prioritize receiving the stop signal
+		if controller.isStopEstablishingBroadcast() {
+			break loop
 		}
+
 		// There may already be a tunnel to this candidate. If so, skip it.
 		if controller.isActiveTunnelServerEntry(serverEntry) {
 			continue
 		}
+
 		tunnel, err := EstablishTunnel(
-			controller.config, controller.pendingConns, serverEntry)
+			controller.config,
+			controller.establishPendingConns,
+			serverEntry,
+			controller) // TunnelOwner
 		if err != nil {
-			// TODO: distingush case where conn is interrupted?
-			Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
-		} else {
-			// Don't block. Assumes the receiver has a buffer large enough for
-			// the number of desired tunnels. If there's no room, the tunnel must
-			// not be required so it's discarded.
-			select {
-			case controller.establishedTunnels <- tunnel:
-			default:
-				controller.discardTunnel(tunnel)
+			// Before emitting error, check if establish interrupted, in which
+			// case the error is noise.
+			if controller.isStopEstablishingBroadcast() {
+				break loop
 			}
+			Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
+			continue
+		}
+
+		// Deliver established tunnel.
+		// Don't block. Assumes the receiver has a buffer large enough for
+		// the number of desired tunnels. If there's no room, the tunnel must
+		// not be required so it's discarded.
+		select {
+		case controller.establishedTunnels <- tunnel:
+		default:
+			controller.discardTunnel(tunnel)
 		}
 	}
 	Notice(NOTICE_INFO, "stopped establish worker")
 }
+
+func (controller *Controller) isStopEstablishingBroadcast() bool {
+	select {
+	case <-controller.stopEstablishingBroadcast:
+		return true
+	default:
+	}
+	return false
+}

+ 46 - 5
psiphon/dataStore.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -24,6 +24,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"math/rand"
 	"path/filepath"
 	"strings"
 	"sync"
@@ -171,7 +172,7 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
 			return ContextError(err)
 		}
 		if serverEntryExists && !replaceIfExists {
-			// Nothing more to do
+			Notice(NOTICE_INFO, "ignored update for server %s", serverEntry.IpAddress)
 			return nil
 		}
 		_, err = transaction.Exec(`
@@ -215,12 +216,32 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
 		}
 		// TODO: post notice after commit
 		if !serverEntryExists {
-			Notice(NOTICE_INFO, "stored server %s", serverEntry.IpAddress)
+			Notice(NOTICE_INFO, "updated server %s", serverEntry.IpAddress)
 		}
 		return nil
 	})
 }
 
+// StoreServerEntries shuffles and stores a list of server entries.
+// Shuffling is performed on imported server entrues as part of client-side
+// load balancing.
+// There is an independent transaction for each entry insert/update.
+func StoreServerEntries(serverEntries []*ServerEntry, replaceIfExists bool) error {
+
+	for index := len(serverEntries) - 1; index > 0; index-- {
+		swapIndex := rand.Intn(index + 1)
+		serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
+	}
+
+	for _, serverEntry := range serverEntries {
+		err := StoreServerEntry(serverEntry, replaceIfExists)
+		if err != nil {
+			return ContextError(err)
+		}
+	}
+	return nil
+}
+
 // PromoteServerEntry assigns the top rank (one more than current
 // max rank) to the specified server entry. Server candidates are
 // iterated in decending rank order, so this server entry will be
@@ -273,10 +294,30 @@ func (iterator *ServerEntryIterator) Reset() error {
 		return ContextError(err)
 	}
 	var cursor *sql.Rows
+
+	// This query implements the Psiphon server candidate selection
+	// algorithm: the first set of server candidates are in rank (priority)
+	// order, to favor previously successful servers; then the remaining
+	// long tail is shuffled to raise up less recent candidates.
+
 	whereClause, whereParams := makeServerEntryWhereClause(
 		iterator.region, iterator.protocol, nil)
-	query := "select data from serverEntry" + whereClause + " order by rank desc;"
-	cursor, err = transaction.Query(query, whereParams...)
+	headLength := CONNECTION_WORKER_POOL_SIZE
+	queryFormat := `
+		select data from serverEntry %s
+		order by case
+		when rank > coalesce((select rank from serverEntry %s order by rank desc limit ?, 1), -1) then rank
+		else abs(random())%%((select rank from serverEntry %s order by rank desc limit ?, 1))
+		end desc;`
+	query := fmt.Sprintf(queryFormat, whereClause, whereClause, whereClause)
+	params := make([]interface{}, 0)
+	params = append(params, whereParams...)
+	params = append(params, whereParams...)
+	params = append(params, headLength)
+	params = append(params, whereParams...)
+	params = append(params, headLength)
+
+	cursor, err = transaction.Query(query, params...)
 	if err != nil {
 		transaction.Rollback()
 		return ContextError(err)

+ 2 - 2
psiphon/httpProxy.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -236,7 +236,7 @@ func (proxy *HttpProxy) serve() {
 	case <-proxy.stopListeningBroadcast:
 	default:
 		if err != nil {
-			proxy.tunneler.SignalFailure()
+			proxy.tunneler.SignalComponentFailure()
 			Notice(NOTICE_ALERT, "%s", ContextError(err))
 		}
 	}

+ 16 - 10
psiphon/meekConn.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -99,10 +99,12 @@ func DialMeek(
 	// pendingConns here, but that was a lifecycle mismatch: we don't want to abort HTTP transport
 	// connections while MeekConn is still in use
 	pendingConns := new(Conns)
+
 	// Use a copy of DialConfig with the meek pendingConns
-	configCopy := new(DialConfig)
-	*configCopy = *config
-	configCopy.PendingConns = pendingConns
+	meekConfig := new(DialConfig)
+	*meekConfig = *config
+	meekConfig.PendingConns = pendingConns
+
 	var host string
 	var dialer Dialer
 	if useFronting {
@@ -113,15 +115,15 @@ func DialMeek(
 		//  - disables SNI -- SNI breaks fronting when used with CDNs that support SNI on the server side.
 		dialer = NewCustomTLSDialer(
 			&CustomTLSConfig{
-				Dial:           NewTCPDialer(configCopy),
-				Timeout:        configCopy.ConnectTimeout,
+				Dial:           NewTCPDialer(meekConfig),
+				Timeout:        meekConfig.ConnectTimeout,
 				FrontingAddr:   fmt.Sprintf("%s:%d", serverEntry.MeekFrontingDomain, 443),
 				SendServerName: false,
 			})
 	} else {
 		// In this case, host is both what is dialed and what ends up in the HTTP Host header
 		host = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
-		dialer = NewTCPDialer(configCopy)
+		dialer = NewTCPDialer(meekConfig)
 	}
 
 	// Scheme is always "http". Otherwise http.Transport will try to do another TLS
@@ -177,18 +179,22 @@ func DialMeek(
 	meek.emptySendBuffer <- new(bytes.Buffer)
 	meek.relayWaitGroup.Add(1)
 	go meek.relay()
+
+	// Enable interruption
+	config.PendingConns.Add(meek)
+
 	return meek, nil
 }
 
 // SetClosedSignal implements psiphon.Conn.SetClosedSignal
-func (meek *MeekConn) SetClosedSignal(closedSignal chan struct{}) (err error) {
+func (meek *MeekConn) SetClosedSignal(closedSignal chan struct{}) bool {
 	meek.mutex.Lock()
 	defer meek.mutex.Unlock()
 	if meek.isClosed {
-		return ContextError(errors.New("connection is already closed"))
+		return false
 	}
 	meek.closedSignal = closedSignal
-	return nil
+	return true
 }
 
 // Close terminates the meek connection. Close waits for the relay processing goroutine

+ 1 - 1
psiphon/notice.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify

+ 10 - 11
psiphon/remoteServerList.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -29,7 +29,6 @@ import (
 	"errors"
 	"io/ioutil"
 	"net/http"
-	"strings"
 )
 
 // RemoteServerList is a JSON record containing a list of Psiphon server
@@ -84,16 +83,16 @@ func FetchRemoteServerList(config *Config, pendingConns *Conns) (err error) {
 		return ContextError(err)
 	}
 
-	for _, encodedServerEntry := range strings.Split(remoteServerList.Data, "\n") {
-		serverEntry, err := DecodeServerEntry(encodedServerEntry)
-		if err != nil {
-			return ContextError(err)
-		}
-		err = StoreServerEntry(serverEntry, true)
-		if err != nil {
-			return ContextError(err)
-		}
+	serverEntries, err := DecodeServerEntryList(remoteServerList.Data)
+	if err != nil {
+		return ContextError(err)
 	}
+
+	err = StoreServerEntries(serverEntries, true)
+	if err != nil {
+		return ContextError(err)
+	}
+
 	return nil
 }
 

+ 92 - 51
psiphon/serverApi.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -21,6 +21,7 @@ package psiphon
 
 import (
 	"bytes"
+	"encoding/hex"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -36,25 +37,40 @@ import (
 // includes the session ID (used for Psiphon API requests) and a http
 // client configured to make tunneled Psiphon API requests.
 type Session struct {
-	config             *Config
-	tunnel             *Tunnel
+	sessionId          string
+	baseRequestUrl     string
 	psiphonHttpsClient *http.Client
+	statsRegexps       *Regexps
+	statsServerId      string
+}
+
+// MakeSessionId creates a new session ID. Making the session ID is not done
+// in NewSession as the transport needs to send the ID in the SSH credentials
+// before the tunnel is established; and NewSession performs a handshake on
+// an established tunnel.
+func MakeSessionId() (sessionId string, err error) {
+	randomId, err := MakeSecureRandomBytes(PSIPHON_API_CLIENT_SESSION_ID_LENGTH)
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return hex.EncodeToString(randomId), nil
 }
 
 // NewSession makes tunnelled handshake and connected requests to the
 // Psiphon server and returns a Session struct, initialized with the
 // session ID, for use with subsequent Psiphon server API requests (e.g.,
 // periodic status requests).
-func NewSession(config *Config, tunnel *Tunnel) (session *Session, err error) {
+func NewSession(config *Config, tunnel *Tunnel, sessionId string) (session *Session, err error) {
 
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel)
 	if err != nil {
 		return nil, ContextError(err)
 	}
 	session = &Session{
-		config:             config,
-		tunnel:             tunnel,
+		sessionId:          sessionId,
+		baseRequestUrl:     makeBaseRequestUrl(config, tunnel, sessionId),
 		psiphonHttpsClient: psiphonHttpsClient,
+		statsServerId:      tunnel.serverEntry.IpAddress,
 	}
 	// Sending two seperate requests is a legacy from when the handshake was
 	// performed before a tunnel was established and the connect was performed
@@ -72,6 +88,17 @@ func NewSession(config *Config, tunnel *Tunnel) (session *Session, err error) {
 	return session, nil
 }
 
+// ServerID provides a unique identifier for the server the session connects to.
+// This ID is consistent between multiple sessions/tunnels connected to that server.
+func (session *Session) StatsServerID() string {
+	return session.statsServerId
+}
+
+// StatsRegexps gets the Regexps used for the statistics for this tunnel.
+func (session *Session) StatsRegexps() *Regexps {
+	return session.statsRegexps
+}
+
 // DoStatusRequest makes a /status request to the server, sending session stats.
 // final should be true if this is the last such request before disconnecting.
 func (session *Session) DoStatusRequest(statsPayload json.Marshaler, final bool) error {
@@ -87,7 +114,7 @@ func (session *Session) DoStatusRequest(statsPayload json.Marshaler, final bool)
 
 	url := session.buildRequestUrl(
 		"status",
-		&ExtraParam{"session_id", session.tunnel.sessionId},
+		&ExtraParam{"session_id", session.sessionId},
 		&ExtraParam{"connected", connected})
 
 	err = session.doPostRequest(url, "application/json", bytes.NewReader(statsPayloadJSON))
@@ -129,6 +156,7 @@ func (session *Session) doHandshakeRequest() error {
 	if len(configLine) == 0 {
 		return ContextError(errors.New("no config line found"))
 	}
+
 	// Note:
 	// - 'preemptive_reconnect_lifetime_milliseconds' is currently unused
 	// - 'ssh_session_id' is ignored; client session ID is used instead
@@ -143,6 +171,8 @@ func (session *Session) doHandshakeRequest() error {
 	if err != nil {
 		return ContextError(err)
 	}
+
+	// Store discovered server entries
 	for _, encodedServerEntry := range handshakeConfig.EncodedServerList {
 		serverEntry, err := DecodeServerEntry(encodedServerEntry)
 		if err != nil {
@@ -153,6 +183,7 @@ func (session *Session) doHandshakeRequest() error {
 			return ContextError(err)
 		}
 	}
+
 	// TODO: formally communicate the sponsor and upgrade info to an
 	// outer client via some control interface.
 	for _, homepage := range handshakeConfig.Homepages {
@@ -161,9 +192,10 @@ func (session *Session) doHandshakeRequest() error {
 	if handshakeConfig.UpgradeClientVersion != "" {
 		Notice(NOTICE_UPGRADE, "%s", handshakeConfig.UpgradeClientVersion)
 	}
-	session.tunnel.SetStatsRegexps(MakeRegexps(
+
+	session.statsRegexps = MakeRegexps(
 		handshakeConfig.PageViewRegexes,
-		handshakeConfig.HttpsRequestRegexes))
+		handshakeConfig.HttpsRequestRegexes)
 	return nil
 }
 
@@ -184,7 +216,7 @@ func (session *Session) doConnectedRequest() error {
 	}
 	url := session.buildRequestUrl(
 		"connected",
-		&ExtraParam{"session_id", session.tunnel.sessionId},
+		&ExtraParam{"session_id", session.sessionId},
 		&ExtraParam{"last_connected", lastConnected})
 	responseBody, err := session.doGetRequest(url)
 	if err != nil {
@@ -204,47 +236,6 @@ func (session *Session) doConnectedRequest() error {
 	return nil
 }
 
-type ExtraParam struct{ name, value string }
-
-// buildRequestUrl makes a URL containing all the common parameters
-// that are included with Psiphon API requests. These common parameters
-// are used for statistics.
-func (session *Session) buildRequestUrl(path string, extraParams ...*ExtraParam) string {
-	var requestUrl bytes.Buffer
-	// Note: don't prefix with HTTPS scheme, see comment in doGetRequest.
-	// e.g., don't do this: requestUrl.WriteString("https://")
-	requestUrl.WriteString("http://")
-	requestUrl.WriteString(session.tunnel.serverEntry.IpAddress)
-	requestUrl.WriteString(":")
-	requestUrl.WriteString(session.tunnel.serverEntry.WebServerPort)
-	requestUrl.WriteString("/")
-	requestUrl.WriteString(path)
-	requestUrl.WriteString("?client_session_id=")
-	requestUrl.WriteString(session.tunnel.sessionId)
-	requestUrl.WriteString("&server_secret=")
-	requestUrl.WriteString(session.tunnel.serverEntry.WebServerSecret)
-	requestUrl.WriteString("&propagation_channel_id=")
-	requestUrl.WriteString(session.config.PropagationChannelId)
-	requestUrl.WriteString("&sponsor_id=")
-	requestUrl.WriteString(session.config.SponsorId)
-	requestUrl.WriteString("&client_version=")
-	requestUrl.WriteString(session.config.ClientVersion)
-	// TODO: client_tunnel_core_version
-	requestUrl.WriteString("&relay_protocol=")
-	requestUrl.WriteString(session.tunnel.protocol)
-	requestUrl.WriteString("&client_platform=")
-	requestUrl.WriteString(session.config.ClientPlatform)
-	requestUrl.WriteString("&tunnel_whole_device=")
-	requestUrl.WriteString(strconv.Itoa(session.config.TunnelWholeDevice))
-	for _, extraParam := range extraParams {
-		requestUrl.WriteString("&")
-		requestUrl.WriteString(extraParam.name)
-		requestUrl.WriteString("=")
-		requestUrl.WriteString(extraParam.value)
-	}
-	return requestUrl.String()
-}
-
 // doGetRequest makes a tunneled HTTPS request and returns the response body.
 func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, err error) {
 	response, err := session.psiphonHttpsClient.Get(requestUrl)
@@ -277,6 +268,56 @@ func (session *Session) doPostRequest(requestUrl string, bodyType string, body i
 	return
 }
 
+// makeBaseRequestUrl makes a URL containing all the common parameters
+// that are included with Psiphon API requests. These common parameters
+// are used for statistics.
+func makeBaseRequestUrl(config *Config, tunnel *Tunnel, sessionId string) string {
+	var requestUrl bytes.Buffer
+	// Note: don't prefix with HTTPS scheme, see comment in doGetRequest.
+	// e.g., don't do this: requestUrl.WriteString("https://")
+	requestUrl.WriteString("http://")
+	requestUrl.WriteString(tunnel.serverEntry.IpAddress)
+	requestUrl.WriteString(":")
+	requestUrl.WriteString(tunnel.serverEntry.WebServerPort)
+	requestUrl.WriteString("/")
+	// Placeholder for the path component of a request
+	requestUrl.WriteString("%s")
+	requestUrl.WriteString("?client_session_id=")
+	requestUrl.WriteString(sessionId)
+	requestUrl.WriteString("&server_secret=")
+	requestUrl.WriteString(tunnel.serverEntry.WebServerSecret)
+	requestUrl.WriteString("&propagation_channel_id=")
+	requestUrl.WriteString(config.PropagationChannelId)
+	requestUrl.WriteString("&sponsor_id=")
+	requestUrl.WriteString(config.SponsorId)
+	requestUrl.WriteString("&client_version=")
+	requestUrl.WriteString(config.ClientVersion)
+	// TODO: client_tunnel_core_version
+	requestUrl.WriteString("&relay_protocol=")
+	requestUrl.WriteString(tunnel.protocol)
+	requestUrl.WriteString("&client_platform=")
+	requestUrl.WriteString(config.ClientPlatform)
+	requestUrl.WriteString("&tunnel_whole_device=")
+	requestUrl.WriteString(strconv.Itoa(config.TunnelWholeDevice))
+	return requestUrl.String()
+}
+
+type ExtraParam struct{ name, value string }
+
+// buildRequestUrl makes a URL for an API request. The URL includes the
+// base request URL and any extra parameters for the specific request.
+func (session *Session) buildRequestUrl(path string, extraParams ...*ExtraParam) string {
+	var requestUrl bytes.Buffer
+	requestUrl.WriteString(fmt.Sprintf(session.baseRequestUrl, path))
+	for _, extraParam := range extraParams {
+		requestUrl.WriteString("&")
+		requestUrl.WriteString(extraParam.name)
+		requestUrl.WriteString("=")
+		requestUrl.WriteString(extraParam.value)
+	}
+	return requestUrl.String()
+}
+
 // makeHttpsClient creates a Psiphon HTTPS client that tunnels requests and which validates
 // the web server using the Psiphon server entry web server certificate.
 // This is not a general purpose HTTPS client.

+ 16 - 1
psiphon/serverEntry.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -24,6 +24,7 @@ import (
 	"encoding/hex"
 	"encoding/json"
 	"errors"
+	"strings"
 )
 
 // ServerEntry represents a Psiphon server. It contains information
@@ -69,3 +70,17 @@ func DecodeServerEntry(encodedServerEntry string) (serverEntry *ServerEntry, err
 	}
 	return serverEntry, nil
 }
+
+// DecodeServerEntryList extracts server entries from the list encoding
+// used by remote server lists and Psiphon server handshake requests.
+func DecodeServerEntryList(encodedServerEntryList string) (serverEntries []*ServerEntry, err error) {
+	serverEntries = make([]*ServerEntry, 0)
+	for _, encodedServerEntry := range strings.Split(encodedServerEntryList, "\n") {
+		serverEntry, err := DecodeServerEntry(encodedServerEntry)
+		if err != nil {
+			return nil, ContextError(err)
+		}
+		serverEntries = append(serverEntries, serverEntry)
+	}
+	return serverEntries, nil
+}

+ 9 - 8
psiphon/socksProxy.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -21,9 +21,10 @@ package psiphon
 
 import (
 	"fmt"
-	socks "github.com/Psiphon-Inc/goptlib"
 	"net"
 	"sync"
+
+	socks "github.com/Psiphon-Inc/goptlib"
 )
 
 // SocksProxy is a SOCKS server that accepts local host connections
@@ -103,13 +104,13 @@ loop:
 		}
 		if err != nil {
 			Notice(NOTICE_ALERT, "SOCKS proxy accept error: %s", err)
-			if e, ok := err.(net.Error); ok && !e.Temporary() {
-				proxy.tunneler.SignalFailure()
-				// Fatal error, stop the proxy
-				break loop
+			if e, ok := err.(net.Error); ok && e.Temporary() {
+				// Temporary error, keep running
+				continue
 			}
-			// Temporary error, keep running
-			continue
+			// Fatal error, stop the proxy
+			proxy.tunneler.SignalComponentFailure()
+			break loop
 		}
 		go func() {
 			err := proxy.socksConnectionHandler(socksConnection)

+ 25 - 13
psiphon/stats_conn.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -30,16 +30,20 @@ Assumption: Enough of the first HTTP will be present in the first Write() call
 		- If this turns out to not be generally true we will need to add buffering.
 */
 
-import "net"
+import (
+	"net"
+	"sync/atomic"
+)
 
 // StatsConn is to be used as an intermediate link in a chain of net.Conn objects.
 // It inspects requests and responses and derives stats from them.
 type StatsConn struct {
 	net.Conn
-	serverID   string
-	hostname   string
-	firstWrite bool
-	regexps    *Regexps
+	serverID       string
+	firstWrite     int32
+	hostnameParsed int32
+	hostname       string
+	regexps        *Regexps
 }
 
 // NewStatsConn creates a StatsConn. serverID can be anything that uniquely
@@ -47,10 +51,11 @@ type StatsConn struct {
 // the accumulated stats.
 func NewStatsConn(nextConn net.Conn, serverID string, regexps *Regexps) *StatsConn {
 	return &StatsConn{
-		Conn:       nextConn,
-		serverID:   serverID,
-		firstWrite: true,
-		regexps:    regexps,
+		Conn:           nextConn,
+		serverID:       serverID,
+		firstWrite:     1,
+		hostnameParsed: 0,
+		regexps:        regexps,
 	}
 }
 
@@ -65,14 +70,14 @@ func (conn *StatsConn) Write(buffer []byte) (n int, err error) {
 	if n > 0 {
 		// If this is the first request, try to determine the hostname to associate
 		// with this connection.
-		if conn.firstWrite {
-			conn.firstWrite = false
+		if atomic.CompareAndSwapInt32(&conn.firstWrite, 1, 0) {
 
 			hostname, ok := getHostname(buffer)
 			if ok {
 				// Get the hostname value that will be stored in stats by
 				// regexing the real hostname.
 				conn.hostname = regexHostname(hostname, conn.regexps)
+				atomic.StoreInt32(&conn.hostnameParsed, 1)
 			}
 		}
 
@@ -90,11 +95,18 @@ func (conn *StatsConn) Write(buffer []byte) (n int, err error) {
 func (conn *StatsConn) Read(buffer []byte) (n int, err error) {
 	n, err = conn.Conn.Read(buffer)
 
+	var hostname string
+	if 1 == atomic.LoadInt32(&conn.hostnameParsed) {
+		hostname = conn.hostname
+	} else {
+		hostname = ""
+	}
+
 	// Count bytes without checking the error condition. It could happen that the
 	// buffer was partially read and then an error occurred.
 	recordStat(&statsUpdate{
 		conn.serverID,
-		conn.hostname,
+		hostname,
 		0,
 		int64(n)})
 

+ 284 - 110
psiphon/tunnel.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -25,9 +25,10 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
 	"net"
 	"strings"
-	"sync/atomic"
+	"sync"
 	"time"
 
 	"golang.org/x/crypto/ssh"
@@ -39,7 +40,14 @@ import (
 // implements Tunneler.
 type Tunneler interface {
 	Dial(remoteAddr string) (conn net.Conn, err error)
-	SignalFailure()
+	SignalComponentFailure()
+}
+
+// TunnerOwner specifies the interface required by Tunnel to notify its
+// owner when it has failed. The owner may, as in the case of the Controller,
+// remove the tunnel from its list of active tunnels.
+type TunnelOwner interface {
+	SignalTunnelFailure(tunnel *Tunnel)
 }
 
 const (
@@ -61,16 +69,18 @@ var SupportedTunnelProtocols = []string{
 // tunnel includes a network connection to the specified server
 // and an SSH session built on top of that transport.
 type Tunnel struct {
-	serverEntry             *ServerEntry
-	sessionId               string
-	sessionStarted          int32
-	protocol                string
-	conn                    Conn
-	sshClient               *ssh.Client
-	sshKeepAliveQuit        chan struct{}
-	portForwardFailures     chan int
-	portForwardFailureTotal int
-	regexps                 *Regexps
+	mutex                    *sync.Mutex
+	isClosed                 bool
+	serverEntry              *ServerEntry
+	session                  *Session
+	protocol                 string
+	conn                     Conn
+	closedSignal             chan struct{}
+	sshClient                *ssh.Client
+	operateWaitGroup         *sync.WaitGroup
+	shutdownOperateBroadcast chan struct{}
+	portForwardFailures      chan int
+	portForwardFailureTotal  int
 }
 
 // EstablishTunnel first makes a network transport connection to the
@@ -84,16 +94,163 @@ type Tunnel struct {
 // the first protocol in SupportedTunnelProtocols that's also in the
 // server capabilities is used.
 func EstablishTunnel(
-	config *Config, pendingConns *Conns, serverEntry *ServerEntry) (tunnel *Tunnel, err error) {
+	config *Config,
+	pendingConns *Conns,
+	serverEntry *ServerEntry,
+	tunnelOwner TunnelOwner) (tunnel *Tunnel, err error) {
+
+	selectedProtocol, err := selectProtocol(config, serverEntry)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	Notice(NOTICE_INFO, "connecting to %s in region %s using %s",
+		serverEntry.IpAddress, serverEntry.Region, selectedProtocol)
 
-	// Select the protocol
-	var selectedProtocol string
+	// Generate a session Id for the Psiphon server API. This is generated now so
+	// that it can be sent with the SSH password payload, which helps the server
+	// associate client geo location, used in server API stats, with the session ID.
+	sessionId, err := MakeSessionId()
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	// Build transport layers and establish SSH connection
+	conn, closedSignal, sshClient, err := dialSsh(
+		config, pendingConns, serverEntry, selectedProtocol, sessionId)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	// Cleanup on error
+	defer func() {
+		if err != nil {
+			conn.Close()
+		}
+	}()
+
+	// The tunnel is now connected
+	tunnel = &Tunnel{
+		mutex:                    new(sync.Mutex),
+		isClosed:                 false,
+		serverEntry:              serverEntry,
+		protocol:                 selectedProtocol,
+		conn:                     conn,
+		closedSignal:             closedSignal,
+		sshClient:                sshClient,
+		operateWaitGroup:         new(sync.WaitGroup),
+		shutdownOperateBroadcast: make(chan struct{}),
+		// portForwardFailures buffer size is large enough to receive the thresold number
+		// of failure reports without blocking. Senders can drop failures without blocking.
+		portForwardFailures: make(chan int, config.PortForwardFailureThreshold)}
+
+	// Create a new Psiphon API session for this tunnel
+	Notice(NOTICE_INFO, "starting session for %s", tunnel.serverEntry.IpAddress)
+	tunnel.session, err = NewSession(config, tunnel, sessionId)
+	if err != nil {
+		return nil, ContextError(fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err))
+	}
+
+	// Now that network operations are complete, cancel interruptibility
+	pendingConns.Remove(conn)
+
+	// Promote this successful tunnel to first rank so it's one
+	// of the first candidates next time establish runs.
+	PromoteServerEntry(tunnel.serverEntry.IpAddress)
+
+	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
+	tunnel.operateWaitGroup.Add(1)
+	go tunnel.operateTunnel(config, tunnelOwner)
+
+	return tunnel, nil
+}
+
+// Close stops operating the tunnel and closes the underlying connection.
+// Supports multiple and/or concurrent calls to Close().
+func (tunnel *Tunnel) Close() {
+	tunnel.mutex.Lock()
+	if !tunnel.isClosed {
+		close(tunnel.shutdownOperateBroadcast)
+		tunnel.operateWaitGroup.Wait()
+		tunnel.conn.Close()
+	}
+	tunnel.isClosed = true
+	tunnel.mutex.Unlock()
+}
+
+// Dial establishes a port forward connection through the tunnel
+func (tunnel *Tunnel) Dial(remoteAddr string) (conn net.Conn, err error) {
+	tunnel.mutex.Lock()
+	isClosed := tunnel.isClosed
+	tunnel.mutex.Unlock()
+	if isClosed {
+		return nil, errors.New("tunnel is closed")
+	}
+
+	sshPortForwardConn, err := tunnel.sshClient.Dial("tcp", remoteAddr)
+	if err != nil {
+		// TODO: conditional on type of error or error message?
+		select {
+		case tunnel.portForwardFailures <- 1:
+		default:
+		}
+		return nil, ContextError(err)
+	}
+
+	return &TunneledConn{
+			Conn:   sshPortForwardConn,
+			tunnel: tunnel},
+		nil
+}
+
+// TunneledConn implements net.Conn and wraps a port foward connection.
+// It is used to hook into Read and Write to observe I/O errors and
+// report these errors back to the tunnel monitor as port forward failures.
+type TunneledConn struct {
+	net.Conn
+	tunnel *Tunnel
+}
+
+func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
+	n, err = conn.Conn.Read(buffer)
+	if err != nil && err != io.EOF {
+		// Report 1 new failure. Won't block; assumes the receiver
+		// has a sufficient buffer for the threshold number of reports.
+		// TODO: conditional on type of error or error message?
+		select {
+		case conn.tunnel.portForwardFailures <- 1:
+		default:
+		}
+	}
+	return
+}
+
+func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
+	n, err = conn.Conn.Write(buffer)
+	if err != nil && err != io.EOF {
+		// Same as TunneledConn.Read()
+		select {
+		case conn.tunnel.portForwardFailures <- 1:
+		default:
+		}
+	}
+	return
+}
+
+// SignalComponentFailure notifies the tunnel that an associated component has failed.
+// This will terminate the tunnel.
+func (tunnel *Tunnel) SignalComponentFailure() {
+	Notice(NOTICE_ALERT, "tunnel received component failure signal")
+	tunnel.Close()
+}
+
+// selectProtocol is a helper that picks the tunnel protocol
+func selectProtocol(config *Config, serverEntry *ServerEntry) (selectedProtocol string, err error) {
 	// TODO: properly handle protocols (e.g. FRONTED-MEEK-OSSH) vs. capabilities (e.g., {FRONTED-MEEK, OSSH})
 	// for now, the code is simply assuming that MEEK capabilities imply OSSH capability.
 	if config.TunnelProtocol != "" {
 		requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
 		if !Contains(serverEntry.Capabilities, requiredCapability) {
-			return nil, ContextError(fmt.Errorf("server does not have required capability"))
+			return "", ContextError(fmt.Errorf("server does not have required capability"))
 		}
 		selectedProtocol = config.TunnelProtocol
 	} else {
@@ -106,11 +263,19 @@ func EstablishTunnel(
 			}
 		}
 		if selectedProtocol == "" {
-			return nil, ContextError(fmt.Errorf("server does not have any supported capabilities"))
+			return "", ContextError(fmt.Errorf("server does not have any supported capabilities"))
 		}
 	}
-	Notice(NOTICE_INFO, "connecting to %s in region %s using %s",
-		serverEntry.IpAddress, serverEntry.Region, selectedProtocol)
+	return selectedProtocol, nil
+}
+
+// dialSsh is a helper that builds the transport layers and establishes the SSH connection
+func dialSsh(
+	config *Config,
+	pendingConns *Conns,
+	serverEntry *ServerEntry,
+	selectedProtocol,
+	sessionId string) (conn Conn, closedSignal chan struct{}, sshClient *ssh.Client, err error) {
 
 	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
 	// So depending on which protocol is used, multiple layers are initialized.
@@ -134,14 +299,6 @@ func EstablishTunnel(
 		port = serverEntry.SshPort
 	}
 
-	// Generate a session Id for the Psiphon server API. This is generated now so
-	// that it can be sent with the SSH password payload, which helps the server
-	// associate client geo location, used in server API stats, with the session ID.
-	sessionId, err := MakeSessionId()
-	if err != nil {
-		return nil, ContextError(err)
-	}
-
 	// Create the base transport: meek or direct connection
 	dialConfig := &DialConfig{
 		ConnectTimeout:        TUNNEL_CONNECT_TIMEOUT,
@@ -151,42 +308,54 @@ func EstablishTunnel(
 		BindToDeviceProvider:  config.BindToDeviceProvider,
 		BindToDeviceDnsServer: config.BindToDeviceDnsServer,
 	}
-	var conn Conn
 	if useMeek {
 		conn, err = DialMeek(serverEntry, sessionId, useFronting, dialConfig)
 		if err != nil {
-			return nil, ContextError(err)
+			return nil, nil, nil, ContextError(err)
 		}
-		// TODO: MeekConn doesn't go into pendingConns since there's no direct connection to
-		// interrupt; underlying HTTP connections may be candidates for interruption, but only
-		// after relay starts polling...
 	} else {
 		conn, err = DialTCP(fmt.Sprintf("%s:%d", serverEntry.IpAddress, port), dialConfig)
 		if err != nil {
-			return nil, ContextError(err)
+			return nil, nil, nil, ContextError(err)
 		}
 	}
+
+	cleanupConn := conn
 	defer func() {
 		// Cleanup on error
 		if err != nil {
-			conn.Close()
+			cleanupConn.Close()
 		}
 	}()
 
+	// Create signal which is triggered when the underlying network connection is closed,
+	// this is used in operateTunnel to detect an unexpected disconnect. SetClosedSignal
+	// is called here, well before operateTunnel, so that we don't need to handle the
+	// "already closed" with a tunnelOwner.SignalTunnelFailure() in operateTunnel (this
+	// was previously the order of events, which caused the establish process to sometimes
+	// run briefly when not needed).
+	closedSignal = make(chan struct{}, 1)
+	if !conn.SetClosedSignal(closedSignal) {
+		// Conn is already closed. This is not unexpected -- for example,
+		// when establish is interrupted.
+		// TODO: make this not log an error when called from establishTunnelWorker?
+		return nil, nil, nil, ContextError(errors.New("conn already closed"))
+	}
+
 	// Add obfuscated SSH layer
 	var sshConn net.Conn
 	sshConn = conn
 	if useObfuscatedSsh {
 		sshConn, err = NewObfuscatedSshConn(conn, serverEntry.SshObfuscatedKey)
 		if err != nil {
-			return nil, ContextError(err)
+			return nil, nil, nil, ContextError(err)
 		}
 	}
 
 	// Now establish the SSH session over the sshConn transport
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	if err != nil {
-		return nil, ContextError(err)
+		return nil, nil, nil, ContextError(err)
 	}
 	sshCertChecker := &ssh.CertChecker{
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
@@ -202,7 +371,7 @@ func EstablishTunnel(
 			SshPassword string `json:"SshPassword"`
 		}{sessionId, serverEntry.SshPassword})
 	if err != nil {
-		return nil, ContextError(err)
+		return nil, nil, nil, ContextError(err)
 	}
 	sshClientConfig := &ssh.ClientConfig{
 		User: serverEntry.SshUsername,
@@ -216,88 +385,93 @@ func EstablishTunnel(
 	sshAddress := ""
 	sshClientConn, sshChans, sshReqs, err := ssh.NewClientConn(sshConn, sshAddress, sshClientConfig)
 	if err != nil {
-		return nil, ContextError(err)
+		return nil, nil, nil, ContextError(err)
 	}
-	sshClient := ssh.NewClient(sshClientConn, sshChans, sshReqs)
-
-	// Run a goroutine to periodically execute SSH keepalive
-	sshKeepAliveQuit := make(chan struct{})
-	go func() {
-		sshKeepAliveTicker := time.NewTicker(TUNNEL_SSH_KEEP_ALIVE_PERIOD)
-		for {
-			select {
-			case <-sshKeepAliveTicker.C:
-				_, _, err := sshClient.SendRequest("keepalive@openssh.com", true, nil)
-				if err != nil {
-					Notice(NOTICE_ALERT, "ssh keep alive failed: %s", err)
-					// TODO: call Tunnel.Close()?
-					sshKeepAliveTicker.Stop()
-					conn.Close()
-				}
-			case <-sshKeepAliveQuit:
-				sshKeepAliveTicker.Stop()
-				return
-			}
-		}
-	}()
+	sshClient = ssh.NewClient(sshClientConn, sshChans, sshReqs)
 
-	return &Tunnel{
-			serverEntry:      serverEntry,
-			sessionId:        sessionId,
-			protocol:         selectedProtocol,
-			conn:             conn,
-			sshClient:        sshClient,
-			sshKeepAliveQuit: sshKeepAliveQuit,
-			// portForwardFailures buffer size is large enough to receive the thresold number
-			// of failure reports without blocking. Senders can drop failures without blocking.
-			portForwardFailures: make(chan int, config.PortForwardFailureThreshold)},
-		nil
+	return conn, closedSignal, sshClient, nil
 }
 
-// Close terminates the tunnel.
-func (tunnel *Tunnel) Close() {
-	if tunnel.sshKeepAliveQuit != nil {
-		close(tunnel.sshKeepAliveQuit)
-		tunnel.sshKeepAliveQuit = nil
-	}
-	if tunnel.conn != nil {
-		tunnel.conn.Close()
-	}
-}
+// operateTunnel periodically sends stats updates to the Psiphon API and
+// monitors the tunnel for failures:
+//
+// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
+// channel on keep-alive failure and other transport I/O errors. In case
+// of such a failure, the tunnel is marked as failed.
+//
+// 2. Tunnel port forward failures: the tunnel connection may stay up but
+// the client may still fail to establish port forwards due to server load
+// and other conditions. After a threshold number of such failures, the
+// overall tunnel is marked as failed.
+//
+// TODO: currently, any connect (dial), read, or write error associated with
+// a port forward is counted as a failure. It may be important to differentiate
+// between failures due to Psiphon server conditions and failures due to the
+// origin/target server (in the latter case, the tunnel is healthy). Here are
+// some typical error messages to consider matching against (or ignoring):
+//
+// - "ssh: rejected: administratively prohibited (open failed)"
+// - "ssh: rejected: connect failed (Connection timed out)"
+// - "write tcp ... broken pipe"
+// - "read tcp ... connection reset by peer"
+// - "ssh: unexpected packet in response to channel open: <nil>"
+//
+func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
+	defer tunnel.operateWaitGroup.Done()
 
-func (tunnel *Tunnel) IsSessionStarted() bool {
-	return atomic.LoadInt32(&tunnel.sessionStarted) == 1
-}
+	// Note: not using a Ticker since NextSendPeriod() is not a fixed time period
+	statsTimer := time.NewTimer(NextSendPeriod())
+	defer statsTimer.Stop()
 
-func (tunnel *Tunnel) SetSessionStarted() {
-	atomic.StoreInt32(&tunnel.sessionStarted, 1)
-}
+	sshKeepAliveTicker := time.NewTicker(TUNNEL_SSH_KEEP_ALIVE_PERIOD)
+	defer sshKeepAliveTicker.Stop()
 
-// Dial establishes a port forward connection through the tunnel
-func (tunnel *Tunnel) Dial(remoteAddr string) (conn net.Conn, err error) {
-	// TODO: should this track port forward failures as in Controller.DialWithTunnel?
-	return tunnel.sshClient.Dial("tcp", remoteAddr)
-}
+	var err error
+	for err == nil {
+		select {
+		case <-statsTimer.C:
+			sendStats(tunnel, false)
+			statsTimer.Reset(NextSendPeriod())
 
-// SignalFailure notifies the tunnel that an associated component has failed.
-// This will terminate the tunnel.
-func (tunnel *Tunnel) SignalFailure() {
-	Notice(NOTICE_ALERT, "tunnel received failure signal")
-	tunnel.Close()
-}
+		case <-sshKeepAliveTicker.C:
+			_, _, err := tunnel.sshClient.SendRequest("keepalive@openssh.com", true, nil)
+			err = fmt.Errorf("ssh keep alive failed: %s", err)
 
-// ServerID provides a unique identifier for the server the tunnel connects to.
-// This ID is consistent between multiple tunnels connected to that server.
-func (tunnel *Tunnel) ServerID() string {
-	return tunnel.serverEntry.IpAddress
-}
+		case failures := <-tunnel.portForwardFailures:
+			// Note: no mutex on portForwardFailureTotal; only referenced here
+			tunnel.portForwardFailureTotal += failures
+			Notice(
+				NOTICE_INFO, "port forward failures for %s: %d",
+				tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
+			if tunnel.portForwardFailureTotal > config.PortForwardFailureThreshold {
+				err = errors.New("tunnel exceeded port forward failure threshold")
+			}
+
+		case <-tunnel.closedSignal:
+			err = errors.New("tunnel closed unexpectedly")
+
+		case <-tunnel.shutdownOperateBroadcast:
+			// Send final stats
+			sendStats(tunnel, true)
+			Notice(NOTICE_INFO, "shutdown operate tunnel")
+			return
+		}
+	}
 
-// StatsRegexps gets the Regexps used for the statistics for this tunnel.
-func (tunnel *Tunnel) StatsRegexps() *Regexps {
-	return tunnel.regexps
+	if err != nil {
+		Notice(NOTICE_ALERT, "operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
+		tunnelOwner.SignalTunnelFailure(tunnel)
+	}
 }
 
-// SetStatsRegexps sets the Regexps used for the statistics for this tunnel.
-func (tunnel *Tunnel) SetStatsRegexps(regexps *Regexps) {
-	tunnel.regexps = regexps
+// sendStats is a helper for sending session stats to the server.
+func sendStats(tunnel *Tunnel, final bool) {
+	payload := GetForServer(tunnel.serverEntry.IpAddress)
+	if payload != nil {
+		err := tunnel.session.DoStatusRequest(payload, final)
+		if err != nil {
+			Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
+			PutBack(tunnel.serverEntry.IpAddress, payload)
+		}
+	}
 }

+ 1 - 10
psiphon/utils.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -23,7 +23,6 @@ import (
 	"crypto/rand"
 	"crypto/x509"
 	"encoding/base64"
-	"encoding/hex"
 	"errors"
 	"fmt"
 	"math/big"
@@ -85,14 +84,6 @@ func ContextError(err error) error {
 	return fmt.Errorf("%s#%d: %s", funcName, line, err)
 }
 
-func MakeSessionId() (id string, err error) {
-	randomId, err := MakeSecureRandomBytes(PSIPHON_API_CLIENT_SESSION_ID_LENGTH)
-	if err != nil {
-		return "", ContextError(err)
-	}
-	return hex.EncodeToString(randomId), nil
-}
-
 func DecodeCertificate(encodedCertificate string) (certificate *x509.Certificate, err error) {
 	derEncodedCertificate, err := base64.StdEncoding.DecodeString(encodedCertificate)
 	if err != nil {