Просмотр исходного кода

Implement client-side tunnel duration reporting

* Client records the precise tunnel duration and
reports it in the next "status" request for the next
tunnel.
* To retain duration data, unreported tunnel durations
are persisted to the data store until reported.
* This new scheme supersedes the server-side duration
estimation based on pairing "connected" and final
"status" requests. The "connected" parameter in the
status request no longer follows the convention of that
old scheme.
* Rename "Session" to "ServerContext": a session is now
considered to be the entire lifetime of a Psiphon app
between user-commanded "Start" and "Stop". Renamed the
per-tunnel server state to avoid confusion.
* Heavily reworked serverApi code.
* Changed transferstats API, replacing json.Marshaler
interface with explicit ServerStats struct. Then moved
the "status" request request body formatting to serverApi
in the "psiphon" package.
Rod Hynes 10 лет назад
Родитель
Сommit
f92103fd79
7 измененных файлов с 636 добавлено и 286 удалено
  1. 1 0
      psiphon/config.go
  2. 13 10
      psiphon/controller.go
  3. 173 0
      psiphon/dataStore_alt.go
  4. 365 192
      psiphon/serverApi.go
  5. 8 8
      psiphon/splitTunnel.go
  6. 16 26
      psiphon/transferstats/collector.go
  7. 60 50
      psiphon/tunnel.go

+ 1 - 0
psiphon/config.go

@@ -59,6 +59,7 @@ const (
 	PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES   = 256
 	PSIPHON_API_CONNECTED_REQUEST_PERIOD           = 24 * time.Hour
 	PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD     = 5 * time.Second
+	PSIPHON_API_TUNNEL_DURATIONS_MAX_COUNT         = 1000
 	FETCH_ROUTES_TIMEOUT                           = 1 * time.Minute
 	DOWNLOAD_UPGRADE_TIMEOUT                       = 15 * time.Minute
 	DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD            = 5 * time.Second

+ 13 - 10
psiphon/controller.go

@@ -312,7 +312,7 @@ loop:
 		reported := false
 		tunnel := controller.getNextActiveTunnel()
 		if tunnel != nil {
-			err := tunnel.session.DoConnectedRequest()
+			err := tunnel.serverContext.DoConnectedRequest()
 			if err == nil {
 				reported = true
 			} else {
@@ -396,8 +396,10 @@ loop:
 	NoticeInfo("exiting upgrade downloader")
 }
 
-func (controller *Controller) startClientUpgradeDownloader(session *Session) {
-	// session is nil when DisableApi is set
+func (controller *Controller) startClientUpgradeDownloader(
+	serverContext *ServerContext) {
+
+	// serverContext is nil when DisableApi is set
 	if controller.config.DisableApi {
 		return
 	}
@@ -408,7 +410,7 @@ func (controller *Controller) startClientUpgradeDownloader(session *Session) {
 		return
 	}
 
-	if session.clientUpgradeVersion == "" {
+	if serverContext.clientUpgradeVersion == "" {
 		// No upgrade is offered
 		return
 	}
@@ -418,7 +420,7 @@ func (controller *Controller) startClientUpgradeDownloader(session *Session) {
 	if !controller.startedUpgradeDownloader {
 		controller.startedUpgradeDownloader = true
 		controller.runWaitGroup.Add(1)
-		go controller.upgradeDownloader(session.clientUpgradeVersion)
+		go controller.upgradeDownloader(serverContext.clientUpgradeVersion)
 	}
 }
 
@@ -455,7 +457,7 @@ loop:
 			// establishPendingConns; this causes the pendingConns.Add() within
 			// interruptibleTCPDial to succeed instead of aborting, and the result
 			// is that it's possible for establish goroutines to run all the way through
-			// NewSession before being discarded... delaying shutdown.
+			// NewServerContext before being discarded... delaying shutdown.
 			select {
 			case <-controller.shutdownBroadcast:
 				break loop
@@ -497,7 +499,8 @@ loop:
 					// tunnel is established.
 					controller.startOrSignalConnectedReporter()
 
-					controller.startClientUpgradeDownloader(establishedTunnel.session)
+					controller.startClientUpgradeDownloader(
+						establishedTunnel.serverContext)
 				}
 
 			} else {
@@ -532,7 +535,7 @@ loop:
 
 // classifyImpairedProtocol tracks "impaired" protocol classifications for failed
 // tunnels. A protocol is classified as impaired if a tunnel using that protocol
-// fails, repeatedly, shortly after the start of the session. During tunnel
+// fails, repeatedly, shortly after the start of the connection. During tunnel
 // establishment, impaired protocols are briefly skipped.
 //
 // One purpose of this measure is to defend against an attack where the adversary,
@@ -543,7 +546,7 @@ loop:
 //
 // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol
 func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) {
-	if failedTunnel.sessionStartTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
+	if failedTunnel.startTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
 		controller.impairedProtocolClassification[failedTunnel.protocol] += 1
 	} else {
 		controller.impairedProtocolClassification[failedTunnel.protocol] = 0
@@ -879,7 +882,7 @@ loop:
 			// first iteration of the ESTABLISH_TUNNEL_WORK_TIME
 			// loop since (a) one iteration should be sufficient to
 			// evade the attack; (b) there's a good chance of false
-			// positives (such as short session durations due to network
+			// positives (such as short tunnel durations due to network
 			// hopping on a mobile device).
 			// Impaired protocols logic is not applied when
 			// config.TunnelProtocol is specified.

+ 173 - 0
psiphon/dataStore_alt.go

@@ -22,6 +22,7 @@
 package psiphon
 
 import (
+	"bytes"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -55,6 +56,7 @@ const (
 	splitTunnelRouteDataBucket  = "splitTunnelRouteData"
 	urlETagsBucket              = "urlETags"
 	keyValueBucket              = "keyValues"
+	tunnelDurationsBucket       = "tunnelDurations"
 	rankedServerEntryCount      = 100
 )
 
@@ -87,6 +89,7 @@ func InitDataStore(config *Config) (err error) {
 				splitTunnelRouteDataBucket,
 				urlETagsBucket,
 				keyValueBucket,
+				tunnelDurationsBucket,
 			}
 			for _, bucket := range requiredBuckets {
 				_, err := tx.CreateBucketIfNotExists([]byte(bucket))
@@ -102,6 +105,8 @@ func InitDataStore(config *Config) (err error) {
 		}
 
 		singleton.db = db
+
+		resetAllTunnelDurationsToUnreported()
 	})
 	return err
 }
@@ -717,3 +722,171 @@ func GetKeyValue(key string) (value string, err error) {
 	}
 	return value, nil
 }
+
+// Tunnel duration records in the tunnelDurationStateUnreported
+// state are available for take out.
+// Records in the tunnelDurationStateReporting have been
+// taken out and are pending either deleting (for a
+// successful request) or change to StateUnreported (for
+// a failed request).
+// All tunnel durations are reverted to StateUnreported when
+// the datastore is initialized at start up.
+
+var tunnelDurationStateUnreported = []byte("0")
+var tunnelDurationStateReporting = []byte("1")
+
+// StoreTunnelDuration adds a new tunnel duration, which is
+// set to StateUnreported and is an immediate candidate for
+// reporting.
+// tunnelDuration is a JSON byte array containing fields as
+// required by the Psiphon server API (see RecordTunnelDuration).
+// It's assumed that the JSON value contains enough unique
+// information for the value to function as a key in the
+// key/value datastore. This assumption is currently satisfied
+// by the fields sessionId + tunnelNumber.
+func StoreTunnelDuration(tunnelDuration []byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelDurationsBucket))
+		err := bucket.Put(tunnelDuration, tunnelDurationStateUnreported)
+		return err
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// CountUnreportedTunnelDurations returns the number of tunnel
+// duration records in StateUnreported.
+func CountUnreportedTunnelDurations() int {
+	checkInitDataStore()
+
+	unreported := 0
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelDurationsBucket))
+		cursor := bucket.Cursor()
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			if 0 == bytes.Compare(value, tunnelDurationStateUnreported) {
+				unreported++
+				break
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		NoticeAlert("CountUnreportedTunnelDurations failed: %s", err)
+		return 0
+	}
+
+	return unreported
+}
+
+// TakeOutUnreportedTunnelDurations returns up to maxCount tunnel
+// durations that are in StateUnreported. The records are set to
+// StateReporting. If the records are successfully reported,
+// clear them with ClearReportedTunnelDurations. If the records are
+// not successfully reported, restore them with
+// PutBackUnreportedTunnelDurations.
+func TakeOutUnreportedTunnelDurations(maxCount int) ([][]byte, error) {
+	checkInitDataStore()
+
+	tunnelDurations := make([][]byte, 0)
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelDurationsBucket))
+		cursor := bucket.Cursor()
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			if 0 == bytes.Compare(value, tunnelDurationStateUnreported) {
+				err := bucket.Put(key, tunnelDurationStateReporting)
+				if err != nil {
+					return err
+				}
+				tunnelDurations = append(tunnelDurations, key)
+				if len(tunnelDurations) >= maxCount {
+					break
+				}
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return tunnelDurations, nil
+}
+
+// PutBackUnreportedTunnelDurations restores a list of tunnel
+// durations to StateUnreported.
+func PutBackUnreportedTunnelDurations(tunnelDurations [][]byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelDurationsBucket))
+		for _, tunnelDuration := range tunnelDurations {
+			err := bucket.Put(tunnelDuration, tunnelDurationStateUnreported)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// ClearReportedTunnelDurations deletes a list of tunnel
+// durations that were succesdfully reported.
+func ClearReportedTunnelDurations(tunnelDurations [][]byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelDurationsBucket))
+		for _, tunnelDuration := range tunnelDurations {
+			err := bucket.Delete(tunnelDuration)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// resetAllTunnelDurationsToUnreported sets all tunnel
+// duration records to StateUnreported. This reset is called
+// when the datastore is initialized at start up, as we do
+// not know if tunnel records in StateReporting were reported
+// or not.
+func resetAllTunnelDurationsToUnreported() error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelDurationsBucket))
+		cursor := bucket.Cursor()
+		for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
+			err := bucket.Put(key, tunnelDurationStateUnreported)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}

+ 365 - 192
psiphon/serverApi.go

@@ -31,29 +31,39 @@ import (
 	"net"
 	"net/http"
 	"strconv"
+	"sync/atomic"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
 )
 
-// Session is a utility struct which holds all of the data associated
-// with a Psiphon session. In addition to the established tunnel, this
-// includes the session ID (used for Psiphon API requests) and a http
+// ServerContext is a utility struct which holds all of the data associated
+// with a Psiphon server connection. In addition to the established tunnel, this
+// includes data associated with Psiphon API requests and a persistent http
 // client configured to make tunneled Psiphon API requests.
-type Session struct {
-	sessionId            string
-	baseRequestUrl       string
-	psiphonHttpsClient   *http.Client
-	statsRegexps         *transferstats.Regexps
-	clientRegion         string
-	clientUpgradeVersion string
+type ServerContext struct {
+	sessionId                string
+	tunnelNumber             int64
+	baseRequestUrl           string
+	psiphonHttpsClient       *http.Client
+	statsRegexps             *transferstats.Regexps
+	clientRegion             string
+	clientUpgradeVersion     string
+	serverHandshakeTimestamp string
 }
 
-// MakeSessionId creates a new session ID. Making the session ID is not done
-// in NewSession because:
-// (1) the transport needs to send the ID in the SSH credentials before the tunnel
-//     is established and NewSession performs a handshake on an established tunnel.
-// (2) the same session ID is used across multi-tunnel controller runs, where each
-//     tunnel has its own Session instance.
+// nextTunnelNumber is a monotonically increasing number assigned to each
+// successive tunnel connection. The sessionId and tunnelNumber together
+// form a globally unique identifier for tunnels, which is used for
+// stats. Note that the number is increasing but not necessarily
+// consecutive for each active tunnel in session.
+var nextTunnelNumber int64
+
+// MakeSessionId creates a new session ID. The same session ID is used across
+// multi-tunnel controller runs, where each tunnel has its own ServerContext
+// instance.
+// In server-side stats, we now consider a "session" to be the lifetime of the
+// Controller (e.g., the user's commanded start and stop) and we measure this
+// duration as well as the duration of each tunnel within the session.
 func MakeSessionId() (sessionId string, err error) {
 	randomId, err := MakeSecureRandomBytes(PSIPHON_API_CLIENT_SESSION_ID_LENGTH)
 	if err != nil {
@@ -62,28 +72,131 @@ func MakeSessionId() (sessionId string, err error) {
 	return hex.EncodeToString(randomId), nil
 }
 
-// NewSession makes the tunnelled handshake request to the
-// Psiphon server and returns a Session struct, initialized with the
-// session ID, for use with subsequent Psiphon server API requests (e.g.,
-// periodic connected and status requests).
-func NewSession(tunnel *Tunnel, sessionId string) (session *Session, err error) {
+// NewServerContext makes the tunnelled handshake request to the Psiphon server
+// and returns a ServerContext struct for use with subsequent Psiphon server API
+// requests (e.g., periodic connected and status requests).
+func NewServerContext(tunnel *Tunnel, sessionId string) (*ServerContext, error) {
 
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel)
 	if err != nil {
 		return nil, ContextError(err)
 	}
-	session = &Session{
+
+	serverContext := &ServerContext{
 		sessionId:          sessionId,
+		tunnelNumber:       atomic.AddInt64(&nextTunnelNumber, 1),
 		baseRequestUrl:     makeBaseRequestUrl(tunnel, "", sessionId),
 		psiphonHttpsClient: psiphonHttpsClient,
 	}
 
-	err = session.doHandshakeRequest()
+	err = serverContext.doHandshakeRequest()
 	if err != nil {
 		return nil, ContextError(err)
 	}
 
-	return session, nil
+	return serverContext, nil
+}
+
+// doHandshakeRequest performs the handshake API request. The handshake
+// returns upgrade info, newly discovered server entries -- which are
+// stored -- and sponsor info (home pages, stat regexes).
+func (serverContext *ServerContext) doHandshakeRequest() error {
+	extraParams := make([]*ExtraParam, 0)
+	serverEntryIpAddresses, err := GetServerEntryIpAddresses()
+	if err != nil {
+		return ContextError(err)
+	}
+	// Submit a list of known servers -- this will be used for
+	// discovery statistics.
+	for _, ipAddress := range serverEntryIpAddresses {
+		extraParams = append(extraParams, &ExtraParam{"known_server", ipAddress})
+	}
+	url := buildRequestUrl(serverContext.baseRequestUrl, "handshake", extraParams...)
+	responseBody, err := serverContext.doGetRequest(url)
+	if err != nil {
+		return ContextError(err)
+	}
+	// Skip legacy format lines and just parse the JSON config line
+	configLinePrefix := []byte("Config: ")
+	var configLine []byte
+	for _, line := range bytes.Split(responseBody, []byte("\n")) {
+		if bytes.HasPrefix(line, configLinePrefix) {
+			configLine = line[len(configLinePrefix):]
+			break
+		}
+	}
+	if len(configLine) == 0 {
+		return ContextError(errors.New("no config line found"))
+	}
+
+	// Note:
+	// - 'preemptive_reconnect_lifetime_milliseconds' is currently unused
+	// - 'ssh_session_id' is ignored; client session ID is used instead
+	var handshakeConfig struct {
+		Homepages            []string            `json:"homepages"`
+		UpgradeClientVersion string              `json:"upgrade_client_version"`
+		PageViewRegexes      []map[string]string `json:"page_view_regexes"`
+		HttpsRequestRegexes  []map[string]string `json:"https_request_regexes"`
+		EncodedServerList    []string            `json:"encoded_server_list"`
+		ClientRegion         string              `json:"client_region"`
+		ServerTimestamp      string              `json:"server_timestamp"`
+	}
+	err = json.Unmarshal(configLine, &handshakeConfig)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	serverContext.clientRegion = handshakeConfig.ClientRegion
+	NoticeClientRegion(serverContext.clientRegion)
+
+	var decodedServerEntries []*ServerEntry
+
+	// Store discovered server entries
+	for _, encodedServerEntry := range handshakeConfig.EncodedServerList {
+		serverEntry, err := DecodeServerEntry(encodedServerEntry)
+		if err != nil {
+			return ContextError(err)
+		}
+		err = ValidateServerEntry(serverEntry)
+		if err != nil {
+			// Skip this entry and continue with the next one
+			continue
+		}
+
+		decodedServerEntries = append(decodedServerEntries, serverEntry)
+	}
+
+	// The reason we are storing the entire array of server entries at once rather
+	// than one at a time is that some desirable side-effects get triggered by
+	// StoreServerEntries that don't get triggered by StoreServerEntry.
+	err = StoreServerEntries(decodedServerEntries, true)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	// TODO: formally communicate the sponsor and upgrade info to an
+	// outer client via some control interface.
+	for _, homepage := range handshakeConfig.Homepages {
+		NoticeHomepage(homepage)
+	}
+
+	serverContext.clientUpgradeVersion = handshakeConfig.UpgradeClientVersion
+	if handshakeConfig.UpgradeClientVersion != "" {
+		NoticeClientUpgradeAvailable(handshakeConfig.UpgradeClientVersion)
+	}
+
+	var regexpsNotices []string
+	serverContext.statsRegexps, regexpsNotices = transferstats.MakeRegexps(
+		handshakeConfig.PageViewRegexes,
+		handshakeConfig.HttpsRequestRegexes)
+
+	for _, notice := range regexpsNotices {
+		NoticeAlert(notice)
+	}
+
+	serverContext.serverHandshakeTimestamp = handshakeConfig.ServerTimestamp
+
+	return nil
 }
 
 // DoConnectedRequest performs the connected API request. This request is
@@ -92,7 +205,7 @@ func NewSession(tunnel *Tunnel, sessionId string) (session *Session, err error)
 // a timestamp (using the server clock, and should be rounded to the
 // nearest hour) which is used to determine when a connection represents
 // a unique user for a time period.
-func (session *Session) DoConnectedRequest() error {
+func (serverContext *ServerContext) DoConnectedRequest() error {
 	const DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
 	lastConnected, err := GetKeyValue(DATA_STORE_LAST_CONNECTED_KEY)
 	if err != nil {
@@ -102,11 +215,11 @@ func (session *Session) DoConnectedRequest() error {
 		lastConnected = "None"
 	}
 	url := buildRequestUrl(
-		session.baseRequestUrl,
+		serverContext.baseRequestUrl,
 		"connected",
-		&ExtraParam{"session_id", session.sessionId},
+		&ExtraParam{"session_id", serverContext.sessionId},
 		&ExtraParam{"last_connected", lastConnected})
-	responseBody, err := session.doGetRequest(url)
+	responseBody, err := serverContext.doGetRequest(url)
 	if err != nil {
 		return ContextError(err)
 	}
@@ -127,40 +240,51 @@ func (session *Session) DoConnectedRequest() error {
 }
 
 // StatsRegexps gets the Regexps used for the statistics for this tunnel.
-func (session *Session) StatsRegexps() *transferstats.Regexps {
-	return session.statsRegexps
+func (serverContext *ServerContext) StatsRegexps() *transferstats.Regexps {
+	return serverContext.statsRegexps
 }
 
 // DoStatusRequest makes a /status request to the server, sending session stats.
-func (session *Session) DoStatusRequest(
-	statsPayload json.Marshaler,
-	isConnected bool) error {
+func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error {
+
+	url := makeStatusRequestUrl(serverContext.sessionId, serverContext.baseRequestUrl, true)
 
-	statsPayloadJSON, err := json.Marshal(statsPayload)
+	payload, payloadInfo, err := makeStatusRequestPayload(tunnel.serverEntry.IpAddress)
 	if err != nil {
 		return ContextError(err)
 	}
 
-	url := makeStatusRequestUrl(session.sessionId, session.baseRequestUrl, isConnected)
-
-	err = session.doPostRequest(url, "application/json", bytes.NewReader(statsPayloadJSON))
+	err = serverContext.doPostRequest(url, "application/json", bytes.NewReader(payload))
 	if err != nil {
+
+		// Resend the transfer stats and tunnel durations later
+		// Note: potential duplicate reports if the server received and processed
+		// the request but the client failed to receive the response.
+		putBackStatusRequestPayload(payloadInfo)
+
 		return ContextError(err)
 	}
+	confirmStatusRequestPayload(payloadInfo)
 
 	return nil
 }
 
-// makeStatusRequestUrl is a helper shared by DoStatusRequest
-// and doUntunneledStatusRequest.
-func makeStatusRequestUrl(sessionId, baseRequestUrl string, isConnected bool) string {
+func makeStatusRequestUrl(sessionId, baseRequestUrl string, isTunneled bool) string {
 
 	// Add a random amount of padding to help prevent stats updates from being
 	// a predictable size (which often happens when the connection is quiet).
 	padding := MakeSecureRandomPadding(0, PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES)
 
+	// Legacy clients set "connected" to "0" when disconnecting, and this value
+	// is used to calculate session duration estimates. This is now superseded
+	// by explicit tunnel duration reporting.
+	// The legacy method of reconstructing session durations is not compatible
+	// with this client's connected request retries and asynchronous final
+	// status request attempts. So we simply set this "connected" flag to reflect
+	// whether the request is sent tunneled or not.
+
 	connected := "1"
-	if !isConnected {
+	if !isTunneled {
 		connected = "0"
 	}
 
@@ -174,108 +298,231 @@ func makeStatusRequestUrl(sessionId, baseRequestUrl string, isConnected bool) st
 		&ExtraParam{"padding", base64.StdEncoding.EncodeToString(padding)})
 }
 
-// doHandshakeRequest performs the handshake API request. The handshake
-// returns upgrade info, newly discovered server entries -- which are
-// stored -- and sponsor info (home pages, stat regexes).
-func (session *Session) doHandshakeRequest() error {
-	extraParams := make([]*ExtraParam, 0)
-	serverEntryIpAddresses, err := GetServerEntryIpAddresses()
+// statusRequestPayloadInfo is a temporary structure for data used to
+// either "clear" or "put back" status request payload data depending
+// on whether or not the request succeeded.
+type statusRequestPayloadInfo struct {
+	serverId        string
+	transferStats   *transferstats.ServerStats
+	tunnelDurations [][]byte
+}
+
+func makeStatusRequestPayload(
+	serverId string) ([]byte, *statusRequestPayloadInfo, error) {
+
+	transferStats := transferstats.GetForServer(serverId)
+	tunnelDurations, err := TakeOutUnreportedTunnelDurations(
+		PSIPHON_API_TUNNEL_DURATIONS_MAX_COUNT)
 	if err != nil {
-		return ContextError(err)
+		NoticeAlert(
+			"TakeOutUnreportedTunnelDurations failed: %s", ContextError(err))
+		tunnelDurations = nil
+		// Proceed with transferStats only
 	}
-	// Submit a list of known servers -- this will be used for
-	// discovery statistics.
-	for _, ipAddress := range serverEntryIpAddresses {
-		extraParams = append(extraParams, &ExtraParam{"known_server", ipAddress})
-	}
-	url := buildRequestUrl(session.baseRequestUrl, "handshake", extraParams...)
-	responseBody, err := session.doGetRequest(url)
+	payloadInfo := &statusRequestPayloadInfo{
+		serverId, transferStats, tunnelDurations}
+
+	payload := make(map[string]interface{})
+
+	hostBytes, bytesTransferred := transferStats.GetStatsForReporting()
+	payload["host_bytes"] = hostBytes
+	payload["bytes_transferred"] = bytesTransferred
+
+	// We're not recording these fields, but the server requires them.
+	payload["page_views"] = make([]string, 0)
+	payload["https_requests"] = make([]string, 0)
+
+	payload["tunnel_durations"] = tunnelDurations
+
+	jsonPayload, err := json.Marshal(payload)
 	if err != nil {
-		return ContextError(err)
-	}
-	// Skip legacy format lines and just parse the JSON config line
-	configLinePrefix := []byte("Config: ")
-	var configLine []byte
-	for _, line := range bytes.Split(responseBody, []byte("\n")) {
-		if bytes.HasPrefix(line, configLinePrefix) {
-			configLine = line[len(configLinePrefix):]
-			break
-		}
-	}
-	if len(configLine) == 0 {
-		return ContextError(errors.New("no config line found"))
-	}
 
-	// Note:
-	// - 'preemptive_reconnect_lifetime_milliseconds' is currently unused
-	// - 'ssh_session_id' is ignored; client session ID is used instead
-	var handshakeConfig struct {
-		Homepages            []string            `json:"homepages"`
-		UpgradeClientVersion string              `json:"upgrade_client_version"`
-		PageViewRegexes      []map[string]string `json:"page_view_regexes"`
-		HttpsRequestRegexes  []map[string]string `json:"https_request_regexes"`
-		EncodedServerList    []string            `json:"encoded_server_list"`
-		ClientRegion         string              `json:"client_region"`
+		// Send the transfer stats and tunnel durations later
+		putBackStatusRequestPayload(payloadInfo)
+
+		return nil, nil, ContextError(err)
 	}
-	err = json.Unmarshal(configLine, &handshakeConfig)
+
+	return jsonPayload, payloadInfo, nil
+}
+
+func putBackStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
+	transferstats.PutBack(payloadInfo.serverId, payloadInfo.transferStats)
+	err := PutBackUnreportedTunnelDurations(payloadInfo.tunnelDurations)
 	if err != nil {
-		return ContextError(err)
+		// These tunnel duration records won't be resent under after a
+		// datastore re-initialization.
+		NoticeAlert(
+			"PutBackUnreportedTunnelDurations failed: %s", ContextError(err))
 	}
+}
 
-	session.clientRegion = handshakeConfig.ClientRegion
-	NoticeClientRegion(session.clientRegion)
+func confirmStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
+	err := ClearReportedTunnelDurations(payloadInfo.tunnelDurations)
+	if err != nil {
+		// These tunnel duration records may be resent.
+		NoticeAlert(
+			"ClearReportedTunnelDurations failed: %s", ContextError(err))
+	}
+}
 
-	var decodedServerEntries []*ServerEntry
+// TryUntunneledStatusRequest makes direct connections to the specified
+// server (if supported) in an attempt to send useful bytes transferred
+// and tunnel duration stats after a tunnel has alreay failed.
+// The tunnel is assumed to be closed, but its config, protocol, and
+// context values must still be valid.
+// TryUntunneledStatusRequest emits notices detailing failed attempts.
+func TryUntunneledStatusRequest(tunnel *Tunnel, isShutdown bool) error {
 
-	// Store discovered server entries
-	for _, encodedServerEntry := range handshakeConfig.EncodedServerList {
-		serverEntry, err := DecodeServerEntry(encodedServerEntry)
-		if err != nil {
-			return ContextError(err)
-		}
-		err = ValidateServerEntry(serverEntry)
-		if err != nil {
-			// Skip this entry and continue with the next one
-			continue
+	for _, port := range tunnel.serverEntry.GetDirectWebRequestPorts() {
+		err := doUntunneledStatusRequest(tunnel, port, isShutdown)
+		if err == nil {
+			return nil
 		}
+		NoticeAlert("doUntunneledStatusRequest failed for %s:%s: %s",
+			tunnel.serverEntry.IpAddress, port, err)
+	}
 
-		decodedServerEntries = append(decodedServerEntries, serverEntry)
+	return errors.New("all attempts failed")
+}
+
+// doUntunneledStatusRequest attempts an untunneled stratus request.
+func doUntunneledStatusRequest(
+	tunnel *Tunnel, port string, isShutdown bool) error {
+
+	url := makeStatusRequestUrl(
+		tunnel.serverContext.sessionId,
+		makeBaseRequestUrl(tunnel, port, tunnel.serverContext.sessionId),
+		false)
+
+	certificate, err := DecodeCertificate(tunnel.serverEntry.WebServerCertificate)
+	if err != nil {
+		return ContextError(err)
 	}
 
-	// The reason we are storing the entire array of server entries at once rather
-	// than one at a time is that some desirable side-effects get triggered by
-	// StoreServerEntries that don't get triggered by StoreServerEntry.
-	err = StoreServerEntries(decodedServerEntries, true)
+	timeout := PSIPHON_API_SERVER_TIMEOUT
+	if isShutdown {
+		timeout = PSIPHON_API_SHUTDOWN_SERVER_TIMEOUT
+	}
+
+	httpClient, requestUrl, err := MakeUntunneledHttpsClient(
+		tunnel.untunneledDialConfig,
+		certificate,
+		url,
+		timeout)
 	if err != nil {
 		return ContextError(err)
 	}
 
-	// TODO: formally communicate the sponsor and upgrade info to an
-	// outer client via some control interface.
-	for _, homepage := range handshakeConfig.Homepages {
-		NoticeHomepage(homepage)
+	payload, payloadInfo, err := makeStatusRequestPayload(tunnel.serverEntry.IpAddress)
+	if err != nil {
+		return ContextError(err)
 	}
 
-	session.clientUpgradeVersion = handshakeConfig.UpgradeClientVersion
-	if handshakeConfig.UpgradeClientVersion != "" {
-		NoticeClientUpgradeAvailable(handshakeConfig.UpgradeClientVersion)
+	bodyType := "application/json"
+	body := bytes.NewReader(payload)
+
+	response, err := httpClient.Post(requestUrl, bodyType, body)
+	if err == nil && response.StatusCode != http.StatusOK {
+		response.Body.Close()
+		err = fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode)
 	}
+	if err != nil {
 
-	var regexpsNotices []string
-	session.statsRegexps, regexpsNotices = transferstats.MakeRegexps(
-		handshakeConfig.PageViewRegexes,
-		handshakeConfig.HttpsRequestRegexes)
+		// Resend the transfer stats and tunnel durations later
+		// Note: potential duplicate reports if the server received and processed
+		// the request but the client failed to receive the response.
+		putBackStatusRequestPayload(payloadInfo)
 
-	for _, notice := range regexpsNotices {
-		NoticeAlert(notice)
+		// Trim this error since it may include long URLs
+		return ContextError(TrimError(err))
 	}
+	confirmStatusRequestPayload(payloadInfo)
+	response.Body.Close()
 
 	return nil
 }
 
+// RecordTunnelDuration records a tunnel duration for
+// subsequent reporting.
+//
+// Tunnel durations are precisely measured client-side
+// and reported in status requests. As the duration is
+// not determined until the tunnel is closed, tunnel
+// duration records are stored in the persistent datastore
+// and reported via subsequent status requests sent to any
+// Psiphon server.
+//
+// Since the status request that reports a tunnel duration
+// is not necessarily handled by the same server, the
+// tunnel duration records include the original server ID.
+//
+// Other fields that may change between duration recording and
+// duration reporting include client geo data, propagation channel,
+// sponsor ID, client version. These are not stored in the
+// datastore (client region, in particular, since that would
+// create an on-disk record of user location).
+// TODO: the server could encrypt, with a nonce and key unknown to
+// the client, a blob containing this data; return it in the
+// handshake response; and the client could store and later report
+// this blob with its tunnel duration records.
+//
+// Multiple "status" requests may be in flight at once (due
+// to multi-tunnel, asynchronous final status retry, and
+// aggressive status requests for pre-registered tunnels),
+// To avoid duplicate reporting, tunnel duration records are
+// "taken-out" by a status request and then "put back" in
+// case the request fails.
+//
+// Note: since tunnel duration records have a globally unique
+// identifier (sessionId + tunnelNumber), we could permit
+// duplicate reporting and filter our duplicates on the
+// server-side. Permitting duplicate reporting could increase
+// the velocity of reporting (for example, both the asynchronous
+// untunneled final status requests and the post-connected
+// immediate startus requests could try to report the same tunnel
+// durations).
+// Duplicate reporting may also occur when a server receives and
+// processes a status request but the client fails to receive
+// the response.
+func RecordTunnelDuration(
+	sessionId string,
+	tunnelNumber int64,
+	serverId string,
+	serverHandshakeTimestamp, duration string,
+	totalBytesSent, totalBytesReceived int64) error {
+
+	tunnelDuration := struct {
+		sessionId                string `json:"sessionId"`
+		tunnelNumber             int64  `json:"tunnelNumber"`
+		serverId                 string `json:"serverId"`
+		serverHandshakeTimestamp string `json:"serverHandshakeTimestamp"`
+		duration                 string `json:"duration"`
+		totalBytesSent           int64  `json:"totalBytesSent"`
+		totalBytesReceived       int64  `json:"totalBytesReceived"`
+	}{
+		sessionId,
+		tunnelNumber,
+		serverId,
+		serverHandshakeTimestamp,
+		duration,
+		totalBytesSent,
+		totalBytesReceived,
+	}
+
+	tunnelDurationJson, err := json.Marshal(tunnelDuration)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return StoreTunnelDuration(tunnelDurationJson)
+}
+
 // doGetRequest makes a tunneled HTTPS request and returns the response body.
-func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, err error) {
-	response, err := session.psiphonHttpsClient.Get(requestUrl)
+func (serverContext *ServerContext) doGetRequest(
+	requestUrl string) (responseBody []byte, err error) {
+
+	response, err := serverContext.psiphonHttpsClient.Get(requestUrl)
 	if err == nil && response.StatusCode != http.StatusOK {
 		response.Body.Close()
 		err = fmt.Errorf("HTTP GET request failed with response code: %d", response.StatusCode)
@@ -293,8 +540,10 @@ func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, er
 }
 
 // doPostRequest makes a tunneled HTTPS POST request.
-func (session *Session) doPostRequest(requestUrl string, bodyType string, body io.Reader) (err error) {
-	response, err := session.psiphonHttpsClient.Post(requestUrl, bodyType, body)
+func (serverContext *ServerContext) doPostRequest(
+	requestUrl string, bodyType string, body io.Reader) (err error) {
+
+	response, err := serverContext.psiphonHttpsClient.Post(requestUrl, bodyType, body)
 	if err == nil && response.StatusCode != http.StatusOK {
 		response.Body.Close()
 		err = fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode)
@@ -391,79 +640,3 @@ func makePsiphonHttpsClient(tunnel *Tunnel) (httpsClient *http.Client, err error
 		Timeout:   PSIPHON_API_SERVER_TIMEOUT,
 	}, nil
 }
-
-// TryUntunneledStatusRequest makes direct connections to the specified
-// server (if supported) in an attempt to send useful bytes transferred
-// and session duration stats after a tunnel has alreay failed.
-// The tunnel is assumed to be closed, but its config, protocol, and
-// session values must still be valid.
-// TryUntunneledStatusRequest emits notices detailing failed attempts.
-func TryUntunneledStatusRequest(
-	tunnel *Tunnel,
-	statsPayload json.Marshaler,
-	isShutdown bool) error {
-
-	for _, port := range tunnel.serverEntry.GetDirectWebRequestPorts() {
-		err := doUntunneledStatusRequest(tunnel, port, statsPayload, isShutdown)
-		if err == nil {
-			return nil
-		}
-		NoticeAlert("doUntunneledStatusRequest failed for %s:%s: %s",
-			tunnel.serverEntry.IpAddress, port, err)
-	}
-
-	return errors.New("all attempts failed")
-}
-
-// doUntunneledStatusRequest attempts an untunneled stratus request.
-func doUntunneledStatusRequest(
-	tunnel *Tunnel,
-	port string,
-	statsPayload json.Marshaler,
-	isShutdown bool) error {
-
-	url := makeStatusRequestUrl(
-		tunnel.session.sessionId,
-		makeBaseRequestUrl(tunnel, port, tunnel.session.sessionId),
-		false)
-
-	certificate, err := DecodeCertificate(tunnel.serverEntry.WebServerCertificate)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	timeout := PSIPHON_API_SERVER_TIMEOUT
-	if isShutdown {
-		timeout = PSIPHON_API_SHUTDOWN_SERVER_TIMEOUT
-	}
-
-	httpClient, requestUrl, err := MakeUntunneledHttpsClient(
-		tunnel.untunneledDialConfig,
-		certificate,
-		url,
-		timeout)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	statsPayloadJSON, err := json.Marshal(statsPayload)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	bodyType := "application/json"
-	body := bytes.NewReader(statsPayloadJSON)
-
-	response, err := httpClient.Post(requestUrl, bodyType, body)
-	if err == nil && response.StatusCode != http.StatusOK {
-		response.Body.Close()
-		err = fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode)
-	}
-	if err != nil {
-		// Trim this error since it may include long URLs
-		return ContextError(TrimError(err))
-	}
-	response.Body.Close()
-
-	return nil
-}

+ 8 - 8
psiphon/splitTunnel.go

@@ -114,12 +114,12 @@ func (classifier *SplitTunnelClassifier) Start(fetchRoutesTunnel *Tunnel) {
 		return
 	}
 
-	if fetchRoutesTunnel.session == nil {
-		// Tunnel has no session
+	if fetchRoutesTunnel.serverContext == nil {
+		// Tunnel has no serverContext
 		return
 	}
 
-	if fetchRoutesTunnel.session.clientRegion == "" {
+	if fetchRoutesTunnel.serverContext.clientRegion == "" {
 		// Split tunnel region is unknown
 		return
 	}
@@ -207,7 +207,7 @@ func (classifier *SplitTunnelClassifier) setRoutes(tunnel *Tunnel) {
 		return
 	}
 
-	NoticeSplitTunnelRegion(tunnel.session.clientRegion)
+	NoticeSplitTunnelRegion(tunnel.serverContext.clientRegion)
 }
 
 // getRoutes makes a web request to download fresh routes data for the
@@ -216,13 +216,13 @@ func (classifier *SplitTunnelClassifier) setRoutes(tunnel *Tunnel) {
 // fails and cached routes data is present, that cached data is returned.
 func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData []byte, err error) {
 
-	url := fmt.Sprintf(classifier.fetchRoutesUrlFormat, tunnel.session.clientRegion)
+	url := fmt.Sprintf(classifier.fetchRoutesUrlFormat, tunnel.serverContext.clientRegion)
 	request, err := http.NewRequest("GET", url, nil)
 	if err != nil {
 		return nil, ContextError(err)
 	}
 
-	etag, err := GetSplitTunnelRoutesETag(tunnel.session.clientRegion)
+	etag, err := GetSplitTunnelRoutesETag(tunnel.serverContext.clientRegion)
 	if err != nil {
 		return nil, ContextError(err)
 	}
@@ -310,7 +310,7 @@ func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData [
 	if !useCachedRoutes {
 		etag := response.Header.Get("ETag")
 		if etag != "" {
-			err := SetSplitTunnelRoutes(tunnel.session.clientRegion, etag, routesData)
+			err := SetSplitTunnelRoutes(tunnel.serverContext.clientRegion, etag, routesData)
 			if err != nil {
 				NoticeAlert("failed to cache split tunnel routes: %s", ContextError(err))
 				// Proceed with fetched data, even when we can't cache it
@@ -319,7 +319,7 @@ func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData [
 	}
 
 	if useCachedRoutes {
-		routesData, err = GetSplitTunnelRoutesData(tunnel.session.clientRegion)
+		routesData, err = GetSplitTunnelRoutesData(tunnel.serverContext.clientRegion)
 		if err != nil {
 			return nil, ContextError(err)
 		}

+ 16 - 26
psiphon/transferstats/collector.go

@@ -20,7 +20,6 @@
 package transferstats
 
 import (
-	"encoding/json"
 	"sync"
 )
 
@@ -44,15 +43,15 @@ func newHostStats() *hostStats {
 	return &hostStats{}
 }
 
-// serverStats holds per-server stats.
-type serverStats struct {
+// ServerStats holds per-server stats.
+type ServerStats struct {
 	hostnameToStats    map[string]*hostStats
 	totalBytesSent     int64
 	totalBytesReceived int64
 }
 
-func newServerStats() *serverStats {
-	return &serverStats{
+func newServerStats() *ServerStats {
+	return &ServerStats{
 		hostnameToStats: make(map[string]*hostStats),
 	}
 }
@@ -61,8 +60,8 @@ func newServerStats() *serverStats {
 // as well as the mutex to access them.
 var allStats = struct {
 	statsMutex      sync.RWMutex
-	serverIDtoStats map[string]*serverStats
-}{serverIDtoStats: make(map[string]*serverStats)}
+	serverIDtoStats map[string]*ServerStats
+}{serverIDtoStats: make(map[string]*ServerStats)}
 
 // statsUpdate contains new stats counts to be aggregated.
 type statsUpdate struct {
@@ -105,27 +104,18 @@ func recordStat(stat *statsUpdate) {
 	//fmt.Println("server:", stat.serverID, "host:", stat.hostname, "sent:", storedHostStats.numBytesSent, "received:", storedHostStats.numBytesReceived)
 }
 
-// Implement the json.Marshaler interface
-func (ss serverStats) MarshalJSON() ([]byte, error) {
-	out := make(map[string]interface{})
+func (serverStats ServerStats) GetStatsForReporting() (map[string]int64, int64) {
 
 	hostBytes := make(map[string]int64)
 	bytesTransferred := int64(0)
 
-	for hostname, hostStats := range ss.hostnameToStats {
+	for hostname, hostStats := range serverStats.hostnameToStats {
 		totalBytes := hostStats.numBytesReceived + hostStats.numBytesSent
 		bytesTransferred += totalBytes
 		hostBytes[hostname] = totalBytes
 	}
 
-	out["bytes_transferred"] = bytesTransferred
-	out["host_bytes"] = hostBytes
-
-	// We're not using these fields, but the server requires them
-	out["page_views"] = make([]string, 0)
-	out["https_requests"] = make([]string, 0)
-
-	return json.Marshal(out)
+	return hostBytes, bytesTransferred
 }
 
 // GetBytesTransferredForServer returns total bytes sent and received since
@@ -149,22 +139,22 @@ func GetBytesTransferredForServer(serverID string) (sent, received int64) {
 	return
 }
 
-// GetForServer returns the json-able stats package for the given server.
-func GetForServer(serverID string) (payload *serverStats) {
+// GetForServer returns the server stats for the given server.
+func GetForServer(serverID string) (serverStats *ServerStats) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
-	payload = allStats.serverIDtoStats[serverID]
-	if payload == nil {
-		payload = newServerStats()
+	serverStats = allStats.serverIDtoStats[serverID]
+	if serverStats == nil {
+		serverStats = newServerStats()
 	}
 	delete(allStats.serverIDtoStats, serverID)
 	return
 }
 
 // PutBack re-adds a set of server stats to the collection.
-func PutBack(serverID string, ss *serverStats) {
-	for hostname, hoststats := range ss.hostnameToStats {
+func PutBack(serverID string, serverStats *ServerStats) {
+	for hostname, hoststats := range serverStats.hostnameToStats {
 		recordStat(
 			&statsUpdate{
 				serverID:         serverID,

+ 60 - 50
psiphon/tunnel.go

@@ -68,7 +68,7 @@ type Tunnel struct {
 	isDiscarded              bool
 	isClosed                 bool
 	serverEntry              *ServerEntry
-	session                  *Session
+	serverContext            *ServerContext
 	protocol                 string
 	conn                     net.Conn
 	sshClient                *ssh.Client
@@ -76,7 +76,7 @@ type Tunnel struct {
 	shutdownOperateBroadcast chan struct{}
 	signalPortForwardFailure chan struct{}
 	totalPortForwardFailures int
-	sessionStartTime         time.Time
+	startTime                time.Time
 }
 
 // EstablishTunnel first makes a network transport connection to the
@@ -134,21 +134,20 @@ func EstablishTunnel(
 		signalPortForwardFailure: make(chan struct{}, 1),
 	}
 
-	// Create a new Psiphon API session for this tunnel. This includes performing
-	// a handshake request. If the handshake fails, this establishment fails.
-	//
-	// TODO: as long as the servers are not enforcing that a client perform a handshake,
-	// proceed with this tunnel as long as at least one previous handhake succeeded?
-	//
+	// Create a new Psiphon API server context for this tunnel. This includes
+	// performing a handshake request. If the handshake fails, this establishment
+	// fails.
 	if !config.DisableApi {
-		NoticeInfo("starting session for %s", tunnel.serverEntry.IpAddress)
-		tunnel.session, err = NewSession(tunnel, sessionId)
+		NoticeInfo("starting server context for %s", tunnel.serverEntry.IpAddress)
+		tunnel.serverContext, err = NewServerContext(tunnel, sessionId)
 		if err != nil {
-			return nil, ContextError(fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err))
+			return nil, ContextError(
+				fmt.Errorf("error starting server context for %s: %s",
+					tunnel.serverEntry.IpAddress, err))
 		}
 	}
 
-	tunnel.sessionStartTime = time.Now()
+	tunnel.startTime = time.Now()
 
 	// Now that network operations are complete, cancel interruptibility
 	pendingConns.Remove(conn)
@@ -239,12 +238,12 @@ func (tunnel *Tunnel) Dial(
 		tunnel:         tunnel,
 		downstreamConn: downstreamConn}
 
-	// Tunnel does not have a session when DisableApi is set. We still use
+	// Tunnel does not have a serverContext when DisableApi is set. We still use
 	// transferstats.Conn to count bytes transferred for monitoring tunnel
 	// quality.
 	var regexps *transferstats.Regexps
-	if tunnel.session != nil {
-		regexps = tunnel.session.StatsRegexps()
+	if tunnel.serverContext != nil {
+		regexps = tunnel.serverContext.StatsRegexps()
 	}
 	conn = transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
 
@@ -577,6 +576,17 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	statsTimer := time.NewTimer(nextStatusRequestPeriod())
 	defer statsTimer.Stop()
 
+	// Schedule an immediate status request to deliver any unreported
+	// tunnelDurations.
+	// Note: this may not be effective when there's an outstanding
+	// asynchronous untunneled final status request is holding the
+	// tunnel duration records.
+	unreported := CountUnreportedTunnelDurations()
+	if unreported > 0 {
+		NoticeInfo("Unreported tunnel durations: %d", unreported)
+		statsTimer.Reset(0)
+	}
+
 	nextSshKeepAlivePeriod := func() time.Duration {
 		return MakeRandomPeriod(
 			TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
@@ -600,7 +610,7 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	go func() {
 		defer requestsWaitGroup.Done()
 		for _ = range signalStatusRequest {
-			sendStats(tunnel, true)
+			sendStats(tunnel)
 		}
 	}()
 
@@ -684,19 +694,36 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 		}
 	}
 
-	// Note: ensure sendStats goroutine is stopped before
-	// sending sendStats(tunnel, false) -- we don't want
-	// to send isConnected=true after isConnected=false.
 	close(signalSshKeepAlive)
 	close(signalStatusRequest)
 	requestsWaitGroup.Wait()
 
+	// This tunnel duration will be reported via the next successful status
+	// request.
+	// Note: Since client clocks are unreliable, we use the server's reported
+	// timestamp in the handshake response as the tunnel start time. This time
+	// will be slightly earlier than the actual tunnel activation time, as the
+	// client has to receive and parse the response and activate the tunnel.
+	if !tunnel.IsDiscarded() {
+		err := RecordTunnelDuration(
+			tunnel.serverContext.sessionId,
+			tunnel.serverContext.tunnelNumber,
+			tunnel.serverEntry.IpAddress,
+			tunnel.serverContext.serverHandshakeTimestamp,
+			fmt.Sprintf("%d", tunnel.startTime.Sub(time.Now())),
+			totalSent,
+			totalReceived)
+		if err != nil {
+			NoticeAlert("RecordTunnelDuration failed: %s", ContextError(err))
+		}
+	}
+
 	// Final status request notes:
 	//
-	// For session duration calculation, it's highly desirable to record
-	// a final status request with "connected=0". For this reason, we attempt
-	// untunneled requests when the tunneled request isn't possible or has
-	// failed.
+	// It's highly desirable to send a final status request in order to report
+	// domain bytes transferred stats as well as to report tunnel duration as
+	// soon as possible. For this reason, we attempt untunneled requests when
+	// the tunneled request isn't possible or has failed.
 	//
 	// In an orderly shutdown (err == nil), the Controller is stopping and
 	// everything must be wrapped up quickly. Also, we still have a working
@@ -718,7 +745,7 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 
 	if err == nil {
 		NoticeInfo("shutdown operate tunnel")
-		if !sendStats(tunnel, false) {
+		if !sendStats(tunnel) {
 			sendUntunneledStats(tunnel, true)
 		}
 	} else {
@@ -757,10 +784,10 @@ func sendSshKeepAlive(
 }
 
 // sendStats is a helper for sending session stats to the server.
-func sendStats(tunnel *Tunnel, isConnected bool) bool {
+func sendStats(tunnel *Tunnel) bool {
 
-	// Tunnel does not have a session when DisableApi is set
-	if tunnel.session == nil {
+	// Tunnel does not have a serverContext when DisableApi is set
+	if tunnel.serverContext == nil {
 		return true
 	}
 
@@ -769,28 +796,21 @@ func sendStats(tunnel *Tunnel, isConnected bool) bool {
 		return true
 	}
 
-	// TODO: reconcile session duration scheme with multi-tunnel mode
-	if tunnel.config.TunnelPoolSize > 1 && !isConnected {
-		return true
-	}
-
-	payload := transferstats.GetForServer(tunnel.serverEntry.IpAddress)
-	err := tunnel.session.DoStatusRequest(payload, isConnected)
+	err := tunnel.serverContext.DoStatusRequest(tunnel)
 	if err != nil {
 		NoticeAlert("DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
-		transferstats.PutBack(tunnel.serverEntry.IpAddress, payload)
 	}
 
 	return err == nil
 }
 
 // sendUntunnelStats sends final status requests directly to Psiphon
-// servers after the tunnel has already failed. This is an to attempt
-// to retain useful bytes transferred and session duration information.
+// servers after the tunnel has already failed. This is an attempt
+// to retain useful bytes transferred stats.
 func sendUntunneledStats(tunnel *Tunnel, isShutdown bool) {
 
-	// Tunnel does not have a session when DisableApi is set
-	if tunnel.session == nil {
+	// Tunnel does not have a serverContext when DisableApi is set
+	if tunnel.serverContext == nil {
 		return
 	}
 
@@ -799,18 +819,8 @@ func sendUntunneledStats(tunnel *Tunnel, isShutdown bool) {
 		return
 	}
 
-	// TODO: reconcile session duration scheme with multi-tunnel mode
-	if tunnel.config.TunnelPoolSize > 1 {
-		return
-	}
-
-	payload := transferstats.GetForServer(tunnel.serverEntry.IpAddress)
-	err := TryUntunneledStatusRequest(tunnel, payload, isShutdown)
+	err := TryUntunneledStatusRequest(tunnel, isShutdown)
 	if err != nil {
 		NoticeAlert("TryUntunneledStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
-
-		// By putting back now, we may send the bytes transferred data for this
-		// session in the next session for the same server.
-		transferstats.PutBack(tunnel.serverEntry.IpAddress, payload)
 	}
 }