Browse Source

Merge branch 'master' of github.com:Psiphon-Labs/psiphon-tunnel-core into console-client-docker

Michael Goldberger 10 years ago
parent
commit
d0cfb25f9f

+ 5 - 0
.gitignore

@@ -3,6 +3,11 @@ psiphon_config
 psiphon.config
 controller_test.config
 psiphon.db*
+psiphon.boltdb
+
+# Exclude compiled tunnel core binaries
+ConsoleClient/ConsoleClient
+ConsoleClient/bin
 
 # Compiled Object files, Static and Dynamic libs (Shared Objects)
 *.o

+ 11 - 2
psiphon/config.go

@@ -29,11 +29,12 @@ import (
 // TODO: allow all params to be configured
 
 const (
-	DATA_STORE_FILENAME                            = "psiphon.db"
+	LEGACY_DATA_STORE_FILENAME                     = "psiphon.db"
+	DATA_STORE_FILENAME                            = "psiphon.boltdb"
 	CONNECTION_WORKER_POOL_SIZE                    = 10
 	TUNNEL_POOL_SIZE                               = 1
 	TUNNEL_CONNECT_TIMEOUT                         = 20 * time.Second
-	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT                = 500 * time.Millisecond
+	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT                = 1 * time.Second
 	TUNNEL_PORT_FORWARD_DIAL_TIMEOUT               = 10 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES        = 256
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN               = 60 * time.Second
@@ -45,6 +46,7 @@ const (
 	ESTABLISH_TUNNEL_TIMEOUT_SECONDS               = 300
 	ESTABLISH_TUNNEL_WORK_TIME                     = 60 * time.Second
 	ESTABLISH_TUNNEL_PAUSE_PERIOD                  = 5 * time.Second
+	ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD  = 1 * time.Second
 	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT               = 15 * time.Second
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST       = 50
 	FETCH_REMOTE_SERVER_LIST_TIMEOUT               = 30 * time.Second
@@ -52,11 +54,15 @@ const (
 	FETCH_REMOTE_SERVER_LIST_STALE_PERIOD          = 6 * time.Hour
 	PSIPHON_API_CLIENT_SESSION_ID_LENGTH           = 16
 	PSIPHON_API_SERVER_TIMEOUT                     = 20 * time.Second
+	PSIPHON_API_SHUTDOWN_SERVER_TIMEOUT            = 1 * time.Second
 	PSIPHON_API_STATUS_REQUEST_PERIOD_MIN          = 5 * time.Minute
 	PSIPHON_API_STATUS_REQUEST_PERIOD_MAX          = 10 * time.Minute
+	PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MIN    = 5 * time.Second
+	PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MAX    = 10 * time.Second
 	PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES   = 256
 	PSIPHON_API_CONNECTED_REQUEST_PERIOD           = 24 * time.Hour
 	PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD     = 5 * time.Second
+	PSIPHON_API_TUNNEL_STATS_MAX_COUNT             = 1000
 	FETCH_ROUTES_TIMEOUT                           = 1 * time.Minute
 	DOWNLOAD_UPGRADE_TIMEOUT                       = 15 * time.Minute
 	DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD            = 5 * time.Second
@@ -118,6 +124,9 @@ type Config struct {
 	// automatic updates.
 	// This value is supplied by and depends on the Psiphon Network, and is
 	// typically embedded in the client binary.
+	// Note that sending a ClientPlatform string which includes "windows"
+	// (case insensitive) and a ClientVersion of <= 44 will cause an
+	// error in processing the response to DoConnectedRequest calls.
 	ClientVersion string
 
 	// ClientPlatform is the client platform ("Windows", "Android", etc.) that

+ 109 - 21
psiphon/controller.go

@@ -51,7 +51,7 @@ type Controller struct {
 	isEstablishing                 bool
 	establishWaitGroup             *sync.WaitGroup
 	stopEstablishingBroadcast      chan struct{}
-	candidateServerEntries         chan *ServerEntry
+	candidateServerEntries         chan *candidateServerEntry
 	establishPendingConns          *Conns
 	untunneledPendingConns         *Conns
 	untunneledDialConfig           *DialConfig
@@ -59,6 +59,12 @@ type Controller struct {
 	signalFetchRemoteServerList    chan struct{}
 	impairedProtocolClassification map[string]int
 	signalReportConnected          chan struct{}
+	serverAffinityDoneBroadcast    chan struct{}
+}
+
+type candidateServerEntry struct {
+	serverEntry               *ServerEntry
+	isServerAffinityCandidate bool
 }
 
 // NewController initializes a new controller.
@@ -184,9 +190,19 @@ func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 
 	close(controller.shutdownBroadcast)
 	controller.establishPendingConns.CloseAll()
-	controller.untunneledPendingConns.CloseAll()
 	controller.runWaitGroup.Wait()
 
+	// Stops untunneled connections, including fetch remote server list,
+	// split tunnel port forwards and also untunneled final stats requests.
+	// Note: there's a circular dependency with runWaitGroup.Wait() and
+	// untunneledPendingConns.CloseAll(): runWaitGroup depends on tunnels
+	// stopping which depends, in orderly shutdown, on final status requests
+	// completing. So this pending conns cancel comes too late to interrupt
+	// final status requests in the orderly shutdown case -- which is desired
+	// since we give those a short timeout and would prefer to not interrupt
+	// them.
+	controller.untunneledPendingConns.CloseAll()
+
 	controller.splitTunnelClassifier.Shutdown()
 
 	NoticeInfo("exiting controller")
@@ -296,7 +312,7 @@ loop:
 		reported := false
 		tunnel := controller.getNextActiveTunnel()
 		if tunnel != nil {
-			err := tunnel.session.DoConnectedRequest()
+			err := tunnel.serverContext.DoConnectedRequest()
 			if err == nil {
 				reported = true
 			} else {
@@ -380,8 +396,10 @@ loop:
 	NoticeInfo("exiting upgrade downloader")
 }
 
-func (controller *Controller) startClientUpgradeDownloader(session *Session) {
-	// session is nil when DisableApi is set
+func (controller *Controller) startClientUpgradeDownloader(
+	serverContext *ServerContext) {
+
+	// serverContext is nil when DisableApi is set
 	if controller.config.DisableApi {
 		return
 	}
@@ -392,7 +410,7 @@ func (controller *Controller) startClientUpgradeDownloader(session *Session) {
 		return
 	}
 
-	if session.clientUpgradeVersion == "" {
+	if serverContext.clientUpgradeVersion == "" {
 		// No upgrade is offered
 		return
 	}
@@ -402,7 +420,7 @@ func (controller *Controller) startClientUpgradeDownloader(session *Session) {
 	if !controller.startedUpgradeDownloader {
 		controller.startedUpgradeDownloader = true
 		controller.runWaitGroup.Add(1)
-		go controller.upgradeDownloader(session.clientUpgradeVersion)
+		go controller.upgradeDownloader(serverContext.clientUpgradeVersion)
 	}
 }
 
@@ -439,7 +457,7 @@ loop:
 			// establishPendingConns; this causes the pendingConns.Add() within
 			// interruptibleTCPDial to succeed instead of aborting, and the result
 			// is that it's possible for establish goroutines to run all the way through
-			// NewSession before being discarded... delaying shutdown.
+			// NewServerContext before being discarded... delaying shutdown.
 			select {
 			case <-controller.shutdownBroadcast:
 				break loop
@@ -481,7 +499,8 @@ loop:
 					// tunnel is established.
 					controller.startOrSignalConnectedReporter()
 
-					controller.startClientUpgradeDownloader(establishedTunnel.session)
+					controller.startClientUpgradeDownloader(
+						establishedTunnel.serverContext)
 				}
 
 			} else {
@@ -516,7 +535,7 @@ loop:
 
 // classifyImpairedProtocol tracks "impaired" protocol classifications for failed
 // tunnels. A protocol is classified as impaired if a tunnel using that protocol
-// fails, repeatedly, shortly after the start of the session. During tunnel
+// fails, repeatedly, shortly after the start of the connection. During tunnel
 // establishment, impaired protocols are briefly skipped.
 //
 // One purpose of this measure is to defend against an attack where the adversary,
@@ -527,7 +546,7 @@ loop:
 //
 // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol
 func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) {
-	if failedTunnel.sessionStartTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
+	if failedTunnel.startTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
 		controller.impairedProtocolClassification[failedTunnel.protocol] += 1
 	} else {
 		controller.impairedProtocolClassification[failedTunnel.protocol] = 0
@@ -581,7 +600,7 @@ func (controller *Controller) discardTunnel(tunnel *Tunnel) {
 	// discarded tunnel before fully active tunnels. Can a discarded tunnel
 	// be promoted (since it connects), but with lower rank than all active
 	// tunnels?
-	tunnel.Close()
+	tunnel.Close(true)
 }
 
 // registerTunnel adds the connected tunnel to the pool of active tunnels
@@ -605,6 +624,10 @@ func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
 	controller.tunnels = append(controller.tunnels, tunnel)
 	NoticeTunnels(len(controller.tunnels))
 
+	// Promote this successful tunnel to first rank so it's one
+	// of the first candidates next time establish runs.
+	PromoteServerEntry(tunnel.serverEntry.IpAddress)
+
 	return len(controller.tunnels), true
 }
 
@@ -640,7 +663,7 @@ func (controller *Controller) terminateTunnel(tunnel *Tunnel) {
 			if controller.nextTunnel >= len(controller.tunnels) {
 				controller.nextTunnel = 0
 			}
-			activeTunnel.Close()
+			activeTunnel.Close(false)
 			NoticeTunnels(len(controller.tunnels))
 			break
 		}
@@ -661,7 +684,7 @@ func (controller *Controller) terminateAllTunnels() {
 		tunnel := activeTunnel
 		go func() {
 			defer closeWaitGroup.Done()
-			tunnel.Close()
+			tunnel.Close(false)
 		}()
 	}
 	closeWaitGroup.Wait()
@@ -748,9 +771,36 @@ func (controller *Controller) startEstablishing() {
 	controller.isEstablishing = true
 	controller.establishWaitGroup = new(sync.WaitGroup)
 	controller.stopEstablishingBroadcast = make(chan struct{})
-	controller.candidateServerEntries = make(chan *ServerEntry)
+	controller.candidateServerEntries = make(chan *candidateServerEntry)
 	controller.establishPendingConns.Reset()
 
+	// The server affinity mechanism attempts to favor the previously
+	// used server when reconnecting. This is beneficial for user
+	// applications which expect consistency in user IP address (for
+	// example, a web site which prompts for additional user
+	// authentication when the IP address changes).
+	//
+	// Only the very first server, as determined by
+	// datastore.PromoteServerEntry(), is the server affinity candidate.
+	// Concurrent connections attempts to many servers are launched
+	// without delay, in case the affinity server connection fails.
+	// While the affinity server connection is outstanding, when any
+	// other connection is established, there is a short grace period
+	// delay before delivering the established tunnel; this allows some
+	// time for the affinity server connection to succeed first.
+	// When the affinity server connection fails, any other established
+	// tunnel is registered without delay.
+	//
+	// Note: the establishTunnelWorker that receives the affinity
+	// candidate is solely resonsible for closing
+	// controller.serverAffinityDoneBroadcast.
+	//
+	// Note: if config.EgressRegion or config.TunnelProtocol has changed
+	// since the top server was promoted, the first server may not actually
+	// be the last connected server.
+	// TODO: should not favor the first server in this case
+	controller.serverAffinityDoneBroadcast = make(chan struct{})
+
 	for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
 		controller.establishWaitGroup.Add(1)
 		go controller.establishTunnelWorker()
@@ -781,6 +831,7 @@ func (controller *Controller) stopEstablishing() {
 	controller.establishWaitGroup = nil
 	controller.stopEstablishingBroadcast = nil
 	controller.candidateServerEntries = nil
+	controller.serverAffinityDoneBroadcast = nil
 }
 
 // establishCandidateGenerator populates the candidate queue with server entries
@@ -798,6 +849,14 @@ func (controller *Controller) establishCandidateGenerator(impairedProtocols []st
 	}
 	defer iterator.Close()
 
+	isServerAffinityCandidate := true
+
+	// TODO: reconcile server affinity scheme with multi-tunnel mode
+	if controller.config.TunnelPoolSize > 1 {
+		isServerAffinityCandidate = false
+		close(controller.serverAffinityDoneBroadcast)
+	}
+
 loop:
 	// Repeat until stopped
 	for i := 0; ; i++ {
@@ -827,7 +886,7 @@ loop:
 			// first iteration of the ESTABLISH_TUNNEL_WORK_TIME
 			// loop since (a) one iteration should be sufficient to
 			// evade the attack; (b) there's a good chance of false
-			// positives (such as short session durations due to network
+			// positives (such as short tunnel durations due to network
 			// hopping on a mobile device).
 			// Impaired protocols logic is not applied when
 			// config.TunnelProtocol is specified.
@@ -843,11 +902,16 @@ loop:
 				}
 			}
 
+			// Note: there must be only one server affinity candidate, as it
+			// closes the serverAffinityDoneBroadcast channel.
+			candidate := &candidateServerEntry{serverEntry, isServerAffinityCandidate}
+			isServerAffinityCandidate = false
+
 			// TODO: here we could generate multiple candidates from the
 			// server entry when there are many MeekFrontingAddresses.
 
 			select {
-			case controller.candidateServerEntries <- serverEntry:
+			case controller.candidateServerEntries <- candidate:
 			case <-controller.stopEstablishingBroadcast:
 				break loop
 			case <-controller.shutdownBroadcast:
@@ -901,7 +965,7 @@ loop:
 func (controller *Controller) establishTunnelWorker() {
 	defer controller.establishWaitGroup.Done()
 loop:
-	for serverEntry := range controller.candidateServerEntries {
+	for candidateServerEntry := range controller.candidateServerEntries {
 		// Note: don't receive from candidateServerEntries and stopEstablishingBroadcast
 		// in the same select, since we want to prioritize receiving the stop signal
 		if controller.isStopEstablishingBroadcast() {
@@ -909,26 +973,44 @@ loop:
 		}
 
 		// There may already be a tunnel to this candidate. If so, skip it.
-		if controller.isActiveTunnelServerEntry(serverEntry) {
+		if controller.isActiveTunnelServerEntry(candidateServerEntry.serverEntry) {
 			continue
 		}
 
 		tunnel, err := EstablishTunnel(
 			controller.config,
+			controller.untunneledDialConfig,
 			controller.sessionId,
 			controller.establishPendingConns,
-			serverEntry,
+			candidateServerEntry.serverEntry,
 			controller) // TunnelOwner
 		if err != nil {
+
+			// Unblock other candidates immediately when
+			// server affinity candidate fails.
+			if candidateServerEntry.isServerAffinityCandidate {
+				close(controller.serverAffinityDoneBroadcast)
+			}
+
 			// Before emitting error, check if establish interrupted, in which
 			// case the error is noise.
 			if controller.isStopEstablishingBroadcast() {
 				break loop
 			}
-			NoticeInfo("failed to connect to %s: %s", serverEntry.IpAddress, err)
+			NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
 			continue
 		}
 
+		// Block for server affinity grace period before delivering.
+		if !candidateServerEntry.isServerAffinityCandidate {
+			timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD)
+			select {
+			case <-timer.C:
+			case <-controller.serverAffinityDoneBroadcast:
+			case <-controller.stopEstablishingBroadcast:
+			}
+		}
+
 		// Deliver established tunnel.
 		// Don't block. Assumes the receiver has a buffer large enough for
 		// the number of desired tunnels. If there's no room, the tunnel must
@@ -938,6 +1020,12 @@ loop:
 		default:
 			controller.discardTunnel(tunnel)
 		}
+
+		// Unblock other candidates only after delivering when
+		// server affinity candidate succeeds.
+		if candidateServerEntry.isServerAffinityCandidate {
+			close(controller.serverAffinityDoneBroadcast)
+		}
 	}
 	NoticeInfo("stopped establish worker")
 }

+ 542 - 329
psiphon/dataStore.go

@@ -1,5 +1,3 @@
-// +build windows
-
 /*
  * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
@@ -22,7 +20,7 @@
 package psiphon
 
 import (
-	"database/sql"
+	"bytes"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -32,14 +30,34 @@ import (
 	"sync"
 	"time"
 
-	sqlite3 "github.com/Psiphon-Inc/go-sqlite3"
+	"github.com/Psiphon-Inc/bolt"
 )
 
+// The BoltDB dataStore implementation is an alternative to the sqlite3-based
+// implementation in dataStore.go. Both implementations have the same interface.
+//
+// BoltDB is pure Go, and is intended to be used in cases where we have trouble
+// building sqlite3/CGO (e.g., currently go mobile due to
+// https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
+// the primary dataStore implementation.
+//
 type dataStore struct {
 	init sync.Once
-	db   *sql.DB
+	db   *bolt.DB
 }
 
+const (
+	serverEntriesBucket         = "serverEntries"
+	rankedServerEntriesBucket   = "rankedServerEntries"
+	rankedServerEntriesKey      = "rankedServerEntries"
+	splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
+	splitTunnelRouteDataBucket  = "splitTunnelRouteData"
+	urlETagsBucket              = "urlETags"
+	keyValueBucket              = "keyValues"
+	tunnelStatsBucket           = "tunnelStats"
+	rankedServerEntryCount      = 100
+)
+
 var singleton dataStore
 
 // InitDataStore initializes the singleton instance of dataStore. This
@@ -52,58 +70,51 @@ var singleton dataStore
 // have been replaced by checkInitDataStore() to assert that Init was called.
 func InitDataStore(config *Config) (err error) {
 	singleton.init.Do(func() {
+
 		filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
-		var db *sql.DB
-		db, err = sql.Open(
-			"sqlite3",
-			fmt.Sprintf("file:%s?cache=private&mode=rwc", filename))
+		var db *bolt.DB
+		db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
 		if err != nil {
 			// Note: intending to set the err return value for InitDataStore
 			err = fmt.Errorf("initDataStore failed to open database: %s", err)
 			return
 		}
-		initialization := "pragma journal_mode=WAL;\n"
-		if config.DataStoreTempDirectory != "" {
-			// On some platforms (e.g., Android), the standard temporary directories expected
-			// by sqlite (see unixGetTempname in aggregate sqlite3.c) may not be present.
-			// In that case, sqlite tries to use the current working directory; but this may
-			// be "/" (again, on Android) which is not writable.
-			// Instead of setting the process current working directory from this library,
-			// use the deprecated temp_store_directory pragma to force use of a specified
-			// temporary directory: https://www.sqlite.org/pragma.html#pragma_temp_store_directory.
-			// TODO: is there another way to restrict writing of temporary files? E.g. temp_store=3?
-			initialization += fmt.Sprintf(
-				"pragma temp_store_directory=\"%s\";\n", config.DataStoreTempDirectory)
-		}
-		initialization += `
-        create table if not exists serverEntry
-            (id text not null primary key,
-             rank integer not null unique,
-             region text not null,
-             data blob not null);
-        create index if not exists idx_serverEntry_region on serverEntry(region);
-        create table if not exists serverEntryProtocol
-            (serverEntryId text not null,
-             protocol text not null,
-             primary key (serverEntryId, protocol));
-        create table if not exists splitTunnelRoutes
-            (region text not null primary key,
-             etag text not null,
-             data blob not null);
-        create table if not exists urlETags
-            (url text not null primary key,
-             etag text not null);
-        create table if not exists keyValue
-            (key text not null primary key,
-             value text not null);
-        `
-		_, err = db.Exec(initialization)
+
+		err = db.Update(func(tx *bolt.Tx) error {
+			requiredBuckets := []string{
+				serverEntriesBucket,
+				rankedServerEntriesBucket,
+				splitTunnelRouteETagsBucket,
+				splitTunnelRouteDataBucket,
+				urlETagsBucket,
+				keyValueBucket,
+				tunnelStatsBucket,
+			}
+			for _, bucket := range requiredBuckets {
+				_, err := tx.CreateBucketIfNotExists([]byte(bucket))
+				if err != nil {
+					return err
+				}
+			}
+			return nil
+		})
 		if err != nil {
-			err = fmt.Errorf("initDataStore failed to initialize: %s", err)
+			err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
 			return
 		}
+
 		singleton.db = db
+
+		// The migrateServerEntries function requires the data store is
+		// initialized prior to execution so that migrated entries can be stored
+		migratableServerEntries := prepareMigrationEntries(config)
+		if len(migratableServerEntries) > 0 {
+			migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
+		}
+
+		resetAllTunnelStatsToUnreported()
 	})
+
 	return err
 }
 
@@ -113,71 +124,17 @@ func checkInitDataStore() {
 	}
 }
 
-func canRetry(err error) bool {
-	sqlError, ok := err.(sqlite3.Error)
-	return ok && (sqlError.Code == sqlite3.ErrBusy ||
-		sqlError.Code == sqlite3.ErrLocked ||
-		sqlError.ExtendedCode == sqlite3.ErrLockedSharedCache ||
-		sqlError.ExtendedCode == sqlite3.ErrBusySnapshot)
-}
-
-// transactionWithRetry will retry a write transaction if sqlite3
-// reports a table is locked by another writer.
-func transactionWithRetry(updater func(*sql.Tx) error) error {
-	checkInitDataStore()
-	for i := 0; i < 10; i++ {
-		if i > 0 {
-			// Delay on retry
-			time.Sleep(100)
-		}
-		transaction, err := singleton.db.Begin()
-		if err != nil {
-			return ContextError(err)
-		}
-		err = updater(transaction)
-		if err != nil {
-			transaction.Rollback()
-			if canRetry(err) {
-				continue
-			}
-			return ContextError(err)
-		}
-		err = transaction.Commit()
-		if err != nil {
-			transaction.Rollback()
-			if canRetry(err) {
-				continue
-			}
-			return ContextError(err)
-		}
-		return nil
-	}
-	return ContextError(errors.New("retries exhausted"))
-}
-
-// serverEntryExists returns true if a serverEntry with the
-// given ipAddress id already exists.
-func serverEntryExists(transaction *sql.Tx, ipAddress string) (bool, error) {
-	query := "select count(*) from serverEntry where id  = ?;"
-	var count int
-	err := singleton.db.QueryRow(query, ipAddress).Scan(&count)
-	if err != nil {
-		return false, ContextError(err)
-	}
-	return count > 0, nil
-}
-
 // StoreServerEntry adds the server entry to the data store.
 // A newly stored (or re-stored) server entry is assigned the next-to-top
 // rank for iteration order (the previous top ranked entry is promoted). The
 // purpose of inserting at next-to-top is to keep the last selected server
-// as the top ranked server. Note, server candidates are iterated in decending
-// rank order, so the largest rank is top rank.
+// as the top ranked server.
 // When replaceIfExists is true, an existing server entry record is
 // overwritten; otherwise, the existing record is unchanged.
 // If the server entry data is malformed, an alert notice is issued and
 // the entry is skipped; no error is returned.
 func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
+	checkInitDataStore()
 
 	// Server entries should already be validated before this point,
 	// so instead of skipping we fail with an error.
@@ -186,11 +143,20 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
 		return ContextError(errors.New("invalid server entry"))
 	}
 
-	return transactionWithRetry(func(transaction *sql.Tx) error {
-		serverEntryExists, err := serverEntryExists(transaction, serverEntry.IpAddress)
-		if err != nil {
-			return ContextError(err)
-		}
+	// BoltDB implementation note:
+	// For simplicity, we don't maintain indexes on server entry
+	// region or supported protocols. Instead, we perform full-bucket
+	// scans with a filter. With a small enough database (thousands or
+	// even tens of thousand of server entries) and common enough
+	// values (e.g., many servers support all protocols), performance
+	// is expected to be acceptable.
+
+	serverEntryExists := false
+	err = singleton.db.Update(func(tx *bolt.Tx) error {
+
+		serverEntries := tx.Bucket([]byte(serverEntriesBucket))
+		serverEntryExists = (serverEntries.Get([]byte(serverEntry.IpAddress)) != nil)
+
 		if serverEntryExists && !replaceIfExists {
 			// Disabling this notice, for now, as it generates too much noise
 			// in diagnostics with clients that always submit embedded servers
@@ -198,51 +164,31 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
 			// NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
 			return nil
 		}
-		_, err = transaction.Exec(`
-            update serverEntry set rank = rank + 1
-                where id = (select id from serverEntry order by rank desc limit 1);
-            `)
-		if err != nil {
-			// Note: ContextError() would break canRetry()
-			return err
-		}
+
 		data, err := json.Marshal(serverEntry)
 		if err != nil {
 			return ContextError(err)
 		}
-		_, err = transaction.Exec(`
-            insert or replace into serverEntry (id, rank, region, data)
-            values (?, (select coalesce(max(rank)-1, 0) from serverEntry), ?, ?);
-            `, serverEntry.IpAddress, serverEntry.Region, data)
+		err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
 		if err != nil {
-			return err
+			return ContextError(err)
 		}
-		_, err = transaction.Exec(`
-            delete from serverEntryProtocol where serverEntryId = ?;
-            `, serverEntry.IpAddress)
+
+		err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
 		if err != nil {
-			return err
-		}
-		for _, protocol := range SupportedTunnelProtocols {
-			// Note: for meek, the capabilities are FRONTED-MEEK and UNFRONTED-MEEK
-			// and the additonal OSSH service is assumed to be available internally.
-			requiredCapability := strings.TrimSuffix(protocol, "-OSSH")
-			if Contains(serverEntry.Capabilities, requiredCapability) {
-				_, err = transaction.Exec(`
-                    insert into serverEntryProtocol (serverEntryId, protocol)
-                    values (?, ?);
-                    `, serverEntry.IpAddress, protocol)
-				if err != nil {
-					return err
-				}
-			}
-		}
-		// TODO: post notice after commit
-		if !serverEntryExists {
-			NoticeInfo("updated server %s", serverEntry.IpAddress)
+			return ContextError(err)
 		}
+
 		return nil
 	})
+	if err != nil {
+		return ContextError(err)
+	}
+
+	if !serverEntryExists {
+		NoticeInfo("updated server %s", serverEntry.IpAddress)
+	}
+	return nil
 }
 
 // StoreServerEntries shuffles and stores a list of server entries.
@@ -250,6 +196,7 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
 // load balancing.
 // There is an independent transaction for each entry insert/update.
 func StoreServerEntries(serverEntries []*ServerEntry, replaceIfExists bool) error {
+	checkInitDataStore()
 
 	for index := len(serverEntries) - 1; index > 0; index-- {
 		swapIndex := rand.Intn(index + 1)
@@ -275,18 +222,98 @@ func StoreServerEntries(serverEntries []*ServerEntry, replaceIfExists bool) erro
 // iterated in decending rank order, so this server entry will be
 // the first candidate in a subsequent tunnel establishment.
 func PromoteServerEntry(ipAddress string) error {
-	return transactionWithRetry(func(transaction *sql.Tx) error {
-		_, err := transaction.Exec(`
-            update serverEntry
-            set rank = (select MAX(rank)+1 from serverEntry)
-            where id = ?;
-            `, ipAddress)
-		if err != nil {
-			// Note: ContextError() would break canRetry()
-			return err
-		}
-		return nil
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		return insertRankedServerEntry(tx, ipAddress, 0)
 	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
+	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
+	data := bucket.Get([]byte(rankedServerEntriesKey))
+
+	if data == nil {
+		return []string{}, nil
+	}
+
+	rankedServerEntries := make([]string, 0)
+	err := json.Unmarshal(data, &rankedServerEntries)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return rankedServerEntries, nil
+}
+
+func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
+	data, err := json.Marshal(rankedServerEntries)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
+	err = bucket.Put([]byte(rankedServerEntriesKey), data)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return nil
+}
+
+func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
+	rankedServerEntries, err := getRankedServerEntries(tx)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	// BoltDB implementation note:
+	// For simplicity, we store the ranked server ids in an array serialized to
+	// a single key value. To ensure this value doesn't grow without bound,
+	// it's capped at rankedServerEntryCount. For now, this cap should be large
+	// enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
+	// any reasonable configuration of config.TunnelPoolSize.
+
+	// Using: https://github.com/golang/go/wiki/SliceTricks
+
+	// When serverEntryId is already ranked, remove it first to avoid duplicates
+
+	for i, rankedServerEntryId := range rankedServerEntries {
+		if rankedServerEntryId == serverEntryId {
+			rankedServerEntries = append(
+				rankedServerEntries[:i], rankedServerEntries[i+1:]...)
+			break
+		}
+	}
+
+	// SliceTricks insert, with length cap enforced
+
+	if len(rankedServerEntries) < rankedServerEntryCount {
+		rankedServerEntries = append(rankedServerEntries, "")
+	}
+	if position >= len(rankedServerEntries) {
+		position = len(rankedServerEntries) - 1
+	}
+	copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
+	rankedServerEntries[position] = serverEntryId
+
+	err = setRankedServerEntries(tx, rankedServerEntries)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return nil
+}
+
+func serverEntrySupportsProtocol(serverEntry *ServerEntry, protocol string) bool {
+	// Note: for meek, the capabilities are FRONTED-MEEK and UNFRONTED-MEEK
+	// and the additonal OSSH service is assumed to be available internally.
+	requiredCapability := strings.TrimSuffix(protocol, "-OSSH")
+	return Contains(serverEntry.Capabilities, requiredCapability)
 }
 
 // ServerEntryIterator is used to iterate over
@@ -295,14 +322,14 @@ type ServerEntryIterator struct {
 	region                      string
 	protocol                    string
 	shuffleHeadLength           int
-	transaction                 *sql.Tx
-	cursor                      *sql.Rows
+	serverEntryIds              []string
+	serverEntryIndex            int
 	isTargetServerEntryIterator bool
 	hasNextTargetServerEntry    bool
 	targetServerEntry           *ServerEntry
 }
 
-// NewServerEntryIterator creates a new NewServerEntryIterator
+// NewServerEntryIterator creates a new ServerEntryIterator
 func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
 
 	// When configured, this target server entry is the only candidate
@@ -362,54 +389,69 @@ func (iterator *ServerEntryIterator) Reset() error {
 	count := CountServerEntries(iterator.region, iterator.protocol)
 	NoticeCandidateServers(iterator.region, iterator.protocol, count)
 
-	transaction, err := singleton.db.Begin()
-	if err != nil {
-		return ContextError(err)
-	}
-	var cursor *sql.Rows
-
 	// This query implements the Psiphon server candidate selection
 	// algorithm: the first TunnelPoolSize server candidates are in rank
 	// (priority) order, to favor previously successful servers; then the
 	// remaining long tail is shuffled to raise up less recent candidates.
 
-	whereClause, whereParams := makeServerEntryWhereClause(
-		iterator.region, iterator.protocol, nil)
-	headLength := iterator.shuffleHeadLength
-	queryFormat := `
-		select data from serverEntry %s
-		order by case
-		when rank > coalesce((select rank from serverEntry %s order by rank desc limit ?, 1), -1) then rank
-		else abs(random())%%((select rank from serverEntry %s order by rank desc limit ?, 1))
-		end desc;`
-	query := fmt.Sprintf(queryFormat, whereClause, whereClause, whereClause)
-	params := make([]interface{}, 0)
-	params = append(params, whereParams...)
-	params = append(params, whereParams...)
-	params = append(params, headLength)
-	params = append(params, whereParams...)
-	params = append(params, headLength)
-
-	cursor, err = transaction.Query(query, params...)
+	// BoltDB implementation note:
+	// We don't keep a transaction open for the duration of the iterator
+	// because this would expose the following semantics to consumer code:
+	//
+	//     Read-only transactions and read-write transactions ... generally
+	//     shouldn't be opened simultaneously in the same goroutine. This can
+	//     cause a deadlock as the read-write transaction needs to periodically
+	//     re-map the data file but it cannot do so while a read-only
+	//     transaction is open.
+	//     (https://github.com/boltdb/bolt)
+	//
+	// So the underlying serverEntriesBucket could change after the serverEntryIds
+	// list is built.
+
+	var serverEntryIds []string
+
+	err := singleton.db.View(func(tx *bolt.Tx) error {
+		var err error
+		serverEntryIds, err = getRankedServerEntries(tx)
+		if err != nil {
+			return err
+		}
+
+		skipServerEntryIds := make(map[string]bool)
+		for _, serverEntryId := range serverEntryIds {
+			skipServerEntryIds[serverEntryId] = true
+		}
+
+		bucket := tx.Bucket([]byte(serverEntriesBucket))
+		cursor := bucket.Cursor()
+		for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
+			serverEntryId := string(key)
+			if _, ok := skipServerEntryIds[serverEntryId]; ok {
+				continue
+			}
+			serverEntryIds = append(serverEntryIds, serverEntryId)
+		}
+		return nil
+	})
 	if err != nil {
-		transaction.Rollback()
 		return ContextError(err)
 	}
-	iterator.transaction = transaction
-	iterator.cursor = cursor
+
+	for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
+		j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
+		serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
+	}
+
+	iterator.serverEntryIds = serverEntryIds
+	iterator.serverEntryIndex = 0
+
 	return nil
 }
 
 // Close cleans up resources associated with a ServerEntryIterator.
 func (iterator *ServerEntryIterator) Close() {
-	if iterator.cursor != nil {
-		iterator.cursor.Close()
-	}
-	iterator.cursor = nil
-	if iterator.transaction != nil {
-		iterator.transaction.Rollback()
-	}
-	iterator.transaction = nil
+	iterator.serverEntryIds = nil
+	iterator.serverEntryIndex = 0
 }
 
 // Next returns the next server entry, by rank, for a ServerEntryIterator.
@@ -429,24 +471,44 @@ func (iterator *ServerEntryIterator) Next() (serverEntry *ServerEntry, err error
 		return nil, nil
 	}
 
-	if !iterator.cursor.Next() {
-		err = iterator.cursor.Err()
+	// There are no region/protocol indexes for the server entries bucket.
+	// Loop until we have the next server entry that matches the iterator
+	// filter requirements.
+	for {
+		if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
+			// There is no next item
+			return nil, nil
+		}
+
+		serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
+		iterator.serverEntryIndex += 1
+
+		var data []byte
+		err = singleton.db.View(func(tx *bolt.Tx) error {
+			bucket := tx.Bucket([]byte(serverEntriesBucket))
+			data = bucket.Get([]byte(serverEntryId))
+			return nil
+		})
 		if err != nil {
 			return nil, ContextError(err)
 		}
-		// There is no next item
-		return nil, nil
-	}
 
-	var data []byte
-	err = iterator.cursor.Scan(&data)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	serverEntry = new(ServerEntry)
-	err = json.Unmarshal(data, serverEntry)
-	if err != nil {
-		return nil, ContextError(err)
+		if data == nil {
+			return nil, ContextError(
+				fmt.Errorf("Unexpected missing server entry: %s", serverEntryId))
+		}
+
+		serverEntry = new(ServerEntry)
+		err = json.Unmarshal(data, serverEntry)
+		if err != nil {
+			return nil, ContextError(err)
+		}
+
+		if (iterator.region == "" || serverEntry.Region == iterator.region) &&
+			(iterator.protocol == "" || serverEntrySupportsProtocol(serverEntry, iterator.protocol)) {
+
+			break
+		}
 	}
 
 	return MakeCompatibleServerEntry(serverEntry), nil
@@ -465,123 +527,92 @@ func MakeCompatibleServerEntry(serverEntry *ServerEntry) *ServerEntry {
 	return serverEntry
 }
 
-func makeServerEntryWhereClause(
-	region, protocol string, excludeIds []string) (whereClause string, whereParams []interface{}) {
-	whereClause = ""
-	whereParams = make([]interface{}, 0)
-	if region != "" {
-		whereClause += " where region = ?"
-		whereParams = append(whereParams, region)
-	}
-	if protocol != "" {
-		if len(whereClause) > 0 {
-			whereClause += " and"
-		} else {
-			whereClause += " where"
-		}
-		whereClause +=
-			" exists (select 1 from serverEntryProtocol where protocol = ? and serverEntryId = serverEntry.id)"
-		whereParams = append(whereParams, protocol)
-	}
-	if len(excludeIds) > 0 {
-		if len(whereClause) > 0 {
-			whereClause += " and"
-		} else {
-			whereClause += " where"
-		}
-		whereClause += " id in ("
-		for index, id := range excludeIds {
-			if index > 0 {
-				whereClause += ", "
+func scanServerEntries(scanner func(*ServerEntry)) error {
+	err := singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(serverEntriesBucket))
+		cursor := bucket.Cursor()
+
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			serverEntry := new(ServerEntry)
+			err := json.Unmarshal(value, serverEntry)
+			if err != nil {
+				return err
 			}
-			whereClause += "?"
-			whereParams = append(whereParams, id)
+			scanner(serverEntry)
 		}
-		whereClause += ")"
+
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
 	}
-	return whereClause, whereParams
+
+	return nil
 }
 
 // CountServerEntries returns a count of stored servers for the
 // specified region and protocol.
 func CountServerEntries(region, protocol string) int {
 	checkInitDataStore()
-	var count int
-	whereClause, whereParams := makeServerEntryWhereClause(region, protocol, nil)
-	query := "select count(*) from serverEntry" + whereClause
-	err := singleton.db.QueryRow(query, whereParams...).Scan(&count)
+
+	count := 0
+	err := scanServerEntries(func(serverEntry *ServerEntry) {
+		if (region == "" || serverEntry.Region == region) &&
+			(protocol == "" || serverEntrySupportsProtocol(serverEntry, protocol)) {
+			count += 1
+		}
+	})
 
 	if err != nil {
 		NoticeAlert("CountServerEntries failed: %s", err)
 		return 0
 	}
 
-	if region == "" {
-		region = "(any)"
-	}
-	if protocol == "" {
-		protocol = "(any)"
-	}
-	NoticeInfo("servers for region %s and protocol %s: %d",
-		region, protocol, count)
-
 	return count
 }
 
 // ReportAvailableRegions prints a notice with the available egress regions.
+// Note that this report ignores config.TunnelProtocol.
 func ReportAvailableRegions() {
 	checkInitDataStore()
 
-	// TODO: For consistency, regions-per-protocol should be used
+	regions := make(map[string]bool)
+	err := scanServerEntries(func(serverEntry *ServerEntry) {
+		regions[serverEntry.Region] = true
+	})
 
-	rows, err := singleton.db.Query("select distinct(region) from serverEntry;")
 	if err != nil {
-		NoticeAlert("failed to query data store for available regions: %s", ContextError(err))
+		NoticeAlert("ReportAvailableRegions failed: %s", err)
 		return
 	}
-	defer rows.Close()
-
-	var regions []string
-
-	for rows.Next() {
-		var region string
-		err = rows.Scan(&region)
-		if err != nil {
-			NoticeAlert("failed to retrieve available regions from data store: %s", ContextError(err))
-			return
-		}
 
+	regionList := make([]string, 0, len(regions))
+	for region, _ := range regions {
 		// Some server entries do not have a region, but it makes no sense to return
 		// an empty string as an "available region".
 		if region != "" {
-			regions = append(regions, region)
+			regionList = append(regionList, region)
 		}
 	}
 
-	NoticeAvailableEgressRegions(regions)
+	NoticeAvailableEgressRegions(regionList)
 }
 
 // GetServerEntryIpAddresses returns an array containing
 // all stored server IP addresses.
 func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
 	checkInitDataStore()
+
 	ipAddresses = make([]string, 0)
-	rows, err := singleton.db.Query("select id from serverEntry;")
+	err = scanServerEntries(func(serverEntry *ServerEntry) {
+		ipAddresses = append(ipAddresses, serverEntry.IpAddress)
+	})
+
 	if err != nil {
 		return nil, ContextError(err)
 	}
-	defer rows.Close()
-	for rows.Next() {
-		var ipAddress string
-		err = rows.Scan(&ipAddress)
-		if err != nil {
-			return nil, ContextError(err)
-		}
-		ipAddresses = append(ipAddresses, ipAddress)
-	}
-	if err = rows.Err(); err != nil {
-		return nil, ContextError(err)
-	}
+
 	return ipAddresses, nil
 }
 
@@ -589,28 +620,34 @@ func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
 // the given region. The associated etag is also stored and
 // used to make efficient web requests for updates to the data.
 func SetSplitTunnelRoutes(region, etag string, data []byte) error {
-	return transactionWithRetry(func(transaction *sql.Tx) error {
-		_, err := transaction.Exec(`
-            insert or replace into splitTunnelRoutes (region, etag, data)
-            values (?, ?, ?);
-            `, region, etag, data)
-		if err != nil {
-			// Note: ContextError() would break canRetry()
-			return err
-		}
-		return nil
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
+		err := bucket.Put([]byte(region), []byte(etag))
+
+		bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
+		err = bucket.Put([]byte(region), data)
+		return err
 	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
 }
 
 // GetSplitTunnelRoutesETag retrieves the etag for cached routes
 // data for the specified region. If not found, it returns an empty string value.
 func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
 	checkInitDataStore()
-	rows := singleton.db.QueryRow("select etag from splitTunnelRoutes where region = ?;", region)
-	err = rows.Scan(&etag)
-	if err == sql.ErrNoRows {
-		return "", nil
-	}
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
+		etag = string(bucket.Get([]byte(region)))
+		return nil
+	})
+
 	if err != nil {
 		return "", ContextError(err)
 	}
@@ -621,11 +658,13 @@ func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
 // for the specified region. If not found, it returns a nil value.
 func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
 	checkInitDataStore()
-	rows := singleton.db.QueryRow("select data from splitTunnelRoutes where region = ?;", region)
-	err = rows.Scan(&data)
-	if err == sql.ErrNoRows {
-		return nil, nil
-	}
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
+		data = bucket.Get([]byte(region))
+		return nil
+	})
+
 	if err != nil {
 		return nil, ContextError(err)
 	}
@@ -636,28 +675,31 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
 // Note: input URL is treated as a string, and is not
 // encoded or decoded or otherwise canonicalized.
 func SetUrlETag(url, etag string) error {
-	return transactionWithRetry(func(transaction *sql.Tx) error {
-		_, err := transaction.Exec(`
-            insert or replace into urlETags (url, etag)
-            values (?, ?);
-            `, url, etag)
-		if err != nil {
-			// Note: ContextError() would break canRetry()
-			return err
-		}
-		return nil
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(urlETagsBucket))
+		err := bucket.Put([]byte(url), []byte(etag))
+		return err
 	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
 }
 
 // GetUrlETag retrieves a previously stored an ETag for the
 // specfied URL. If not found, it returns an empty string value.
 func GetUrlETag(url string) (etag string, err error) {
 	checkInitDataStore()
-	rows := singleton.db.QueryRow("select etag from urlETags where url = ?;", url)
-	err = rows.Scan(&etag)
-	if err == sql.ErrNoRows {
-		return "", nil
-	}
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(urlETagsBucket))
+		etag = string(bucket.Get([]byte(url)))
+		return nil
+	})
+
 	if err != nil {
 		return "", ContextError(err)
 	}
@@ -666,30 +708,201 @@ func GetUrlETag(url string) (etag string, err error) {
 
 // SetKeyValue stores a key/value pair.
 func SetKeyValue(key, value string) error {
-	return transactionWithRetry(func(transaction *sql.Tx) error {
-		_, err := transaction.Exec(`
-            insert or replace into keyValue (key, value)
-            values (?, ?);
-            `, key, value)
-		if err != nil {
-			// Note: ContextError() would break canRetry()
-			return err
-		}
-		return nil
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(keyValueBucket))
+		err := bucket.Put([]byte(key), []byte(value))
+		return err
 	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
 }
 
 // GetKeyValue retrieves the value for a given key. If not found,
 // it returns an empty string value.
 func GetKeyValue(key string) (value string, err error) {
 	checkInitDataStore()
-	rows := singleton.db.QueryRow("select value from keyValue where key = ?;", key)
-	err = rows.Scan(&value)
-	if err == sql.ErrNoRows {
-		return "", nil
-	}
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(keyValueBucket))
+		value = string(bucket.Get([]byte(key)))
+		return nil
+	})
+
 	if err != nil {
 		return "", ContextError(err)
 	}
 	return value, nil
 }
+
+// Tunnel stats records in the tunnelStatsStateUnreported
+// state are available for take out.
+// Records in the tunnelStatsStateReporting have been
+// taken out and are pending either deleting (for a
+// successful request) or change to StateUnreported (for
+// a failed request).
+// All tunnel stats records are reverted to StateUnreported
+// when the datastore is initialized at start up.
+
+var tunnelStatsStateUnreported = []byte("0")
+var tunnelStatsStateReporting = []byte("1")
+
+// StoreTunnelStats adds a new tunnel stats record, which is
+// set to StateUnreported and is an immediate candidate for
+// reporting.
+// tunnelStats is a JSON byte array containing fields as
+// required by the Psiphon server API (see RecordTunnelStats).
+// It's assumed that the JSON value contains enough unique
+// information for the value to function as a key in the
+// key/value datastore. This assumption is currently satisfied
+// by the fields sessionId + tunnelNumber.
+func StoreTunnelStats(tunnelStats []byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		err := bucket.Put(tunnelStats, tunnelStatsStateUnreported)
+		return err
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// CountUnreportedTunnelStats returns the number of tunnel
+// stats records in StateUnreported.
+func CountUnreportedTunnelStats() int {
+	checkInitDataStore()
+
+	unreported := 0
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		cursor := bucket.Cursor()
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
+				unreported++
+				break
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		NoticeAlert("CountUnreportedTunnelStats failed: %s", err)
+		return 0
+	}
+
+	return unreported
+}
+
+// TakeOutUnreportedTunnelStats returns up to maxCount tunnel
+// stats records that are in StateUnreported. The records are set
+// to StateReporting. If the records are successfully reported,
+// clear them with ClearReportedTunnelStats. If the records are
+// not successfully reported, restore them with
+// PutBackUnreportedTunnelStats.
+func TakeOutUnreportedTunnelStats(maxCount int) ([][]byte, error) {
+	checkInitDataStore()
+
+	tunnelStats := make([][]byte, 0)
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		cursor := bucket.Cursor()
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
+				err := bucket.Put(key, tunnelStatsStateReporting)
+				if err != nil {
+					return err
+				}
+				tunnelStats = append(tunnelStats, key)
+				if len(tunnelStats) >= maxCount {
+					break
+				}
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return tunnelStats, nil
+}
+
+// PutBackUnreportedTunnelStats restores a list of tunnel
+// stats records to StateUnreported.
+func PutBackUnreportedTunnelStats(tunnelStats [][]byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		for _, key := range tunnelStats {
+			err := bucket.Put(key, tunnelStatsStateUnreported)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// ClearReportedTunnelStats deletes a list of tunnel
+// stats records that were succesdfully reported.
+func ClearReportedTunnelStats(tunnelStats [][]byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		for _, key := range tunnelStats {
+			err := bucket.Delete(key)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// resetAllTunnelStatsToUnreported sets all tunnel
+// stats records to StateUnreported. This reset is called
+// when the datastore is initialized at start up, as we do
+// not know if tunnel records in StateReporting were reported
+// or not.
+func resetAllTunnelStatsToUnreported() error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		cursor := bucket.Cursor()
+		for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
+			err := bucket.Put(key, tunnelStatsStateUnreported)
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}

+ 0 - 719
psiphon/dataStore_alt.go

@@ -1,719 +0,0 @@
-// +build !windows
-
-/*
- * Copyright (c) 2015, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
-package psiphon
-
-import (
-	"encoding/json"
-	"errors"
-	"fmt"
-	"math/rand"
-	"path/filepath"
-	"strings"
-	"sync"
-	"time"
-
-	"github.com/Psiphon-Inc/bolt"
-)
-
-// The BoltDB dataStore implementation is an alternative to the sqlite3-based
-// implementation in dataStore.go. Both implementations have the same interface.
-//
-// BoltDB is pure Go, and is intended to be used in cases where we have trouble
-// building sqlite3/CGO (e.g., currently go mobile due to
-// https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
-// the primary dataStore implementation.
-//
-type dataStore struct {
-	init sync.Once
-	db   *bolt.DB
-}
-
-const (
-	serverEntriesBucket         = "serverEntries"
-	rankedServerEntriesBucket   = "rankedServerEntries"
-	rankedServerEntriesKey      = "rankedServerEntries"
-	splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
-	splitTunnelRouteDataBucket  = "splitTunnelRouteData"
-	urlETagsBucket              = "urlETags"
-	keyValueBucket              = "keyValues"
-	rankedServerEntryCount      = 100
-)
-
-var singleton dataStore
-
-// InitDataStore initializes the singleton instance of dataStore. This
-// function uses a sync.Once and is safe for use by concurrent goroutines.
-// The underlying sql.DB connection pool is also safe.
-//
-// Note: the sync.Once was more useful when initDataStore was private and
-// called on-demand by the public functions below. Now we require an explicit
-// InitDataStore() call with the filename passed in. The on-demand calls
-// have been replaced by checkInitDataStore() to assert that Init was called.
-func InitDataStore(config *Config) (err error) {
-	singleton.init.Do(func() {
-		filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
-		var db *bolt.DB
-		db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
-		if err != nil {
-			// Note: intending to set the err return value for InitDataStore
-			err = fmt.Errorf("initDataStore failed to open database: %s", err)
-			return
-		}
-
-		err = db.Update(func(tx *bolt.Tx) error {
-			requiredBuckets := []string{
-				serverEntriesBucket,
-				rankedServerEntriesBucket,
-				splitTunnelRouteETagsBucket,
-				splitTunnelRouteDataBucket,
-				urlETagsBucket,
-				keyValueBucket,
-			}
-			for _, bucket := range requiredBuckets {
-				_, err := tx.CreateBucketIfNotExists([]byte(bucket))
-				if err != nil {
-					return err
-				}
-			}
-			return nil
-		})
-		if err != nil {
-			err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
-			return
-		}
-
-		singleton.db = db
-	})
-	return err
-}
-
-func checkInitDataStore() {
-	if singleton.db == nil {
-		panic("checkInitDataStore: datastore not initialized")
-	}
-}
-
-// StoreServerEntry adds the server entry to the data store.
-// A newly stored (or re-stored) server entry is assigned the next-to-top
-// rank for iteration order (the previous top ranked entry is promoted). The
-// purpose of inserting at next-to-top is to keep the last selected server
-// as the top ranked server.
-// When replaceIfExists is true, an existing server entry record is
-// overwritten; otherwise, the existing record is unchanged.
-// If the server entry data is malformed, an alert notice is issued and
-// the entry is skipped; no error is returned.
-func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
-	checkInitDataStore()
-
-	// Server entries should already be validated before this point,
-	// so instead of skipping we fail with an error.
-	err := ValidateServerEntry(serverEntry)
-	if err != nil {
-		return ContextError(errors.New("invalid server entry"))
-	}
-
-	// BoltDB implementation note:
-	// For simplicity, we don't maintain indexes on server entry
-	// region or supported protocols. Instead, we perform full-bucket
-	// scans with a filter. With a small enough database (thousands or
-	// even tens of thousand of server entries) and common enough
-	// values (e.g., many servers support all protocols), performance
-	// is expected to be acceptable.
-
-	serverEntryExists := false
-	err = singleton.db.Update(func(tx *bolt.Tx) error {
-
-		serverEntries := tx.Bucket([]byte(serverEntriesBucket))
-		serverEntryExists = (serverEntries.Get([]byte(serverEntry.IpAddress)) != nil)
-
-		if serverEntryExists && !replaceIfExists {
-			// Disabling this notice, for now, as it generates too much noise
-			// in diagnostics with clients that always submit embedded servers
-			// to the core on each run.
-			// NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
-			return nil
-		}
-
-		data, err := json.Marshal(serverEntry)
-		if err != nil {
-			return ContextError(err)
-		}
-		err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
-		if err != nil {
-			return ContextError(err)
-		}
-
-		err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
-		if err != nil {
-			return ContextError(err)
-		}
-
-		return nil
-	})
-	if err != nil {
-		return ContextError(err)
-	}
-
-	if !serverEntryExists {
-		NoticeInfo("updated server %s", serverEntry.IpAddress)
-	}
-	return nil
-}
-
-// StoreServerEntries shuffles and stores a list of server entries.
-// Shuffling is performed on imported server entrues as part of client-side
-// load balancing.
-// There is an independent transaction for each entry insert/update.
-func StoreServerEntries(serverEntries []*ServerEntry, replaceIfExists bool) error {
-	checkInitDataStore()
-
-	for index := len(serverEntries) - 1; index > 0; index-- {
-		swapIndex := rand.Intn(index + 1)
-		serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
-	}
-
-	for _, serverEntry := range serverEntries {
-		err := StoreServerEntry(serverEntry, replaceIfExists)
-		if err != nil {
-			return ContextError(err)
-		}
-	}
-
-	// Since there has possibly been a significant change in the server entries,
-	// take this opportunity to update the available egress regions.
-	ReportAvailableRegions()
-
-	return nil
-}
-
-// PromoteServerEntry assigns the top rank (one more than current
-// max rank) to the specified server entry. Server candidates are
-// iterated in decending rank order, so this server entry will be
-// the first candidate in a subsequent tunnel establishment.
-func PromoteServerEntry(ipAddress string) error {
-	checkInitDataStore()
-
-	err := singleton.db.Update(func(tx *bolt.Tx) error {
-		return insertRankedServerEntry(tx, ipAddress, 0)
-	})
-
-	if err != nil {
-		return ContextError(err)
-	}
-	return nil
-}
-
-func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
-	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
-	data := bucket.Get([]byte(rankedServerEntriesKey))
-
-	if data == nil {
-		return []string{}, nil
-	}
-
-	rankedServerEntries := make([]string, 0)
-	err := json.Unmarshal(data, &rankedServerEntries)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	return rankedServerEntries, nil
-}
-
-func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
-	data, err := json.Marshal(rankedServerEntries)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
-	err = bucket.Put([]byte(rankedServerEntriesKey), data)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	return nil
-}
-
-func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
-	rankedServerEntries, err := getRankedServerEntries(tx)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	// BoltDB implementation note:
-	// For simplicity, we store the ranked server ids in an array serialized to
-	// a single key value. To ensure this value doesn't grow without bound,
-	// it's capped at rankedServerEntryCount. For now, this cap should be large
-	// enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
-	// any reasonable configuration of config.TunnelPoolSize.
-
-	if position >= len(rankedServerEntries) {
-		rankedServerEntries = append(rankedServerEntries, serverEntryId)
-	} else {
-		end := len(rankedServerEntries)
-		if end+1 > rankedServerEntryCount {
-			end = rankedServerEntryCount
-		}
-		// insert: https://github.com/golang/go/wiki/SliceTricks
-		rankedServerEntries = append(
-			rankedServerEntries[:position],
-			append([]string{serverEntryId},
-				rankedServerEntries[position:end]...)...)
-	}
-
-	err = setRankedServerEntries(tx, rankedServerEntries)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	return nil
-}
-
-func serverEntrySupportsProtocol(serverEntry *ServerEntry, protocol string) bool {
-	// Note: for meek, the capabilities are FRONTED-MEEK and UNFRONTED-MEEK
-	// and the additonal OSSH service is assumed to be available internally.
-	requiredCapability := strings.TrimSuffix(protocol, "-OSSH")
-	return Contains(serverEntry.Capabilities, requiredCapability)
-}
-
-// ServerEntryIterator is used to iterate over
-// stored server entries in rank order.
-type ServerEntryIterator struct {
-	region                      string
-	protocol                    string
-	shuffleHeadLength           int
-	serverEntryIds              []string
-	serverEntryIndex            int
-	isTargetServerEntryIterator bool
-	hasNextTargetServerEntry    bool
-	targetServerEntry           *ServerEntry
-}
-
-// NewServerEntryIterator creates a new ServerEntryIterator
-func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
-
-	// When configured, this target server entry is the only candidate
-	if config.TargetServerEntry != "" {
-		return newTargetServerEntryIterator(config)
-	}
-
-	checkInitDataStore()
-	iterator = &ServerEntryIterator{
-		region:                      config.EgressRegion,
-		protocol:                    config.TunnelProtocol,
-		shuffleHeadLength:           config.TunnelPoolSize,
-		isTargetServerEntryIterator: false,
-	}
-	err = iterator.Reset()
-	if err != nil {
-		return nil, err
-	}
-	return iterator, nil
-}
-
-// newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
-func newTargetServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
-	serverEntry, err := DecodeServerEntry(config.TargetServerEntry)
-	if err != nil {
-		return nil, err
-	}
-	if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
-		return nil, errors.New("TargetServerEntry does not support EgressRegion")
-	}
-	if config.TunnelProtocol != "" {
-		// Note: same capability/protocol mapping as in StoreServerEntry
-		requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
-		if !Contains(serverEntry.Capabilities, requiredCapability) {
-			return nil, errors.New("TargetServerEntry does not support TunnelProtocol")
-		}
-	}
-	iterator = &ServerEntryIterator{
-		isTargetServerEntryIterator: true,
-		hasNextTargetServerEntry:    true,
-		targetServerEntry:           serverEntry,
-	}
-	NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
-	return iterator, nil
-}
-
-// Reset a NewServerEntryIterator to the start of its cycle. The next
-// call to Next will return the first server entry.
-func (iterator *ServerEntryIterator) Reset() error {
-	iterator.Close()
-
-	if iterator.isTargetServerEntryIterator {
-		iterator.hasNextTargetServerEntry = true
-		return nil
-	}
-
-	count := CountServerEntries(iterator.region, iterator.protocol)
-	NoticeCandidateServers(iterator.region, iterator.protocol, count)
-
-	// This query implements the Psiphon server candidate selection
-	// algorithm: the first TunnelPoolSize server candidates are in rank
-	// (priority) order, to favor previously successful servers; then the
-	// remaining long tail is shuffled to raise up less recent candidates.
-
-	// BoltDB implementation note:
-	// We don't keep a transaction open for the duration of the iterator
-	// because this would expose the following semantics to consumer code:
-	//
-	//     Read-only transactions and read-write transactions ... generally
-	//     shouldn't be opened simultaneously in the same goroutine. This can
-	//     cause a deadlock as the read-write transaction needs to periodically
-	//     re-map the data file but it cannot do so while a read-only
-	//     transaction is open.
-	//     (https://github.com/boltdb/bolt)
-	//
-	// So the uderlying serverEntriesBucket could change after the serverEntryIds
-	// list is built.
-
-	var serverEntryIds []string
-
-	err := singleton.db.View(func(tx *bolt.Tx) error {
-		var err error
-		serverEntryIds, err = getRankedServerEntries(tx)
-		if err != nil {
-			return err
-		}
-
-		skipServerEntryIds := make(map[string]bool)
-		for _, serverEntryId := range serverEntryIds {
-			skipServerEntryIds[serverEntryId] = true
-		}
-
-		bucket := tx.Bucket([]byte(serverEntriesBucket))
-		cursor := bucket.Cursor()
-		for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
-			serverEntryId := string(key)
-			if _, ok := skipServerEntryIds[serverEntryId]; ok {
-				continue
-			}
-			serverEntryIds = append(serverEntryIds, serverEntryId)
-		}
-		return nil
-	})
-	if err != nil {
-		return ContextError(err)
-	}
-
-	for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
-		j := rand.Intn(i)
-		serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
-	}
-
-	iterator.serverEntryIds = serverEntryIds
-	iterator.serverEntryIndex = 0
-
-	return nil
-}
-
-// Close cleans up resources associated with a ServerEntryIterator.
-func (iterator *ServerEntryIterator) Close() {
-	iterator.serverEntryIds = nil
-	iterator.serverEntryIndex = 0
-}
-
-// Next returns the next server entry, by rank, for a ServerEntryIterator.
-// Returns nil with no error when there is no next item.
-func (iterator *ServerEntryIterator) Next() (serverEntry *ServerEntry, err error) {
-	defer func() {
-		if err != nil {
-			iterator.Close()
-		}
-	}()
-
-	if iterator.isTargetServerEntryIterator {
-		if iterator.hasNextTargetServerEntry {
-			iterator.hasNextTargetServerEntry = false
-			return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
-		}
-		return nil, nil
-	}
-
-	// There are no region/protocol indexes for the server entries bucket.
-	// Loop until we have the next server entry that matches the iterator
-	// filter requirements.
-	for {
-		if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
-			// There is no next item
-			return nil, nil
-		}
-
-		serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
-		iterator.serverEntryIndex += 1
-
-		var data []byte
-		err = singleton.db.View(func(tx *bolt.Tx) error {
-			bucket := tx.Bucket([]byte(serverEntriesBucket))
-			data = bucket.Get([]byte(serverEntryId))
-			return nil
-		})
-		if err != nil {
-			return nil, ContextError(err)
-		}
-
-		if data == nil {
-			return nil, ContextError(
-				fmt.Errorf("Unexpected missing server entry: %s", serverEntryId))
-		}
-
-		serverEntry = new(ServerEntry)
-		err = json.Unmarshal(data, serverEntry)
-		if err != nil {
-			return nil, ContextError(err)
-		}
-
-		if (iterator.region == "" || serverEntry.Region == iterator.region) &&
-			(iterator.protocol == "" || serverEntrySupportsProtocol(serverEntry, iterator.protocol)) {
-
-			break
-		}
-	}
-
-	return MakeCompatibleServerEntry(serverEntry), nil
-}
-
-// MakeCompatibleServerEntry provides backwards compatibility with old server entries
-// which have a single meekFrontingDomain and not a meekFrontingAddresses array.
-// By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
-// uses that single value as legacy clients do.
-func MakeCompatibleServerEntry(serverEntry *ServerEntry) *ServerEntry {
-	if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
-		serverEntry.MeekFrontingAddresses =
-			append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
-	}
-
-	return serverEntry
-}
-
-func scanServerEntries(scanner func(*ServerEntry)) error {
-	err := singleton.db.View(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(serverEntriesBucket))
-		cursor := bucket.Cursor()
-
-		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
-			serverEntry := new(ServerEntry)
-			err := json.Unmarshal(value, serverEntry)
-			if err != nil {
-				return err
-			}
-			scanner(serverEntry)
-		}
-
-		return nil
-	})
-
-	if err != nil {
-		return ContextError(err)
-	}
-
-	return nil
-}
-
-// CountServerEntries returns a count of stored servers for the
-// specified region and protocol.
-func CountServerEntries(region, protocol string) int {
-	checkInitDataStore()
-
-	count := 0
-	err := scanServerEntries(func(serverEntry *ServerEntry) {
-		if (region == "" || serverEntry.Region == region) &&
-			(protocol == "" || serverEntrySupportsProtocol(serverEntry, protocol)) {
-			count += 1
-		}
-	})
-
-	if err != nil {
-		NoticeAlert("CountServerEntries failed: %s", err)
-		return 0
-	}
-
-	return count
-}
-
-// ReportAvailableRegions prints a notice with the available egress regions.
-// Note that this report ignores config.TunnelProtocol.
-func ReportAvailableRegions() {
-	checkInitDataStore()
-
-	regions := make(map[string]bool)
-	err := scanServerEntries(func(serverEntry *ServerEntry) {
-		regions[serverEntry.Region] = true
-	})
-
-	if err != nil {
-		NoticeAlert("ReportAvailableRegions failed: %s", err)
-		return
-	}
-
-	regionList := make([]string, 0, len(regions))
-	for region, _ := range regions {
-		// Some server entries do not have a region, but it makes no sense to return
-		// an empty string as an "available region".
-		if region != "" {
-			regionList = append(regionList, region)
-		}
-	}
-
-	NoticeAvailableEgressRegions(regionList)
-}
-
-// GetServerEntryIpAddresses returns an array containing
-// all stored server IP addresses.
-func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
-	checkInitDataStore()
-
-	ipAddresses = make([]string, 0)
-	err = scanServerEntries(func(serverEntry *ServerEntry) {
-		ipAddresses = append(ipAddresses, serverEntry.IpAddress)
-	})
-
-	if err != nil {
-		return nil, ContextError(err)
-	}
-
-	return ipAddresses, nil
-}
-
-// SetSplitTunnelRoutes updates the cached routes data for
-// the given region. The associated etag is also stored and
-// used to make efficient web requests for updates to the data.
-func SetSplitTunnelRoutes(region, etag string, data []byte) error {
-	checkInitDataStore()
-
-	err := singleton.db.Update(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
-		err := bucket.Put([]byte(region), []byte(etag))
-
-		bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
-		err = bucket.Put([]byte(region), data)
-		return err
-	})
-
-	if err != nil {
-		return ContextError(err)
-	}
-	return nil
-}
-
-// GetSplitTunnelRoutesETag retrieves the etag for cached routes
-// data for the specified region. If not found, it returns an empty string value.
-func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
-	checkInitDataStore()
-
-	err = singleton.db.View(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
-		etag = string(bucket.Get([]byte(region)))
-		return nil
-	})
-
-	if err != nil {
-		return "", ContextError(err)
-	}
-	return etag, nil
-}
-
-// GetSplitTunnelRoutesData retrieves the cached routes data
-// for the specified region. If not found, it returns a nil value.
-func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
-	checkInitDataStore()
-
-	err = singleton.db.View(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
-		data = bucket.Get([]byte(region))
-		return nil
-	})
-
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	return data, nil
-}
-
-// SetUrlETag stores an ETag for the specfied URL.
-// Note: input URL is treated as a string, and is not
-// encoded or decoded or otherwise canonicalized.
-func SetUrlETag(url, etag string) error {
-	checkInitDataStore()
-
-	err := singleton.db.Update(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(urlETagsBucket))
-		err := bucket.Put([]byte(url), []byte(etag))
-		return err
-	})
-
-	if err != nil {
-		return ContextError(err)
-	}
-	return nil
-}
-
-// GetUrlETag retrieves a previously stored an ETag for the
-// specfied URL. If not found, it returns an empty string value.
-func GetUrlETag(url string) (etag string, err error) {
-	checkInitDataStore()
-
-	err = singleton.db.View(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(urlETagsBucket))
-		etag = string(bucket.Get([]byte(url)))
-		return nil
-	})
-
-	if err != nil {
-		return "", ContextError(err)
-	}
-	return etag, nil
-}
-
-// SetKeyValue stores a key/value pair.
-func SetKeyValue(key, value string) error {
-	checkInitDataStore()
-
-	err := singleton.db.Update(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(keyValueBucket))
-		err := bucket.Put([]byte(key), []byte(value))
-		return err
-	})
-
-	if err != nil {
-		return ContextError(err)
-	}
-	return nil
-}
-
-// GetKeyValue retrieves the value for a given key. If not found,
-// it returns an empty string value.
-func GetKeyValue(key string) (value string, err error) {
-	checkInitDataStore()
-
-	err = singleton.db.View(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(keyValueBucket))
-		value = string(bucket.Get([]byte(key)))
-		return nil
-	})
-
-	if err != nil {
-		return "", ContextError(err)
-	}
-	return value, nil
-}

+ 29 - 0
psiphon/migrateDataStore.go

@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+// Stub function to return an empty list for non-Windows builds
+func prepareMigrationEntries(config *Config) []*ServerEntry {
+	return nil
+}
+
+// Stub function to return immediately for non-Windows builds
+func migrateEntries(serverEntries []*ServerEntry, legacyDataStoreFilename string) {
+}

+ 292 - 0
psiphon/migrateDataStore_windows.go

@@ -0,0 +1,292 @@
+// TODO: Windows only build flag + runtime.GOOS check in datastore.go?
+
+/*
+ * Copyright (c) 2015, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"database/sql"
+	"encoding/json"
+	"fmt"
+	"os"
+	"path/filepath"
+
+	_ "github.com/Psiphon-Inc/go-sqlite3"
+)
+
+var legacyDb *sql.DB
+
+func prepareMigrationEntries(config *Config) []*ServerEntry {
+	var migratableServerEntries []*ServerEntry
+
+	// If DATA_STORE_FILENAME does not exist on disk
+	if _, err := os.Stat(filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)); os.IsNotExist(err) {
+		// If LEGACY_DATA_STORE_FILENAME exists on disk
+		if _, err := os.Stat(filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME)); err == nil {
+
+			legacyDb, err = sql.Open("sqlite3", fmt.Sprintf("file:%s?cache=private&mode=rwc", filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME)))
+			defer legacyDb.Close()
+
+			if err != nil {
+				NoticeAlert("prepareMigrationEntries: sql.Open failed: %s", err)
+				return nil
+			}
+
+			initialization := "pragma journal_mode=WAL;\n"
+			_, err = legacyDb.Exec(initialization)
+			if err != nil {
+				NoticeAlert("prepareMigrationEntries: sql.DB.Exec failed: %s", err)
+				return nil
+			}
+
+			iterator, err := newlegacyServerEntryIterator(config)
+			if err != nil {
+				NoticeAlert("prepareMigrationEntries: newlegacyServerEntryIterator failed: %s", err)
+				return nil
+			}
+			defer iterator.Close()
+
+			for {
+				serverEntry, err := iterator.Next()
+				if err != nil {
+					NoticeAlert("prepareMigrationEntries: legacyServerEntryIterator.Next failed: %s", err)
+					break
+				}
+				if serverEntry == nil {
+					break
+				}
+
+				migratableServerEntries = append(migratableServerEntries, serverEntry)
+			}
+			NoticeInfo("%d server entries prepared for data store migration", len(migratableServerEntries))
+		}
+	}
+
+	return migratableServerEntries
+}
+
+// migrateEntries calls the BoltDB data store method to shuffle
+// and store an array of server entries (StoreServerEntries)
+// Failing to migrate entries, or delete the legacy file is never fatal
+func migrateEntries(serverEntries []*ServerEntry, legacyDataStoreFilename string) {
+	checkInitDataStore()
+
+	err := StoreServerEntries(serverEntries, false)
+	if err != nil {
+		NoticeAlert("migrateEntries: StoreServerEntries failed: %s", err)
+	} else {
+		// Retain server affinity from old datastore by taking the first
+		// array element (previous top ranked server) and promoting it
+		// to the top rank before the server selection process begins
+		err = PromoteServerEntry(serverEntries[0].IpAddress)
+		if err != nil {
+			NoticeAlert("migrateEntries: PromoteServerEntry failed: %s", err)
+		}
+
+		NoticeAlert("%d server entries successfully migrated to new data store", len(serverEntries))
+	}
+
+	err = os.Remove(legacyDataStoreFilename)
+	if err != nil {
+		NoticeAlert("migrateEntries: failed to delete legacy data store file '%s': %s", legacyDataStoreFilename, err)
+	}
+
+	return
+}
+
+// This code is copied from the dataStore.go code used to operate the legacy
+// SQLite datastore. The word "legacy" was added to all of the method names to avoid
+// namespace conflicts with the methods used to operate the BoltDB datastore
+
+// legacyServerEntryIterator is used to iterate over
+// stored server entries in rank order.
+type legacyServerEntryIterator struct {
+	region            string
+	protocol          string
+	shuffleHeadLength int
+	transaction       *sql.Tx
+	cursor            *sql.Rows
+}
+
+// newLegacyServerEntryIterator creates a new legacyServerEntryIterator
+func newlegacyServerEntryIterator(config *Config) (iterator *legacyServerEntryIterator, err error) {
+
+	iterator = &legacyServerEntryIterator{
+		region:            config.EgressRegion,
+		protocol:          config.TunnelProtocol,
+		shuffleHeadLength: config.TunnelPoolSize,
+	}
+	err = iterator.Reset()
+	if err != nil {
+		return nil, err
+	}
+	return iterator, nil
+}
+
+// Close cleans up resources associated with a legacyServerEntryIterator.
+func (iterator *legacyServerEntryIterator) Close() {
+	if iterator.cursor != nil {
+		iterator.cursor.Close()
+	}
+	iterator.cursor = nil
+	if iterator.transaction != nil {
+		iterator.transaction.Rollback()
+	}
+	iterator.transaction = nil
+}
+
+// Next returns the next server entry, by rank, for a legacyServerEntryIterator.
+// Returns nil with no error when there is no next item.
+func (iterator *legacyServerEntryIterator) Next() (serverEntry *ServerEntry, err error) {
+	defer func() {
+		if err != nil {
+			iterator.Close()
+		}
+	}()
+
+	if !iterator.cursor.Next() {
+		err = iterator.cursor.Err()
+		if err != nil {
+			return nil, ContextError(err)
+		}
+		// There is no next item
+		return nil, nil
+	}
+
+	var data []byte
+	err = iterator.cursor.Scan(&data)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	serverEntry = new(ServerEntry)
+	err = json.Unmarshal(data, serverEntry)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	return MakeCompatibleServerEntry(serverEntry), nil
+}
+
+// Reset a NewlegacyServerEntryIterator to the start of its cycle. The next
+// call to Next will return the first server entry.
+func (iterator *legacyServerEntryIterator) Reset() error {
+	iterator.Close()
+
+	count := countLegacyServerEntries(iterator.region, iterator.protocol)
+	NoticeCandidateServers(iterator.region, iterator.protocol, count)
+
+	transaction, err := legacyDb.Begin()
+	if err != nil {
+		return ContextError(err)
+	}
+	var cursor *sql.Rows
+
+	// This query implements the Psiphon server candidate selection
+	// algorithm: the first TunnelPoolSize server candidates are in rank
+	// (priority) order, to favor previously successful servers; then the
+	// remaining long tail is shuffled to raise up less recent candidates.
+
+	whereClause, whereParams := makeServerEntryWhereClause(
+		iterator.region, iterator.protocol, nil)
+	headLength := iterator.shuffleHeadLength
+	queryFormat := `
+		select data from serverEntry %s
+		order by case
+		when rank > coalesce((select rank from serverEntry %s order by rank desc limit ?, 1), -1) then rank
+		else abs(random())%%((select rank from serverEntry %s order by rank desc limit ?, 1))
+		end desc;`
+	query := fmt.Sprintf(queryFormat, whereClause, whereClause, whereClause)
+	params := make([]interface{}, 0)
+	params = append(params, whereParams...)
+	params = append(params, whereParams...)
+	params = append(params, headLength)
+	params = append(params, whereParams...)
+	params = append(params, headLength)
+
+	cursor, err = transaction.Query(query, params...)
+	if err != nil {
+		transaction.Rollback()
+		return ContextError(err)
+	}
+	iterator.transaction = transaction
+	iterator.cursor = cursor
+	return nil
+}
+
+func makeServerEntryWhereClause(
+	region, protocol string, excludeIds []string) (whereClause string, whereParams []interface{}) {
+	whereClause = ""
+	whereParams = make([]interface{}, 0)
+	if region != "" {
+		whereClause += " where region = ?"
+		whereParams = append(whereParams, region)
+	}
+	if protocol != "" {
+		if len(whereClause) > 0 {
+			whereClause += " and"
+		} else {
+			whereClause += " where"
+		}
+		whereClause +=
+			" exists (select 1 from serverEntryProtocol where protocol = ? and serverEntryId = serverEntry.id)"
+		whereParams = append(whereParams, protocol)
+	}
+	if len(excludeIds) > 0 {
+		if len(whereClause) > 0 {
+			whereClause += " and"
+		} else {
+			whereClause += " where"
+		}
+		whereClause += " id in ("
+		for index, id := range excludeIds {
+			if index > 0 {
+				whereClause += ", "
+			}
+			whereClause += "?"
+			whereParams = append(whereParams, id)
+		}
+		whereClause += ")"
+	}
+	return whereClause, whereParams
+}
+
+// countLegacyServerEntries returns a count of stored servers for the specified region and protocol.
+func countLegacyServerEntries(region, protocol string) int {
+	var count int
+	whereClause, whereParams := makeServerEntryWhereClause(region, protocol, nil)
+	query := "select count(*) from serverEntry" + whereClause
+	err := legacyDb.QueryRow(query, whereParams...).Scan(&count)
+
+	if err != nil {
+		NoticeAlert("countLegacyServerEntries failed: %s", err)
+		return 0
+	}
+
+	if region == "" {
+		region = "(any)"
+	}
+	if protocol == "" {
+		protocol = "(any)"
+	}
+	NoticeInfo("servers for region %s and protocol %s: %d",
+		region, protocol, count)
+
+	return count
+}

+ 61 - 0
psiphon/net.go

@@ -20,9 +20,12 @@
 package psiphon
 
 import (
+	"crypto/x509"
 	"fmt"
 	"io"
 	"net"
+	"net/http"
+	"net/url"
 	"reflect"
 	"sync"
 	"time"
@@ -241,3 +244,61 @@ func ResolveIP(host string, conn net.Conn) (addrs []net.IP, ttls []time.Duration
 	}
 	return addrs, ttls, nil
 }
+
+// MakeUntunneledHttpsClient returns a net/http.Client which is
+// configured to use custom dialing features -- including BindToDevice,
+// UseIndistinguishableTLS, etc. -- for a specific HTTPS request URL.
+// If verifyLegacyCertificate is not nil, it's used for certificate
+// verification.
+// Because UseIndistinguishableTLS requires a hack to work with
+// net/http, MakeUntunneledHttpClient may return a modified request URL
+// to be used. Callers should always use this return value to make
+// requests, not the input value.
+func MakeUntunneledHttpsClient(
+	dialConfig *DialConfig,
+	verifyLegacyCertificate *x509.Certificate,
+	requestUrl string,
+	requestTimeout time.Duration) (*http.Client, string, error) {
+
+	dialer := NewCustomTLSDialer(
+		// Note: when verifyLegacyCertificate is not nil, some
+		// of the other CustomTLSConfig is overridden.
+		&CustomTLSConfig{
+			Dial: NewTCPDialer(dialConfig),
+			VerifyLegacyCertificate:       verifyLegacyCertificate,
+			SendServerName:                true,
+			SkipVerify:                    false,
+			UseIndistinguishableTLS:       dialConfig.UseIndistinguishableTLS,
+			TrustedCACertificatesFilename: dialConfig.TrustedCACertificatesFilename,
+		})
+
+	urlComponents, err := url.Parse(requestUrl)
+	if err != nil {
+		return nil, "", ContextError(err)
+	}
+
+	// Change the scheme to "http"; otherwise http.Transport will try to do
+	// another TLS handshake inside the explicit TLS session. Also need to
+	// force an explicit port, as the default for "http", 80, won't talk TLS.
+	urlComponents.Scheme = "http"
+	host, port, err := net.SplitHostPort(urlComponents.Host)
+	if err != nil {
+		// Assume there's no port
+		host = urlComponents.Host
+		port = ""
+	}
+	if port == "" {
+		port = "443"
+	}
+	urlComponents.Host = net.JoinHostPort(host, port)
+
+	transport := &http.Transport{
+		Dial: dialer,
+	}
+	httpClient := &http.Client{
+		Timeout:   requestTimeout,
+		Transport: transport,
+	}
+
+	return httpClient, urlComponents.String(), nil
+}

+ 2 - 2
psiphon/notice.go

@@ -91,12 +91,12 @@ func NoticeInfo(format string, args ...interface{}) {
 	outputNotice("Info", false, "message", fmt.Sprintf(format, args...))
 }
 
-// NoticeInfo is an alert message; typically a recoverable error condition
+// NoticeAlert is an alert message; typically a recoverable error condition
 func NoticeAlert(format string, args ...interface{}) {
 	outputNotice("Alert", false, "message", fmt.Sprintf(format, args...))
 }
 
-// NoticeInfo is an error message; typically an unrecoverable error condition
+// NoticeError is an error message; typically an unrecoverable error condition
 func NoticeError(format string, args ...interface{}) {
 	outputNotice("Error", true, "message", fmt.Sprintf(format, args...))
 }

+ 3 - 38
psiphon/remoteServerList.go

@@ -23,9 +23,7 @@ import (
 	"errors"
 	"fmt"
 	"io/ioutil"
-	"net"
 	"net/http"
-	"net/url"
 )
 
 // FetchRemoteServerList downloads a remote server list JSON record from
@@ -42,46 +40,13 @@ func FetchRemoteServerList(config *Config, dialConfig *DialConfig) (err error) {
 		return ContextError(errors.New("remote server list signature public key blank"))
 	}
 
-	dialer := NewTCPDialer(dialConfig)
-
-	// When the URL is HTTPS, use the custom TLS dialer with the
-	// UseIndistinguishableTLS option.
-	// TODO: refactor into helper function
-	requestUrl, err := url.Parse(config.RemoteServerListUrl)
+	httpClient, requestUrl, err := MakeUntunneledHttpsClient(
+		dialConfig, nil, config.RemoteServerListUrl, FETCH_REMOTE_SERVER_LIST_TIMEOUT)
 	if err != nil {
 		return ContextError(err)
 	}
-	if requestUrl.Scheme == "https" {
-		dialer = NewCustomTLSDialer(
-			&CustomTLSConfig{
-				Dial:                          dialer,
-				SendServerName:                true,
-				SkipVerify:                    false,
-				UseIndistinguishableTLS:       config.UseIndistinguishableTLS,
-				TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
-			})
-
-		// Change the scheme to "http"; otherwise http.Transport will try to do
-		// another TLS handshake inside the explicit TLS session. Also need to
-		// force the port to 443,as the default for "http", 80, won't talk TLS.
-		requestUrl.Scheme = "http"
-		host, _, err := net.SplitHostPort(requestUrl.Host)
-		if err != nil {
-			// Assume there's no port
-			host = requestUrl.Host
-		}
-		requestUrl.Host = net.JoinHostPort(host, "443")
-	}
-
-	transport := &http.Transport{
-		Dial: dialer,
-	}
-	httpClient := http.Client{
-		Timeout:   FETCH_REMOTE_SERVER_LIST_TIMEOUT,
-		Transport: transport,
-	}
 
-	request, err := http.NewRequest("GET", requestUrl.String(), nil)
+	request, err := http.NewRequest("GET", requestUrl, nil)
 	if err != nil {
 		return ContextError(err)
 	}

+ 396 - 128
psiphon/serverApi.go

@@ -31,29 +31,39 @@ import (
 	"net"
 	"net/http"
 	"strconv"
+	"sync/atomic"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
 )
 
-// Session is a utility struct which holds all of the data associated
-// with a Psiphon session. In addition to the established tunnel, this
-// includes the session ID (used for Psiphon API requests) and a http
+// ServerContext is a utility struct which holds all of the data associated
+// with a Psiphon server connection. In addition to the established tunnel, this
+// includes data associated with Psiphon API requests and a persistent http
 // client configured to make tunneled Psiphon API requests.
-type Session struct {
-	sessionId            string
-	baseRequestUrl       string
-	psiphonHttpsClient   *http.Client
-	statsRegexps         *transferstats.Regexps
-	clientRegion         string
-	clientUpgradeVersion string
+type ServerContext struct {
+	sessionId                string
+	tunnelNumber             int64
+	baseRequestUrl           string
+	psiphonHttpsClient       *http.Client
+	statsRegexps             *transferstats.Regexps
+	clientRegion             string
+	clientUpgradeVersion     string
+	serverHandshakeTimestamp string
 }
 
-// MakeSessionId creates a new session ID. Making the session ID is not done
-// in NewSession because:
-// (1) the transport needs to send the ID in the SSH credentials before the tunnel
-//     is established and NewSession performs a handshake on an established tunnel.
-// (2) the same session ID is used across multi-tunnel controller runs, where each
-//     tunnel has its own Session instance.
+// nextTunnelNumber is a monotonically increasing number assigned to each
+// successive tunnel connection. The sessionId and tunnelNumber together
+// form a globally unique identifier for tunnels, which is used for
+// stats. Note that the number is increasing but not necessarily
+// consecutive for each active tunnel in session.
+var nextTunnelNumber int64
+
+// MakeSessionId creates a new session ID. The same session ID is used across
+// multi-tunnel controller runs, where each tunnel has its own ServerContext
+// instance.
+// In server-side stats, we now consider a "session" to be the lifetime of the
+// Controller (e.g., the user's commanded start and stop) and we measure this
+// duration as well as the duration of each tunnel within the session.
 func MakeSessionId() (sessionId string, err error) {
 	randomId, err := MakeSecureRandomBytes(PSIPHON_API_CLIENT_SESSION_ID_LENGTH)
 	if err != nil {
@@ -62,108 +72,35 @@ func MakeSessionId() (sessionId string, err error) {
 	return hex.EncodeToString(randomId), nil
 }
 
-// NewSession makes the tunnelled handshake request to the
-// Psiphon server and returns a Session struct, initialized with the
-// session ID, for use with subsequent Psiphon server API requests (e.g.,
-// periodic connected and status requests).
-func NewSession(config *Config, tunnel *Tunnel, sessionId string) (session *Session, err error) {
+// NewServerContext makes the tunnelled handshake request to the Psiphon server
+// and returns a ServerContext struct for use with subsequent Psiphon server API
+// requests (e.g., periodic connected and status requests).
+func NewServerContext(tunnel *Tunnel, sessionId string) (*ServerContext, error) {
 
 	psiphonHttpsClient, err := makePsiphonHttpsClient(tunnel)
 	if err != nil {
 		return nil, ContextError(err)
 	}
-	session = &Session{
+
+	serverContext := &ServerContext{
 		sessionId:          sessionId,
-		baseRequestUrl:     makeBaseRequestUrl(config, tunnel, sessionId),
+		tunnelNumber:       atomic.AddInt64(&nextTunnelNumber, 1),
+		baseRequestUrl:     makeBaseRequestUrl(tunnel, "", sessionId),
 		psiphonHttpsClient: psiphonHttpsClient,
 	}
 
-	err = session.doHandshakeRequest()
+	err = serverContext.doHandshakeRequest()
 	if err != nil {
 		return nil, ContextError(err)
 	}
 
-	return session, nil
-}
-
-// DoConnectedRequest performs the connected API request. This request is
-// used for statistics. The server returns a last_connected token for
-// the client to store and send next time it connects. This token is
-// a timestamp (using the server clock, and should be rounded to the
-// nearest hour) which is used to determine when a connection represents
-// a unique user for a time period.
-func (session *Session) DoConnectedRequest() error {
-	const DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
-	lastConnected, err := GetKeyValue(DATA_STORE_LAST_CONNECTED_KEY)
-	if err != nil {
-		return ContextError(err)
-	}
-	if lastConnected == "" {
-		lastConnected = "None"
-	}
-	url := session.buildRequestUrl(
-		"connected",
-		&ExtraParam{"session_id", session.sessionId},
-		&ExtraParam{"last_connected", lastConnected})
-	responseBody, err := session.doGetRequest(url)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	var response struct {
-		ConnectedTimestamp string `json:"connected_timestamp"`
-	}
-	err = json.Unmarshal(responseBody, &response)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	err = SetKeyValue(DATA_STORE_LAST_CONNECTED_KEY, response.ConnectedTimestamp)
-	if err != nil {
-		return ContextError(err)
-	}
-	return nil
-}
-
-// StatsRegexps gets the Regexps used for the statistics for this tunnel.
-func (session *Session) StatsRegexps() *transferstats.Regexps {
-	return session.statsRegexps
-}
-
-// DoStatusRequest makes a /status request to the server, sending session stats.
-func (session *Session) DoStatusRequest(statsPayload json.Marshaler) error {
-	statsPayloadJSON, err := json.Marshal(statsPayload)
-	if err != nil {
-		return ContextError(err)
-	}
-
-	// Add a random amount of padding to help prevent stats updates from being
-	// a predictable size (which often happens when the connection is quiet).
-	padding := MakeSecureRandomPadding(0, PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES)
-
-	// "connected" is a legacy parameter. This client does not report when
-	// it has disconnected.
-
-	url := session.buildRequestUrl(
-		"status",
-		&ExtraParam{"session_id", session.sessionId},
-		&ExtraParam{"connected", "1"},
-		// TODO: base64 encoding of padding means the padding
-		// size is not exactly [0, PADDING_MAX_BYTES]
-		&ExtraParam{"padding", base64.StdEncoding.EncodeToString(padding)})
-
-	err = session.doPostRequest(url, "application/json", bytes.NewReader(statsPayloadJSON))
-	if err != nil {
-		return ContextError(err)
-	}
-
-	return nil
+	return serverContext, nil
 }
 
 // doHandshakeRequest performs the handshake API request. The handshake
 // returns upgrade info, newly discovered server entries -- which are
 // stored -- and sponsor info (home pages, stat regexes).
-func (session *Session) doHandshakeRequest() error {
+func (serverContext *ServerContext) doHandshakeRequest() error {
 	extraParams := make([]*ExtraParam, 0)
 	serverEntryIpAddresses, err := GetServerEntryIpAddresses()
 	if err != nil {
@@ -174,8 +111,8 @@ func (session *Session) doHandshakeRequest() error {
 	for _, ipAddress := range serverEntryIpAddresses {
 		extraParams = append(extraParams, &ExtraParam{"known_server", ipAddress})
 	}
-	url := session.buildRequestUrl("handshake", extraParams...)
-	responseBody, err := session.doGetRequest(url)
+	url := buildRequestUrl(serverContext.baseRequestUrl, "handshake", extraParams...)
+	responseBody, err := serverContext.doGetRequest(url)
 	if err != nil {
 		return ContextError(err)
 	}
@@ -202,14 +139,15 @@ func (session *Session) doHandshakeRequest() error {
 		HttpsRequestRegexes  []map[string]string `json:"https_request_regexes"`
 		EncodedServerList    []string            `json:"encoded_server_list"`
 		ClientRegion         string              `json:"client_region"`
+		ServerTimestamp      string              `json:"server_timestamp"`
 	}
 	err = json.Unmarshal(configLine, &handshakeConfig)
 	if err != nil {
 		return ContextError(err)
 	}
 
-	session.clientRegion = handshakeConfig.ClientRegion
-	NoticeClientRegion(session.clientRegion)
+	serverContext.clientRegion = handshakeConfig.ClientRegion
+	NoticeClientRegion(serverContext.clientRegion)
 
 	var decodedServerEntries []*ServerEntry
 
@@ -242,13 +180,13 @@ func (session *Session) doHandshakeRequest() error {
 		NoticeHomepage(homepage)
 	}
 
-	session.clientUpgradeVersion = handshakeConfig.UpgradeClientVersion
+	serverContext.clientUpgradeVersion = handshakeConfig.UpgradeClientVersion
 	if handshakeConfig.UpgradeClientVersion != "" {
 		NoticeClientUpgradeAvailable(handshakeConfig.UpgradeClientVersion)
 	}
 
 	var regexpsNotices []string
-	session.statsRegexps, regexpsNotices = transferstats.MakeRegexps(
+	serverContext.statsRegexps, regexpsNotices = transferstats.MakeRegexps(
 		handshakeConfig.PageViewRegexes,
 		handshakeConfig.HttpsRequestRegexes)
 
@@ -256,15 +194,344 @@ func (session *Session) doHandshakeRequest() error {
 		NoticeAlert(notice)
 	}
 
+	serverContext.serverHandshakeTimestamp = handshakeConfig.ServerTimestamp
+
+	return nil
+}
+
+// DoConnectedRequest performs the connected API request. This request is
+// used for statistics. The server returns a last_connected token for
+// the client to store and send next time it connects. This token is
+// a timestamp (using the server clock, and should be rounded to the
+// nearest hour) which is used to determine when a connection represents
+// a unique user for a time period.
+func (serverContext *ServerContext) DoConnectedRequest() error {
+	const DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
+	lastConnected, err := GetKeyValue(DATA_STORE_LAST_CONNECTED_KEY)
+	if err != nil {
+		return ContextError(err)
+	}
+	if lastConnected == "" {
+		lastConnected = "None"
+	}
+	url := buildRequestUrl(
+		serverContext.baseRequestUrl,
+		"connected",
+		&ExtraParam{"session_id", serverContext.sessionId},
+		&ExtraParam{"last_connected", lastConnected})
+	responseBody, err := serverContext.doGetRequest(url)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	var response struct {
+		ConnectedTimestamp string `json:"connected_timestamp"`
+	}
+	err = json.Unmarshal(responseBody, &response)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	err = SetKeyValue(DATA_STORE_LAST_CONNECTED_KEY, response.ConnectedTimestamp)
+	if err != nil {
+		return ContextError(err)
+	}
 	return nil
 }
 
+// StatsRegexps gets the Regexps used for the statistics for this tunnel.
+func (serverContext *ServerContext) StatsRegexps() *transferstats.Regexps {
+	return serverContext.statsRegexps
+}
+
+// DoStatusRequest makes a /status request to the server, sending session stats.
+func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error {
+
+	url := makeStatusRequestUrl(serverContext.sessionId, serverContext.baseRequestUrl, true)
+
+	payload, payloadInfo, err := makeStatusRequestPayload(tunnel.serverEntry.IpAddress)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	err = serverContext.doPostRequest(url, "application/json", bytes.NewReader(payload))
+	if err != nil {
+
+		// Resend the transfer stats and tunnel stats later
+		// Note: potential duplicate reports if the server received and processed
+		// the request but the client failed to receive the response.
+		putBackStatusRequestPayload(payloadInfo)
+
+		return ContextError(err)
+	}
+	confirmStatusRequestPayload(payloadInfo)
+
+	return nil
+}
+
+func makeStatusRequestUrl(sessionId, baseRequestUrl string, isTunneled bool) string {
+
+	// Add a random amount of padding to help prevent stats updates from being
+	// a predictable size (which often happens when the connection is quiet).
+	padding := MakeSecureRandomPadding(0, PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES)
+
+	// Legacy clients set "connected" to "0" when disconnecting, and this value
+	// is used to calculate session duration estimates. This is now superseded
+	// by explicit tunnel stats duration reporting.
+	// The legacy method of reconstructing session durations is not compatible
+	// with this client's connected request retries and asynchronous final
+	// status request attempts. So we simply set this "connected" flag to reflect
+	// whether the request is sent tunneled or not.
+
+	connected := "1"
+	if !isTunneled {
+		connected = "0"
+	}
+
+	return buildRequestUrl(
+		baseRequestUrl,
+		"status",
+		&ExtraParam{"session_id", sessionId},
+		&ExtraParam{"connected", connected},
+		// TODO: base64 encoding of padding means the padding
+		// size is not exactly [0, PADDING_MAX_BYTES]
+		&ExtraParam{"padding", base64.StdEncoding.EncodeToString(padding)})
+}
+
+// statusRequestPayloadInfo is a temporary structure for data used to
+// either "clear" or "put back" status request payload data depending
+// on whether or not the request succeeded.
+type statusRequestPayloadInfo struct {
+	serverId      string
+	transferStats *transferstats.ServerStats
+	tunnelStats   [][]byte
+}
+
+func makeStatusRequestPayload(
+	serverId string) ([]byte, *statusRequestPayloadInfo, error) {
+
+	transferStats := transferstats.GetForServer(serverId)
+	tunnelStats, err := TakeOutUnreportedTunnelStats(
+		PSIPHON_API_TUNNEL_STATS_MAX_COUNT)
+	if err != nil {
+		NoticeAlert(
+			"TakeOutUnreportedTunnelStats failed: %s", ContextError(err))
+		tunnelStats = nil
+		// Proceed with transferStats only
+	}
+	payloadInfo := &statusRequestPayloadInfo{
+		serverId, transferStats, tunnelStats}
+
+	payload := make(map[string]interface{})
+
+	hostBytes, bytesTransferred := transferStats.GetStatsForReporting()
+	payload["host_bytes"] = hostBytes
+	payload["bytes_transferred"] = bytesTransferred
+
+	// We're not recording these fields, but the server requires them.
+	payload["page_views"] = make([]string, 0)
+	payload["https_requests"] = make([]string, 0)
+
+	// Tunnel stats records are already in JSON format
+	jsonTunnelStats := make([]json.RawMessage, len(tunnelStats))
+	for i, tunnelStatsRecord := range tunnelStats {
+		jsonTunnelStats[i] = json.RawMessage(tunnelStatsRecord)
+	}
+	payload["tunnel_stats"] = jsonTunnelStats
+
+	jsonPayload, err := json.Marshal(payload)
+	if err != nil {
+
+		// Send the transfer stats and tunnel stats later
+		putBackStatusRequestPayload(payloadInfo)
+
+		return nil, nil, ContextError(err)
+	}
+
+	return jsonPayload, payloadInfo, nil
+}
+
+func putBackStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
+	transferstats.PutBack(payloadInfo.serverId, payloadInfo.transferStats)
+	err := PutBackUnreportedTunnelStats(payloadInfo.tunnelStats)
+	if err != nil {
+		// These tunnel stats records won't be resent under after a
+		// datastore re-initialization.
+		NoticeAlert(
+			"PutBackUnreportedTunnelStats failed: %s", ContextError(err))
+	}
+}
+
+func confirmStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
+	err := ClearReportedTunnelStats(payloadInfo.tunnelStats)
+	if err != nil {
+		// These tunnel stats records may be resent.
+		NoticeAlert(
+			"ClearReportedTunnelStats failed: %s", ContextError(err))
+	}
+}
+
+// TryUntunneledStatusRequest makes direct connections to the specified
+// server (if supported) in an attempt to send useful bytes transferred
+// and tunnel duration stats after a tunnel has alreay failed.
+// The tunnel is assumed to be closed, but its config, protocol, and
+// context values must still be valid.
+// TryUntunneledStatusRequest emits notices detailing failed attempts.
+func TryUntunneledStatusRequest(tunnel *Tunnel, isShutdown bool) error {
+
+	for _, port := range tunnel.serverEntry.GetDirectWebRequestPorts() {
+		err := doUntunneledStatusRequest(tunnel, port, isShutdown)
+		if err == nil {
+			return nil
+		}
+		NoticeAlert("doUntunneledStatusRequest failed for %s:%s: %s",
+			tunnel.serverEntry.IpAddress, port, err)
+	}
+
+	return errors.New("all attempts failed")
+}
+
+// doUntunneledStatusRequest attempts an untunneled stratus request.
+func doUntunneledStatusRequest(
+	tunnel *Tunnel, port string, isShutdown bool) error {
+
+	url := makeStatusRequestUrl(
+		tunnel.serverContext.sessionId,
+		makeBaseRequestUrl(tunnel, port, tunnel.serverContext.sessionId),
+		false)
+
+	certificate, err := DecodeCertificate(tunnel.serverEntry.WebServerCertificate)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	timeout := PSIPHON_API_SERVER_TIMEOUT
+	if isShutdown {
+		timeout = PSIPHON_API_SHUTDOWN_SERVER_TIMEOUT
+	}
+
+	httpClient, requestUrl, err := MakeUntunneledHttpsClient(
+		tunnel.untunneledDialConfig,
+		certificate,
+		url,
+		timeout)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	payload, payloadInfo, err := makeStatusRequestPayload(tunnel.serverEntry.IpAddress)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	bodyType := "application/json"
+	body := bytes.NewReader(payload)
+
+	response, err := httpClient.Post(requestUrl, bodyType, body)
+	if err == nil && response.StatusCode != http.StatusOK {
+		response.Body.Close()
+		err = fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode)
+	}
+	if err != nil {
+
+		// Resend the transfer stats and tunnel stats later
+		// Note: potential duplicate reports if the server received and processed
+		// the request but the client failed to receive the response.
+		putBackStatusRequestPayload(payloadInfo)
+
+		// Trim this error since it may include long URLs
+		return ContextError(TrimError(err))
+	}
+	confirmStatusRequestPayload(payloadInfo)
+	response.Body.Close()
+
+	return nil
+}
+
+// RecordTunnelStats records a tunnel duration and bytes
+// sent and received for subsequent reporting and quality
+// analysis.
+//
+// Tunnel durations are precisely measured client-side
+// and reported in status requests. As the duration is
+// not determined until the tunnel is closed, tunnel
+// stats records are stored in the persistent datastore
+// and reported via subsequent status requests sent to any
+// Psiphon server.
+//
+// Since the status request that reports a tunnel stats
+// record is not necessarily handled by the same server, the
+// tunnel stats records include the original server ID.
+//
+// Other fields that may change between tunnel stats recording
+// and reporting include client geo data, propagation channel,
+// sponsor ID, client version. These are not stored in the
+// datastore (client region, in particular, since that would
+// create an on-disk record of user location).
+// TODO: the server could encrypt, with a nonce and key unknown to
+// the client, a blob containing this data; return it in the
+// handshake response; and the client could store and later report
+// this blob with its tunnel stats records.
+//
+// Multiple "status" requests may be in flight at once (due
+// to multi-tunnel, asynchronous final status retry, and
+// aggressive status requests for pre-registered tunnels),
+// To avoid duplicate reporting, tunnel stats records are
+// "taken-out" by a status request and then "put back" in
+// case the request fails.
+//
+// Note: since tunnel stats records have a globally unique
+// identifier (sessionId + tunnelNumber), we could tolerate
+// duplicate reporting and filter our duplicates on the
+// server-side. Permitting duplicate reporting could increase
+// the velocity of reporting (for example, both the asynchronous
+// untunneled final status requests and the post-connected
+// immediate startus requests could try to report the same tunnel
+// stats).
+// Duplicate reporting may also occur when a server receives and
+// processes a status request but the client fails to receive
+// the response.
+func RecordTunnelStats(
+	sessionId string,
+	tunnelNumber int64,
+	tunnelServerIpAddress string,
+	serverHandshakeTimestamp, duration string,
+	totalBytesSent, totalBytesReceived int64) error {
+
+	tunnelStats := struct {
+		SessionId                string `json:"session_id"`
+		TunnelNumber             int64  `json:"tunnel_number"`
+		TunnelServerIpAddress    string `json:"tunnel_server_ip_address"`
+		ServerHandshakeTimestamp string `json:"server_handshake_timestamp"`
+		Duration                 string `json:"duration"`
+		TotalBytesSent           int64  `json:"total_bytes_sent"`
+		TotalBytesReceived       int64  `json:"total_bytes_received"`
+	}{
+		sessionId,
+		tunnelNumber,
+		tunnelServerIpAddress,
+		serverHandshakeTimestamp,
+		duration,
+		totalBytesSent,
+		totalBytesReceived,
+	}
+
+	tunnelStatsJson, err := json.Marshal(tunnelStats)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return StoreTunnelStats(tunnelStatsJson)
+}
+
 // doGetRequest makes a tunneled HTTPS request and returns the response body.
-func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, err error) {
-	response, err := session.psiphonHttpsClient.Get(requestUrl)
+func (serverContext *ServerContext) doGetRequest(
+	requestUrl string) (responseBody []byte, err error) {
+
+	response, err := serverContext.psiphonHttpsClient.Get(requestUrl)
 	if err == nil && response.StatusCode != http.StatusOK {
 		response.Body.Close()
-		err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
+		err = fmt.Errorf("HTTP GET request failed with response code: %d", response.StatusCode)
 	}
 	if err != nil {
 		// Trim this error since it may include long URLs
@@ -275,41 +542,42 @@ func (session *Session) doGetRequest(requestUrl string) (responseBody []byte, er
 	if err != nil {
 		return nil, ContextError(err)
 	}
-	if response.StatusCode != http.StatusOK {
-		return nil, ContextError(fmt.Errorf("HTTP GET request failed with response code: %d", response.StatusCode))
-	}
 	return body, nil
 }
 
 // doPostRequest makes a tunneled HTTPS POST request.
-func (session *Session) doPostRequest(requestUrl string, bodyType string, body io.Reader) (err error) {
-	response, err := session.psiphonHttpsClient.Post(requestUrl, bodyType, body)
+func (serverContext *ServerContext) doPostRequest(
+	requestUrl string, bodyType string, body io.Reader) (err error) {
+
+	response, err := serverContext.psiphonHttpsClient.Post(requestUrl, bodyType, body)
 	if err == nil && response.StatusCode != http.StatusOK {
 		response.Body.Close()
-		err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
+		err = fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode)
 	}
 	if err != nil {
 		// Trim this error since it may include long URLs
 		return ContextError(TrimError(err))
 	}
 	response.Body.Close()
-	if response.StatusCode != http.StatusOK {
-		return ContextError(fmt.Errorf("HTTP POST request failed with response code: %d", response.StatusCode))
-	}
-	return
+	return nil
 }
 
 // makeBaseRequestUrl makes a URL containing all the common parameters
 // that are included with Psiphon API requests. These common parameters
 // are used for statistics.
-func makeBaseRequestUrl(config *Config, tunnel *Tunnel, sessionId string) string {
+func makeBaseRequestUrl(tunnel *Tunnel, port, sessionId string) string {
 	var requestUrl bytes.Buffer
+
+	if port == "" {
+		port = tunnel.serverEntry.WebServerPort
+	}
+
 	// Note: don't prefix with HTTPS scheme, see comment in doGetRequest.
 	// e.g., don't do this: requestUrl.WriteString("https://")
 	requestUrl.WriteString("http://")
 	requestUrl.WriteString(tunnel.serverEntry.IpAddress)
 	requestUrl.WriteString(":")
-	requestUrl.WriteString(tunnel.serverEntry.WebServerPort)
+	requestUrl.WriteString(port)
 	requestUrl.WriteString("/")
 	// Placeholder for the path component of a request
 	requestUrl.WriteString("%s")
@@ -318,18 +586,18 @@ func makeBaseRequestUrl(config *Config, tunnel *Tunnel, sessionId string) string
 	requestUrl.WriteString("&server_secret=")
 	requestUrl.WriteString(tunnel.serverEntry.WebServerSecret)
 	requestUrl.WriteString("&propagation_channel_id=")
-	requestUrl.WriteString(config.PropagationChannelId)
+	requestUrl.WriteString(tunnel.config.PropagationChannelId)
 	requestUrl.WriteString("&sponsor_id=")
-	requestUrl.WriteString(config.SponsorId)
+	requestUrl.WriteString(tunnel.config.SponsorId)
 	requestUrl.WriteString("&client_version=")
-	requestUrl.WriteString(config.ClientVersion)
+	requestUrl.WriteString(tunnel.config.ClientVersion)
 	// TODO: client_tunnel_core_version
 	requestUrl.WriteString("&relay_protocol=")
 	requestUrl.WriteString(tunnel.protocol)
 	requestUrl.WriteString("&client_platform=")
-	requestUrl.WriteString(config.ClientPlatform)
+	requestUrl.WriteString(tunnel.config.ClientPlatform)
 	requestUrl.WriteString("&tunnel_whole_device=")
-	requestUrl.WriteString(strconv.Itoa(config.TunnelWholeDevice))
+	requestUrl.WriteString(strconv.Itoa(tunnel.config.TunnelWholeDevice))
 	return requestUrl.String()
 }
 
@@ -337,9 +605,9 @@ type ExtraParam struct{ name, value string }
 
 // buildRequestUrl makes a URL for an API request. The URL includes the
 // base request URL and any extra parameters for the specific request.
-func (session *Session) buildRequestUrl(path string, extraParams ...*ExtraParam) string {
+func buildRequestUrl(baseRequestUrl, path string, extraParams ...*ExtraParam) string {
 	var requestUrl bytes.Buffer
-	requestUrl.WriteString(fmt.Sprintf(session.baseRequestUrl, path))
+	requestUrl.WriteString(fmt.Sprintf(baseRequestUrl, path))
 	for _, extraParam := range extraParams {
 		requestUrl.WriteString("&")
 		requestUrl.WriteString(extraParam.name)

+ 14 - 0
psiphon/serverEntry.go

@@ -109,6 +109,20 @@ func (serverEntry *ServerEntry) DisableImpairedProtocols(impairedProtocols []str
 	serverEntry.Capabilities = capabilities
 }
 
+func (serverEntry *ServerEntry) GetDirectWebRequestPorts() []string {
+	ports := make([]string, 0)
+	if Contains(serverEntry.Capabilities, "handshake") {
+		// Server-side configuration quirk: there's a port forward from
+		// port 443 to the web server, which we can try, except on servers
+		// running FRONTED_MEEK, which listens on port 443.
+		if serverEntry.SupportsProtocol(TUNNEL_PROTOCOL_FRONTED_MEEK) {
+			ports = append(ports, "443")
+		}
+		ports = append(ports, serverEntry.WebServerPort)
+	}
+	return ports
+}
+
 // DecodeServerEntry extracts server entries from the encoding
 // used by remote server lists and Psiphon server handshake requests.
 func DecodeServerEntry(encodedServerEntry string) (serverEntry *ServerEntry, err error) {

+ 8 - 8
psiphon/splitTunnel.go

@@ -114,12 +114,12 @@ func (classifier *SplitTunnelClassifier) Start(fetchRoutesTunnel *Tunnel) {
 		return
 	}
 
-	if fetchRoutesTunnel.session == nil {
-		// Tunnel has no session
+	if fetchRoutesTunnel.serverContext == nil {
+		// Tunnel has no serverContext
 		return
 	}
 
-	if fetchRoutesTunnel.session.clientRegion == "" {
+	if fetchRoutesTunnel.serverContext.clientRegion == "" {
 		// Split tunnel region is unknown
 		return
 	}
@@ -207,7 +207,7 @@ func (classifier *SplitTunnelClassifier) setRoutes(tunnel *Tunnel) {
 		return
 	}
 
-	NoticeSplitTunnelRegion(tunnel.session.clientRegion)
+	NoticeSplitTunnelRegion(tunnel.serverContext.clientRegion)
 }
 
 // getRoutes makes a web request to download fresh routes data for the
@@ -216,13 +216,13 @@ func (classifier *SplitTunnelClassifier) setRoutes(tunnel *Tunnel) {
 // fails and cached routes data is present, that cached data is returned.
 func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData []byte, err error) {
 
-	url := fmt.Sprintf(classifier.fetchRoutesUrlFormat, tunnel.session.clientRegion)
+	url := fmt.Sprintf(classifier.fetchRoutesUrlFormat, tunnel.serverContext.clientRegion)
 	request, err := http.NewRequest("GET", url, nil)
 	if err != nil {
 		return nil, ContextError(err)
 	}
 
-	etag, err := GetSplitTunnelRoutesETag(tunnel.session.clientRegion)
+	etag, err := GetSplitTunnelRoutesETag(tunnel.serverContext.clientRegion)
 	if err != nil {
 		return nil, ContextError(err)
 	}
@@ -310,7 +310,7 @@ func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData [
 	if !useCachedRoutes {
 		etag := response.Header.Get("ETag")
 		if etag != "" {
-			err := SetSplitTunnelRoutes(tunnel.session.clientRegion, etag, routesData)
+			err := SetSplitTunnelRoutes(tunnel.serverContext.clientRegion, etag, routesData)
 			if err != nil {
 				NoticeAlert("failed to cache split tunnel routes: %s", ContextError(err))
 				// Proceed with fetched data, even when we can't cache it
@@ -319,7 +319,7 @@ func (classifier *SplitTunnelClassifier) getRoutes(tunnel *Tunnel) (routesData [
 	}
 
 	if useCachedRoutes {
-		routesData, err = GetSplitTunnelRoutesData(tunnel.session.clientRegion)
+		routesData, err = GetSplitTunnelRoutesData(tunnel.serverContext.clientRegion)
 		if err != nil {
 			return nil, ContextError(err)
 		}

+ 16 - 26
psiphon/transferstats/collector.go

@@ -20,7 +20,6 @@
 package transferstats
 
 import (
-	"encoding/json"
 	"sync"
 )
 
@@ -44,15 +43,15 @@ func newHostStats() *hostStats {
 	return &hostStats{}
 }
 
-// serverStats holds per-server stats.
-type serverStats struct {
+// ServerStats holds per-server stats.
+type ServerStats struct {
 	hostnameToStats    map[string]*hostStats
 	totalBytesSent     int64
 	totalBytesReceived int64
 }
 
-func newServerStats() *serverStats {
-	return &serverStats{
+func newServerStats() *ServerStats {
+	return &ServerStats{
 		hostnameToStats: make(map[string]*hostStats),
 	}
 }
@@ -61,8 +60,8 @@ func newServerStats() *serverStats {
 // as well as the mutex to access them.
 var allStats = struct {
 	statsMutex      sync.RWMutex
-	serverIDtoStats map[string]*serverStats
-}{serverIDtoStats: make(map[string]*serverStats)}
+	serverIDtoStats map[string]*ServerStats
+}{serverIDtoStats: make(map[string]*ServerStats)}
 
 // statsUpdate contains new stats counts to be aggregated.
 type statsUpdate struct {
@@ -105,27 +104,18 @@ func recordStat(stat *statsUpdate) {
 	//fmt.Println("server:", stat.serverID, "host:", stat.hostname, "sent:", storedHostStats.numBytesSent, "received:", storedHostStats.numBytesReceived)
 }
 
-// Implement the json.Marshaler interface
-func (ss serverStats) MarshalJSON() ([]byte, error) {
-	out := make(map[string]interface{})
+func (serverStats ServerStats) GetStatsForReporting() (map[string]int64, int64) {
 
 	hostBytes := make(map[string]int64)
 	bytesTransferred := int64(0)
 
-	for hostname, hostStats := range ss.hostnameToStats {
+	for hostname, hostStats := range serverStats.hostnameToStats {
 		totalBytes := hostStats.numBytesReceived + hostStats.numBytesSent
 		bytesTransferred += totalBytes
 		hostBytes[hostname] = totalBytes
 	}
 
-	out["bytes_transferred"] = bytesTransferred
-	out["host_bytes"] = hostBytes
-
-	// We're not using these fields, but the server requires them
-	out["page_views"] = make([]string, 0)
-	out["https_requests"] = make([]string, 0)
-
-	return json.Marshal(out)
+	return hostBytes, bytesTransferred
 }
 
 // GetBytesTransferredForServer returns total bytes sent and received since
@@ -149,22 +139,22 @@ func GetBytesTransferredForServer(serverID string) (sent, received int64) {
 	return
 }
 
-// GetForServer returns the json-able stats package for the given server.
-func GetForServer(serverID string) (payload *serverStats) {
+// GetForServer returns the server stats for the given server.
+func GetForServer(serverID string) (serverStats *ServerStats) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
-	payload = allStats.serverIDtoStats[serverID]
-	if payload == nil {
-		payload = newServerStats()
+	serverStats = allStats.serverIDtoStats[serverID]
+	if serverStats == nil {
+		serverStats = newServerStats()
 	}
 	delete(allStats.serverIDtoStats, serverID)
 	return
 }
 
 // PutBack re-adds a set of server stats to the collection.
-func PutBack(serverID string, ss *serverStats) {
-	for hostname, hoststats := range ss.hostnameToStats {
+func PutBack(serverID string, serverStats *ServerStats) {
+	for hostname, hoststats := range serverStats.hostnameToStats {
 		recordStat(
 			&statsUpdate{
 				serverID:         serverID,

+ 146 - 43
psiphon/tunnel.go

@@ -63,9 +63,12 @@ type TunnelOwner interface {
 // and an SSH session built on top of that transport.
 type Tunnel struct {
 	mutex                    *sync.Mutex
+	config                   *Config
+	untunneledDialConfig     *DialConfig
+	isDiscarded              bool
 	isClosed                 bool
 	serverEntry              *ServerEntry
-	session                  *Session
+	serverContext            *ServerContext
 	protocol                 string
 	conn                     net.Conn
 	sshClient                *ssh.Client
@@ -73,7 +76,7 @@ type Tunnel struct {
 	shutdownOperateBroadcast chan struct{}
 	signalPortForwardFailure chan struct{}
 	totalPortForwardFailures int
-	sessionStartTime         time.Time
+	startTime                time.Time
 }
 
 // EstablishTunnel first makes a network transport connection to the
@@ -85,8 +88,10 @@ type Tunnel struct {
 // HTTP (meek protocol).
 // When requiredProtocol is not blank, that protocol is used. Otherwise,
 // the a random supported protocol is used.
+// untunneledDialConfig is used for untunneled final status requests.
 func EstablishTunnel(
 	config *Config,
+	untunneledDialConfig *DialConfig,
 	sessionId string,
 	pendingConns *Conns,
 	serverEntry *ServerEntry,
@@ -115,6 +120,8 @@ func EstablishTunnel(
 	// The tunnel is now connected
 	tunnel = &Tunnel{
 		mutex:                    new(sync.Mutex),
+		config:                   config,
+		untunneledDialConfig:     untunneledDialConfig,
 		isClosed:                 false,
 		serverEntry:              serverEntry,
 		protocol:                 selectedProtocol,
@@ -127,41 +134,39 @@ func EstablishTunnel(
 		signalPortForwardFailure: make(chan struct{}, 1),
 	}
 
-	// Create a new Psiphon API session for this tunnel. This includes performing
-	// a handshake request. If the handshake fails, this establishment fails.
-	//
-	// TODO: as long as the servers are not enforcing that a client perform a handshake,
-	// proceed with this tunnel as long as at least one previous handhake succeeded?
-	//
+	// Create a new Psiphon API server context for this tunnel. This includes
+	// performing a handshake request. If the handshake fails, this establishment
+	// fails.
 	if !config.DisableApi {
-		NoticeInfo("starting session for %s", tunnel.serverEntry.IpAddress)
-		tunnel.session, err = NewSession(config, tunnel, sessionId)
+		NoticeInfo("starting server context for %s", tunnel.serverEntry.IpAddress)
+		tunnel.serverContext, err = NewServerContext(tunnel, sessionId)
 		if err != nil {
-			return nil, ContextError(fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err))
+			return nil, ContextError(
+				fmt.Errorf("error starting server context for %s: %s",
+					tunnel.serverEntry.IpAddress, err))
 		}
 	}
 
-	tunnel.sessionStartTime = time.Now()
+	tunnel.startTime = time.Now()
 
 	// Now that network operations are complete, cancel interruptibility
 	pendingConns.Remove(conn)
 
-	// Promote this successful tunnel to first rank so it's one
-	// of the first candidates next time establish runs.
-	PromoteServerEntry(tunnel.serverEntry.IpAddress)
-
 	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
 	tunnel.operateWaitGroup.Add(1)
-	go tunnel.operateTunnel(config, tunnelOwner)
+	go tunnel.operateTunnel(tunnelOwner)
 
 	return tunnel, nil
 }
 
 // Close stops operating the tunnel and closes the underlying connection.
 // Supports multiple and/or concurrent calls to Close().
-func (tunnel *Tunnel) Close() {
+// When isDicarded is set, operateTunnel will not attempt to send final
+// status requests.
+func (tunnel *Tunnel) Close(isDiscarded bool) {
 
 	tunnel.mutex.Lock()
+	tunnel.isDiscarded = isDiscarded
 	isClosed := tunnel.isClosed
 	tunnel.isClosed = true
 	tunnel.mutex.Unlock()
@@ -185,6 +190,13 @@ func (tunnel *Tunnel) Close() {
 	}
 }
 
+// IsDiscarded returns the tunnel's discarded flag.
+func (tunnel *Tunnel) IsDiscarded() bool {
+	tunnel.mutex.Lock()
+	defer tunnel.mutex.Unlock()
+	return tunnel.isDiscarded
+}
+
 // Dial establishes a port forward connection through the tunnel
 // This Dial doesn't support split tunnel, so alwaysTunnel is not referenced
 func (tunnel *Tunnel) Dial(
@@ -226,12 +238,12 @@ func (tunnel *Tunnel) Dial(
 		tunnel:         tunnel,
 		downstreamConn: downstreamConn}
 
-	// Tunnel does not have a session when DisableApi is set. We still use
+	// Tunnel does not have a serverContext when DisableApi is set. We still use
 	// transferstats.Conn to count bytes transferred for monitoring tunnel
 	// quality.
 	var regexps *transferstats.Regexps
-	if tunnel.session != nil {
-		regexps = tunnel.session.StatsRegexps()
+	if tunnel.serverContext != nil {
+		regexps = tunnel.serverContext.StatsRegexps()
 	}
 	conn = transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
 
@@ -242,7 +254,7 @@ func (tunnel *Tunnel) Dial(
 // This will terminate the tunnel.
 func (tunnel *Tunnel) SignalComponentFailure() {
 	NoticeAlert("tunnel received component failure signal")
-	tunnel.Close()
+	tunnel.Close(false)
 }
 
 // TunneledConn implements net.Conn and wraps a port foward connection.
@@ -535,7 +547,7 @@ func dialSsh(
 // TODO: change "recently active" to include having received any
 // SSH protocol messages from the server, not just user payload?
 //
-func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
+func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	defer tunnel.operateWaitGroup.Done()
 
 	lastBytesReceivedTime := time.Now()
@@ -564,6 +576,22 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	statsTimer := time.NewTimer(nextStatusRequestPeriod())
 	defer statsTimer.Stop()
 
+	// Schedule an immediate status request to deliver any unreported
+	// tunnel stats.
+	// Note: this may not be effective when there's an outstanding
+	// asynchronous untunneled final status request is holding the
+	// tunnel stats records. It may also conflict with other
+	// tunnel candidates which attempt to send an immediate request
+	// before being discarded. For now, we mitigate this with a short,
+	// random delay.
+	unreported := CountUnreportedTunnelStats()
+	if unreported > 0 {
+		NoticeInfo("Unreported tunnel stats: %d", unreported)
+		statsTimer.Reset(MakeRandomPeriod(
+			PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MIN,
+			PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MAX))
+	}
+
 	nextSshKeepAlivePeriod := func() time.Duration {
 		return MakeRandomPeriod(
 			TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
@@ -572,7 +600,7 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 
 	// TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
 	sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
-	if config.DisablePeriodicSshKeepAlive {
+	if tunnel.config.DisablePeriodicSshKeepAlive {
 		sshKeepAliveTimer.Stop()
 	} else {
 		defer sshKeepAliveTimer.Stop()
@@ -580,13 +608,10 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 
 	// Perform network requests in separate goroutines so as not to block
 	// other operations.
-	// Note: defer LIFO dependency: channels to be closed before Wait()
 	requestsWaitGroup := new(sync.WaitGroup)
-	defer requestsWaitGroup.Wait()
 
 	requestsWaitGroup.Add(1)
 	signalStatusRequest := make(chan struct{})
-	defer close(signalStatusRequest)
 	go func() {
 		defer requestsWaitGroup.Done()
 		for _ = range signalStatusRequest {
@@ -597,7 +622,6 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	requestsWaitGroup.Add(1)
 	signalSshKeepAlive := make(chan time.Duration)
 	sshKeepAliveError := make(chan error, 1)
-	defer close(signalSshKeepAlive)
 	go func() {
 		defer requestsWaitGroup.Done()
 		for timeout := range signalSshKeepAlive {
@@ -611,8 +635,9 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 		}
 	}()
 
+	shutdown := false
 	var err error
-	for err == nil {
+	for !shutdown && err == nil {
 		select {
 		case <-noticeBytesTransferredTicker.C:
 			sent, received := transferstats.GetBytesTransferredForServer(
@@ -631,7 +656,7 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 			}
 
 			// Only emit the frequent BytesTransferred notice when tunnel is not idle.
-			if config.EmitBytesTransferred && (sent > 0 || received > 0) {
+			if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
 				NoticeBytesTransferred(tunnel.serverEntry.IpAddress, sent, received)
 			}
 
@@ -663,22 +688,74 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 				default:
 				}
 			}
-			if !config.DisablePeriodicSshKeepAlive {
+			if !tunnel.config.DisablePeriodicSshKeepAlive {
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			}
 
 		case err = <-sshKeepAliveError:
 
 		case <-tunnel.shutdownOperateBroadcast:
-			// Attempt to send any remaining stats
-			sendStats(tunnel)
-			NoticeInfo("shutdown operate tunnel")
-			return
+			shutdown = true
 		}
 	}
 
-	if err != nil {
+	close(signalSshKeepAlive)
+	close(signalStatusRequest)
+	requestsWaitGroup.Wait()
+
+	// The stats for this tunnel will be reported via the next successful
+	// status request.
+	// Note: Since client clocks are unreliable, we use the server's reported
+	// timestamp in the handshake response as the tunnel start time. This time
+	// will be slightly earlier than the actual tunnel activation time, as the
+	// client has to receive and parse the response and activate the tunnel.
+	if !tunnel.IsDiscarded() {
+		err := RecordTunnelStats(
+			tunnel.serverContext.sessionId,
+			tunnel.serverContext.tunnelNumber,
+			tunnel.serverEntry.IpAddress,
+			tunnel.serverContext.serverHandshakeTimestamp,
+			fmt.Sprintf("%d", time.Now().Sub(tunnel.startTime)),
+			totalSent,
+			totalReceived)
+		if err != nil {
+			NoticeAlert("RecordTunnelStats failed: %s", ContextError(err))
+		}
+	}
+
+	// Final status request notes:
+	//
+	// It's highly desirable to send a final status request in order to report
+	// domain bytes transferred stats as well as to report tunnel stats as
+	// soon as possible. For this reason, we attempt untunneled requests when
+	// the tunneled request isn't possible or has failed.
+	//
+	// In an orderly shutdown (err == nil), the Controller is stopping and
+	// everything must be wrapped up quickly. Also, we still have a working
+	// tunnel. So we first attempt a tunneled status request (with a short
+	// timeout) and then attempt, synchronously -- otherwise the Contoller's
+	// untunneledPendingConns.CloseAll() will immediately interrupt untunneled
+	// requests -- untunneled requests (also with short timeouts).
+	// Note that this depends on the order of untunneledPendingConns.CloseAll()
+	// coming after tunnel.Close(): see note in Controller.Run().
+	//
+	// If the tunnel has failed, the Controller may continue working. We want
+	// to re-establish as soon as possible (so don't want to block on status
+	// requests, even for a second). We may have a long time to attempt
+	// untunneled requests in the background. And there is no tunnel through
+	// which to attempt tunneled requests. So we spawn a goroutine to run the
+	// untunneled requests, which are allowed a longer timeout. These requests
+	// will be interrupted by the Controller's untunneledPendingConns.CloseAll()
+	// in the case of a shutdown.
+
+	if err == nil {
+		NoticeInfo("shutdown operate tunnel")
+		if !sendStats(tunnel) {
+			sendUntunneledStats(tunnel, true)
+		}
+	} else {
 		NoticeAlert("operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
+		go sendUntunneledStats(tunnel, false)
 		tunnelOwner.SignalTunnelFailure(tunnel)
 	}
 }
@@ -712,17 +789,43 @@ func sendSshKeepAlive(
 }
 
 // sendStats is a helper for sending session stats to the server.
-func sendStats(tunnel *Tunnel) {
+func sendStats(tunnel *Tunnel) bool {
 
-	// Tunnel does not have a session when DisableApi is set
-	if tunnel.session == nil {
-		return
+	// Tunnel does not have a serverContext when DisableApi is set
+	if tunnel.serverContext == nil {
+		return true
+	}
+
+	// Skip when tunnel is discarded
+	if tunnel.IsDiscarded() {
+		return true
 	}
 
-	payload := transferstats.GetForServer(tunnel.serverEntry.IpAddress)
-	err := tunnel.session.DoStatusRequest(payload)
+	err := tunnel.serverContext.DoStatusRequest(tunnel)
 	if err != nil {
 		NoticeAlert("DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
-		transferstats.PutBack(tunnel.serverEntry.IpAddress, payload)
+	}
+
+	return err == nil
+}
+
+// sendUntunnelStats sends final status requests directly to Psiphon
+// servers after the tunnel has already failed. This is an attempt
+// to retain useful bytes transferred stats.
+func sendUntunneledStats(tunnel *Tunnel, isShutdown bool) {
+
+	// Tunnel does not have a serverContext when DisableApi is set
+	if tunnel.serverContext == nil {
+		return
+	}
+
+	// Skip when tunnel is discarded
+	if tunnel.IsDiscarded() {
+		return
+	}
+
+	err := TryUntunneledStatusRequest(tunnel, isShutdown)
+	if err != nil {
+		NoticeAlert("TryUntunneledStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
 	}
 }