Răsfoiți Sursa

Refactor and fix tunnel/controller/serverApi/pendingConns

* EstablishTunnel now completes NewSession (handshake/connected)
  before returning the tunnel. This simplifies tunnel states in
  controller, as active tunnels already have completed handshake. As a
  side effect, we now get the desired property that handshake Notice
  lines emit before TUNNELS N emits, which the Windows client expects.

* Split EstablishTunnel into subroutines for readability.

* Removed operateTunnel logic from controller and merge into tunnel
  goroutine (which was previously only doing SSH keepalive).
  Tunnel.operateTunnel now handles all tunnel monitoring and periodic
  work (stats) for a single tunnel instance. Tunnel.operateTunnel
  termination is synchronized with Tunnel.Close() -- as a result,
  final stats actually sends.

* Controller channel logic (failedTunnel channel) now better located
  in controller.go (via TunnelOwner interface).

* Session now holds session ID and regexes; serverApi somewhat less
  intermingled with tunnel code.

* Suppress noise errors from EstablishTunnel after interruption.

* Meek/TCPConn now leave their output conns in the pendingConn set,
  allowing the caller (EstablishTunnel) to determine when
  interruptibility is no longer required. This allows EstablishTunnel
  to ensure that handshake/connect Psiphon API requests are
  interrupted.

* MeekConn is now properly interruptible. This became critical now
  that EstablishTunnel completes a handshake unless interrupted.
Rod Hynes 11 ani în urmă
părinte
comite
2cea38f366

+ 6 - 3
psiphon/TCPConn_unix.go

@@ -1,7 +1,7 @@
 // +build android darwin dragonfly freebsd linux nacl netbsd openbsd solaris
 
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -34,7 +34,10 @@ type interruptibleTCPSocket struct {
 	socketFd int
 }
 
-// interruptibleTCPDial creates a socket connection.
+// interruptibleTCPDial establishes a TCP network connection. A conn is added
+// to config.PendingConns before blocking on network IO, which enables interruption.
+// The caller is responsible for removing an established conn from PendingConns.
+//
 // To implement socket device binding and interruptible connecting, the lower-level
 // syscall APIs are used. The sequence of syscalls in this implementation are
 // taken from: https://code.google.com/p/go/issues/detail?id=6966
@@ -67,6 +70,7 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 	}
 
 	// Get the remote IP and port, resolving a domain name if necessary
+	// TODO: domain name resolution isn't interruptible
 	host, strPort, err := net.SplitHostPort(dialAddr)
 	if err != nil {
 		return nil, ContextError(err)
@@ -92,7 +96,6 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 		readTimeout:   config.ReadTimeout,
 		writeTimeout:  config.WriteTimeout}
 	config.PendingConns.Add(conn)
-	defer config.PendingConns.Remove(conn)
 
 	// Connect the socket
 	// TODO: adjust the timeout to account for time spent resolving hostname

+ 5 - 2
psiphon/TCPConn_windows.go

@@ -1,7 +1,7 @@
 // +build windows
 
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -42,11 +42,15 @@ type interruptibleDialResult struct {
 	err     error
 }
 
+// interruptibleTCPDial establishes a TCP network connection. A conn is added
+// to config.PendingConns before blocking on network IO, which enables interruption.
+// The caller is responsible for removing an established conn from PendingConns.
 func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err error) {
 	if config.BindToDeviceProvider != nil {
 		return nil, ContextError(errors.New("psiphon.interruptibleTCPDial with bind not supported on Windows"))
 	}
 
+	// Enable interruption
 	conn = &TCPConn{
 		interruptible: interruptibleTCPSocket{results: make(chan *interruptibleDialResult, 2)},
 		readTimeout:   config.ReadTimeout,
@@ -78,7 +82,6 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 
 	// Block until Dial completes (or times out) or until interrupt
 	result := <-conn.interruptible.results
-	config.PendingConns.Remove(conn)
 	if result.err != nil {
 		return nil, ContextError(result.err)
 	}

+ 1 - 1
psiphon/config.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify

+ 59 - 142
psiphon/controller.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -25,7 +25,6 @@ package psiphon
 
 import (
 	"errors"
-	"fmt"
 	"io"
 	"net"
 	"sync"
@@ -37,7 +36,7 @@ import (
 // route traffic through the tunnels.
 type Controller struct {
 	config                    *Config
-	failureSignal             chan struct{}
+	componentFailureSignal    chan struct{}
 	shutdownBroadcast         chan struct{}
 	runWaitGroup              *sync.WaitGroup
 	establishedTunnels        chan *Tunnel
@@ -45,7 +44,6 @@ type Controller struct {
 	tunnelMutex               sync.Mutex
 	tunnels                   []*Tunnel
 	nextTunnel                int
-	operateWaitGroup          *sync.WaitGroup
 	isEstablishing            bool
 	establishWaitGroup        *sync.WaitGroup
 	stopEstablishingBroadcast chan struct{}
@@ -57,18 +55,17 @@ type Controller struct {
 func NewController(config *Config) (controller *Controller) {
 	return &Controller{
 		config: config,
-		// failureSignal receives a signal from a component (including socks and
+		// componentFailureSignal receives a signal from a component (including socks and
 		// http local proxies) if they unexpectedly fail. Senders should not block.
 		// A buffer allows at least one stop signal to be sent before there is a receiver.
-		failureSignal:     make(chan struct{}, 1),
-		shutdownBroadcast: make(chan struct{}),
-		runWaitGroup:      new(sync.WaitGroup),
+		componentFailureSignal: make(chan struct{}, 1),
+		shutdownBroadcast:      make(chan struct{}),
+		runWaitGroup:           new(sync.WaitGroup),
 		// establishedTunnels and failedTunnels buffer sizes are large enough to
 		// receive full pools of tunnels without blocking. Senders should not block.
 		establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
 		failedTunnels:      make(chan *Tunnel, config.TunnelPoolSize),
 		tunnels:            make([]*Tunnel, 0),
-		operateWaitGroup:   new(sync.WaitGroup),
 		isEstablishing:     false,
 		pendingConns:       new(Conns),
 	}
@@ -105,8 +102,8 @@ func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 	select {
 	case <-shutdownBroadcast:
 		Notice(NOTICE_INFO, "controller shutdown by request")
-	case <-controller.failureSignal:
-		Notice(NOTICE_ALERT, "controller shutdown due to failure")
+	case <-controller.componentFailureSignal:
+		Notice(NOTICE_ALERT, "controller shutdown due to component failure")
 	}
 
 	// Note: in addition to establish(), this pendingConns will interrupt
@@ -118,11 +115,11 @@ func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 	Notice(NOTICE_INFO, "exiting controller")
 }
 
-// SignalFailure notifies the controller that an associated component has failed.
+// SignalComponentFailure notifies the controller that an associated component has failed.
 // This will terminate the controller.
-func (controller *Controller) SignalFailure() {
+func (controller *Controller) SignalComponentFailure() {
 	select {
-	case controller.failureSignal <- *new(struct{}):
+	case controller.componentFailureSignal <- *new(struct{}):
 	default:
 	}
 }
@@ -162,9 +159,8 @@ loop:
 // runTunnels is the controller tunnel management main loop. It starts and stops
 // establishing tunnels based on the target tunnel pool size and the current size
 // of the pool. Tunnels are established asynchronously using worker goroutines.
-// When a tunnel is established, it's added to the active pool and a corresponding
-// operateTunnel goroutine is launched which starts a session in the tunnel and
-// monitors the tunnel for failures.
+// When a tunnel is established, it's added to the active pool. The tunnel's
+// operateTunnel goroutine monitors the tunnel.
 // When a tunnel fails, it's removed from the pool and the establish process is
 // restarted to fill the pool.
 func (controller *Controller) runTunnels() {
@@ -205,8 +201,6 @@ loop:
 			Notice(NOTICE_INFO, "established tunnel: %s", establishedTunnel.serverEntry.IpAddress)
 			if controller.registerTunnel(establishedTunnel) {
 				Notice(NOTICE_INFO, "active tunnel: %s", establishedTunnel.serverEntry.IpAddress)
-				controller.operateWaitGroup.Add(1)
-				go controller.operateTunnel(establishedTunnel)
 			} else {
 				controller.discardTunnel(establishedTunnel)
 			}
@@ -220,7 +214,6 @@ loop:
 	}
 	controller.stopEstablishing()
 	controller.terminateAllTunnels()
-	controller.operateWaitGroup.Wait()
 
 	// Drain tunnel channels
 	close(controller.establishedTunnels)
@@ -235,6 +228,22 @@ loop:
 	Notice(NOTICE_INFO, "exiting run tunnels")
 }
 
+// HandleFailedTunnel implements the TunnelOwner interface. This function
+// is called by Tunnel.operateTunnel when the tunnel has detected that it
+// has failed. The Controller will signal runTunnels to create a new
+// tunnel and/or remove the tunnel from the list of active tunnels.
+func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) {
+	// Don't block. Assumes the receiver has a buffer large enough for
+	// the typical number of operated tunnels. In case there's no room,
+	// terminate the tunnel (runTunnels won't get a signal in this case,
+	// but the tunnel will be removed from the list of active tunnels).
+	select {
+	case controller.failedTunnels <- tunnel:
+	default:
+		controller.terminateTunnel(tunnel)
+	}
+}
+
 // discardTunnel disposes of a successful connection that is no longer required.
 func (controller *Controller) discardTunnel(tunnel *Tunnel) {
 	Notice(NOTICE_INFO, "discard tunnel: %s", tunnel.serverEntry.IpAddress)
@@ -319,12 +328,7 @@ func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
 		tunnel = controller.tunnels[controller.nextTunnel]
 		controller.nextTunnel =
 			(controller.nextTunnel + 1) % len(controller.tunnels)
-		// A tunnel must[*] have started its session (performed the server
-		// API handshake sequence) before it may be used for tunneling traffic
-		// [*]currently not enforced by the server, but may be in the future.
-		if tunnel.IsSessionStarted() {
-			return tunnel
-		}
+		return tunnel
 	}
 	return nil
 }
@@ -342,108 +346,6 @@ func (controller *Controller) isActiveTunnelServerEntry(serverEntry *ServerEntry
 	return false
 }
 
-// operateTunnel starts a Psiphon session (handshake, etc.) on a newly
-// connected tunnel, and then monitors the tunnel for failures:
-//
-// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
-// channel on keep-alive failure and other transport I/O errors. In case
-// of such a failure, the tunnel is marked as failed.
-//
-// 2. Tunnel port forward failures: the tunnel connection may stay up but
-// the client may still fail to establish port forwards due to server load
-// and other conditions. After a threshold number of such failures, the
-// overall tunnel is marked as failed.
-//
-// TODO: currently, any connect (dial), read, or write error associated with
-// a port forward is counted as a failure. It may be important to differentiate
-// between failures due to Psiphon server conditions and failures due to the
-// origin/target server (in the latter case, the tunnel is healthy). Here are
-// some typical error messages to consider matching against (or ignoring):
-//
-// - "ssh: rejected: administratively prohibited (open failed)"
-// - "ssh: rejected: connect failed (Connection timed out)"
-// - "write tcp ... broken pipe"
-// - "read tcp ... connection reset by peer"
-// - "ssh: unexpected packet in response to channel open: <nil>"
-//
-func (controller *Controller) operateTunnel(tunnel *Tunnel) {
-	defer controller.operateWaitGroup.Done()
-
-	tunnelClosedSignal := make(chan struct{}, 1)
-	err := tunnel.conn.SetClosedSignal(tunnelClosedSignal)
-	if err != nil {
-		err = fmt.Errorf("failed to set closed signal: %s", err)
-	}
-
-	Notice(NOTICE_INFO, "starting session for %s", tunnel.serverEntry.IpAddress)
-	// TODO: NewSession server API calls may block shutdown
-	session, err := NewSession(controller.config, tunnel)
-	if err != nil {
-		err = fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err)
-	}
-
-	// Tunnel may now be used for port forwarding
-	tunnel.SetSessionStarted()
-
-	// Promote this successful tunnel to first rank so it's one
-	// of the first candidates next time establish runs.
-	PromoteServerEntry(tunnel.serverEntry.IpAddress)
-
-	statsTimer := time.NewTimer(NextSendPeriod())
-
-	for err == nil {
-		select {
-		case failures := <-tunnel.portForwardFailures:
-			tunnel.portForwardFailureTotal += failures
-			Notice(
-				NOTICE_INFO, "port forward failures for %s: %d",
-				tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
-			if tunnel.portForwardFailureTotal > controller.config.PortForwardFailureThreshold {
-				err = errors.New("tunnel exceeded port forward failure threshold")
-			}
-
-		case <-tunnelClosedSignal:
-			// TODO: this signal can be received during a commanded shutdown due to
-			// how tunnels are closed; should rework this to avoid log noise.
-			err = errors.New("tunnel closed unexpectedly")
-
-		case <-controller.shutdownBroadcast:
-			// Send final stats
-			sendStats(tunnel, session, true)
-			Notice(NOTICE_INFO, "shutdown operate tunnel")
-			return
-
-		case <-statsTimer.C:
-			sendStats(tunnel, session, false)
-			statsTimer.Reset(NextSendPeriod())
-		}
-	}
-
-	if err != nil {
-		Notice(NOTICE_ALERT, "operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
-		// Don't block. Assumes the receiver has a buffer large enough for
-		// the typical number of operated tunnels. In case there's no room,
-		// terminate the tunnel (runTunnels won't get a signal in this case).
-		select {
-		case controller.failedTunnels <- tunnel:
-		default:
-			controller.terminateTunnel(tunnel)
-		}
-	}
-}
-
-// sendStats is a helper for sending session stats to the server.
-func sendStats(tunnel *Tunnel, session *Session, final bool) {
-	payload := GetForServer(tunnel.serverEntry.IpAddress)
-	if payload != nil {
-		err := session.DoStatusRequest(payload, final)
-		if err != nil {
-			Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
-			PutBack(tunnel.serverEntry.IpAddress, payload)
-		}
-	}
-}
-
 // TunneledConn implements net.Conn and wraps a port foward connection.
 // It is used to hook into Read and Write to observe I/O errors and
 // report these errors back to the tunnel monitor as port forward failures.
@@ -496,7 +398,8 @@ func (controller *Controller) Dial(remoteAddr string) (conn net.Conn, err error)
 		return nil, ContextError(err)
 	}
 
-	statsConn := NewStatsConn(tunnelConn, tunnel.ServerID(), tunnel.StatsRegexps())
+	statsConn := NewStatsConn(
+		tunnelConn, tunnel.session.StatsServerID(), tunnel.session.StatsRegexps())
 
 	conn = &TunneledConn{
 		Conn:   statsConn,
@@ -535,10 +438,10 @@ func (controller *Controller) stopEstablishing() {
 		return
 	}
 	Notice(NOTICE_INFO, "stop establishing")
+	close(controller.stopEstablishingBroadcast)
 	// Note: on Windows, interruptibleTCPClose doesn't really interrupt socket connects
 	// and may leave goroutines running for a time after the Wait call.
 	controller.pendingConns.CloseAll()
-	close(controller.stopEstablishingBroadcast)
 	// Note: establishCandidateGenerator closes controller.candidateServerEntries
 	// (as it may be sending to that channel).
 	controller.establishWaitGroup.Wait()
@@ -559,7 +462,7 @@ func (controller *Controller) establishCandidateGenerator() {
 		controller.config.EgressRegion, controller.config.TunnelProtocol)
 	if err != nil {
 		Notice(NOTICE_ALERT, "failed to iterate over candidates: %s", err)
-		controller.SignalFailure()
+		controller.SignalComponentFailure()
 		return
 	}
 	defer iterator.Close()
@@ -570,7 +473,7 @@ loop:
 			serverEntry, err := iterator.Next()
 			if err != nil {
 				Notice(NOTICE_ALERT, "failed to get next candidate: %s", err)
-				controller.SignalFailure()
+				controller.SignalComponentFailure()
 				break loop
 			}
 			if serverEntry == nil {
@@ -608,32 +511,46 @@ loop:
 // a connection to the tunnel server, and delivers the established tunnel to a channel.
 func (controller *Controller) establishTunnelWorker() {
 	defer controller.establishWaitGroup.Done()
+loop:
 	for serverEntry := range controller.candidateServerEntries {
 		// Note: don't receive from candidateQueue and broadcastStopWorkers in the same
 		// select, since we want to prioritize receiving the stop signal
 		select {
 		case <-controller.stopEstablishingBroadcast:
-			return
+			break loop
 		default:
 		}
+
 		// There may already be a tunnel to this candidate. If so, skip it.
 		if controller.isActiveTunnelServerEntry(serverEntry) {
 			continue
 		}
+
 		tunnel, err := EstablishTunnel(
-			controller.config, controller.pendingConns, serverEntry)
+			controller.config,
+			controller.pendingConns,
+			serverEntry,
+			controller) // TunnelOwner
 		if err != nil {
-			// TODO: distingush case where conn is interrupted?
-			Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
-		} else {
-			// Don't block. Assumes the receiver has a buffer large enough for
-			// the number of desired tunnels. If there's no room, the tunnel must
-			// not be required so it's discarded.
+			// Before emitting error, check if establish interrupted, in which
+			// case the error is noise.
 			select {
-			case controller.establishedTunnels <- tunnel:
+			case <-controller.stopEstablishingBroadcast:
+				break loop
 			default:
-				controller.discardTunnel(tunnel)
 			}
+			Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
+			continue
+		}
+
+		// Deliver established tunnel.
+		// Don't block. Assumes the receiver has a buffer large enough for
+		// the number of desired tunnels. If there's no room, the tunnel must
+		// not be required so it's discarded.
+		select {
+		case controller.establishedTunnels <- tunnel:
+		default:
+			controller.discardTunnel(tunnel)
 		}
 	}
 	Notice(NOTICE_INFO, "stopped establish worker")

+ 2 - 2
psiphon/httpProxy.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -236,7 +236,7 @@ func (proxy *HttpProxy) serve() {
 	case <-proxy.stopListeningBroadcast:
 	default:
 		if err != nil {
-			proxy.tunneler.SignalFailure()
+			proxy.tunneler.SignalComponentFailure()
 			Notice(NOTICE_ALERT, "%s", ContextError(err))
 		}
 	}

+ 13 - 7
psiphon/meekConn.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -99,10 +99,12 @@ func DialMeek(
 	// pendingConns here, but that was a lifecycle mismatch: we don't want to abort HTTP transport
 	// connections while MeekConn is still in use
 	pendingConns := new(Conns)
+
 	// Use a copy of DialConfig with the meek pendingConns
-	configCopy := new(DialConfig)
-	*configCopy = *config
-	configCopy.PendingConns = pendingConns
+	meekConfig := new(DialConfig)
+	*meekConfig = *config
+	meekConfig.PendingConns = pendingConns
+
 	var host string
 	var dialer Dialer
 	if useFronting {
@@ -113,15 +115,15 @@ func DialMeek(
 		//  - disables SNI -- SNI breaks fronting when used with CDNs that support SNI on the server side.
 		dialer = NewCustomTLSDialer(
 			&CustomTLSConfig{
-				Dial:           NewTCPDialer(configCopy),
-				Timeout:        configCopy.ConnectTimeout,
+				Dial:           NewTCPDialer(meekConfig),
+				Timeout:        meekConfig.ConnectTimeout,
 				FrontingAddr:   fmt.Sprintf("%s:%d", serverEntry.MeekFrontingDomain, 443),
 				SendServerName: false,
 			})
 	} else {
 		// In this case, host is both what is dialed and what ends up in the HTTP Host header
 		host = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
-		dialer = NewTCPDialer(configCopy)
+		dialer = NewTCPDialer(meekConfig)
 	}
 
 	// Scheme is always "http". Otherwise http.Transport will try to do another TLS
@@ -177,6 +179,10 @@ func DialMeek(
 	meek.emptySendBuffer <- new(bytes.Buffer)
 	meek.relayWaitGroup.Add(1)
 	go meek.relay()
+
+	// Enable interruption
+	config.PendingConns.Add(meek)
+
 	return meek, nil
 }
 

+ 87 - 51
psiphon/serverApi.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -21,6 +21,7 @@ package psiphon
 
 import (
 	"bytes"
+	"encoding/hex"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -36,25 +37,40 @@ import (
 // includes the session ID (used for Psiphon API requests) and a http
 // client configured to make tunneled Psiphon API requests.
 type Session struct {
-	config             *Config
-	tunnel             *Tunnel
+	sessionId          string
+	baseRequestUrl     string
 	psiphonHttpsClient *http.Client
+	statsRegexps       *Regexps
+	statsServerId      string
+}
+
+// MakeSessionId creates a new session ID. Making the session ID is not done
+// in NewSession as the transport needs to send the ID in the SSH credentials
+// before the tunnel is established; and NewSession performs a handshake on
+// an established tunnel.
+func MakeSessionId() (sessionId string, err error) {
+	randomId, err := MakeSecureRandomBytes(PSIPHON_API_CLIENT_SESSION_ID_LENGTH)
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return hex.EncodeToString(randomId), nil
 }
 
 // NewSession makes tunnelled handshake and connected requests to the
 // Psiphon server and returns a Session struct, initialized with the
 // session ID, for use with subsequent Psiphon server API requests (e.g.,
 // periodic status requests).
-func NewSession(config *Config, tunnel *Tunnel) (session *Session, err error) {
+func NewSession(config *Config, tunnel *Tunnel, sessionId string) (session *Session, err error) {
 
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel)
 	if err != nil {
 		return nil, ContextError(err)
 	}
 	session = &Session{
-		config:             config,
-		tunnel:             tunnel,
+		sessionId:          sessionId,
+		baseRequestUrl:     makeBaseRequestUrl(config, tunnel, sessionId),
 		psiphonHttpsClient: psiphonHttpsClient,
+		statsServerId:      tunnel.serverEntry.IpAddress,
 	}
 	// Sending two seperate requests is a legacy from when the handshake was
 	// performed before a tunnel was established and the connect was performed
@@ -72,6 +88,17 @@ func NewSession(config *Config, tunnel *Tunnel) (session *Session, err error) {
 	return session, nil
 }
 
+// ServerID provides a unique identifier for the server the session connects to.
+// This ID is consistent between multiple sessions/tunnels connected to that server.
+func (session *Session) StatsServerID() string {
+	return session.statsServerId
+}
+
+// StatsRegexps gets the Regexps used for the statistics for this tunnel.
+func (session *Session) StatsRegexps() *Regexps {
+	return session.statsRegexps
+}
+
 // DoStatusRequest makes a /status request to the server, sending session stats.
 // final should be true if this is the last such request before disconnecting.
 func (session *Session) DoStatusRequest(statsPayload json.Marshaler, final bool) error {
@@ -87,7 +114,7 @@ func (session *Session) DoStatusRequest(statsPayload json.Marshaler, final bool)
 
 	url := session.buildRequestUrl(
 		"status",
-		&ExtraParam{"session_id", session.tunnel.sessionId},
+		&ExtraParam{"session_id", session.sessionId},
 		&ExtraParam{"connected", connected})
 
 	err = session.doPostRequest(url, "application/json", bytes.NewReader(statsPayloadJSON))
@@ -161,9 +188,9 @@ func (session *Session) doHandshakeRequest() error {
 	if handshakeConfig.UpgradeClientVersion != "" {
 		Notice(NOTICE_UPGRADE, "%s", handshakeConfig.UpgradeClientVersion)
 	}
-	session.tunnel.SetStatsRegexps(MakeRegexps(
+	session.statsRegexps = MakeRegexps(
 		handshakeConfig.PageViewRegexes,
-		handshakeConfig.HttpsRequestRegexes))
+		handshakeConfig.HttpsRequestRegexes)
 	return nil
 }
 
@@ -184,7 +211,7 @@ func (session *Session) doConnectedRequest() error {
 	}
 	url := session.buildRequestUrl(
 		"connected",
-		&ExtraParam{"session_id", session.tunnel.sessionId},
+		&ExtraParam{"session_id", session.sessionId},
 		&ExtraParam{"last_connected", lastConnected})
 	responseBody, err := session.doGetRequest(url)
 	if err != nil {
@@ -204,47 +231,6 @@ func (session *Session) doConnectedRequest() error {
 	return nil
 }
 
-type ExtraParam struct{ name, value string }
-
-// buildRequestUrl makes a URL containing all the common parameters
-// that are included with Psiphon API requests. These common parameters
-// are used for statistics.
-func (session *Session) buildRequestUrl(path string, extraParams ...*ExtraParam) string {
-	var requestUrl bytes.Buffer
-	// Note: don't prefix with HTTPS scheme, see comment in doGetRequest.
-	// e.g., don't do this: requestUrl.WriteString("https://")
-	requestUrl.WriteString("http://")
-	requestUrl.WriteString(session.tunnel.serverEntry.IpAddress)
-	requestUrl.WriteString(":")
-	requestUrl.WriteString(session.tunnel.serverEntry.WebServerPort)
-	requestUrl.WriteString("/")
-	requestUrl.WriteString(path)
-	requestUrl.WriteString("?client_session_id=")
-	requestUrl.WriteString(session.tunnel.sessionId)
-	requestUrl.WriteString("&server_secret=")
-	requestUrl.WriteString(session.tunnel.serverEntry.WebServerSecret)
-	requestUrl.WriteString("&propagation_channel_id=")
-	requestUrl.WriteString(session.config.PropagationChannelId)
-	requestUrl.WriteString("&sponsor_id=")
-	requestUrl.WriteString(session.config.SponsorId)
-	requestUrl.WriteString("&client_version=")
-	requestUrl.WriteString(session.config.ClientVersion)
-	// TODO: client_tunnel_core_version
-	requestUrl.WriteString("&relay_protocol=")
-	requestUrl.WriteString(session.tunnel.protocol)
-	requestUrl.WriteString("&client_platform=")
-	requestUrl.WriteString(session.config.ClientPlatform)
-	requestUrl.WriteString("&tunnel_whole_device=")
-	requestUrl.WriteString(strconv.Itoa(session.config.TunnelWholeDevice))
-	for _, extraParam := range extraParams {
-		requestUrl.WriteString("&")
-		requestUrl.WriteString(extraParam.name)
-		requestUrl.WriteString("=")
-		requestUrl.WriteString(extraParam.value)
-	}
-	return requestUrl.String()
-}
-
 // doGetRequest makes a tunneled HTTPS request and returns the response body.
 func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, err error) {
 	response, err := session.psiphonHttpsClient.Get(requestUrl)
@@ -277,6 +263,56 @@ func (session *Session) doPostRequest(requestUrl string, bodyType string, body i
 	return
 }
 
+// makeBaseRequestUrl makes a URL containing all the common parameters
+// that are included with Psiphon API requests. These common parameters
+// are used for statistics.
+func makeBaseRequestUrl(config *Config, tunnel *Tunnel, sessionId string) string {
+	var requestUrl bytes.Buffer
+	// Note: don't prefix with HTTPS scheme, see comment in doGetRequest.
+	// e.g., don't do this: requestUrl.WriteString("https://")
+	requestUrl.WriteString("http://")
+	requestUrl.WriteString(tunnel.serverEntry.IpAddress)
+	requestUrl.WriteString(":")
+	requestUrl.WriteString(tunnel.serverEntry.WebServerPort)
+	requestUrl.WriteString("/")
+	// Placeholder for the path component of a request
+	requestUrl.WriteString("%s")
+	requestUrl.WriteString("?client_session_id=")
+	requestUrl.WriteString(sessionId)
+	requestUrl.WriteString("&server_secret=")
+	requestUrl.WriteString(tunnel.serverEntry.WebServerSecret)
+	requestUrl.WriteString("&propagation_channel_id=")
+	requestUrl.WriteString(config.PropagationChannelId)
+	requestUrl.WriteString("&sponsor_id=")
+	requestUrl.WriteString(config.SponsorId)
+	requestUrl.WriteString("&client_version=")
+	requestUrl.WriteString(config.ClientVersion)
+	// TODO: client_tunnel_core_version
+	requestUrl.WriteString("&relay_protocol=")
+	requestUrl.WriteString(tunnel.protocol)
+	requestUrl.WriteString("&client_platform=")
+	requestUrl.WriteString(config.ClientPlatform)
+	requestUrl.WriteString("&tunnel_whole_device=")
+	requestUrl.WriteString(strconv.Itoa(config.TunnelWholeDevice))
+	return requestUrl.String()
+}
+
+type ExtraParam struct{ name, value string }
+
+// buildRequestUrl makes a URL for an API request. The URL includes the
+// base request URL and any extra parameters for the specific request.
+func (session *Session) buildRequestUrl(path string, extraParams ...*ExtraParam) string {
+	var requestUrl bytes.Buffer
+	requestUrl.WriteString(fmt.Sprintf(session.baseRequestUrl, path))
+	for _, extraParam := range extraParams {
+		requestUrl.WriteString("&")
+		requestUrl.WriteString(extraParam.name)
+		requestUrl.WriteString("=")
+		requestUrl.WriteString(extraParam.value)
+	}
+	return requestUrl.String()
+}
+
 // makeHttpsClient creates a Psiphon HTTPS client that tunnels requests and which validates
 // the web server using the Psiphon server entry web server certificate.
 // This is not a general purpose HTTPS client.

+ 2 - 2
psiphon/socksProxy.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -109,7 +109,7 @@ loop:
 				continue
 			}
 			// Fatal error, stop the proxy
-			proxy.tunneler.SignalFailure()
+			proxy.tunneler.SignalComponentFailure()
 			break loop
 		}
 		go func() {

+ 205 - 109
psiphon/tunnel.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -27,7 +27,7 @@ import (
 	"fmt"
 	"net"
 	"strings"
-	"sync/atomic"
+	"sync"
 	"time"
 
 	"golang.org/x/crypto/ssh"
@@ -39,7 +39,14 @@ import (
 // implements Tunneler.
 type Tunneler interface {
 	Dial(remoteAddr string) (conn net.Conn, err error)
-	SignalFailure()
+	SignalComponentFailure()
+}
+
+// TunnerOwner specifies the interface required by Tunnel to notify its
+// owner when it has failed. The owner may, as in the case of the Controller,
+// remove the tunnel from its list of active tunnels.
+type TunnelOwner interface {
+	SignalTunnelFailure(tunnel *Tunnel)
 }
 
 const (
@@ -61,16 +68,15 @@ var SupportedTunnelProtocols = []string{
 // tunnel includes a network connection to the specified server
 // and an SSH session built on top of that transport.
 type Tunnel struct {
-	serverEntry             *ServerEntry
-	sessionId               string
-	sessionStarted          int32
-	protocol                string
-	conn                    Conn
-	sshClient               *ssh.Client
-	sshKeepAliveQuit        chan struct{}
-	portForwardFailures     chan int
-	portForwardFailureTotal int
-	regexps                 *Regexps
+	serverEntry              *ServerEntry
+	session                  *Session
+	protocol                 string
+	conn                     Conn
+	sshClient                *ssh.Client
+	operateWaitGroup         *sync.WaitGroup
+	shutdownOperateBroadcast chan struct{}
+	portForwardFailures      chan int
+	portForwardFailureTotal  int
 }
 
 // EstablishTunnel first makes a network transport connection to the
@@ -84,16 +90,101 @@ type Tunnel struct {
 // the first protocol in SupportedTunnelProtocols that's also in the
 // server capabilities is used.
 func EstablishTunnel(
-	config *Config, pendingConns *Conns, serverEntry *ServerEntry) (tunnel *Tunnel, err error) {
+	config *Config,
+	pendingConns *Conns,
+	serverEntry *ServerEntry,
+	tunnelOwner TunnelOwner) (tunnel *Tunnel, err error) {
+
+	selectedProtocol, err := selectProtocol(config, serverEntry)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	Notice(NOTICE_INFO, "connecting to %s in region %s using %s",
+		serverEntry.IpAddress, serverEntry.Region, selectedProtocol)
+
+	// Generate a session Id for the Psiphon server API. This is generated now so
+	// that it can be sent with the SSH password payload, which helps the server
+	// associate client geo location, used in server API stats, with the session ID.
+	sessionId, err := MakeSessionId()
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	// Build transport layers and establish SSH connection
+	conn, sshClient, err := dialSsh(config, pendingConns, serverEntry, selectedProtocol, sessionId)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	// Cleanup on error
+	defer func() {
+		if err != nil {
+			conn.Close()
+		}
+	}()
+
+	// The tunnel is now connected
+	tunnel = &Tunnel{
+		serverEntry:              serverEntry,
+		protocol:                 selectedProtocol,
+		conn:                     conn,
+		sshClient:                sshClient,
+		operateWaitGroup:         new(sync.WaitGroup),
+		shutdownOperateBroadcast: make(chan struct{}),
+		// portForwardFailures buffer size is large enough to receive the thresold number
+		// of failure reports without blocking. Senders can drop failures without blocking.
+		portForwardFailures: make(chan int, config.PortForwardFailureThreshold)}
+
+	// Create a new Psiphon API session for this tunnel
+	Notice(NOTICE_INFO, "starting session for %s", tunnel.serverEntry.IpAddress)
+	tunnel.session, err = NewSession(config, tunnel, sessionId)
+	if err != nil {
+		return nil, ContextError(fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err))
+	}
+
+	// Now that network operations are complete, cancel interruptibility
+	pendingConns.Remove(conn)
+
+	// Promote this successful tunnel to first rank so it's one
+	// of the first candidates next time establish runs.
+	PromoteServerEntry(tunnel.serverEntry.IpAddress)
+
+	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
+	tunnel.operateWaitGroup.Add(1)
+	go tunnel.operateTunnel(config, tunnelOwner)
+
+	return tunnel, nil
+}
+
+// Close stops operating the tunnel and closes the underlying connection.
+// Note: unlike Conn, this currently only supports a single to Close().
+func (tunnel *Tunnel) Close() {
+	close(tunnel.shutdownOperateBroadcast)
+	tunnel.operateWaitGroup.Wait()
+	tunnel.conn.Close()
+}
+
+// Dial establishes a port forward connection through the tunnel
+func (tunnel *Tunnel) Dial(remoteAddr string) (conn net.Conn, err error) {
+	// TODO: should this track port forward failures as in Controller.DialWithTunnel?
+	return tunnel.sshClient.Dial("tcp", remoteAddr)
+}
+
+// SignalComponentFailure notifies the tunnel that an associated component has failed.
+// This will terminate the tunnel.
+func (tunnel *Tunnel) SignalComponentFailure() {
+	Notice(NOTICE_ALERT, "tunnel received component failure signal")
+	tunnel.Close()
+}
 
-	// Select the protocol
-	var selectedProtocol string
+// selectProtocol is a helper that picks the tunnel protocol
+func selectProtocol(config *Config, serverEntry *ServerEntry) (selectedProtocol string, err error) {
 	// TODO: properly handle protocols (e.g. FRONTED-MEEK-OSSH) vs. capabilities (e.g., {FRONTED-MEEK, OSSH})
 	// for now, the code is simply assuming that MEEK capabilities imply OSSH capability.
 	if config.TunnelProtocol != "" {
 		requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
 		if !Contains(serverEntry.Capabilities, requiredCapability) {
-			return nil, ContextError(fmt.Errorf("server does not have required capability"))
+			return "", ContextError(fmt.Errorf("server does not have required capability"))
 		}
 		selectedProtocol = config.TunnelProtocol
 	} else {
@@ -106,11 +197,16 @@ func EstablishTunnel(
 			}
 		}
 		if selectedProtocol == "" {
-			return nil, ContextError(fmt.Errorf("server does not have any supported capabilities"))
+			return "", ContextError(fmt.Errorf("server does not have any supported capabilities"))
 		}
 	}
-	Notice(NOTICE_INFO, "connecting to %s in region %s using %s",
-		serverEntry.IpAddress, serverEntry.Region, selectedProtocol)
+	return selectedProtocol, nil
+}
+
+// dialSsh is a helper that builds the transport layers and establishes the SSH connection
+func dialSsh(
+	config *Config, pendingConns *Conns, serverEntry *ServerEntry,
+	selectedProtocol, sessionId string) (conn Conn, sshClient *ssh.Client, err error) {
 
 	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
 	// So depending on which protocol is used, multiple layers are initialized.
@@ -134,14 +230,6 @@ func EstablishTunnel(
 		port = serverEntry.SshPort
 	}
 
-	// Generate a session Id for the Psiphon server API. This is generated now so
-	// that it can be sent with the SSH password payload, which helps the server
-	// associate client geo location, used in server API stats, with the session ID.
-	sessionId, err := MakeSessionId()
-	if err != nil {
-		return nil, ContextError(err)
-	}
-
 	// Create the base transport: meek or direct connection
 	dialConfig := &DialConfig{
 		ConnectTimeout:        TUNNEL_CONNECT_TIMEOUT,
@@ -151,25 +239,23 @@ func EstablishTunnel(
 		BindToDeviceProvider:  config.BindToDeviceProvider,
 		BindToDeviceDnsServer: config.BindToDeviceDnsServer,
 	}
-	var conn Conn
 	if useMeek {
 		conn, err = DialMeek(serverEntry, sessionId, useFronting, dialConfig)
 		if err != nil {
-			return nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
-		// TODO: MeekConn doesn't go into pendingConns since there's no direct connection to
-		// interrupt; underlying HTTP connections may be candidates for interruption, but only
-		// after relay starts polling...
 	} else {
 		conn, err = DialTCP(fmt.Sprintf("%s:%d", serverEntry.IpAddress, port), dialConfig)
 		if err != nil {
-			return nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	}
+
+	cleanupConn := conn
 	defer func() {
 		// Cleanup on error
 		if err != nil {
-			conn.Close()
+			cleanupConn.Close()
 		}
 	}()
 
@@ -179,14 +265,14 @@ func EstablishTunnel(
 	if useObfuscatedSsh {
 		sshConn, err = NewObfuscatedSshConn(conn, serverEntry.SshObfuscatedKey)
 		if err != nil {
-			return nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	}
 
 	// Now establish the SSH session over the sshConn transport
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	if err != nil {
-		return nil, ContextError(err)
+		return nil, nil, ContextError(err)
 	}
 	sshCertChecker := &ssh.CertChecker{
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
@@ -202,7 +288,7 @@ func EstablishTunnel(
 			SshPassword string `json:"SshPassword"`
 		}{sessionId, serverEntry.SshPassword})
 	if err != nil {
-		return nil, ContextError(err)
+		return nil, nil, ContextError(err)
 	}
 	sshClientConfig := &ssh.ClientConfig{
 		User: serverEntry.SshUsername,
@@ -216,88 +302,98 @@ func EstablishTunnel(
 	sshAddress := ""
 	sshClientConn, sshChans, sshReqs, err := ssh.NewClientConn(sshConn, sshAddress, sshClientConfig)
 	if err != nil {
-		return nil, ContextError(err)
+		return nil, nil, ContextError(err)
 	}
-	sshClient := ssh.NewClient(sshClientConn, sshChans, sshReqs)
-
-	// Run a goroutine to periodically execute SSH keepalive
-	sshKeepAliveQuit := make(chan struct{})
-	go func() {
-		sshKeepAliveTicker := time.NewTicker(TUNNEL_SSH_KEEP_ALIVE_PERIOD)
-		for {
-			select {
-			case <-sshKeepAliveTicker.C:
-				_, _, err := sshClient.SendRequest("keepalive@openssh.com", true, nil)
-				if err != nil {
-					Notice(NOTICE_ALERT, "ssh keep alive failed: %s", err)
-					// TODO: call Tunnel.Close()?
-					sshKeepAliveTicker.Stop()
-					conn.Close()
-				}
-			case <-sshKeepAliveQuit:
-				sshKeepAliveTicker.Stop()
-				return
-			}
-		}
-	}()
+	sshClient = ssh.NewClient(sshClientConn, sshChans, sshReqs)
 
-	return &Tunnel{
-			serverEntry:      serverEntry,
-			sessionId:        sessionId,
-			protocol:         selectedProtocol,
-			conn:             conn,
-			sshClient:        sshClient,
-			sshKeepAliveQuit: sshKeepAliveQuit,
-			// portForwardFailures buffer size is large enough to receive the thresold number
-			// of failure reports without blocking. Senders can drop failures without blocking.
-			portForwardFailures: make(chan int, config.PortForwardFailureThreshold)},
-		nil
+	return conn, sshClient, nil
 }
 
-// Close terminates the tunnel.
-func (tunnel *Tunnel) Close() {
-	if tunnel.sshKeepAliveQuit != nil {
-		close(tunnel.sshKeepAliveQuit)
-		tunnel.sshKeepAliveQuit = nil
-	}
-	if tunnel.conn != nil {
-		tunnel.conn.Close()
+// operateTunnel periodically sends stats updates to the Psiphon API and
+// monitors the tunnel for failures:
+//
+// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
+// channel on keep-alive failure and other transport I/O errors. In case
+// of such a failure, the tunnel is marked as failed.
+//
+// 2. Tunnel port forward failures: the tunnel connection may stay up but
+// the client may still fail to establish port forwards due to server load
+// and other conditions. After a threshold number of such failures, the
+// overall tunnel is marked as failed.
+//
+// TODO: currently, any connect (dial), read, or write error associated with
+// a port forward is counted as a failure. It may be important to differentiate
+// between failures due to Psiphon server conditions and failures due to the
+// origin/target server (in the latter case, the tunnel is healthy). Here are
+// some typical error messages to consider matching against (or ignoring):
+//
+// - "ssh: rejected: administratively prohibited (open failed)"
+// - "ssh: rejected: connect failed (Connection timed out)"
+// - "write tcp ... broken pipe"
+// - "read tcp ... connection reset by peer"
+// - "ssh: unexpected packet in response to channel open: <nil>"
+//
+func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
+	defer tunnel.operateWaitGroup.Done()
+
+	tunnelClosedSignal := make(chan struct{}, 1)
+	err := tunnel.conn.SetClosedSignal(tunnelClosedSignal)
+	if err != nil {
+		err = fmt.Errorf("failed to set closed signal: %s", err)
 	}
-}
 
-func (tunnel *Tunnel) IsSessionStarted() bool {
-	return atomic.LoadInt32(&tunnel.sessionStarted) == 1
-}
+	// Note: not using a Ticker since NextSendPeriod() is not a fixed time period
+	statsTimer := time.NewTimer(NextSendPeriod())
+	defer statsTimer.Stop()
 
-func (tunnel *Tunnel) SetSessionStarted() {
-	atomic.StoreInt32(&tunnel.sessionStarted, 1)
-}
+	sshKeepAliveTicker := time.NewTicker(TUNNEL_SSH_KEEP_ALIVE_PERIOD)
+	defer sshKeepAliveTicker.Stop()
 
-// Dial establishes a port forward connection through the tunnel
-func (tunnel *Tunnel) Dial(remoteAddr string) (conn net.Conn, err error) {
-	// TODO: should this track port forward failures as in Controller.DialWithTunnel?
-	return tunnel.sshClient.Dial("tcp", remoteAddr)
-}
+	for err == nil {
+		select {
+		case <-statsTimer.C:
+			sendStats(tunnel, false)
+			statsTimer.Reset(NextSendPeriod())
 
-// SignalFailure notifies the tunnel that an associated component has failed.
-// This will terminate the tunnel.
-func (tunnel *Tunnel) SignalFailure() {
-	Notice(NOTICE_ALERT, "tunnel received failure signal")
-	tunnel.Close()
-}
+		case <-sshKeepAliveTicker.C:
+			_, _, err := tunnel.sshClient.SendRequest("keepalive@openssh.com", true, nil)
+			err = fmt.Errorf("ssh keep alive failed: %s", err)
 
-// ServerID provides a unique identifier for the server the tunnel connects to.
-// This ID is consistent between multiple tunnels connected to that server.
-func (tunnel *Tunnel) ServerID() string {
-	return tunnel.serverEntry.IpAddress
-}
+		case failures := <-tunnel.portForwardFailures:
+			// Note: no mutex on portForwardFailureTotal; only referenced here
+			tunnel.portForwardFailureTotal += failures
+			Notice(
+				NOTICE_INFO, "port forward failures for %s: %d",
+				tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
+			if tunnel.portForwardFailureTotal > config.PortForwardFailureThreshold {
+				err = errors.New("tunnel exceeded port forward failure threshold")
+			}
+
+		case <-tunnelClosedSignal:
+			err = errors.New("tunnel closed unexpectedly")
+
+		case <-tunnel.shutdownOperateBroadcast:
+			// Send final stats
+			sendStats(tunnel, true)
+			Notice(NOTICE_INFO, "shutdown operate tunnel")
+			return
+		}
+	}
 
-// StatsRegexps gets the Regexps used for the statistics for this tunnel.
-func (tunnel *Tunnel) StatsRegexps() *Regexps {
-	return tunnel.regexps
+	if err != nil {
+		Notice(NOTICE_ALERT, "operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
+		tunnelOwner.SignalTunnelFailure(tunnel)
+	}
 }
 
-// SetStatsRegexps sets the Regexps used for the statistics for this tunnel.
-func (tunnel *Tunnel) SetStatsRegexps(regexps *Regexps) {
-	tunnel.regexps = regexps
+// sendStats is a helper for sending session stats to the server.
+func sendStats(tunnel *Tunnel, final bool) {
+	payload := GetForServer(tunnel.serverEntry.IpAddress)
+	if payload != nil {
+		err := tunnel.session.DoStatusRequest(payload, final)
+		if err != nil {
+			Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
+			PutBack(tunnel.serverEntry.IpAddress, payload)
+		}
+	}
 }

+ 1 - 10
psiphon/utils.go

@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Psiphon Inc.
+ * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  *
  * This program is free software: you can redistribute it and/or modify
@@ -23,7 +23,6 @@ import (
 	"crypto/rand"
 	"crypto/x509"
 	"encoding/base64"
-	"encoding/hex"
 	"errors"
 	"fmt"
 	"math/big"
@@ -85,14 +84,6 @@ func ContextError(err error) error {
 	return fmt.Errorf("%s#%d: %s", funcName, line, err)
 }
 
-func MakeSessionId() (id string, err error) {
-	randomId, err := MakeSecureRandomBytes(PSIPHON_API_CLIENT_SESSION_ID_LENGTH)
-	if err != nil {
-		return "", ContextError(err)
-	}
-	return hex.EncodeToString(randomId), nil
-}
-
 func DecodeCertificate(encodedCertificate string) (certificate *x509.Certificate, err error) {
 	derEncodedCertificate, err := base64.StdEncoding.DecodeString(encodedCertificate)
 	if err != nil {