瀏覽代碼

Merge remote-tracking branch 'upstream/master'

Eugene Fryntov 10 年之前
父節點
當前提交
47b77ab59a

+ 0 - 1
.travis.yml

@@ -1,7 +1,6 @@
 language: go
 language: go
 go:
 go:
 - 1.4
 - 1.4
-- tip
 addons:
 addons:
   apt_packages:
   apt_packages:
     - libx11-dev
     - libx11-dev

+ 36 - 20
ConsoleClient/psiphonClient.go

@@ -113,27 +113,43 @@ func main() {
 	}
 	}
 
 
 	// Handle optional embedded server list file parameter
 	// Handle optional embedded server list file parameter
-	// If specified, the embedded server list is loaded and stored before
-	// running Psiphon.
-
+	// If specified, the embedded server list is loaded and stored. When there
+	// are no server candidates at all, we wait for this import to complete
+	// before starting the Psiphon controller. Otherwise, we import while
+	// concurrently starting the controller to minimize delay before attempting
+	// to connect to existing candidate servers.
+	// If the import fails, an error notice is emitted, but the controller is
+	// still started: either existing candidate servers may suffice, or the
+	// remote server list fetch may obtain candidate servers.
 	if embeddedServerEntryListFilename != "" {
 	if embeddedServerEntryListFilename != "" {
-		serverEntryList, err := ioutil.ReadFile(embeddedServerEntryListFilename)
-		if err != nil {
-			psiphon.NoticeError("error loading embedded server entry list file: %s", err)
-			os.Exit(1)
-		}
-		// TODO: stream embedded server list data? also, the cast makaes an unnecessary copy of a large buffer?
-		serverEntries, err := psiphon.DecodeAndValidateServerEntryList(string(serverEntryList))
-		if err != nil {
-			psiphon.NoticeError("error decoding embedded server entry list file: %s", err)
-			os.Exit(1)
-		}
-		// Since embedded server list entries may become stale, they will not
-		// overwrite existing stored entries for the same server.
-		err = psiphon.StoreServerEntries(serverEntries, false)
-		if err != nil {
-			psiphon.NoticeError("error storing embedded server entry list data: %s", err)
-			os.Exit(1)
+		embeddedServerListWaitGroup := new(sync.WaitGroup)
+		embeddedServerListWaitGroup.Add(1)
+		go func() {
+			defer embeddedServerListWaitGroup.Done()
+			serverEntryList, err := ioutil.ReadFile(embeddedServerEntryListFilename)
+			if err != nil {
+				psiphon.NoticeError("error loading embedded server entry list file: %s", err)
+				return
+			}
+			// TODO: stream embedded server list data? also, the cast makes an unnecessary copy of a large buffer?
+			serverEntries, err := psiphon.DecodeAndValidateServerEntryList(string(serverEntryList))
+			if err != nil {
+				psiphon.NoticeError("error decoding embedded server entry list file: %s", err)
+				return
+			}
+			// Since embedded server list entries may become stale, they will not
+			// overwrite existing stored entries for the same server.
+			err = psiphon.StoreServerEntries(serverEntries, false)
+			if err != nil {
+				psiphon.NoticeError("error storing embedded server entry list data: %s", err)
+				return
+			}
+		}()
+
+		if psiphon.CountServerEntries(config.EgressRegion, config.TunnelProtocol) == 0 {
+			embeddedServerListWaitGroup.Wait()
+		} else {
+			defer embeddedServerListWaitGroup.Wait()
 		}
 		}
 	}
 	}
 
 

+ 2 - 1
psiphon/config.go

@@ -41,7 +41,8 @@ const (
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN             = 60 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN             = 60 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX             = 120 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX             = 120 * time.Second
 	ESTABLISH_TUNNEL_TIMEOUT_SECONDS             = 300
 	ESTABLISH_TUNNEL_TIMEOUT_SECONDS             = 300
-	ESTABLISH_TUNNEL_PAUSE_PERIOD                = 10 * time.Second
+	ESTABLISH_TUNNEL_WORK_TIME_SECONDS           = 60 * time.Second
+	ESTABLISH_TUNNEL_PAUSE_PERIOD                = 5 * time.Second
 	PORT_FORWARD_FAILURE_THRESHOLD               = 10
 	PORT_FORWARD_FAILURE_THRESHOLD               = 10
 	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT             = 15 * time.Second
 	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT             = 15 * time.Second
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST     = 50
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST     = 50

+ 92 - 44
psiphon/controller.go

@@ -34,26 +34,27 @@ import (
 // connect to; establishes and monitors tunnels; and runs local proxies which
 // connect to; establishes and monitors tunnels; and runs local proxies which
 // route traffic through the tunnels.
 // route traffic through the tunnels.
 type Controller struct {
 type Controller struct {
-	config                    *Config
-	sessionId                 string
-	componentFailureSignal    chan struct{}
-	shutdownBroadcast         chan struct{}
-	runWaitGroup              *sync.WaitGroup
-	establishedTunnels        chan *Tunnel
-	failedTunnels             chan *Tunnel
-	tunnelMutex               sync.Mutex
-	establishedOnce           bool
-	tunnels                   []*Tunnel
-	nextTunnel                int
-	startedConnectedReporter  bool
-	isEstablishing            bool
-	establishWaitGroup        *sync.WaitGroup
-	stopEstablishingBroadcast chan struct{}
-	candidateServerEntries    chan *ServerEntry
-	establishPendingConns     *Conns
-	untunneledPendingConns    *Conns
-	untunneledDialConfig      *DialConfig
-	splitTunnelClassifier     *SplitTunnelClassifier
+	config                      *Config
+	sessionId                   string
+	componentFailureSignal      chan struct{}
+	shutdownBroadcast           chan struct{}
+	runWaitGroup                *sync.WaitGroup
+	establishedTunnels          chan *Tunnel
+	failedTunnels               chan *Tunnel
+	tunnelMutex                 sync.Mutex
+	establishedOnce             bool
+	tunnels                     []*Tunnel
+	nextTunnel                  int
+	startedConnectedReporter    bool
+	isEstablishing              bool
+	establishWaitGroup          *sync.WaitGroup
+	stopEstablishingBroadcast   chan struct{}
+	candidateServerEntries      chan *ServerEntry
+	establishPendingConns       *Conns
+	untunneledPendingConns      *Conns
+	untunneledDialConfig        *DialConfig
+	splitTunnelClassifier       *SplitTunnelClassifier
+	signalFetchRemoteServerList chan struct{}
 }
 }
 
 
 // NewController initializes a new controller.
 // NewController initializes a new controller.
@@ -97,6 +98,9 @@ func NewController(config *Config) (controller *Controller, err error) {
 		establishPendingConns:    new(Conns),
 		establishPendingConns:    new(Conns),
 		untunneledPendingConns:   untunneledPendingConns,
 		untunneledPendingConns:   untunneledPendingConns,
 		untunneledDialConfig:     untunneledDialConfig,
 		untunneledDialConfig:     untunneledDialConfig,
+		// A buffer allows at least one signal to be sent even when the receiver is
+		// not listening. Senders should not block.
+		signalFetchRemoteServerList: make(chan struct{}, 1),
 	}
 	}
 
 
 	controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
 	controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
@@ -180,35 +184,54 @@ func (controller *Controller) SignalComponentFailure() {
 }
 }
 
 
 // remoteServerListFetcher fetches an out-of-band list of server entries
 // remoteServerListFetcher fetches an out-of-band list of server entries
-// for more tunnel candidates. It fetches immediately, retries after failure
-// with a wait period, and refetches after success with a longer wait period.
+// for more tunnel candidates. It fetches when signalled, with retries
+// on failure.
 func (controller *Controller) remoteServerListFetcher() {
 func (controller *Controller) remoteServerListFetcher() {
 	defer controller.runWaitGroup.Done()
 	defer controller.runWaitGroup.Done()
 
 
-loop:
+	var lastFetchTime time.Time
+
+fetcherLoop:
 	for {
 	for {
-		if !WaitForNetworkConnectivity(
-			controller.config.NetworkConnectivityChecker,
-			controller.shutdownBroadcast) {
-			break
+		// Wait for a signal before fetching
+		select {
+		case <-controller.signalFetchRemoteServerList:
+		case <-controller.shutdownBroadcast:
+			break fetcherLoop
 		}
 		}
 
 
-		err := FetchRemoteServerList(
-			controller.config, controller.untunneledDialConfig)
+		// Skip fetch entirely (i.e., send no request at all, even when ETag would save
+		// on response size) when a recent fetch was successful
+		if time.Now().Before(lastFetchTime.Add(FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)) {
+			continue
+		}
+
+	retryLoop:
+		for {
+			// Don't attempt to fetch while there is no network connectivity,
+			// to avoid alert notice noise.
+			if !WaitForNetworkConnectivity(
+				controller.config.NetworkConnectivityChecker,
+				controller.shutdownBroadcast) {
+				break fetcherLoop
+			}
+
+			err := FetchRemoteServerList(
+				controller.config, controller.untunneledDialConfig)
+
+			if err == nil {
+				lastFetchTime = time.Now()
+				break retryLoop
+			}
 
 
-		var duration time.Duration
-		if err != nil {
 			NoticeAlert("failed to fetch remote server list: %s", err)
 			NoticeAlert("failed to fetch remote server list: %s", err)
-			duration = FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD
-		} else {
-			duration = FETCH_REMOTE_SERVER_LIST_STALE_PERIOD
-		}
-		timeout := time.After(duration)
-		select {
-		case <-timeout:
-			// Fetch again
-		case <-controller.shutdownBroadcast:
-			break loop
+
+			timeout := time.After(FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD)
+			select {
+			case <-timeout:
+			case <-controller.shutdownBroadcast:
+				break fetcherLoop
+			}
 		}
 		}
 	}
 	}
 
 
@@ -619,7 +642,9 @@ func (controller *Controller) establishCandidateGenerator() {
 loop:
 loop:
 	// Repeat until stopped
 	// Repeat until stopped
 	for {
 	for {
-		// Yield each server entry returned by the iterator
+
+		// Send each iterator server entry to the establish workers
+		startTime := time.Now()
 		for {
 		for {
 			serverEntry, err := iterator.Next()
 			serverEntry, err := iterator.Next()
 			if err != nil {
 			if err != nil {
@@ -642,12 +667,33 @@ loop:
 			case <-controller.shutdownBroadcast:
 			case <-controller.shutdownBroadcast:
 				break loop
 				break loop
 			}
 			}
+
+			if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME_SECONDS)) {
+				// Start over, after a brief pause, with a new shuffle of the server
+				// entries, and potentially some newly fetched server entries.
+				break
+			}
+		}
+		// Free up resources now, but don't reset until after the pause.
+		iterator.Close()
+
+		// Trigger a fetch remote server list, since we may have failed to
+		// connect with all known servers. Don't block sending signal, since
+		// this signal may have already been sent.
+		// Don't wait for fetch remote to succeed, since it may fail and
+		// enter a retry loop and we're better off trying more known servers.
+		// TODO: synchronize the fetch response, so it can be incorporated
+		// into the server entry iterator as soon as available.
+		select {
+		case controller.signalFetchRemoteServerList <- *new(struct{}):
+		default:
 		}
 		}
-		iterator.Reset()
 
 
 		// After a complete iteration of candidate servers, pause before iterating again.
 		// After a complete iteration of candidate servers, pause before iterating again.
 		// This helps avoid some busy wait loop conditions, and also allows some time for
 		// This helps avoid some busy wait loop conditions, and also allows some time for
-		// network conditions to change.
+		// network conditions to change. Also allows for fetch remote to complete,
+		// in typical conditions (it isn't strictly necessary to wait for this, there will
+		// be more rounds if required).
 		timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD)
 		timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD)
 		select {
 		select {
 		case <-timeout:
 		case <-timeout:
@@ -657,6 +703,8 @@ loop:
 		case <-controller.shutdownBroadcast:
 		case <-controller.shutdownBroadcast:
 			break loop
 			break loop
 		}
 		}
+
+		iterator.Reset()
 	}
 	}
 
 
 	NoticeInfo("stopped candidate generator")
 	NoticeInfo("stopped candidate generator")

+ 10 - 0
psiphon/controller_test.go

@@ -99,6 +99,12 @@ func controllerRun(t *testing.T, protocol string) {
 				}
 				}
 			case "ListeningHttpProxyPort":
 			case "ListeningHttpProxyPort":
 				httpProxyPort = int(payload["port"].(float64))
 				httpProxyPort = int(payload["port"].(float64))
+			case "ConnectingServer":
+				serverProtocol := payload["protocol"]
+				if serverProtocol != protocol {
+					t.Errorf("wrong protocol selected: %s", serverProtocol)
+					t.FailNow()
+				}
 			}
 			}
 		}))
 		}))
 
 
@@ -118,6 +124,10 @@ func controllerRun(t *testing.T, protocol string) {
 
 
 	select {
 	select {
 	case <-tunnelEstablished:
 	case <-tunnelEstablished:
+
+		// Allow for known race condition described in NewHttpProxy():
+		time.Sleep(1 * time.Second)
+
 		// Test: fetch website through tunnel
 		// Test: fetch website through tunnel
 		fetchWebsite(t, httpProxyPort)
 		fetchWebsite(t, httpProxyPort)
 
 

+ 39 - 2
psiphon/dataStore.go

@@ -1,3 +1,5 @@
+// +build windows
+
 /*
 /*
  * Copyright (c) 2015, Psiphon Inc.
  * Copyright (c) 2015, Psiphon Inc.
  * All rights reserved.
  * All rights reserved.
@@ -71,7 +73,7 @@ func InitDataStore(config *Config) (err error) {
 			// temporary directory: https://www.sqlite.org/pragma.html#pragma_temp_store_directory.
 			// temporary directory: https://www.sqlite.org/pragma.html#pragma_temp_store_directory.
 			// TODO: is there another way to restrict writing of temporary files? E.g. temp_store=3?
 			// TODO: is there another way to restrict writing of temporary files? E.g. temp_store=3?
 			initialization += fmt.Sprintf(
 			initialization += fmt.Sprintf(
-				"pragma temp_store_directory=\"%s\";\n", config.DataStoreDirectory)
+				"pragma temp_store_directory=\"%s\";\n", config.DataStoreTempDirectory)
 		}
 		}
 		initialization += `
 		initialization += `
         create table if not exists serverEntry
         create table if not exists serverEntry
@@ -88,6 +90,9 @@ func InitDataStore(config *Config) (err error) {
             (region text not null primary key,
             (region text not null primary key,
              etag text not null,
              etag text not null,
              data blob not null);
              data blob not null);
+        create table if not exists urlETags
+            (url text not null primary key,
+             etag text not null);
         create table if not exists keyValue
         create table if not exists keyValue
             (key text not null primary key,
             (key text not null primary key,
              value text not null);
              value text not null);
@@ -548,7 +553,7 @@ func ReportAvailableRegions() {
 
 
 		// Some server entries do not have a region, but it makes no sense to return
 		// Some server entries do not have a region, but it makes no sense to return
 		// an empty string as an "available region".
 		// an empty string as an "available region".
-		if (region != "") {
+		if region != "" {
 			regions = append(regions, region)
 			regions = append(regions, region)
 		}
 		}
 	}
 	}
@@ -627,6 +632,38 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
 	return data, nil
 	return data, nil
 }
 }
 
 
+// SetUrlETag stores an ETag for the specfied URL.
+// Note: input URL is treated as a string, and is not
+// encoded or decoded or otherwise canonicalized.
+func SetUrlETag(url, etag string) error {
+	return transactionWithRetry(func(transaction *sql.Tx) error {
+		_, err := transaction.Exec(`
+            insert or replace into urlETags (url, etag)
+            values (?, ?);
+            `, url, etag)
+		if err != nil {
+			// Note: ContextError() would break canRetry()
+			return err
+		}
+		return nil
+	})
+}
+
+// GetUrlETag retrieves a previously stored an ETag for the
+// specfied URL. If not found, it returns an empty string value.
+func GetUrlETag(url string) (etag string, err error) {
+	checkInitDataStore()
+	rows := singleton.db.QueryRow("select etag from urlETags where url = ?;", url)
+	err = rows.Scan(&etag)
+	if err == sql.ErrNoRows {
+		return "", nil
+	}
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return etag, nil
+}
+
 // SetKeyValue stores a key/value pair.
 // SetKeyValue stores a key/value pair.
 func SetKeyValue(key, value string) error {
 func SetKeyValue(key, value string) error {
 	return transactionWithRetry(func(transaction *sql.Tx) error {
 	return transactionWithRetry(func(transaction *sql.Tx) error {

+ 720 - 0
psiphon/dataStore_alt.go

@@ -0,0 +1,720 @@
+// +build !windows
+
+/*
+ * Copyright (c) 2015, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"math/rand"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/boltdb/bolt"
+)
+
+// The BoltDB dataStore implementation is an alternative to the sqlite3-based
+// implementation in dataStore.go. Both implementations have the same interface.
+//
+// BoltDB is pure Go, and is intended to be used in cases where we have trouble
+// building sqlite3/CGO (e.g., currently go mobile due to
+// https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
+// the primary dataStore implementation.
+//
+type dataStore struct {
+	init sync.Once
+	db   *bolt.DB
+}
+
+const (
+	serverEntriesBucket         = "serverEntries"
+	rankedServerEntriesBucket   = "rankedServerEntries"
+	rankedServerEntriesKey      = "rankedServerEntries"
+	splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
+	splitTunnelRouteDataBucket  = "splitTunnelRouteData"
+	urlETagsBucket              = "urlETags"
+	keyValueBucket              = "keyValues"
+	rankedServerEntryCount      = 100
+)
+
+var singleton dataStore
+
+// InitDataStore initializes the singleton instance of dataStore. This
+// function uses a sync.Once and is safe for use by concurrent goroutines.
+// The underlying sql.DB connection pool is also safe.
+//
+// Note: the sync.Once was more useful when initDataStore was private and
+// called on-demand by the public functions below. Now we require an explicit
+// InitDataStore() call with the filename passed in. The on-demand calls
+// have been replaced by checkInitDataStore() to assert that Init was called.
+func InitDataStore(config *Config) (err error) {
+	singleton.init.Do(func() {
+		filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
+		var db *bolt.DB
+		db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
+		if err != nil {
+			// Note: intending to set the err return value for InitDataStore
+			err = fmt.Errorf("initDataStore failed to open database: %s", err)
+			return
+		}
+
+		err = db.Update(func(tx *bolt.Tx) error {
+			requiredBuckets := []string{
+				serverEntriesBucket,
+				rankedServerEntriesBucket,
+				splitTunnelRouteETagsBucket,
+				splitTunnelRouteDataBucket,
+				urlETagsBucket,
+				keyValueBucket,
+			}
+			for _, bucket := range requiredBuckets {
+				_, err := tx.CreateBucketIfNotExists([]byte(bucket))
+				if err != nil {
+					return err
+				}
+			}
+			return nil
+		})
+		if err != nil {
+			err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
+			return
+		}
+
+		singleton.db = db
+	})
+	return err
+}
+
+func checkInitDataStore() {
+	if singleton.db == nil {
+		panic("checkInitDataStore: datastore not initialized")
+	}
+}
+
+// StoreServerEntry adds the server entry to the data store.
+// A newly stored (or re-stored) server entry is assigned the next-to-top
+// rank for iteration order (the previous top ranked entry is promoted). The
+// purpose of inserting at next-to-top is to keep the last selected server
+// as the top ranked server.
+// When replaceIfExists is true, an existing server entry record is
+// overwritten; otherwise, the existing record is unchanged.
+// If the server entry data is malformed, an alert notice is issued and
+// the entry is skipped; no error is returned.
+func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
+	checkInitDataStore()
+
+	// Server entries should already be validated before this point,
+	// so instead of skipping we fail with an error.
+	err := ValidateServerEntry(serverEntry)
+	if err != nil {
+		return ContextError(errors.New("invalid server entry"))
+	}
+
+	// BoltDB implementation note:
+	// For simplicity, we don't maintain indexes on server entry
+	// region or supported protocols. Instead, we perform full-bucket
+	// scans with a filter. With a small enough database (thousands or
+	// even tens of thousand of server entries) and common enough
+	// values (e.g., many servers support all protocols), performance
+	// is expected to be acceptable.
+
+	serverEntryExists := false
+	err = singleton.db.Update(func(tx *bolt.Tx) error {
+
+		serverEntries := tx.Bucket([]byte(serverEntriesBucket))
+		serverEntryExists = (serverEntries.Get([]byte(serverEntry.IpAddress)) != nil)
+
+		if serverEntryExists && !replaceIfExists {
+			// Disabling this notice, for now, as it generates too much noise
+			// in diagnostics with clients that always submit embedded servers
+			// to the core on each run.
+			// NoticeInfo("ignored update for server %s", serverEntry.IpAddress)
+			return nil
+		}
+
+		data, err := json.Marshal(serverEntry)
+		if err != nil {
+			return ContextError(err)
+		}
+		err = serverEntries.Put([]byte(serverEntry.IpAddress), data)
+		if err != nil {
+			return ContextError(err)
+		}
+
+		err = insertRankedServerEntry(tx, serverEntry.IpAddress, 1)
+		if err != nil {
+			return ContextError(err)
+		}
+
+		return nil
+	})
+	if err != nil {
+		return ContextError(err)
+	}
+
+	if !serverEntryExists {
+		NoticeInfo("updated server %s", serverEntry.IpAddress)
+	}
+	return nil
+}
+
+// StoreServerEntries shuffles and stores a list of server entries.
+// Shuffling is performed on imported server entrues as part of client-side
+// load balancing.
+// There is an independent transaction for each entry insert/update.
+func StoreServerEntries(serverEntries []*ServerEntry, replaceIfExists bool) error {
+	checkInitDataStore()
+
+	for index := len(serverEntries) - 1; index > 0; index-- {
+		swapIndex := rand.Intn(index + 1)
+		serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
+	}
+
+	for _, serverEntry := range serverEntries {
+		err := StoreServerEntry(serverEntry, replaceIfExists)
+		if err != nil {
+			return ContextError(err)
+		}
+	}
+
+	// Since there has possibly been a significant change in the server entries,
+	// take this opportunity to update the available egress regions.
+	ReportAvailableRegions()
+
+	return nil
+}
+
+// PromoteServerEntry assigns the top rank (one more than current
+// max rank) to the specified server entry. Server candidates are
+// iterated in decending rank order, so this server entry will be
+// the first candidate in a subsequent tunnel establishment.
+func PromoteServerEntry(ipAddress string) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		return insertRankedServerEntry(tx, ipAddress, 0)
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
+	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
+	data := bucket.Get([]byte(rankedServerEntriesKey))
+
+	if data == nil {
+		return []string{}, nil
+	}
+
+	rankedServerEntries := make([]string, 0)
+	err := json.Unmarshal(data, &rankedServerEntries)
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return rankedServerEntries, nil
+}
+
+func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
+	data, err := json.Marshal(rankedServerEntries)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
+	err = bucket.Put([]byte(rankedServerEntriesKey), data)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return nil
+}
+
+func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
+	rankedServerEntries, err := getRankedServerEntries(tx)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	// BoltDB implementation note:
+	// For simplicity, we store the ranked server ids in an array serialized to
+	// a single key value. To ensure this value doesn't grow without bound,
+	// it's capped at rankedServerEntryCount. For now, this cap should be large
+	// enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
+	// any reasonable configuration of config.TunnelPoolSize.
+
+	if position >= len(rankedServerEntries) {
+		rankedServerEntries = append(rankedServerEntries, serverEntryId)
+	} else {
+		end := len(rankedServerEntries)
+		if end+1 > rankedServerEntryCount {
+			end = rankedServerEntryCount
+		}
+		// insert: https://github.com/golang/go/wiki/SliceTricks
+		rankedServerEntries = append(
+			rankedServerEntries[:position],
+			append([]string{serverEntryId},
+				rankedServerEntries[position:end]...)...)
+	}
+
+	err = setRankedServerEntries(tx, rankedServerEntries)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return nil
+}
+
+func serverEntrySupportsProtocol(serverEntry *ServerEntry, protocol string) bool {
+	// Note: for meek, the capabilities are FRONTED-MEEK and UNFRONTED-MEEK
+	// and the additonal OSSH service is assumed to be available internally.
+	requiredCapability := strings.TrimSuffix(protocol, "-OSSH")
+	return Contains(serverEntry.Capabilities, requiredCapability)
+}
+
+// ServerEntryIterator is used to iterate over
+// stored server entries in rank order.
+type ServerEntryIterator struct {
+	region                      string
+	protocol                    string
+	shuffleHeadLength           int
+	serverEntryIds              []string
+	serverEntryIndex            int
+	isTargetServerEntryIterator bool
+	hasNextTargetServerEntry    bool
+	targetServerEntry           *ServerEntry
+}
+
+// NewServerEntryIterator creates a new ServerEntryIterator
+func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
+
+	// When configured, this target server entry is the only candidate
+	if config.TargetServerEntry != "" {
+		return newTargetServerEntryIterator(config)
+	}
+
+	checkInitDataStore()
+	iterator = &ServerEntryIterator{
+		region:                      config.EgressRegion,
+		protocol:                    config.TunnelProtocol,
+		shuffleHeadLength:           config.TunnelPoolSize,
+		isTargetServerEntryIterator: false,
+	}
+	err = iterator.Reset()
+	if err != nil {
+		return nil, err
+	}
+	return iterator, nil
+}
+
+// newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
+func newTargetServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
+	serverEntry, err := DecodeServerEntry(config.TargetServerEntry)
+	if err != nil {
+		return nil, err
+	}
+	if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
+		return nil, errors.New("TargetServerEntry does not support EgressRegion")
+	}
+	if config.TunnelProtocol != "" {
+		// Note: same capability/protocol mapping as in StoreServerEntry
+		requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
+		if !Contains(serverEntry.Capabilities, requiredCapability) {
+			return nil, errors.New("TargetServerEntry does not support TunnelProtocol")
+		}
+	}
+	iterator = &ServerEntryIterator{
+		isTargetServerEntryIterator: true,
+		hasNextTargetServerEntry:    true,
+		targetServerEntry:           serverEntry,
+	}
+	NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
+	return iterator, nil
+}
+
+// Reset a NewServerEntryIterator to the start of its cycle. The next
+// call to Next will return the first server entry.
+func (iterator *ServerEntryIterator) Reset() error {
+	iterator.Close()
+
+	if iterator.isTargetServerEntryIterator {
+		iterator.hasNextTargetServerEntry = true
+		return nil
+	}
+
+	count := CountServerEntries(iterator.region, iterator.protocol)
+	NoticeCandidateServers(iterator.region, iterator.protocol, count)
+
+	// This query implements the Psiphon server candidate selection
+	// algorithm: the first TunnelPoolSize server candidates are in rank
+	// (priority) order, to favor previously successful servers; then the
+	// remaining long tail is shuffled to raise up less recent candidates.
+
+	// BoltDB implementation note:
+	// We don't keep a transaction open for the duration of the iterator
+	// because this would expose the following semantics to consumer code:
+	//
+	//     Read-only transactions and read-write transactions ... generally
+	//     shouldn't be opened simultaneously in the same goroutine. This can
+	//     cause a deadlock as the read-write transaction needs to periodically
+	//     re-map the data file but it cannot do so while a read-only
+	//     transaction is open.
+	//     (https://github.com/boltdb/bolt)
+	//
+	// So the uderlying serverEntriesBucket could change after the serverEntryIds
+	// list is built.
+
+	var serverEntryIds []string
+
+	err := singleton.db.View(func(tx *bolt.Tx) error {
+		var err error
+		serverEntryIds, err = getRankedServerEntries(tx)
+		if err != nil {
+			return err
+		}
+
+		skipServerEntryIds := make(map[string]bool)
+		for _, serverEntryId := range serverEntryIds {
+			skipServerEntryIds[serverEntryId] = true
+		}
+
+		bucket := tx.Bucket([]byte(serverEntriesBucket))
+		cursor := bucket.Cursor()
+		for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
+			serverEntryId := string(key)
+			if _, ok := skipServerEntryIds[serverEntryId]; ok {
+				continue
+			}
+			serverEntryIds = append(serverEntryIds, serverEntryId)
+		}
+		return nil
+	})
+	if err != nil {
+		return ContextError(err)
+	}
+
+	rand.Seed(int64(time.Now().Nanosecond()))
+	for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
+		j := rand.Intn(i)
+		serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
+	}
+
+	iterator.serverEntryIds = serverEntryIds
+	iterator.serverEntryIndex = 0
+
+	return nil
+}
+
+// Close cleans up resources associated with a ServerEntryIterator.
+func (iterator *ServerEntryIterator) Close() {
+	iterator.serverEntryIds = nil
+	iterator.serverEntryIndex = 0
+}
+
+// Next returns the next server entry, by rank, for a ServerEntryIterator.
+// Returns nil with no error when there is no next item.
+func (iterator *ServerEntryIterator) Next() (serverEntry *ServerEntry, err error) {
+	defer func() {
+		if err != nil {
+			iterator.Close()
+		}
+	}()
+
+	if iterator.isTargetServerEntryIterator {
+		if iterator.hasNextTargetServerEntry {
+			iterator.hasNextTargetServerEntry = false
+			return MakeCompatibleServerEntry(iterator.targetServerEntry), nil
+		}
+		return nil, nil
+	}
+
+	// There are no region/protocol indexes for the server entries bucket.
+	// Loop until we have the next server entry that matches the iterator
+	// filter requirements.
+	for {
+		if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
+			// There is no next item
+			return nil, nil
+		}
+
+		serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
+		iterator.serverEntryIndex += 1
+
+		var data []byte
+		err = singleton.db.View(func(tx *bolt.Tx) error {
+			bucket := tx.Bucket([]byte(serverEntriesBucket))
+			data = bucket.Get([]byte(serverEntryId))
+			return nil
+		})
+		if err != nil {
+			return nil, ContextError(err)
+		}
+
+		if data == nil {
+			return nil, ContextError(
+				fmt.Errorf("Unexpected missing server entry: %s", serverEntryId))
+		}
+
+		serverEntry = new(ServerEntry)
+		err = json.Unmarshal(data, serverEntry)
+		if err != nil {
+			return nil, ContextError(err)
+		}
+
+		if (iterator.region == "" || serverEntry.Region == iterator.region) &&
+			(iterator.protocol == "" || serverEntrySupportsProtocol(serverEntry, iterator.protocol)) {
+
+			break
+		}
+	}
+
+	return MakeCompatibleServerEntry(serverEntry), nil
+}
+
+// MakeCompatibleServerEntry provides backwards compatibility with old server entries
+// which have a single meekFrontingDomain and not a meekFrontingAddresses array.
+// By copying this one meekFrontingDomain into meekFrontingAddresses, this client effectively
+// uses that single value as legacy clients do.
+func MakeCompatibleServerEntry(serverEntry *ServerEntry) *ServerEntry {
+	if len(serverEntry.MeekFrontingAddresses) == 0 && serverEntry.MeekFrontingDomain != "" {
+		serverEntry.MeekFrontingAddresses =
+			append(serverEntry.MeekFrontingAddresses, serverEntry.MeekFrontingDomain)
+	}
+
+	return serverEntry
+}
+
+func scanServerEntries(scanner func(*ServerEntry)) error {
+	err := singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(serverEntriesBucket))
+		cursor := bucket.Cursor()
+
+		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			serverEntry := new(ServerEntry)
+			err := json.Unmarshal(value, serverEntry)
+			if err != nil {
+				return err
+			}
+			scanner(serverEntry)
+		}
+
+		return nil
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+
+	return nil
+}
+
+// CountServerEntries returns a count of stored servers for the
+// specified region and protocol.
+func CountServerEntries(region, protocol string) int {
+	checkInitDataStore()
+
+	count := 0
+	err := scanServerEntries(func(serverEntry *ServerEntry) {
+		if (region == "" || serverEntry.Region == region) &&
+			(protocol == "" || serverEntrySupportsProtocol(serverEntry, protocol)) {
+			count += 1
+		}
+	})
+
+	if err != nil {
+		NoticeAlert("CountServerEntries failed: %s", err)
+		return 0
+	}
+
+	return count
+}
+
+// ReportAvailableRegions prints a notice with the available egress regions.
+// Note that this report ignores config.TunnelProtocol.
+func ReportAvailableRegions() {
+	checkInitDataStore()
+
+	regions := make(map[string]bool)
+	err := scanServerEntries(func(serverEntry *ServerEntry) {
+		regions[serverEntry.Region] = true
+	})
+
+	if err != nil {
+		NoticeAlert("ReportAvailableRegions failed: %s", err)
+		return
+	}
+
+	regionList := make([]string, 0, len(regions))
+	for region, _ := range regions {
+		// Some server entries do not have a region, but it makes no sense to return
+		// an empty string as an "available region".
+		if region != "" {
+			regionList = append(regionList, region)
+		}
+	}
+
+	NoticeAvailableEgressRegions(regionList)
+}
+
+// GetServerEntryIpAddresses returns an array containing
+// all stored server IP addresses.
+func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
+	checkInitDataStore()
+
+	ipAddresses = make([]string, 0)
+	err = scanServerEntries(func(serverEntry *ServerEntry) {
+		ipAddresses = append(ipAddresses, serverEntry.IpAddress)
+	})
+
+	if err != nil {
+		return nil, ContextError(err)
+	}
+
+	return ipAddresses, nil
+}
+
+// SetSplitTunnelRoutes updates the cached routes data for
+// the given region. The associated etag is also stored and
+// used to make efficient web requests for updates to the data.
+func SetSplitTunnelRoutes(region, etag string, data []byte) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
+		err := bucket.Put([]byte(region), []byte(etag))
+
+		bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
+		err = bucket.Put([]byte(region), data)
+		return err
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// GetSplitTunnelRoutesETag retrieves the etag for cached routes
+// data for the specified region. If not found, it returns an empty string value.
+func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
+	checkInitDataStore()
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
+		etag = string(bucket.Get([]byte(region)))
+		return nil
+	})
+
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return etag, nil
+}
+
+// GetSplitTunnelRoutesData retrieves the cached routes data
+// for the specified region. If not found, it returns a nil value.
+func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
+	checkInitDataStore()
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
+		data = bucket.Get([]byte(region))
+		return nil
+	})
+
+	if err != nil {
+		return nil, ContextError(err)
+	}
+	return data, nil
+}
+
+// SetUrlETag stores an ETag for the specfied URL.
+// Note: input URL is treated as a string, and is not
+// encoded or decoded or otherwise canonicalized.
+func SetUrlETag(url, etag string) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(urlETagsBucket))
+		err := bucket.Put([]byte(url), []byte(etag))
+		return err
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// GetUrlETag retrieves a previously stored an ETag for the
+// specfied URL. If not found, it returns an empty string value.
+func GetUrlETag(url string) (etag string, err error) {
+	checkInitDataStore()
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(urlETagsBucket))
+		etag = string(bucket.Get([]byte(url)))
+		return nil
+	})
+
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return etag, nil
+}
+
+// SetKeyValue stores a key/value pair.
+func SetKeyValue(key, value string) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(keyValueBucket))
+		err := bucket.Put([]byte(key), []byte(value))
+		return err
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// GetKeyValue retrieves the value for a given key. If not found,
+// it returns an empty string value.
+func GetKeyValue(key string) (value string, err error) {
+	checkInitDataStore()
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(keyValueBucket))
+		value = string(bucket.Get([]byte(key)))
+		return nil
+	})
+
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return value, nil
+}

+ 77 - 34
psiphon/httpProxy.go

@@ -56,10 +56,11 @@ type HttpProxy struct {
 	tunneler               Tunneler
 	tunneler               Tunneler
 	listener               net.Listener
 	listener               net.Listener
 	serveWaitGroup         *sync.WaitGroup
 	serveWaitGroup         *sync.WaitGroup
-	httpTunneledRelay      *http.Transport
-	httpTunneledClient     *http.Client
-	httpDirectRelay        *http.Transport
-	httpDirectClient       *http.Client
+	httpProxyTunneledRelay *http.Transport
+	urlProxyTunneledRelay  *http.Transport
+	urlProxyTunneledClient *http.Client
+	urlProxyDirectRelay    *http.Transport
+	urlProxyDirectClient   *http.Client
 	openConns              *Conns
 	openConns              *Conns
 	stopListeningBroadcast chan struct{}
 	stopListeningBroadcast chan struct{}
 }
 }
@@ -86,44 +87,73 @@ func NewHttpProxy(
 		// TODO: connect timeout?
 		// TODO: connect timeout?
 		return tunneler.Dial(addr, false, nil)
 		return tunneler.Dial(addr, false, nil)
 	}
 	}
-	httpTunneledRelay := &http.Transport{
-		Dial:                tunneledDialer,
-		MaxIdleConnsPerHost: HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST,
+	directDialer := func(_, addr string) (conn net.Conn, err error) {
+		return DialTCP(addr, untunneledDialConfig)
 	}
 	}
-	httpTunneledClient := &http.Client{
-		Transport: httpTunneledRelay,
-		Jar:       nil, // TODO: cookie support for URL proxy?
-		Timeout:   HTTP_PROXY_ORIGIN_SERVER_TIMEOUT,
+
+	// TODO: could HTTP proxy share a tunneled transport with URL proxy?
+	// For now, keeping them distinct just to be conservative.
+	httpProxyTunneledRelay := &http.Transport{
+		Dial:                  tunneledDialer,
+		MaxIdleConnsPerHost:   HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST,
+		ResponseHeaderTimeout: HTTP_PROXY_ORIGIN_SERVER_TIMEOUT,
 	}
 	}
 
 
-	directDialer := func(_, addr string) (conn net.Conn, err error) {
-		return DialTCP(addr, untunneledDialConfig)
+	// Note: URL proxy relays use http.Client for upstream requests, so
+	// redirects will be followed. HTTP proxy should not follow redirects
+	// and simply uses http.Transport directly.
+
+	urlProxyTunneledRelay := &http.Transport{
+		Dial:                  tunneledDialer,
+		MaxIdleConnsPerHost:   HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST,
+		ResponseHeaderTimeout: HTTP_PROXY_ORIGIN_SERVER_TIMEOUT,
+	}
+	urlProxyTunneledClient := &http.Client{
+		Transport: urlProxyTunneledRelay,
+		Jar:       nil, // TODO: cookie support for URL proxy?
+
+		// Note: don't use this timeout -- it interrupts downloads of large response bodies
+		//Timeout:   HTTP_PROXY_ORIGIN_SERVER_TIMEOUT,
 	}
 	}
-	httpDirectRelay := &http.Transport{
+
+	urlProxyDirectRelay := &http.Transport{
 		Dial:                  directDialer,
 		Dial:                  directDialer,
 		MaxIdleConnsPerHost:   HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST,
 		MaxIdleConnsPerHost:   HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST,
 		ResponseHeaderTimeout: HTTP_PROXY_ORIGIN_SERVER_TIMEOUT,
 		ResponseHeaderTimeout: HTTP_PROXY_ORIGIN_SERVER_TIMEOUT,
 	}
 	}
-	httpDirectClient := &http.Client{
-		Transport: httpDirectRelay,
+	urlProxyDirectClient := &http.Client{
+		Transport: urlProxyDirectRelay,
 		Jar:       nil,
 		Jar:       nil,
-		Timeout:   HTTP_PROXY_ORIGIN_SERVER_TIMEOUT,
 	}
 	}
 
 
 	proxy = &HttpProxy{
 	proxy = &HttpProxy{
 		tunneler:               tunneler,
 		tunneler:               tunneler,
 		listener:               listener,
 		listener:               listener,
 		serveWaitGroup:         new(sync.WaitGroup),
 		serveWaitGroup:         new(sync.WaitGroup),
-		httpTunneledRelay:      httpTunneledRelay,
-		httpTunneledClient:     httpTunneledClient,
-		httpDirectRelay:        httpDirectRelay,
-		httpDirectClient:       httpDirectClient,
+		httpProxyTunneledRelay: httpProxyTunneledRelay,
+		urlProxyTunneledRelay:  urlProxyTunneledRelay,
+		urlProxyTunneledClient: urlProxyTunneledClient,
+		urlProxyDirectRelay:    urlProxyDirectRelay,
+		urlProxyDirectClient:   urlProxyDirectClient,
 		openConns:              new(Conns),
 		openConns:              new(Conns),
 		stopListeningBroadcast: make(chan struct{}),
 		stopListeningBroadcast: make(chan struct{}),
 	}
 	}
 	proxy.serveWaitGroup.Add(1)
 	proxy.serveWaitGroup.Add(1)
 	go proxy.serve()
 	go proxy.serve()
+
+	// TODO: NoticeListeningHttpProxyPort is emitted after net.Listen
+	// but before go proxy.server() and httpServer.Serve(), and this
+	// appears to cause client connections to the HTTP proxy to fail
+	// (in controller_test.go, only when a tunnel is established very quickly
+	// and NoticeTunnels is emitted and the client makes a request -- all
+	// before the proxy.server() goroutine runs).
+	// This condition doesn't arise in Go 1.4, just in Go tip (pre-1.5).
+	// Note that httpServer.Serve() blocks so the fix can't be to emit
+	// NoticeListeningHttpProxyPort after that call.
+	// Also, check the listen backlog queue length -- shouldn't it be possible
+	// to enqueue pending connections between net.Listen() and httpServer.Serve()?
 	NoticeListeningHttpProxyPort(proxy.listener.Addr().(*net.TCPAddr).Port)
 	NoticeListeningHttpProxyPort(proxy.listener.Addr().(*net.TCPAddr).Port)
+
 	return proxy, nil
 	return proxy, nil
 }
 }
 
 
@@ -136,8 +166,9 @@ func (proxy *HttpProxy) Close() {
 	proxy.openConns.CloseAll()
 	proxy.openConns.CloseAll()
 	// Close idle proxy->origin persistent connections
 	// Close idle proxy->origin persistent connections
 	// TODO: also close active connections
 	// TODO: also close active connections
-	proxy.httpTunneledRelay.CloseIdleConnections()
-	proxy.httpDirectRelay.CloseIdleConnections()
+	proxy.httpProxyTunneledRelay.CloseIdleConnections()
+	proxy.urlProxyTunneledRelay.CloseIdleConnections()
+	proxy.urlProxyDirectRelay.CloseIdleConnections()
 }
 }
 
 
 // ServeHTTP receives HTTP requests and proxies them. CONNECT requests
 // ServeHTTP receives HTTP requests and proxies them. CONNECT requests
@@ -199,7 +230,7 @@ func (proxy *HttpProxy) httpConnectHandler(localConn net.Conn, target string) (e
 }
 }
 
 
 func (proxy *HttpProxy) httpProxyHandler(responseWriter http.ResponseWriter, request *http.Request) {
 func (proxy *HttpProxy) httpProxyHandler(responseWriter http.ResponseWriter, request *http.Request) {
-	relayHttpRequest(proxy.httpTunneledClient, request, responseWriter)
+	relayHttpRequest(nil, proxy.httpProxyTunneledRelay, request, responseWriter)
 }
 }
 
 
 const (
 const (
@@ -218,15 +249,15 @@ func (proxy *HttpProxy) urlProxyHandler(responseWriter http.ResponseWriter, requ
 	switch {
 	switch {
 	case strings.HasPrefix(request.URL.Path, URL_PROXY_TUNNELED_REQUEST_PATH):
 	case strings.HasPrefix(request.URL.Path, URL_PROXY_TUNNELED_REQUEST_PATH):
 		originUrl, err = url.QueryUnescape(request.URL.Path[len(URL_PROXY_TUNNELED_REQUEST_PATH):])
 		originUrl, err = url.QueryUnescape(request.URL.Path[len(URL_PROXY_TUNNELED_REQUEST_PATH):])
-		client = proxy.httpTunneledClient
+		client = proxy.urlProxyTunneledClient
 	case strings.HasPrefix(request.URL.Path, URL_PROXY_DIRECT_REQUEST_PATH):
 	case strings.HasPrefix(request.URL.Path, URL_PROXY_DIRECT_REQUEST_PATH):
 		originUrl, err = url.QueryUnescape(request.URL.Path[len(URL_PROXY_DIRECT_REQUEST_PATH):])
 		originUrl, err = url.QueryUnescape(request.URL.Path[len(URL_PROXY_DIRECT_REQUEST_PATH):])
-		client = proxy.httpDirectClient
+		client = proxy.urlProxyDirectClient
 	default:
 	default:
 		err = errors.New("missing origin URL")
 		err = errors.New("missing origin URL")
 	}
 	}
 	if err != nil {
 	if err != nil {
-		NoticeAlert("%s", ContextError(err))
+		NoticeAlert("%s", ContextError(FilterUrlError(err)))
 		forceClose(responseWriter)
 		forceClose(responseWriter)
 		return
 		return
 	}
 	}
@@ -234,7 +265,7 @@ func (proxy *HttpProxy) urlProxyHandler(responseWriter http.ResponseWriter, requ
 	// Origin URL must be well-formed, absolute, and have a scheme of  "http" or "https"
 	// Origin URL must be well-formed, absolute, and have a scheme of  "http" or "https"
 	url, err := url.ParseRequestURI(originUrl)
 	url, err := url.ParseRequestURI(originUrl)
 	if err != nil {
 	if err != nil {
-		NoticeAlert("%s", ContextError(err))
+		NoticeAlert("%s", ContextError(FilterUrlError(err)))
 		forceClose(responseWriter)
 		forceClose(responseWriter)
 		return
 		return
 	}
 	}
@@ -248,10 +279,14 @@ func (proxy *HttpProxy) urlProxyHandler(responseWriter http.ResponseWriter, requ
 	request.Host = url.Host
 	request.Host = url.Host
 	request.URL = url
 	request.URL = url
 
 
-	relayHttpRequest(client, request, responseWriter)
+	relayHttpRequest(client, nil, request, responseWriter)
 }
 }
 
 
-func relayHttpRequest(client *http.Client, request *http.Request, responseWriter http.ResponseWriter) {
+func relayHttpRequest(
+	client *http.Client,
+	transport *http.Transport,
+	request *http.Request,
+	responseWriter http.ResponseWriter) {
 
 
 	// Transform received request struct before using as input to relayed request
 	// Transform received request struct before using as input to relayed request
 	request.Close = false
 	request.Close = false
@@ -260,11 +295,19 @@ func relayHttpRequest(client *http.Client, request *http.Request, responseWriter
 		request.Header.Del(key)
 		request.Header.Del(key)
 	}
 	}
 
 
-	// Relay the HTTP request and get the response
-	//response, err := relay.RoundTrip(request)
-	response, err := client.Do(request)
+	// Relay the HTTP request and get the response. Use a client when supplied,
+	// otherwise a transport. A client handles cookies and redirects, and a
+	// transport does not.
+	var response *http.Response
+	var err error
+	if client != nil {
+		response, err = client.Do(request)
+	} else {
+		response, err = transport.RoundTrip(request)
+	}
+
 	if err != nil {
 	if err != nil {
-		NoticeAlert("%s", ContextError(err))
+		NoticeAlert("%s", ContextError(FilterUrlError(err)))
 		forceClose(responseWriter)
 		forceClose(responseWriter)
 		return
 		return
 	}
 	}

+ 7 - 4
psiphon/meekConn.go

@@ -166,7 +166,7 @@ func DialMeek(
 			// For unfronted meek, we let the http.Transport handle proxying, as the
 			// For unfronted meek, we let the http.Transport handle proxying, as the
 			// target server hostname has to be in the HTTP request line. Also, in this
 			// target server hostname has to be in the HTTP request line. Also, in this
 			// case, we don't require the proxy to support CONNECT and so we can work
 			// case, we don't require the proxy to support CONNECT and so we can work
-			// throigh HTTP proxies that don't support it.
+			// through HTTP proxies that don't support it.
 			url, err := url.Parse(fmt.Sprintf("http://%s", meekConfig.UpstreamHttpProxyAddress))
 			url, err := url.Parse(fmt.Sprintf("http://%s", meekConfig.UpstreamHttpProxyAddress))
 			if err != nil {
 			if err != nil {
 				return nil, ContextError(err)
 				return nil, ContextError(err)
@@ -189,7 +189,6 @@ func DialMeek(
 	if err != nil {
 	if err != nil {
 		return nil, ContextError(err)
 		return nil, ContextError(err)
 	}
 	}
-	// TODO: also use http.Client, with its Timeout field?
 	transport := &http.Transport{
 	transport := &http.Transport{
 		Proxy: proxyUrl,
 		Proxy: proxyUrl,
 		Dial:  dialer,
 		Dial:  dialer,
@@ -485,13 +484,15 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 	// Note: Retry will only be effective if entire request failed (underlying transport protocol
 	// Note: Retry will only be effective if entire request failed (underlying transport protocol
 	// such as SSH will fail if extra bytes are replayed in either direction due to partial relay
 	// such as SSH will fail if extra bytes are replayed in either direction due to partial relay
 	// success followed by retry).
 	// success followed by retry).
-	// We retry when still within a brief deadline and wait for a short time before re-dialing.
+	// At least one retry is always attempted. We retry when still within a brief deadline and wait
+	// for a short time before re-dialing.
 	//
 	//
 	// TODO: in principle, we could retry for min(TUNNEL_WRITE_TIMEOUT, meek-server.MAX_SESSION_STALENESS),
 	// TODO: in principle, we could retry for min(TUNNEL_WRITE_TIMEOUT, meek-server.MAX_SESSION_STALENESS),
 	// i.e., as long as the underlying tunnel has not timed out and as long as the server has not
 	// i.e., as long as the underlying tunnel has not timed out and as long as the server has not
 	// expired the current meek session. Presently not doing this to avoid excessive connection attempts
 	// expired the current meek session. Presently not doing this to avoid excessive connection attempts
 	// through the first hop. In addition, this will require additional support for timely shutdown.
 	// through the first hop. In addition, this will require additional support for timely shutdown.
 
 
+	retries := uint(0)
 	retryDeadline := time.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
 	retryDeadline := time.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
 
 
 	var response *http.Response
 	var response *http.Response
@@ -524,9 +525,11 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 			break
 			break
 		}
 		}
 
 
-		if time.Now().After(retryDeadline) {
+		if retries >= 1 && time.Now().After(retryDeadline) {
 			break
 			break
 		}
 		}
+		retries += 1
+
 		time.Sleep(MEEK_ROUND_TRIP_RETRY_DELAY)
 		time.Sleep(MEEK_ROUND_TRIP_RETRY_DELAY)
 	}
 	}
 	if err != nil {
 	if err != nil {

+ 1 - 2
psiphon/net.go

@@ -26,7 +26,6 @@ import (
 	"io"
 	"io"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
-	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
@@ -194,7 +193,7 @@ func HttpProxyConnect(rawConn net.Conn, addr string) (err error) {
 		return ContextError(err)
 		return ContextError(err)
 	}
 	}
 	if response.StatusCode != 200 {
 	if response.StatusCode != 200 {
-		return ContextError(errors.New(strings.SplitN(response.Status, " ", 2)[1]))
+		return ContextError(errors.New(response.Status))
 	}
 	}
 
 
 	return nil
 	return nil

+ 23 - 1
psiphon/remoteServerList.go

@@ -47,7 +47,20 @@ func FetchRemoteServerList(config *Config, dialConfig *DialConfig) (err error) {
 		Transport: transport,
 		Transport: transport,
 	}
 	}
 
 
-	response, err := httpClient.Get(config.RemoteServerListUrl)
+	request, err := http.NewRequest("GET", config.RemoteServerListUrl, nil)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	etag, err := GetUrlETag(config.RemoteServerListUrl)
+	if err != nil {
+		return ContextError(err)
+	}
+	if etag != "" {
+		request.Header.Add("If-None-Match", etag)
+	}
+
+	response, err := httpClient.Do(request)
 	if err != nil {
 	if err != nil {
 		return ContextError(err)
 		return ContextError(err)
 	}
 	}
@@ -74,5 +87,14 @@ func FetchRemoteServerList(config *Config, dialConfig *DialConfig) (err error) {
 		return ContextError(err)
 		return ContextError(err)
 	}
 	}
 
 
+	etag = response.Header.Get("ETag")
+	if etag != "" {
+		err := SetUrlETag(config.RemoteServerListUrl, etag)
+		if err != nil {
+			NoticeAlert("failed to set remote server list etag: %s", ContextError(err))
+			// This fetch is still reported as a success, even if we can't store the etag
+		}
+	}
+
 	return nil
 	return nil
 }
 }

+ 21 - 0
psiphon/utils.go

@@ -27,6 +27,7 @@ import (
 	"fmt"
 	"fmt"
 	"math/big"
 	"math/big"
 	"net"
 	"net"
+	"net/url"
 	"os"
 	"os"
 	"runtime"
 	"runtime"
 	"strings"
 	"strings"
@@ -120,6 +121,26 @@ func DecodeCertificate(encodedCertificate string) (certificate *x509.Certificate
 	return certificate, nil
 	return certificate, nil
 }
 }
 
 
+// FilterUrlError transforms an error, when it is a url.Error, removing
+// the URL value. This is to avoid logging private user data in cases
+// where the URL may be a user input value.
+// This function is used with errors returned by net/http and net/url,
+// which are (currently) of type url.Error. In particular, the round trip
+// function used by our HttpProxy, http.Client.Do, returns errors of type
+// url.Error, with the URL being the url sent from the user's tunneled
+// applications:
+// https://github.com/golang/go/blob/release-branch.go1.4/src/net/http/client.go#L394
+func FilterUrlError(err error) error {
+	if urlErr, ok := err.(*url.Error); ok {
+		err = &url.Error{
+			Op:  urlErr.Op,
+			URL: "",
+			Err: urlErr.Err,
+		}
+	}
+	return err
+}
+
 // TrimError removes the middle of over-long error message strings
 // TrimError removes the middle of over-long error message strings
 func TrimError(err error) error {
 func TrimError(err error) error {
 	const MAX_LEN = 100
 	const MAX_LEN = 100