Просмотр исходного кода

Merge pull request #124 from rod-hynes/master

Session duration related-changes
Rod Hynes 10 лет назад
Родитель
Сommit
84b4723b55

+ 6 - 1
psiphon/config.go

@@ -34,7 +34,7 @@ const (
 	CONNECTION_WORKER_POOL_SIZE                    = 10
 	TUNNEL_POOL_SIZE                               = 1
 	TUNNEL_CONNECT_TIMEOUT                         = 20 * time.Second
-	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT                = 500 * time.Millisecond
+	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT                = 1 * time.Second
 	TUNNEL_PORT_FORWARD_DIAL_TIMEOUT               = 10 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES        = 256
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN               = 60 * time.Second
@@ -46,6 +46,7 @@ const (
 	ESTABLISH_TUNNEL_TIMEOUT_SECONDS               = 300
 	ESTABLISH_TUNNEL_WORK_TIME                     = 60 * time.Second
 	ESTABLISH_TUNNEL_PAUSE_PERIOD                  = 5 * time.Second
+	ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD  = 1 * time.Second
 	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT               = 15 * time.Second
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST       = 50
 	FETCH_REMOTE_SERVER_LIST_TIMEOUT               = 30 * time.Second
@@ -53,11 +54,15 @@ const (
 	FETCH_REMOTE_SERVER_LIST_STALE_PERIOD          = 6 * time.Hour
 	PSIPHON_API_CLIENT_SESSION_ID_LENGTH           = 16
 	PSIPHON_API_SERVER_TIMEOUT                     = 20 * time.Second
+	PSIPHON_API_SHUTDOWN_SERVER_TIMEOUT            = 1 * time.Second
 	PSIPHON_API_STATUS_REQUEST_PERIOD_MIN          = 5 * time.Minute
 	PSIPHON_API_STATUS_REQUEST_PERIOD_MAX          = 10 * time.Minute
+	PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MIN    = 5 * time.Second
+	PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MAX    = 10 * time.Second
 	PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES   = 256
 	PSIPHON_API_CONNECTED_REQUEST_PERIOD           = 24 * time.Hour
 	PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD     = 5 * time.Second
+	PSIPHON_API_TUNNEL_STATS_MAX_COUNT             = 1000
 	FETCH_ROUTES_TIMEOUT                           = 1 * time.Minute
 	DOWNLOAD_UPGRADE_TIMEOUT                       = 15 * time.Minute
 	DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD            = 5 * time.Second

+ 109 - 21
psiphon/controller.go

@@ -51,7 +51,7 @@ type Controller struct {
 	isEstablishing                 bool
 	establishWaitGroup             *sync.WaitGroup
 	stopEstablishingBroadcast      chan struct{}
-	candidateServerEntries         chan *ServerEntry
+	candidateServerEntries         chan *candidateServerEntry
 	establishPendingConns          *Conns
 	untunneledPendingConns         *Conns
 	untunneledDialConfig           *DialConfig
@@ -59,6 +59,12 @@ type Controller struct {
 	signalFetchRemoteServerList    chan struct{}
 	impairedProtocolClassification map[string]int
 	signalReportConnected          chan struct{}
+	serverAffinityDoneBroadcast    chan struct{}
+}
+
+type candidateServerEntry struct {
+	serverEntry               *ServerEntry
+	isServerAffinityCandidate bool
 }
 
 // NewController initializes a new controller.
@@ -184,9 +190,19 @@ func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 
 	close(controller.shutdownBroadcast)
 	controller.establishPendingConns.CloseAll()
-	controller.untunneledPendingConns.CloseAll()
 	controller.runWaitGroup.Wait()
 
+	// Stops untunneled connections, including fetch remote server list,
+	// split tunnel port forwards and also untunneled final stats requests.
+	// Note: there's a circular dependency with runWaitGroup.Wait() and
+	// untunneledPendingConns.CloseAll(): runWaitGroup depends on tunnels
+	// stopping which depends, in orderly shutdown, on final status requests
+	// completing. So this pending conns cancel comes too late to interrupt
+	// final status requests in the orderly shutdown case -- which is desired
+	// since we give those a short timeout and would prefer to not interrupt
+	// them.
+	controller.untunneledPendingConns.CloseAll()
+
 	controller.splitTunnelClassifier.Shutdown()
 
 	NoticeInfo("exiting controller")
@@ -296,7 +312,7 @@ loop:
 		reported := false
 		tunnel := controller.getNextActiveTunnel()
 		if tunnel != nil {
-			err := tunnel.session.DoConnectedRequest()
+			err := tunnel.serverContext.DoConnectedRequest()
 			if err == nil {
 				reported = true
 			} else {
@@ -380,8 +396,10 @@ loop:
 	NoticeInfo("exiting upgrade downloader")
 }
 
-func (controller *Controller) startClientUpgradeDownloader(session *Session) {
-	// session is nil when DisableApi is set
+func (controller *Controller) startClientUpgradeDownloader(
+	serverContext *ServerContext) {
+
+	// serverContext is nil when DisableApi is set
 	if controller.config.DisableApi {
 		return
 	}
@@ -392,7 +410,7 @@ func (controller *Controller) startClientUpgradeDownloader(session *Session) {
 		return
 	}
 
-	if session.clientUpgradeVersion == "" {
+	if serverContext.clientUpgradeVersion == "" {
 		// No upgrade is offered
 		return
 	}
@@ -402,7 +420,7 @@ func (controller *Controller) startClientUpgradeDownloader(session *Session) {
 	if !controller.startedUpgradeDownloader {
 		controller.startedUpgradeDownloader = true
 		controller.runWaitGroup.Add(1)
-		go controller.upgradeDownloader(session.clientUpgradeVersion)
+		go controller.upgradeDownloader(serverContext.clientUpgradeVersion)
 	}
 }
 
@@ -439,7 +457,7 @@ loop:
 			// establishPendingConns; this causes the pendingConns.Add() within
 			// interruptibleTCPDial to succeed instead of aborting, and the result
 			// is that it's possible for establish goroutines to run all the way through
-			// NewSession before being discarded... delaying shutdown.
+			// NewServerContext before being discarded... delaying shutdown.
 			select {
 			case <-controller.shutdownBroadcast:
 				break loop
@@ -481,7 +499,8 @@ loop:
 					// tunnel is established.
 					controller.startOrSignalConnectedReporter()
 
-					controller.startClientUpgradeDownloader(establishedTunnel.session)
+					controller.startClientUpgradeDownloader(
+						establishedTunnel.serverContext)
 				}
 
 			} else {
@@ -516,7 +535,7 @@ loop:
 
 // classifyImpairedProtocol tracks "impaired" protocol classifications for failed
 // tunnels. A protocol is classified as impaired if a tunnel using that protocol
-// fails, repeatedly, shortly after the start of the session. During tunnel
+// fails, repeatedly, shortly after the start of the connection. During tunnel
 // establishment, impaired protocols are briefly skipped.
 //
 // One purpose of this measure is to defend against an attack where the adversary,
@@ -527,7 +546,7 @@ loop:
 //
 // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol
 func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) {
-	if failedTunnel.sessionStartTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
+	if failedTunnel.startTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
 		controller.impairedProtocolClassification[failedTunnel.protocol] += 1
 	} else {
 		controller.impairedProtocolClassification[failedTunnel.protocol] = 0
@@ -581,7 +600,7 @@ func (controller *Controller) discardTunnel(tunnel *Tunnel) {
 	// discarded tunnel before fully active tunnels. Can a discarded tunnel
 	// be promoted (since it connects), but with lower rank than all active
 	// tunnels?
-	tunnel.Close()
+	tunnel.Close(true)
 }
 
 // registerTunnel adds the connected tunnel to the pool of active tunnels
@@ -605,6 +624,10 @@ func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
 	controller.tunnels = append(controller.tunnels, tunnel)
 	NoticeTunnels(len(controller.tunnels))
 
+	// Promote this successful tunnel to first rank so it's one
+	// of the first candidates next time establish runs.
+	PromoteServerEntry(tunnel.serverEntry.IpAddress)
+
 	return len(controller.tunnels), true
 }
 
@@ -640,7 +663,7 @@ func (controller *Controller) terminateTunnel(tunnel *Tunnel) {
 			if controller.nextTunnel >= len(controller.tunnels) {
 				controller.nextTunnel = 0
 			}
-			activeTunnel.Close()
+			activeTunnel.Close(false)
 			NoticeTunnels(len(controller.tunnels))
 			break
 		}
@@ -661,7 +684,7 @@ func (controller *Controller) terminateAllTunnels() {
 		tunnel := activeTunnel
 		go func() {
 			defer closeWaitGroup.Done()
-			tunnel.Close()
+			tunnel.Close(false)
 		}()
 	}
 	closeWaitGroup.Wait()
@@ -748,9 +771,36 @@ func (controller *Controller) startEstablishing() {
 	controller.isEstablishing = true
 	controller.establishWaitGroup = new(sync.WaitGroup)
 	controller.stopEstablishingBroadcast = make(chan struct{})
-	controller.candidateServerEntries = make(chan *ServerEntry)
+	controller.candidateServerEntries = make(chan *candidateServerEntry)
 	controller.establishPendingConns.Reset()
 
+	// The server affinity mechanism attempts to favor the previously
+	// used server when reconnecting. This is beneficial for user
+	// applications which expect consistency in user IP address (for
+	// example, a web site which prompts for additional user
+	// authentication when the IP address changes).
+	//
+	// Only the very first server, as determined by
+	// datastore.PromoteServerEntry(), is the server affinity candidate.
+	// Concurrent connections attempts to many servers are launched
+	// without delay, in case the affinity server connection fails.
+	// While the affinity server connection is outstanding, when any
+	// other connection is established, there is a short grace period
+	// delay before delivering the established tunnel; this allows some
+	// time for the affinity server connection to succeed first.
+	// When the affinity server connection fails, any other established
+	// tunnel is registered without delay.
+	//
+	// Note: the establishTunnelWorker that receives the affinity
+	// candidate is solely resonsible for closing
+	// controller.serverAffinityDoneBroadcast.
+	//
+	// Note: if config.EgressRegion or config.TunnelProtocol has changed
+	// since the top server was promoted, the first server may not actually
+	// be the last connected server.
+	// TODO: should not favor the first server in this case
+	controller.serverAffinityDoneBroadcast = make(chan struct{})
+
 	for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
 		controller.establishWaitGroup.Add(1)
 		go controller.establishTunnelWorker()
@@ -781,6 +831,7 @@ func (controller *Controller) stopEstablishing() {
 	controller.establishWaitGroup = nil
 	controller.stopEstablishingBroadcast = nil
 	controller.candidateServerEntries = nil
+	controller.serverAffinityDoneBroadcast = nil
 }
 
 // establishCandidateGenerator populates the candidate queue with server entries
@@ -798,6 +849,14 @@ func (controller *Controller) establishCandidateGenerator(impairedProtocols []st
 	}
 	defer iterator.Close()
 
+	isServerAffinityCandidate := true
+
+	// TODO: reconcile server affinity scheme with multi-tunnel mode
+	if controller.config.TunnelPoolSize > 1 {
+		isServerAffinityCandidate = false
+		close(controller.serverAffinityDoneBroadcast)
+	}
+
 loop:
 	// Repeat until stopped
 	for i := 0; ; i++ {
@@ -827,7 +886,7 @@ loop:
 			// first iteration of the ESTABLISH_TUNNEL_WORK_TIME
 			// loop since (a) one iteration should be sufficient to
 			// evade the attack; (b) there's a good chance of false
-			// positives (such as short session durations due to network
+			// positives (such as short tunnel durations due to network
 			// hopping on a mobile device).
 			// Impaired protocols logic is not applied when
 			// config.TunnelProtocol is specified.
@@ -843,11 +902,16 @@ loop:
 				}
 			}
 
+			// Note: there must be only one server affinity candidate, as it
+			// closes the serverAffinityDoneBroadcast channel.
+			candidate := &candidateServerEntry{serverEntry, isServerAffinityCandidate}
+			isServerAffinityCandidate = false
+
 			// TODO: here we could generate multiple candidates from the
 			// server entry when there are many MeekFrontingAddresses.
 
 			select {
-			case controller.candidateServerEntries <- serverEntry:
+			case controller.candidateServerEntries <- candidate:
 			case <-controller.stopEstablishingBroadcast:
 				break loop
 			case <-controller.shutdownBroadcast:
@@ -901,7 +965,7 @@ loop:
 func (controller *Controller) establishTunnelWorker() {
 	defer controller.establishWaitGroup.Done()
 loop:
-	for serverEntry := range controller.candidateServerEntries {
+	for candidateServerEntry := range controller.candidateServerEntries {
 		// Note: don't receive from candidateServerEntries and stopEstablishingBroadcast
 		// in the same select, since we want to prioritize receiving the stop signal
 		if controller.isStopEstablishingBroadcast() {
@@ -909,26 +973,44 @@ loop:
 		}
 
 		// There may already be a tunnel to this candidate. If so, skip it.
-		if controller.isActiveTunnelServerEntry(serverEntry) {
+		if controller.isActiveTunnelServerEntry(candidateServerEntry.serverEntry) {
 			continue
 		}
 
 		tunnel, err := EstablishTunnel(
 			controller.config,
+			controller.untunneledDialConfig,
 			controller.sessionId,
 			controller.establishPendingConns,
-			serverEntry,
+			candidateServerEntry.serverEntry,
 			controller) // TunnelOwner
 		if err != nil {
+
+			// Unblock other candidates immediately when
+			// server affinity candidate fails.
+			if candidateServerEntry.isServerAffinityCandidate {
+				close(controller.serverAffinityDoneBroadcast)
+			}
+
 			// Before emitting error, check if establish interrupted, in which
 			// case the error is noise.
 			if controller.isStopEstablishingBroadcast() {
 				break loop
 			}
-			NoticeInfo("failed to connect to %s: %s", serverEntry.IpAddress, err)
+			NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
 			continue
 		}
 
+		// Block for server affinity grace period before delivering.
+		if !candidateServerEntry.isServerAffinityCandidate {
+			timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD)
+			select {
+			case <-timer.C:
+			case <-controller.serverAffinityDoneBroadcast:
+			case <-controller.stopEstablishingBroadcast:
+			}
+		}
+
 		// Deliver established tunnel.
 		// Don't block. Assumes the receiver has a buffer large enough for
 		// the number of desired tunnels. If there's no room, the tunnel must
@@ -938,6 +1020,12 @@ loop:
 		default:
 			controller.discardTunnel(tunnel)
 		}
+
+		// Unblock other candidates only after delivering when
+		// server affinity candidate succeeds.
+		if candidateServerEntry.isServerAffinityCandidate {
+			close(controller.serverAffinityDoneBroadcast)
+		}
 	}
 	NoticeInfo("stopped establish worker")
 }

+ 196 - 18
psiphon/dataStore_alt.go

@@ -20,6 +20,7 @@
 package psiphon
 
 import (
+	"bytes"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -53,6 +54,7 @@ const (
 	splitTunnelRouteDataBucket  = "splitTunnelRouteData"
 	urlETagsBucket              = "urlETags"
 	keyValueBucket              = "keyValues"
+	tunnelStatsBucket           = "tunnelStats"
 	rankedServerEntryCount      = 100
 )
 
@@ -68,11 +70,6 @@ var singleton dataStore
 // have been replaced by checkInitDataStore() to assert that Init was called.
 func InitDataStore(config *Config) (err error) {
 	singleton.init.Do(func() {
-		var migratableServerEntries []*ServerEntry
-		migratableServerEntries, err = prepareMigrationEntries(config)
-		if err != nil {
-			return
-		}
 
 		filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
 		var db *bolt.DB
@@ -91,6 +88,7 @@ func InitDataStore(config *Config) (err error) {
 				splitTunnelRouteDataBucket,
 				urlETagsBucket,
 				keyValueBucket,
+				tunnelStatsBucket,
 			}
 			for _, bucket := range requiredBuckets {
 				_, err := tx.CreateBucketIfNotExists([]byte(bucket))
@@ -109,9 +107,12 @@ func InitDataStore(config *Config) (err error) {
 
 		// The migrateServerEntries function requires the data store is
 		// initialized prior to execution so that migrated entries can be stored
+		migratableServerEntries := prepareMigrationEntries(config)
 		if len(migratableServerEntries) > 0 {
 			migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
 		}
+
+		resetAllTunnelStatsToUnreported()
 	})
 
 	return err
@@ -277,20 +278,29 @@ func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) er
 	// enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
 	// any reasonable configuration of config.TunnelPoolSize.
 
-	if position >= len(rankedServerEntries) {
-		rankedServerEntries = append(rankedServerEntries, serverEntryId)
-	} else {
-		end := len(rankedServerEntries)
-		if end+1 > rankedServerEntryCount {
-			end = rankedServerEntryCount
+	// Using: https://github.com/golang/go/wiki/SliceTricks
+
+	// When serverEntryId is already ranked, remove it first to avoid duplicates
+
+	for i, rankedServerEntryId := range rankedServerEntries {
+		if rankedServerEntryId == serverEntryId {
+			rankedServerEntries = append(
+				rankedServerEntries[:i], rankedServerEntries[i+1:]...)
+			break
 		}
-		// insert: https://github.com/golang/go/wiki/SliceTricks
-		rankedServerEntries = append(
-			rankedServerEntries[:position],
-			append([]string{serverEntryId},
-				rankedServerEntries[position:end]...)...)
 	}
 
+	// SliceTricks insert, with length cap enforced
+
+	if len(rankedServerEntries) < rankedServerEntryCount {
+		rankedServerEntries = append(rankedServerEntries, "")
+	}
+	if position >= len(rankedServerEntries) {
+		position = len(rankedServerEntries) - 1
+	}
+	copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
+	rankedServerEntries[position] = serverEntryId
+
 	err = setRankedServerEntries(tx, rankedServerEntries)
 	if err != nil {
 		return ContextError(err)
@@ -395,7 +405,7 @@ func (iterator *ServerEntryIterator) Reset() error {
 	//     transaction is open.
 	//     (https://github.com/boltdb/bolt)
 	//
-	// So the uderlying serverEntriesBucket could change after the serverEntryIds
+	// So the underlying serverEntriesBucket could change after the serverEntryIds
 	// list is built.
 
 	var serverEntryIds []string
@@ -428,7 +438,7 @@ func (iterator *ServerEntryIterator) Reset() error {
 	}
 
 	for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
-		j := rand.Intn(i)
+		j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
 		serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
 	}
 
@@ -728,3 +738,171 @@ func GetKeyValue(key string) (value string, err error) {
 	}
 	return value, nil
 }
+
+// Tunnel stats records in the tunnelStatsStateUnreported
+// state are available for take out.
+// Records in the tunnelStatsStateReporting have been
+// taken out and are pending either deleting (for a
+// successful request) or change to StateUnreported (for
+// a failed request).
+// All tunnel stats records are reverted to StateUnreported
+// when the datastore is initialized at start up.
+
+var tunnelStatsStateUnreported = []byte("0")
+var tunnelStatsStateReporting = []byte("1")
+
+// StoreTunnelStats adds a new tunnel stats record, which is
+// set to StateUnreported and is an immediate candidate for
+// reporting.
+// tunnelStats is a JSON byte array containing fields as
+// required by the Psiphon server API (see RecordTunnelStats).
+// It's assumed that the JSON value contains enough unique
+// information for the value to function as a key in the
+// key/value datastore. This assumption is currently satisfied
+// by the fields sessionId + tunnelNumber.
+func StoreTunnelStats(tunnelStats []byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		err := bucket.Put(tunnelStats, tunnelStatsStateUnreported)
+		return err
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// CountUnreportedTunnelStats returns the number of tunnel
+// stats records in StateUnreported.
+func CountUnreportedTunnelStats() int {
+	checkInitDataStore()
+
+	unreported := 0
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		cursor := bucket.Cursor()
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
+				unreported++
+				break
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		NoticeAlert("CountUnreportedTunnelStats failed: %s", err)
+		return 0
+	}
+
+	return unreported
+}
+
+// TakeOutUnreportedTunnelStats returns up to maxCount tunnel
+// stats records that are in StateUnreported. The records are set
+// to StateReporting. If the records are successfully reported,
+// clear them with ClearReportedTunnelStats. If the records are
+// not successfully reported, restore them with
+// PutBackUnreportedTunnelStats.
+func TakeOutUnreportedTunnelStats(maxCount int) ([][]byte, error) {
+	checkInitDataStore()
+
+	tunnelStats := make([][]byte, 0)
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		cursor := bucket.Cursor()
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
+				err := bucket.Put(key, tunnelStatsStateReporting)
+				if err != nil {
+					return err
+				}
+				tunnelStats = append(tunnelStats, key)
+				if len(tunnelStats) >= maxCount {
+					break
+				}
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return tunnelStats, nil
+}
+
+// PutBackUnreportedTunnelStats restores a list of tunnel
+// stats records to StateUnreported.
+func PutBackUnreportedTunnelStats(tunnelStats [][]byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		for _, key := range tunnelStats {
+			err := bucket.Put(key, tunnelStatsStateUnreported)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// ClearReportedTunnelStats deletes a list of tunnel
+// stats records that were succesdfully reported.
+func ClearReportedTunnelStats(tunnelStats [][]byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		for _, key := range tunnelStats {
+			err := bucket.Delete(key)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// resetAllTunnelStatsToUnreported sets all tunnel
+// stats records to StateUnreported. This reset is called
+// when the datastore is initialized at start up, as we do
+// not know if tunnel records in StateReporting were reported
+// or not.
+func resetAllTunnelStatsToUnreported() error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		cursor := bucket.Cursor()
+		for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
+			err := bucket.Put(key, tunnelStatsStateUnreported)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}

+ 2 - 4
psiphon/migrateDataStore.go

@@ -20,12 +20,10 @@
 package psiphon
 
 // Stub function to return an empty list for non-Windows builds
-func prepareMigrationEntries(config *Config) ([]*ServerEntry, error) {
-	var migratableServerEntries []*ServerEntry
-	return migratableServerEntries, nil
+func prepareMigrationEntries(config *Config) []*ServerEntry {
+	return nil
 }
 
 // Stub function to return immediately for non-Windows builds
 func migrateEntries(serverEntries []*ServerEntry, legacyDataStoreFilename string) {
-	return
 }

+ 14 - 10
psiphon/migrateDataStore_windows.go

@@ -33,36 +33,40 @@ import (
 
 var legacyDb *sql.DB
 
-func prepareMigrationEntries(config *Config) ([]*ServerEntry, error) {
+func prepareMigrationEntries(config *Config) []*ServerEntry {
+	var migratableServerEntries []*ServerEntry
+
 	// If DATA_STORE_FILENAME does not exist on disk
 	if _, err := os.Stat(filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)); os.IsNotExist(err) {
 		// If LEGACY_DATA_STORE_FILENAME exists on disk
 		if _, err := os.Stat(filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME)); err == nil {
-			var migratableServerEntries []*ServerEntry
 
 			legacyDb, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=private&mode=rwc", filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME)))
 			defer legacyDb.Close()
 
 			if err != nil {
-				return migratableServerEntries, err
+				NoticeAlert("prepareMigrationEntries: sql.Open failed: %s", err)
+				return nil
 			}
 
 			initialization := "pragma journal_mode=WAL;\n"
 			_, err = legacyDb.Exec(initialization)
 			if err != nil {
-				return migratableServerEntries, err
+				NoticeAlert("prepareMigrationEntries: sql.DB.Exec failed: %s", err)
+				return nil
 			}
 
 			iterator, err := newlegacyServerEntryIterator(config)
 			if err != nil {
-				return migratableServerEntries, err
+				NoticeAlert("prepareMigrationEntries: newlegacyServerEntryIterator failed: %s", err)
+				return nil
 			}
 			defer iterator.Close()
 
 			for {
 				serverEntry, err := iterator.Next()
 				if err != nil {
-					err = fmt.Errorf("failed to iterate legacy server entries: %s", err)
+					NoticeAlert("prepareMigrationEntries: legacyServerEntryIterator.Next failed: %s", err)
 					break
 				}
 				if serverEntry == nil {
@@ -75,7 +79,7 @@ func prepareMigrationEntries(config *Config) ([]*ServerEntry, error) {
 		}
 	}
 
-	return migratableServerEntries, nil
+	return migratableServerEntries
 }
 
 // migrateEntries calls the BoltDB data store method to shuffle
@@ -86,14 +90,14 @@ func migrateEntries(serverEntries []*ServerEntry, legacyDataStoreFilename string
 
 	err := StoreServerEntries(serverEntries, false)
 	if err != nil {
-		NoticeAlert("failed to store migrated server entries: %s", err)
+		NoticeAlert("migrateEntries: StoreServerEntries failed: %s", err)
 	} else {
 		// Retain server affinity from old datastore by taking the first
 		// array element (previous top ranked server) and promoting it
 		// to the top rank before the server selection process begins
 		err = PromoteServerEntry(serverEntries[0].IpAddress)
 		if err != nil {
-			NoticeAlert("failed to promote server entry: %s", err)
+			NoticeAlert("migrateEntries: PromoteServerEntry failed: %s", err)
 		}
 
 		NoticeAlert("%d server entries successfully migrated to new data store", len(serverEntries))
@@ -101,7 +105,7 @@ func migrateEntries(serverEntries []*ServerEntry, legacyDataStoreFilename string
 
 	err = os.Remove(legacyDataStoreFilename)
 	if err != nil {
-		NoticeAlert("failed to delete legacy data store file '%s': %s", legacyDataStoreFilename, err)
+		NoticeAlert("migrateEntries: failed to delete legacy data store file '%s': %s", legacyDataStoreFilename, err)
 	}
 
 	return

+ 61 - 0
psiphon/net.go

@@ -20,9 +20,12 @@
 package psiphon
 
 import (
+	"crypto/x509"
 	"fmt"
 	"io"
 	"net"
+	"net/http"
+	"net/url"
 	"reflect"
 	"sync"
 	"time"
@@ -241,3 +244,61 @@ func ResolveIP(host string, conn net.Conn) (addrs []net.IP, ttls []time.Duration
 	}
 	return addrs, ttls, nil
 }
+
+// MakeUntunneledHttpsClient returns a net/http.Client which is
+// configured to use custom dialing features -- including BindToDevice,
+// UseIndistinguishableTLS, etc. -- for a specific HTTPS request URL.
+// If verifyLegacyCertificate is not nil, it's used for certificate
+// verification.
+// Because UseIndistinguishableTLS requires a hack to work with
+// net/http, MakeUntunneledHttpClient may return a modified request URL
+// to be used. Callers should always use this return value to make
+// requests, not the input value.
+func MakeUntunneledHttpsClient(
+	dialConfig *DialConfig,
+	verifyLegacyCertificate *x509.Certificate,
+	requestUrl string,
+	requestTimeout time.Duration) (*http.Client, string, error) {
+
+	dialer := NewCustomTLSDialer(
+		// Note: when verifyLegacyCertificate is not nil, some
+		// of the other CustomTLSConfig is overridden.
+		&CustomTLSConfig{
+			Dial: NewTCPDialer(dialConfig),
+			VerifyLegacyCertificate:       verifyLegacyCertificate,
+			SendServerName:                true,
+			SkipVerify:                    false,
+			UseIndistinguishableTLS:       dialConfig.UseIndistinguishableTLS,
+			TrustedCACertificatesFilename: dialConfig.TrustedCACertificatesFilename,
+		})
+
+	urlComponents, err := url.Parse(requestUrl)
+	if err != nil {
+		return nil, "", ContextError(err)
+	}
+
+	// Change the scheme to "http"; otherwise http.Transport will try to do
+	// another TLS handshake inside the explicit TLS session. Also need to
+	// force an explicit port, as the default for "http", 80, won't talk TLS.
+	urlComponents.Scheme = "http"
+	host, port, err := net.SplitHostPort(urlComponents.Host)
+	if err != nil {
+		// Assume there's no port
+		host = urlComponents.Host
+		port = ""
+	}
+	if port == "" {
+		port = "443"
+	}
+	urlComponents.Host = net.JoinHostPort(host, port)
+
+	transport := &http.Transport{
+		Dial: dialer,
+	}
+	httpClient := &http.Client{
+		Timeout:   requestTimeout,
+		Transport: transport,
+	}
+
+	return httpClient, urlComponents.String(), nil
+}

+ 3 - 38
psiphon/remoteServerList.go

@@ -23,9 +23,7 @@ import (
 	"errors"
 	"fmt"
 	"io/ioutil"
-	"net"
 	"net/http"
-	"net/url"
 )
 
 // FetchRemoteServerList downloads a remote server list JSON record from
@@ -42,46 +40,13 @@ func FetchRemoteServerList(config *Config, dialConfig *DialConfig) (err error) {
 		return ContextError(errors.New("remote server list signature public key blank"))
 	}
 
-	dialer := NewTCPDialer(dialConfig)
-
-	// When the URL is HTTPS, use the custom TLS dialer with the
-	// UseIndistinguishableTLS option.
-	// TODO: refactor into helper function
-	requestUrl, err := url.Parse(config.RemoteServerListUrl)
+	httpClient, requestUrl, err := MakeUntunneledHttpsClient(
+		dialConfig, nil, config.RemoteServerListUrl, FETCH_REMOTE_SERVER_LIST_TIMEOUT)
 	if err != nil {
 		return ContextError(err)
 	}
-	if requestUrl.Scheme == "https" {
-		dialer = NewCustomTLSDialer(
-			&CustomTLSConfig{
-				Dial:                          dialer,
-				SendServerName:                true,
-				SkipVerify:                    false,
-				UseIndistinguishableTLS:       config.UseIndistinguishableTLS,
-				TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
-			})
-
-		// Change the scheme to "http"; otherwise http.Transport will try to do
-		// another TLS handshake inside the explicit TLS session. Also need to
-		// force the port to 443,as the default for "http", 80, won't talk TLS.
-		requestUrl.Scheme = "http"
-		host, _, err := net.SplitHostPort(requestUrl.Host)
-		if err != nil {
-			// Assume there's no port
-			host = requestUrl.Host
-		}
-		requestUrl.Host = net.JoinHostPort(host, "443")
-	}
-
-	transport := &http.Transport{
-		Dial: dialer,
-	}
-	httpClient := http.Client{
-		Timeout:   FETCH_REMOTE_SERVER_LIST_TIMEOUT,
-		Transport: transport,
-	}
 
-	request, err := http.NewRequest("GET", requestUrl.String(), nil)
+	request, err := http.NewRequest("GET", requestUrl, nil)
 	if err != nil {
 		return ContextError(err)
 	}

+ 396 - 128
psiphon/serverApi.go

@@ -31,29 +31,39 @@ import (
 	"net"
 	"net/http"
 	"strconv"
+	"sync/atomic"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
 )
 
-// Session is a utility struct which holds all of the data associated
-// with a Psiphon session. In addition to the established tunnel, this
-// includes the session ID (used for Psiphon API requests) and a http
+// ServerContext is a utility struct which holds all of the data associated
+// with a Psiphon server connection. In addition to the established tunnel, this
+// includes data associated with Psiphon API requests and a persistent http
 // client configured to make tunneled Psiphon API requests.
-type Session struct {
-	sessionId            string
-	baseRequestUrl       string
-	psiphonHttpsClient   *http.Client
-	statsRegexps         *transferstats.Regexps
-	clientRegion         string
-	clientUpgradeVersion string
+type ServerContext struct {
+	sessionId                string
+	tunnelNumber             int64
+	baseRequestUrl           string
+	psiphonHttpsClient       *http.Client
+	statsRegexps             *transferstats.Regexps
+	clientRegion             string
+	clientUpgradeVersion     string
+	serverHandshakeTimestamp string
 }
 
-// MakeSessionId creates a new session ID. Making the session ID is not done
-// in NewSession because:
-// (1) the transport needs to send the ID in the SSH credentials before the tunnel
-//     is established and NewSession performs a handshake on an established tunnel.
-// (2) the same session ID is used across multi-tunnel controller runs, where each
-//     tunnel has its own Session instance.
+// nextTunnelNumber is a monotonically increasing number assigned to each
+// successive tunnel connection. The sessionId and tunnelNumber together
+// form a globally unique identifier for tunnels, which is used for
+// stats. Note that the number is increasing but not necessarily
+// consecutive for each active tunnel in session.
+var nextTunnelNumber int64
+
+// MakeSessionId creates a new session ID. The same session ID is used across
+// multi-tunnel controller runs, where each tunnel has its own ServerContext
+// instance.
+// In server-side stats, we now consider a "session" to be the lifetime of the
+// Controller (e.g., the user's commanded start and stop) and we measure this
+// duration as well as the duration of each tunnel within the session.
 func MakeSessionId() (sessionId string, err error) {
 	randomId, err := MakeSecureRandomBytes(PSIPHON_API_CLIENT_SESSION_ID_LENGTH)
 	if err != nil {
@@ -62,108 +72,35 @@ func MakeSessionId() (sessionId string, err error) {
 	return hex.EncodeToString(randomId), nil
 }
 
-// NewSession makes the tunnelled handshake request to the
-// Psiphon server and returns a Session struct, initialized with the
-// session ID, for use with subsequent Psiphon server API requests (e.g.,
-// periodic connected and status requests).
-func NewSession(config *Config, tunnel *Tunnel, sessionId string) (session *Session, err error) {
+// NewServerContext makes the tunnelled handshake request to the Psiphon server
+// and returns a ServerContext struct for use with subsequent Psiphon server API
+// requests (e.g., periodic connected and status requests).
+func NewServerContext(tunnel *Tunnel, sessionId string) (*ServerContext, error) {
 
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel)
 	if err != nil {
 		return nil, ContextError(err)
 	}
-	session = &Session{
+
+	serverContext := &ServerContext{
 		sessionId:          sessionId,
-		baseRequestUrl:     makeBaseRequestUrl(config, tunnel, sessionId),
+		tunnelNumber:       atomic.AddInt64(&nextTunnelNumber, 1),
+		baseRequestUrl:     makeBaseRequestUrl(tunnel, "", sessionId),
 		psiphonHttpsClient: psiphonHttpsClient,
 	}
 
-	err = session.doHandshakeRequest()
+	err = serverContext.doHandshakeRequest()
 	if err != nil {
 		return nil, ContextError(err)
 	}
 
-	return session, nil
-}
-
-// DoConnectedRequest performs the connected API request. This request is
-// used for statistics. The server returns a last_connected token for
-// the client to store and send next time it connects. This token is
-// a timestamp (using the server clock, and should be rounded to the
-// nearest hour) which is used to determine when a connection represents
-// a unique user for a time period.
-func (session *Session) DoConnectedRequest() error {
-	const DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
-	lastConnected, err := GetKeyValue(DATA_STORE_LAST_CONNECTED_KEY)
-	if err != nil {
-		return ContextError(err)
-	}
-	if lastConnected == "" {
-		lastConnected = "None"
-	}
-	url := session.buildRequestUrl(
-		"connected",
-		&ExtraParam{"session_id", session.sessionId},
-		&ExtraParam{"last_connected", lastConnected})
-	responseBody, err := session.doGetRequest(url)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	var response struct {
-		ConnectedTimestamp string `json:"connected_timestamp"`
-	}
-	err = json.Unmarshal(responseBody, &response)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	err = SetKeyValue(DATA_STORE_LAST_CONNECTED_KEY, response.ConnectedTimestamp)
-	if err != nil {
-		return ContextError(err)
-	}
-	return nil
-}
-
-// StatsRegexps gets the Regexps used for the statistics for this tunnel.
-func (session *Session) StatsRegexps() *transferstats.Regexps {
-	return session.statsRegexps
-}
-
-// DoStatusRequest makes a /status request to the server, sending session stats.
-func (session *Session) DoStatusRequest(statsPayload json.Marshaler) error {
-	statsPayloadJSON, err := json.Marshal(statsPayload)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	// Add a random amount of padding to help prevent stats updates from being
-	// a predictable size (which often happens when the connection is quiet).
-	padding := MakeSecureRandomPadding(0, PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES)
-
-	// "connected" is a legacy parameter. This client does not report when
-	// it has disconnected.
-
-	url := session.buildRequestUrl(
-		"status",
-		&ExtraParam{"session_id", session.sessionId},
-		&ExtraParam{"connected", "1"},
-		// TODO: base64 encoding of padding means the padding
-		// size is not exactly [0, PADDING_MAX_BYTES]
-		&ExtraParam{"padding", base64.StdEncoding.EncodeToString(padding)})
-
-	err = session.doPostRequest(url, "application/json", bytes.NewReader(statsPayloadJSON))
-	if err != nil {
-		return ContextError(err)
-	}
-
-	return nil
+	return serverContext, nil
 }
 
 // doHandshakeRequest performs the handshake API request. The handshake
 // returns upgrade info, newly discovered server entries -- which are
 // stored -- and sponsor info (home pages, stat regexes).
-func (session *Session) doHandshakeRequest() error {
+func (serverContext *ServerContext) doHandshakeRequest() error {
 	extraParams := make([]*ExtraParam, 0)
 	serverEntryIpAddresses, err := GetServerEntryIpAddresses()
 	if err != nil {
@@ -174,8 +111,8 @@ func (session *Session) doHandshakeRequest() error {
 	for _, ipAddress := range serverEntryIpAddresses {
 		extraParams = append(extraParams, &ExtraParam{"known_server", ipAddress})
 	}
-	url := session.buildRequestUrl("handshake", extraParams...)
-	responseBody, err := session.doGetRequest(url)
+	url := buildRequestUrl(serverContext.baseRequestUrl, "handshake", extraParams...)
+	responseBody, err := serverContext.doGetRequest(url)
 	if err != nil {
 		return ContextError(err)
 	}
@@ -202,14 +139,15 @@ func (session *Session) doHandshakeRequest() error {
 		HttpsRequestRegexes  []map[string]string `json:"https_request_regexes"`
 		EncodedServerList    []string            `json:"encoded_server_list"`
 		ClientRegion         string              `json:"client_region"`
+		ServerTimestamp      string              `json:"server_timestamp"`
 	}
 	err = json.Unmarshal(configLine, &handshakeConfig)
 	if err != nil {
 		return ContextError(err)
 	}
 
-	session.clientRegion = handshakeConfig.ClientRegion
-	NoticeClientRegion(session.clientRegion)
+	serverContext.clientRegion = handshakeConfig.ClientRegion
+	NoticeClientRegion(serverContext.clientRegion)
 
 	var decodedServerEntries []*ServerEntry
 
@@ -242,13 +180,13 @@ func (session *Session) doHandshakeRequest() error {
 		NoticeHomepage(homepage)
 	}
 
-	session.clientUpgradeVersion = handshakeConfig.UpgradeClientVersion
+	serverContext.clientUpgradeVersion = handshakeConfig.UpgradeClientVersion
 	if handshakeConfig.UpgradeClientVersion != "" {
 		NoticeClientUpgradeAvailable(handshakeConfig.UpgradeClientVersion)
 	}
 
 	var regexpsNotices []string
-	session.statsRegexps, regexpsNotices = transferstats.MakeRegexps(
+	serverContext.statsRegexps, regexpsNotices = transferstats.MakeRegexps(
 		handshakeConfig.PageViewRegexes,
 		handshakeConfig.HttpsRequestRegexes)
 
@@ -256,15 +194,344 @@ func (session *Session) doHandshakeRequest() error {
 		NoticeAlert(notice)
 	}
 
+	serverContext.serverHandshakeTimestamp = handshakeConfig.ServerTimestamp
+
+	return nil
+}
+
+// DoConnectedRequest performs the connected API request. This request is
+// used for statistics. The server returns a last_connected token for
+// the client to store and send next time it connects. This token is
+// a timestamp (using the server clock, and should be rounded to the
+// nearest hour) which is used to determine when a connection represents
+// a unique user for a time period.
+func (serverContext *ServerContext) DoConnectedRequest() error {
+	const DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
+	lastConnected, err := GetKeyValue(DATA_STORE_LAST_CONNECTED_KEY)
+	if err != nil {
+		return ContextError(err)
+	}
+	if lastConnected == "" {
+		lastConnected = "None"
+	}
+	url := buildRequestUrl(
+		serverContext.baseRequestUrl,
+		"connected",
+		&ExtraParam{"session_id", serverContext.sessionId},
+		&ExtraParam{"last_connected", lastConnected})
+	responseBody, err := serverContext.doGetRequest(url)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	var response struct {
+		ConnectedTimestamp string `json:"connected_timestamp"`
+	}
+	err = json.Unmarshal(responseBody, &response)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	err = SetKeyValue(DATA_STORE_LAST_CONNECTED_KEY, response.ConnectedTimestamp)
+	if err != nil {
+		return ContextError(err)
+	}
 	return nil
 }
 
+// StatsRegexps gets the Regexps used for the statistics for this tunnel.
+func (serverContext *ServerContext) StatsRegexps() *transferstats.Regexps {
+	return serverContext.statsRegexps
+}
+
+// DoStatusRequest makes a /status request to the server, sending session stats.
+func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error {
+
+	url := makeStatusRequestUrl(serverContext.sessionId, serverContext.baseRequestUrl, true)
+
+	payload, payloadInfo, err := makeStatusRequestPayload(tunnel.serverEntry.IpAddress)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	err = serverContext.doPostRequest(url, "application/json", bytes.NewReader(payload))
+	if err != nil {
+
+		// Resend the transfer stats and tunnel stats later
+		// Note: potential duplicate reports if the server received and processed
+		// the request but the client failed to receive the response.
+		putBackStatusRequestPayload(payloadInfo)
+
+		return ContextError(err)
+	}
+	confirmStatusRequestPayload(payloadInfo)
+
+	return nil
+}
+
+func makeStatusRequestUrl(sessionId, baseRequestUrl string, isTunneled bool) string {
+
+	// Add a random amount of padding to help prevent stats updates from being
+	// a predictable size (which often happens when the connection is quiet).
+	padding := MakeSecureRandomPadding(0, PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES)
+
+	// Legacy clients set "connected" to "0" when disconnecting, and this value
+	// is used to calculate session duration estimates. This is now superseded
+	// by explicit tunnel stats duration reporting.
+	// The legacy method of reconstructing session durations is not compatible
+	// with this client's connected request retries and asynchronous final
+	// status request attempts. So we simply set this "connected" flag to reflect
+	// whether the request is sent tunneled or not.
+
+	connected := "1"
+	if !isTunneled {
+		connected = "0"
+	}
+
+	return buildRequestUrl(
+		baseRequestUrl,
+		"status",
+		&ExtraParam{"session_id", sessionId},
+		&ExtraParam{"connected", connected},
+		// TODO: base64 encoding of padding means the padding
+		// size is not exactly [0, PADDING_MAX_BYTES]
+		&ExtraParam{"padding", base64.StdEncoding.EncodeToString(padding)})
+}
+
+// statusRequestPayloadInfo is a temporary structure for data used to
+// either "clear" or "put back" status request payload data depending
+// on whether or not the request succeeded.
+type statusRequestPayloadInfo struct {
+	serverId      string
+	transferStats *transferstats.ServerStats
+	tunnelStats   [][]byte
+}
+
+func makeStatusRequestPayload(
+	serverId string) ([]byte, *statusRequestPayloadInfo, error) {
+
+	transferStats := transferstats.GetForServer(serverId)
+	tunnelStats, err := TakeOutUnreportedTunnelStats(
+		PSIPHON_API_TUNNEL_STATS_MAX_COUNT)
+	if err != nil {
+		NoticeAlert(
+			"TakeOutUnreportedTunnelStats failed: %s", ContextError(err))
+		tunnelStats = nil
+		// Proceed with transferStats only
+	}
+	payloadInfo := &statusRequestPayloadInfo{
+		serverId, transferStats, tunnelStats}
+
+	payload := make(map[string]interface{})
+
+	hostBytes, bytesTransferred := transferStats.GetStatsForReporting()
+	payload["host_bytes"] = hostBytes
+	payload["bytes_transferred"] = bytesTransferred
+
+	// We're not recording these fields, but the server requires them.
+	payload["page_views"] = make([]string, 0)
+	payload["https_requests"] = make([]string, 0)
+
+	// Tunnel stats records are already in JSON format
+	jsonTunnelStats := make([]json.RawMessage, len(tunnelStats))
+	for i, tunnelStatsRecord := range tunnelStats {
+		jsonTunnelStats[i] = json.RawMessage(tunnelStatsRecord)
+	}
+	payload["tunnel_stats"] = jsonTunnelStats
+
+	jsonPayload, err := json.Marshal(payload)
+	if err != nil {
+
+		// Send the transfer stats and tunnel stats later
+		putBackStatusRequestPayload(payloadInfo)
+
+		return nil, nil, ContextError(err)
+	}
+
+	return jsonPayload, payloadInfo, nil
+}
+
+func putBackStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
+	transferstats.PutBack(payloadInfo.serverId, payloadInfo.transferStats)
+	err := PutBackUnreportedTunnelStats(payloadInfo.tunnelStats)
+	if err != nil {
+		// These tunnel stats records won't be resent under after a
+		// datastore re-initialization.
+		NoticeAlert(
+			"PutBackUnreportedTunnelStats failed: %s", ContextError(err))
+	}
+}
+
+func confirmStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
+	err := ClearReportedTunnelStats(payloadInfo.tunnelStats)
+	if err != nil {
+		// These tunnel stats records may be resent.
+		NoticeAlert(
+			"ClearReportedTunnelStats failed: %s", ContextError(err))
+	}
+}
+
+// TryUntunneledStatusRequest makes direct connections to the specified
+// server (if supported) in an attempt to send useful bytes transferred
+// and tunnel duration stats after a tunnel has alreay failed.
+// The tunnel is assumed to be closed, but its config, protocol, and
+// context values must still be valid.
+// TryUntunneledStatusRequest emits notices detailing failed attempts.
+func TryUntunneledStatusRequest(tunnel *Tunnel, isShutdown bool) error {
+
+	for _, port := range tunnel.serverEntry.GetDirectWebRequestPorts() {
+		err := doUntunneledStatusRequest(tunnel, port, isShutdown)
+		if err == nil {
+			return nil
+		}
+		NoticeAlert("doUntunneledStatusRequest failed for %s:%s: %s",
+			tunnel.serverEntry.IpAddress, port, err)
+	}
+
+	return errors.New("all attempts failed")
+}
+
+// doUntunneledStatusRequest attempts an untunneled stratus request.
+func doUntunneledStatusRequest(
+	tunnel *Tunnel, port string, isShutdown bool) error {
+
+	url := makeStatusRequestUrl(
+		tunnel.serverContext.sessionId,
+		makeBaseRequestUrl(tunnel, port, tunnel.serverContext.sessionId),
+		false)
+
+	certificate, err := DecodeCertificate(tunnel.serverEntry.WebServerCertificate)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	timeout := PSIPHON_API_SERVER_TIMEOUT
+	if isShutdown {
+		timeout = PSIPHON_API_SHUTDOWN_SERVER_TIMEOUT
+	}
+
+	httpClient, requestUrl, err := MakeUntunneledHttpsClient(
+		tunnel.untunneledDialConfig,
+		certificate,
+		url,
+		timeout)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	payload, payloadInfo, err := makeStatusRequestPayload(tunnel.serverEntry.IpAddress)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	bodyType := "application/json"
+	body := bytes.NewReader(payload)
+
+	response, err := httpClient.Post(requestUrl, bodyType, body)
+	if err == nil && response.StatusCode != http.StatusOK {
+		response.Body.Close()
+		err = fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode)
+	}
+	if err != nil {
+
+		// Resend the transfer stats and tunnel stats later
+		// Note: potential duplicate reports if the server received and processed
+		// the request but the client failed to receive the response.
+		putBackStatusRequestPayload(payloadInfo)
+
+		// Trim this error since it may include long URLs
+		return ContextError(TrimError(err))
+	}
+	confirmStatusRequestPayload(payloadInfo)
+	response.Body.Close()
+
+	return nil
+}
+
+// RecordTunnelStats records a tunnel duration and bytes
+// sent and received for subsequent reporting and quality
+// analysis.
+//
+// Tunnel durations are precisely measured client-side
+// and reported in status requests. As the duration is
+// not determined until the tunnel is closed, tunnel
+// stats records are stored in the persistent datastore
+// and reported via subsequent status requests sent to any
+// Psiphon server.
+//
+// Since the status request that reports a tunnel stats
+// record is not necessarily handled by the same server, the
+// tunnel stats records include the original server ID.
+//
+// Other fields that may change between tunnel stats recording
+// and reporting include client geo data, propagation channel,
+// sponsor ID, client version. These are not stored in the
+// datastore (client region, in particular, since that would
+// create an on-disk record of user location).
+// TODO: the server could encrypt, with a nonce and key unknown to
+// the client, a blob containing this data; return it in the
+// handshake response; and the client could store and later report
+// this blob with its tunnel stats records.
+//
+// Multiple "status" requests may be in flight at once (due
+// to multi-tunnel, asynchronous final status retry, and
+// aggressive status requests for pre-registered tunnels),
+// To avoid duplicate reporting, tunnel stats records are
+// "taken-out" by a status request and then "put back" in
+// case the request fails.
+//
+// Note: since tunnel stats records have a globally unique
+// identifier (sessionId + tunnelNumber), we could tolerate
+// duplicate reporting and filter our duplicates on the
+// server-side. Permitting duplicate reporting could increase
+// the velocity of reporting (for example, both the asynchronous
+// untunneled final status requests and the post-connected
+// immediate startus requests could try to report the same tunnel
+// stats).
+// Duplicate reporting may also occur when a server receives and
+// processes a status request but the client fails to receive
+// the response.
+func RecordTunnelStats(
+	sessionId string,
+	tunnelNumber int64,
+	tunnelServerIpAddress string,
+	serverHandshakeTimestamp, duration string,
+	totalBytesSent, totalBytesReceived int64) error {
+
+	tunnelStats := struct {
+		SessionId                string `json:"session_id"`
+		TunnelNumber             int64  `json:"tunnel_number"`
+		TunnelServerIpAddress    string `json:"tunnel_server_ip_address"`
+		ServerHandshakeTimestamp string `json:"server_handshake_timestamp"`
+		Duration                 string `json:"duration"`
+		TotalBytesSent           int64  `json:"total_bytes_sent"`
+		TotalBytesReceived       int64  `json:"total_bytes_received"`
+	}{
+		sessionId,
+		tunnelNumber,
+		tunnelServerIpAddress,
+		serverHandshakeTimestamp,
+		duration,
+		totalBytesSent,
+		totalBytesReceived,
+	}
+
+	tunnelStatsJson, err := json.Marshal(tunnelStats)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return StoreTunnelStats(tunnelStatsJson)
+}
+
 // doGetRequest makes a tunneled HTTPS request and returns the response body.
-func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, err error) {
-	response, err := session.psiphonHttpsClient.Get(requestUrl)
+func (serverContext *ServerContext) doGetRequest(
+	requestUrl string) (responseBody []byte, err error) {
+
+	response, err := serverContext.psiphonHttpsClient.Get(requestUrl)
 	if err == nil && response.StatusCode != http.StatusOK {
 		response.Body.Close()
-		err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
+		err = fmt.Errorf("HTTP GET request failed with response code: %d", response.StatusCode)
 	}
 	if err != nil {
 		// Trim this error since it may include long URLs
@@ -275,41 +542,42 @@ func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, er
 	if err != nil {
 		return nil, ContextError(err)
 	}
-	if response.StatusCode != http.StatusOK {
-		return nil, ContextError(fmt.Errorf("HTTP GET request failed with response code: %d", response.StatusCode))
-	}
 	return body, nil
 }
 
 // doPostRequest makes a tunneled HTTPS POST request.
-func (session *Session) doPostRequest(requestUrl string, bodyType string, body io.Reader) (err error) {
-	response, err := session.psiphonHttpsClient.Post(requestUrl, bodyType, body)
+func (serverContext *ServerContext) doPostRequest(
+	requestUrl string, bodyType string, body io.Reader) (err error) {
+
+	response, err := serverContext.psiphonHttpsClient.Post(requestUrl, bodyType, body)
 	if err == nil && response.StatusCode != http.StatusOK {
 		response.Body.Close()
-		err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
+		err = fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode)
 	}
 	if err != nil {
 		// Trim this error since it may include long URLs
 		return ContextError(TrimError(err))
 	}
 	response.Body.Close()
-	if response.StatusCode != http.StatusOK {
-		return ContextError(fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode))
-	}
-	return
+	return nil
 }
 
 // makeBaseRequestUrl makes a URL containing all the common parameters
 // that are included with Psiphon API requests. These common parameters
 // are used for statistics.
-func makeBaseRequestUrl(config *Config, tunnel *Tunnel, sessionId string) string {
+func makeBaseRequestUrl(tunnel *Tunnel, port, sessionId string) string {
 	var requestUrl bytes.Buffer
+
+	if port == "" {
+		port = tunnel.serverEntry.WebServerPort
+	}
+
 	// Note: don't prefix with HTTPS scheme, see comment in doGetRequest.
 	// e.g., don't do this: requestUrl.WriteString("https://")
 	requestUrl.WriteString("http://")
 	requestUrl.WriteString(tunnel.serverEntry.IpAddress)
 	requestUrl.WriteString(":")
-	requestUrl.WriteString(tunnel.serverEntry.WebServerPort)
+	requestUrl.WriteString(port)
 	requestUrl.WriteString("/")
 	// Placeholder for the path component of a request
 	requestUrl.WriteString("%s")
@@ -318,18 +586,18 @@ func makeBaseRequestUrl(config *Config, tunnel *Tunnel, sessionId string) string
 	requestUrl.WriteString("&server_secret=")
 	requestUrl.WriteString(tunnel.serverEntry.WebServerSecret)
 	requestUrl.WriteString("&propagation_channel_id=")
-	requestUrl.WriteString(config.PropagationChannelId)
+	requestUrl.WriteString(tunnel.config.PropagationChannelId)
 	requestUrl.WriteString("&sponsor_id=")
-	requestUrl.WriteString(config.SponsorId)
+	requestUrl.WriteString(tunnel.config.SponsorId)
 	requestUrl.WriteString("&client_version=")
-	requestUrl.WriteString(config.ClientVersion)
+	requestUrl.WriteString(tunnel.config.ClientVersion)
 	// TODO: client_tunnel_core_version
 	requestUrl.WriteString("&relay_protocol=")
 	requestUrl.WriteString(tunnel.protocol)
 	requestUrl.WriteString("&client_platform=")
-	requestUrl.WriteString(config.ClientPlatform)
+	requestUrl.WriteString(tunnel.config.ClientPlatform)
 	requestUrl.WriteString("&tunnel_whole_device=")
-	requestUrl.WriteString(strconv.Itoa(config.TunnelWholeDevice))
+	requestUrl.WriteString(strconv.Itoa(tunnel.config.TunnelWholeDevice))
 	return requestUrl.String()
 }
 
@@ -337,9 +605,9 @@ type ExtraParam struct{ name, value string }
 
 // buildRequestUrl makes a URL for an API request. The URL includes the
 // base request URL and any extra parameters for the specific request.
-func (session *Session) buildRequestUrl(path string, extraParams ...*ExtraParam) string {
+func buildRequestUrl(baseRequestUrl, path string, extraParams ...*ExtraParam) string {
 	var requestUrl bytes.Buffer
-	requestUrl.WriteString(fmt.Sprintf(session.baseRequestUrl, path))
+	requestUrl.WriteString(fmt.Sprintf(baseRequestUrl, path))
 	for _, extraParam := range extraParams {
 		requestUrl.WriteString("&")
 		requestUrl.WriteString(extraParam.name)

+ 14 - 0
psiphon/serverEntry.go

@@ -109,6 +109,20 @@ func (serverEntry *ServerEntry) DisableImpairedProtocols(impairedProtocols []str
 	serverEntry.Capabilities = capabilities
 }
 
+func (serverEntry *ServerEntry) GetDirectWebRequestPorts() []string {
+	ports := make([]string, 0)
+	if Contains(serverEntry.Capabilities, "handshake") {
+		// Server-side configuration quirk: there's a port forward from
+		// port 443 to the web server, which we can try, except on servers
+		// running FRONTED_MEEK, which listens on port 443.
+		if serverEntry.SupportsProtocol(TUNNEL_PROTOCOL_FRONTED_MEEK) {
+			ports = append(ports, "443")
+		}
+		ports = append(ports, serverEntry.WebServerPort)
+	}
+	return ports
+}
+
 // DecodeServerEntry extracts server entries from the encoding
 // used by remote server lists and Psiphon server handshake requests.
 func DecodeServerEntry(encodedServerEntry string) (serverEntry *ServerEntry, err error) {

+ 8 - 8
psiphon/splitTunnel.go

@@ -114,12 +114,12 @@ func (classifier *SplitTunnelClassifier) Start(fetchRoutesTunnel *Tunnel) {
 		return
 	}
 
-	if fetchRoutesTunnel.session == nil {
-		// Tunnel has no session
+	if fetchRoutesTunnel.serverContext == nil {
+		// Tunnel has no serverContext
 		return
 	}
 
-	if fetchRoutesTunnel.session.clientRegion == "" {
+	if fetchRoutesTunnel.serverContext.clientRegion == "" {
 		// Split tunnel region is unknown
 		return
 	}
@@ -207,7 +207,7 @@ func (classifier *SplitTunnelClassifier) setRoutes(tunnel *Tunnel) {
 		return
 	}
 
-	NoticeSplitTunnelRegion(tunnel.session.clientRegion)
+	NoticeSplitTunnelRegion(tunnel.serverContext.clientRegion)
 }
 
 // getRoutes makes a web request to download fresh routes data for the
@@ -216,13 +216,13 @@ func (classifier *SplitTunnelClassifier) setRoutes(tunnel *Tunnel) {
 // fails and cached routes data is present, that cached data is returned.
 func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData []byte, err error) {
 
-	url := fmt.Sprintf(classifier.fetchRoutesUrlFormat, tunnel.session.clientRegion)
+	url := fmt.Sprintf(classifier.fetchRoutesUrlFormat, tunnel.serverContext.clientRegion)
 	request, err := http.NewRequest("GET", url, nil)
 	if err != nil {
 		return nil, ContextError(err)
 	}
 
-	etag, err := GetSplitTunnelRoutesETag(tunnel.session.clientRegion)
+	etag, err := GetSplitTunnelRoutesETag(tunnel.serverContext.clientRegion)
 	if err != nil {
 		return nil, ContextError(err)
 	}
@@ -310,7 +310,7 @@ func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData [
 	if !useCachedRoutes {
 		etag := response.Header.Get("ETag")
 		if etag != "" {
-			err := SetSplitTunnelRoutes(tunnel.session.clientRegion, etag, routesData)
+			err := SetSplitTunnelRoutes(tunnel.serverContext.clientRegion, etag, routesData)
 			if err != nil {
 				NoticeAlert("failed to cache split tunnel routes: %s", ContextError(err))
 				// Proceed with fetched data, even when we can't cache it
@@ -319,7 +319,7 @@ func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData [
 	}
 
 	if useCachedRoutes {
-		routesData, err = GetSplitTunnelRoutesData(tunnel.session.clientRegion)
+		routesData, err = GetSplitTunnelRoutesData(tunnel.serverContext.clientRegion)
 		if err != nil {
 			return nil, ContextError(err)
 		}

+ 16 - 26
psiphon/transferstats/collector.go

@@ -20,7 +20,6 @@
 package transferstats
 
 import (
-	"encoding/json"
 	"sync"
 )
 
@@ -44,15 +43,15 @@ func newHostStats() *hostStats {
 	return &hostStats{}
 }
 
-// serverStats holds per-server stats.
-type serverStats struct {
+// ServerStats holds per-server stats.
+type ServerStats struct {
 	hostnameToStats    map[string]*hostStats
 	totalBytesSent     int64
 	totalBytesReceived int64
 }
 
-func newServerStats() *serverStats {
-	return &serverStats{
+func newServerStats() *ServerStats {
+	return &ServerStats{
 		hostnameToStats: make(map[string]*hostStats),
 	}
 }
@@ -61,8 +60,8 @@ func newServerStats() *serverStats {
 // as well as the mutex to access them.
 var allStats = struct {
 	statsMutex      sync.RWMutex
-	serverIDtoStats map[string]*serverStats
-}{serverIDtoStats: make(map[string]*serverStats)}
+	serverIDtoStats map[string]*ServerStats
+}{serverIDtoStats: make(map[string]*ServerStats)}
 
 // statsUpdate contains new stats counts to be aggregated.
 type statsUpdate struct {
@@ -105,27 +104,18 @@ func recordStat(stat *statsUpdate) {
 	//fmt.Println("server:", stat.serverID, "host:", stat.hostname, "sent:", storedHostStats.numBytesSent, "received:", storedHostStats.numBytesReceived)
 }
 
-// Implement the json.Marshaler interface
-func (ss serverStats) MarshalJSON() ([]byte, error) {
-	out := make(map[string]interface{})
+func (serverStats ServerStats) GetStatsForReporting() (map[string]int64, int64) {
 
 	hostBytes := make(map[string]int64)
 	bytesTransferred := int64(0)
 
-	for hostname, hostStats := range ss.hostnameToStats {
+	for hostname, hostStats := range serverStats.hostnameToStats {
 		totalBytes := hostStats.numBytesReceived + hostStats.numBytesSent
 		bytesTransferred += totalBytes
 		hostBytes[hostname] = totalBytes
 	}
 
-	out["bytes_transferred"] = bytesTransferred
-	out["host_bytes"] = hostBytes
-
-	// We're not using these fields, but the server requires them
-	out["page_views"] = make([]string, 0)
-	out["https_requests"] = make([]string, 0)
-
-	return json.Marshal(out)
+	return hostBytes, bytesTransferred
 }
 
 // GetBytesTransferredForServer returns total bytes sent and received since
@@ -149,22 +139,22 @@ func GetBytesTransferredForServer(serverID string) (sent, received int64) {
 	return
 }
 
-// GetForServer returns the json-able stats package for the given server.
-func GetForServer(serverID string) (payload *serverStats) {
+// GetForServer returns the server stats for the given server.
+func GetForServer(serverID string) (serverStats *ServerStats) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
-	payload = allStats.serverIDtoStats[serverID]
-	if payload == nil {
-		payload = newServerStats()
+	serverStats = allStats.serverIDtoStats[serverID]
+	if serverStats == nil {
+		serverStats = newServerStats()
 	}
 	delete(allStats.serverIDtoStats, serverID)
 	return
 }
 
 // PutBack re-adds a set of server stats to the collection.
-func PutBack(serverID string, ss *serverStats) {
-	for hostname, hoststats := range ss.hostnameToStats {
+func PutBack(serverID string, serverStats *ServerStats) {
+	for hostname, hoststats := range serverStats.hostnameToStats {
 		recordStat(
 			&statsUpdate{
 				serverID:         serverID,

+ 146 - 43
psiphon/tunnel.go

@@ -63,9 +63,12 @@ type TunnelOwner interface {
 // and an SSH session built on top of that transport.
 type Tunnel struct {
 	mutex                    *sync.Mutex
+	config                   *Config
+	untunneledDialConfig     *DialConfig
+	isDiscarded              bool
 	isClosed                 bool
 	serverEntry              *ServerEntry
-	session                  *Session
+	serverContext            *ServerContext
 	protocol                 string
 	conn                     net.Conn
 	sshClient                *ssh.Client
@@ -73,7 +76,7 @@ type Tunnel struct {
 	shutdownOperateBroadcast chan struct{}
 	signalPortForwardFailure chan struct{}
 	totalPortForwardFailures int
-	sessionStartTime         time.Time
+	startTime                time.Time
 }
 
 // EstablishTunnel first makes a network transport connection to the
@@ -85,8 +88,10 @@ type Tunnel struct {
 // HTTP (meek protocol).
 // When requiredProtocol is not blank, that protocol is used. Otherwise,
 // the a random supported protocol is used.
+// untunneledDialConfig is used for untunneled final status requests.
 func EstablishTunnel(
 	config *Config,
+	untunneledDialConfig *DialConfig,
 	sessionId string,
 	pendingConns *Conns,
 	serverEntry *ServerEntry,
@@ -115,6 +120,8 @@ func EstablishTunnel(
 	// The tunnel is now connected
 	tunnel = &Tunnel{
 		mutex:                    new(sync.Mutex),
+		config:                   config,
+		untunneledDialConfig:     untunneledDialConfig,
 		isClosed:                 false,
 		serverEntry:              serverEntry,
 		protocol:                 selectedProtocol,
@@ -127,41 +134,39 @@ func EstablishTunnel(
 		signalPortForwardFailure: make(chan struct{}, 1),
 	}
 
-	// Create a new Psiphon API session for this tunnel. This includes performing
-	// a handshake request. If the handshake fails, this establishment fails.
-	//
-	// TODO: as long as the servers are not enforcing that a client perform a handshake,
-	// proceed with this tunnel as long as at least one previous handhake succeeded?
-	//
+	// Create a new Psiphon API server context for this tunnel. This includes
+	// performing a handshake request. If the handshake fails, this establishment
+	// fails.
 	if !config.DisableApi {
-		NoticeInfo("starting session for %s", tunnel.serverEntry.IpAddress)
-		tunnel.session, err = NewSession(config, tunnel, sessionId)
+		NoticeInfo("starting server context for %s", tunnel.serverEntry.IpAddress)
+		tunnel.serverContext, err = NewServerContext(tunnel, sessionId)
 		if err != nil {
-			return nil, ContextError(fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err))
+			return nil, ContextError(
+				fmt.Errorf("error starting server context for %s: %s",
+					tunnel.serverEntry.IpAddress, err))
 		}
 	}
 
-	tunnel.sessionStartTime = time.Now()
+	tunnel.startTime = time.Now()
 
 	// Now that network operations are complete, cancel interruptibility
 	pendingConns.Remove(conn)
 
-	// Promote this successful tunnel to first rank so it's one
-	// of the first candidates next time establish runs.
-	PromoteServerEntry(tunnel.serverEntry.IpAddress)
-
 	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
 	tunnel.operateWaitGroup.Add(1)
-	go tunnel.operateTunnel(config, tunnelOwner)
+	go tunnel.operateTunnel(tunnelOwner)
 
 	return tunnel, nil
 }
 
 // Close stops operating the tunnel and closes the underlying connection.
 // Supports multiple and/or concurrent calls to Close().
-func (tunnel *Tunnel) Close() {
+// When isDicarded is set, operateTunnel will not attempt to send final
+// status requests.
+func (tunnel *Tunnel) Close(isDiscarded bool) {
 
 	tunnel.mutex.Lock()
+	tunnel.isDiscarded = isDiscarded
 	isClosed := tunnel.isClosed
 	tunnel.isClosed = true
 	tunnel.mutex.Unlock()
@@ -185,6 +190,13 @@ func (tunnel *Tunnel) Close() {
 	}
 }
 
+// IsDiscarded returns the tunnel's discarded flag.
+func (tunnel *Tunnel) IsDiscarded() bool {
+	tunnel.mutex.Lock()
+	defer tunnel.mutex.Unlock()
+	return tunnel.isDiscarded
+}
+
 // Dial establishes a port forward connection through the tunnel
 // This Dial doesn't support split tunnel, so alwaysTunnel is not referenced
 func (tunnel *Tunnel) Dial(
@@ -226,12 +238,12 @@ func (tunnel *Tunnel) Dial(
 		tunnel:         tunnel,
 		downstreamConn: downstreamConn}
 
-	// Tunnel does not have a session when DisableApi is set. We still use
+	// Tunnel does not have a serverContext when DisableApi is set. We still use
 	// transferstats.Conn to count bytes transferred for monitoring tunnel
 	// quality.
 	var regexps *transferstats.Regexps
-	if tunnel.session != nil {
-		regexps = tunnel.session.StatsRegexps()
+	if tunnel.serverContext != nil {
+		regexps = tunnel.serverContext.StatsRegexps()
 	}
 	conn = transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
 
@@ -242,7 +254,7 @@ func (tunnel *Tunnel) Dial(
 // This will terminate the tunnel.
 func (tunnel *Tunnel) SignalComponentFailure() {
 	NoticeAlert("tunnel received component failure signal")
-	tunnel.Close()
+	tunnel.Close(false)
 }
 
 // TunneledConn implements net.Conn and wraps a port foward connection.
@@ -535,7 +547,7 @@ func dialSsh(
 // TODO: change "recently active" to include having received any
 // SSH protocol messages from the server, not just user payload?
 //
-func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
+func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	defer tunnel.operateWaitGroup.Done()
 
 	lastBytesReceivedTime := time.Now()
@@ -564,6 +576,22 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	statsTimer := time.NewTimer(nextStatusRequestPeriod())
 	defer statsTimer.Stop()
 
+	// Schedule an immediate status request to deliver any unreported
+	// tunnel stats.
+	// Note: this may not be effective when there's an outstanding
+	// asynchronous untunneled final status request is holding the
+	// tunnel stats records. It may also conflict with other
+	// tunnel candidates which attempt to send an immediate request
+	// before being discarded. For now, we mitigate this with a short,
+	// random delay.
+	unreported := CountUnreportedTunnelStats()
+	if unreported > 0 {
+		NoticeInfo("Unreported tunnel stats: %d", unreported)
+		statsTimer.Reset(MakeRandomPeriod(
+			PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MIN,
+			PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MAX))
+	}
+
 	nextSshKeepAlivePeriod := func() time.Duration {
 		return MakeRandomPeriod(
 			TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
@@ -572,7 +600,7 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 
 	// TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
 	sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
-	if config.DisablePeriodicSshKeepAlive {
+	if tunnel.config.DisablePeriodicSshKeepAlive {
 		sshKeepAliveTimer.Stop()
 	} else {
 		defer sshKeepAliveTimer.Stop()
@@ -580,13 +608,10 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 
 	// Perform network requests in separate goroutines so as not to block
 	// other operations.
-	// Note: defer LIFO dependency: channels to be closed before Wait()
 	requestsWaitGroup := new(sync.WaitGroup)
-	defer requestsWaitGroup.Wait()
 
 	requestsWaitGroup.Add(1)
 	signalStatusRequest := make(chan struct{})
-	defer close(signalStatusRequest)
 	go func() {
 		defer requestsWaitGroup.Done()
 		for _ = range signalStatusRequest {
@@ -597,7 +622,6 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	requestsWaitGroup.Add(1)
 	signalSshKeepAlive := make(chan time.Duration)
 	sshKeepAliveError := make(chan error, 1)
-	defer close(signalSshKeepAlive)
 	go func() {
 		defer requestsWaitGroup.Done()
 		for timeout := range signalSshKeepAlive {
@@ -611,8 +635,9 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 		}
 	}()
 
+	shutdown := false
 	var err error
-	for err == nil {
+	for !shutdown && err == nil {
 		select {
 		case <-noticeBytesTransferredTicker.C:
 			sent, received := transferstats.GetBytesTransferredForServer(
@@ -631,7 +656,7 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 			}
 
 			// Only emit the frequent BytesTransferred notice when tunnel is not idle.
-			if config.EmitBytesTransferred && (sent > 0 || received > 0) {
+			if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
 				NoticeBytesTransferred(tunnel.serverEntry.IpAddress, sent, received)
 			}
 
@@ -663,22 +688,74 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 				default:
 				}
 			}
-			if !config.DisablePeriodicSshKeepAlive {
+			if !tunnel.config.DisablePeriodicSshKeepAlive {
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			}
 
 		case err = <-sshKeepAliveError:
 
 		case <-tunnel.shutdownOperateBroadcast:
-			// Attempt to send any remaining stats
-			sendStats(tunnel)
-			NoticeInfo("shutdown operate tunnel")
-			return
+			shutdown = true
 		}
 	}
 
-	if err != nil {
+	close(signalSshKeepAlive)
+	close(signalStatusRequest)
+	requestsWaitGroup.Wait()
+
+	// The stats for this tunnel will be reported via the next successful
+	// status request.
+	// Note: Since client clocks are unreliable, we use the server's reported
+	// timestamp in the handshake response as the tunnel start time. This time
+	// will be slightly earlier than the actual tunnel activation time, as the
+	// client has to receive and parse the response and activate the tunnel.
+	if !tunnel.IsDiscarded() {
+		err := RecordTunnelStats(
+			tunnel.serverContext.sessionId,
+			tunnel.serverContext.tunnelNumber,
+			tunnel.serverEntry.IpAddress,
+			tunnel.serverContext.serverHandshakeTimestamp,
+			fmt.Sprintf("%d", time.Now().Sub(tunnel.startTime)),
+			totalSent,
+			totalReceived)
+		if err != nil {
+			NoticeAlert("RecordTunnelStats failed: %s", ContextError(err))
+		}
+	}
+
+	// Final status request notes:
+	//
+	// It's highly desirable to send a final status request in order to report
+	// domain bytes transferred stats as well as to report tunnel stats as
+	// soon as possible. For this reason, we attempt untunneled requests when
+	// the tunneled request isn't possible or has failed.
+	//
+	// In an orderly shutdown (err == nil), the Controller is stopping and
+	// everything must be wrapped up quickly. Also, we still have a working
+	// tunnel. So we first attempt a tunneled status request (with a short
+	// timeout) and then attempt, synchronously -- otherwise the Contoller's
+	// untunneledPendingConns.CloseAll() will immediately interrupt untunneled
+	// requests -- untunneled requests (also with short timeouts).
+	// Note that this depends on the order of untunneledPendingConns.CloseAll()
+	// coming after tunnel.Close(): see note in Controller.Run().
+	//
+	// If the tunnel has failed, the Controller may continue working. We want
+	// to re-establish as soon as possible (so don't want to block on status
+	// requests, even for a second). We may have a long time to attempt
+	// untunneled requests in the background. And there is no tunnel through
+	// which to attempt tunneled requests. So we spawn a goroutine to run the
+	// untunneled requests, which are allowed a longer timeout. These requests
+	// will be interrupted by the Controller's untunneledPendingConns.CloseAll()
+	// in the case of a shutdown.
+
+	if err == nil {
+		NoticeInfo("shutdown operate tunnel")
+		if !sendStats(tunnel) {
+			sendUntunneledStats(tunnel, true)
+		}
+	} else {
 		NoticeAlert("operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
+		go sendUntunneledStats(tunnel, false)
 		tunnelOwner.SignalTunnelFailure(tunnel)
 	}
 }
@@ -712,17 +789,43 @@ func sendSshKeepAlive(
 }
 
 // sendStats is a helper for sending session stats to the server.
-func sendStats(tunnel *Tunnel) {
+func sendStats(tunnel *Tunnel) bool {
 
-	// Tunnel does not have a session when DisableApi is set
-	if tunnel.session == nil {
-		return
+	// Tunnel does not have a serverContext when DisableApi is set
+	if tunnel.serverContext == nil {
+		return true
+	}
+
+	// Skip when tunnel is discarded
+	if tunnel.IsDiscarded() {
+		return true
 	}
 
-	payload := transferstats.GetForServer(tunnel.serverEntry.IpAddress)
-	err := tunnel.session.DoStatusRequest(payload)
+	err := tunnel.serverContext.DoStatusRequest(tunnel)
 	if err != nil {
 		NoticeAlert("DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
-		transferstats.PutBack(tunnel.serverEntry.IpAddress, payload)
+	}
+
+	return err == nil
+}
+
+// sendUntunnelStats sends final status requests directly to Psiphon
+// servers after the tunnel has already failed. This is an attempt
+// to retain useful bytes transferred stats.
+func sendUntunneledStats(tunnel *Tunnel, isShutdown bool) {
+
+	// Tunnel does not have a serverContext when DisableApi is set
+	if tunnel.serverContext == nil {
+		return
+	}
+
+	// Skip when tunnel is discarded
+	if tunnel.IsDiscarded() {
+		return
+	}
+
+	err := TryUntunneledStatusRequest(tunnel, isShutdown)
+	if err != nil {
+		NoticeAlert("TryUntunneledStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
 	}
 }