Quellcode durchsuchen

Changes to tunnel establishment algorithm

* To conserve data transfer, no longer unconditionally
fetch remote server lists. Fetch now triggered after a
period of failing to establish sufficient tunnels.

* Persist remote server list ETags to avoid redundant
download on subsequent process runs.

* Abort current server candidate iterator after some
period of time in order to reshuffle and incorporate
new servers, after a brief pause.

* More consistent with legacy client behavior.
Rod Hynes vor 10 Jahren
Ursprung
Commit
34b092eacf
5 geänderte Dateien mit 193 neuen und 50 gelöschten Zeilen
  1. 2 1
      psiphon/config.go
  2. 92 44
      psiphon/controller.go
  3. 35 0
      psiphon/dataStore.go
  4. 41 4
      psiphon/dataStore_alt.go
  5. 23 1
      psiphon/remoteServerList.go

+ 2 - 1
psiphon/config.go

@@ -41,7 +41,8 @@ const (
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN             = 60 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX             = 120 * time.Second
 	ESTABLISH_TUNNEL_TIMEOUT_SECONDS             = 300
-	ESTABLISH_TUNNEL_PAUSE_PERIOD                = 10 * time.Second
+	ESTABLISH_TUNNEL_WORK_TIME_SECONDS           = 60 * time.Second
+	ESTABLISH_TUNNEL_PAUSE_PERIOD                = 5 * time.Second
 	PORT_FORWARD_FAILURE_THRESHOLD               = 10
 	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT             = 15 * time.Second
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST     = 50

+ 92 - 44
psiphon/controller.go

@@ -34,26 +34,27 @@ import (
 // connect to; establishes and monitors tunnels; and runs local proxies which
 // route traffic through the tunnels.
 type Controller struct {
-	config                    *Config
-	sessionId                 string
-	componentFailureSignal    chan struct{}
-	shutdownBroadcast         chan struct{}
-	runWaitGroup              *sync.WaitGroup
-	establishedTunnels        chan *Tunnel
-	failedTunnels             chan *Tunnel
-	tunnelMutex               sync.Mutex
-	establishedOnce           bool
-	tunnels                   []*Tunnel
-	nextTunnel                int
-	startedConnectedReporter  bool
-	isEstablishing            bool
-	establishWaitGroup        *sync.WaitGroup
-	stopEstablishingBroadcast chan struct{}
-	candidateServerEntries    chan *ServerEntry
-	establishPendingConns     *Conns
-	untunneledPendingConns    *Conns
-	untunneledDialConfig      *DialConfig
-	splitTunnelClassifier     *SplitTunnelClassifier
+	config                      *Config
+	sessionId                   string
+	componentFailureSignal      chan struct{}
+	shutdownBroadcast           chan struct{}
+	runWaitGroup                *sync.WaitGroup
+	establishedTunnels          chan *Tunnel
+	failedTunnels               chan *Tunnel
+	tunnelMutex                 sync.Mutex
+	establishedOnce             bool
+	tunnels                     []*Tunnel
+	nextTunnel                  int
+	startedConnectedReporter    bool
+	isEstablishing              bool
+	establishWaitGroup          *sync.WaitGroup
+	stopEstablishingBroadcast   chan struct{}
+	candidateServerEntries      chan *ServerEntry
+	establishPendingConns       *Conns
+	untunneledPendingConns      *Conns
+	untunneledDialConfig        *DialConfig
+	splitTunnelClassifier       *SplitTunnelClassifier
+	signalFetchRemoteServerList chan struct{}
 }
 
 // NewController initializes a new controller.
@@ -97,6 +98,9 @@ func NewController(config *Config) (controller *Controller, err error) {
 		establishPendingConns:    new(Conns),
 		untunneledPendingConns:   untunneledPendingConns,
 		untunneledDialConfig:     untunneledDialConfig,
+		// A buffer allows at least one signal to be sent even when the receiver is
+		// not listening. Senders should not block.
+		signalFetchRemoteServerList: make(chan struct{}, 1),
 	}
 
 	controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
@@ -180,35 +184,54 @@ func (controller *Controller) SignalComponentFailure() {
 }
 
 // remoteServerListFetcher fetches an out-of-band list of server entries
-// for more tunnel candidates. It fetches immediately, retries after failure
-// with a wait period, and refetches after success with a longer wait period.
+// for more tunnel candidates. It fetches when signalled, with retries
+// on failure.
 func (controller *Controller) remoteServerListFetcher() {
 	defer controller.runWaitGroup.Done()
 
-loop:
+	var lastFetchTime time.Time
+
+fetcherLoop:
 	for {
-		if !WaitForNetworkConnectivity(
-			controller.config.NetworkConnectivityChecker,
-			controller.shutdownBroadcast) {
-			break
+		// Wait for a signal before fetching
+		select {
+		case <-controller.signalFetchRemoteServerList:
+		case <-controller.shutdownBroadcast:
+			break fetcherLoop
 		}
 
-		err := FetchRemoteServerList(
-			controller.config, controller.untunneledDialConfig)
+		// Skip fetch entirely (i.e., send no request at all, even when ETag would save
+		// on response size) when a recent fetch was successful
+		if time.Now().Before(lastFetchTime.Add(FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)) {
+			continue
+		}
+
+	retryLoop:
+		for {
+			// Don't attempt to fetch while there is no network connectivity,
+			// to avoid alert notice noise.
+			if !WaitForNetworkConnectivity(
+				controller.config.NetworkConnectivityChecker,
+				controller.shutdownBroadcast) {
+				break fetcherLoop
+			}
+
+			err := FetchRemoteServerList(
+				controller.config, controller.untunneledDialConfig)
+
+			if err == nil {
+				lastFetchTime = time.Now()
+				break retryLoop
+			}
 
-		var duration time.Duration
-		if err != nil {
 			NoticeAlert("failed to fetch remote server list: %s", err)
-			duration = FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD
-		} else {
-			duration = FETCH_REMOTE_SERVER_LIST_STALE_PERIOD
-		}
-		timeout := time.After(duration)
-		select {
-		case <-timeout:
-			// Fetch again
-		case <-controller.shutdownBroadcast:
-			break loop
+
+			timeout := time.After(FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD)
+			select {
+			case <-timeout:
+			case <-controller.shutdownBroadcast:
+				break fetcherLoop
+			}
 		}
 	}
 
@@ -619,7 +642,9 @@ func (controller *Controller) establishCandidateGenerator() {
 loop:
 	// Repeat until stopped
 	for {
-		// Yield each server entry returned by the iterator
+
+		// Send each iterator server entry to the establish workers
+		startTime := time.Now()
 		for {
 			serverEntry, err := iterator.Next()
 			if err != nil {
@@ -642,12 +667,33 @@ loop:
 			case <-controller.shutdownBroadcast:
 				break loop
 			}
+
+			if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME_SECONDS)) {
+				// Start over, after a brief pause, with a new shuffle of the server
+				// entries, and potentially some newly fetched server entries.
+				break
+			}
+		}
+		// Free up resources now, but don't reset until after the pause.
+		iterator.Close()
+
+		// Trigger a fetch remote server list, since we may have failed to
+		// connect with all known servers. Don't block sending signal, since
+		// this signal may have already been sent.
+		// Don't wait for fetch remote to succeed, since it may fail and
+		// enter a retry loop and we're better off trying more known servers.
+		// TODO: synchronize the fetch response, so it can be incorporated
+		// into the server entry iterator as soon as available.
+		select {
+		case controller.signalFetchRemoteServerList <- *new(struct{}):
+		default:
 		}
-		iterator.Reset()
 
 		// After a complete iteration of candidate servers, pause before iterating again.
 		// This helps avoid some busy wait loop conditions, and also allows some time for
-		// network conditions to change.
+		// network conditions to change. Also allows for fetch remote to complete,
+		// in typical conditions (it isn't strictly necessary to wait for this, there will
+		// be more rounds if required).
 		timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD)
 		select {
 		case <-timeout:
@@ -657,6 +703,8 @@ loop:
 		case <-controller.shutdownBroadcast:
 			break loop
 		}
+
+		iterator.Reset()
 	}
 
 	NoticeInfo("stopped candidate generator")

+ 35 - 0
psiphon/dataStore.go

@@ -90,6 +90,9 @@ func InitDataStore(config *Config) (err error) {
             (region text not null primary key,
              etag text not null,
              data blob not null);
+        create table if not exists urlETags
+            (url text not null primary key,
+             etag text not null);
         create table if not exists keyValue
             (key text not null primary key,
              value text not null);
@@ -629,6 +632,38 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
 	return data, nil
 }
 
+// SetUrlETag stores an ETag for the specfied URL.
+// Note: input URL is treated as a string, and is not
+// encoded or decoded or otherwise canonicalized.
+func SetUrlETag(url, etag string) error {
+	return transactionWithRetry(func(transaction *sql.Tx) error {
+		_, err := transaction.Exec(`
+            insert or replace into urlETags (url, etag)
+            values (?, ?);
+            `, url, etag)
+		if err != nil {
+			// Note: ContextError() would break canRetry()
+			return err
+		}
+		return nil
+	})
+}
+
+// GetUrlETag retrieves a previously stored an ETag for the
+// specfied URL. If not found, it returns an empty string value.
+func GetUrlETag(url string) (etag string, err error) {
+	checkInitDataStore()
+	rows := singleton.db.QueryRow("select etag from urlETags where url = ?;", url)
+	err = rows.Scan(&etag)
+	if err == sql.ErrNoRows {
+		return "", nil
+	}
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return etag, nil
+}
+
 // SetKeyValue stores a key/value pair.
 func SetKeyValue(key, value string) error {
 	return transactionWithRetry(func(transaction *sql.Tx) error {

+ 41 - 4
psiphon/dataStore_alt.go

@@ -51,8 +51,9 @@ const (
 	serverEntriesBucket         = "serverEntries"
 	rankedServerEntriesBucket   = "rankedServerEntries"
 	rankedServerEntriesKey      = "rankedServerEntries"
-	splitTunnelRouteEtagsBucket = "splitTunnelRouteEtags"
+	splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
 	splitTunnelRouteDataBucket  = "splitTunnelRouteData"
+	urlETagsBucket              = "urlETags"
 	keyValueBucket              = "keyValues"
 	rankedServerEntryCount      = 100
 )
@@ -82,8 +83,9 @@ func InitDataStore(config *Config) (err error) {
 			requiredBuckets := []string{
 				serverEntriesBucket,
 				rankedServerEntriesBucket,
-				splitTunnelRouteEtagsBucket,
+				splitTunnelRouteETagsBucket,
 				splitTunnelRouteDataBucket,
+				urlETagsBucket,
 				keyValueBucket,
 			}
 			for _, bucket := range requiredBuckets {
@@ -601,7 +603,7 @@ func SetSplitTunnelRoutes(region, etag string, data []byte) error {
 	checkInitDataStore()
 
 	err := singleton.db.Update(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteEtagsBucket))
+		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
 		err := bucket.Put([]byte(region), []byte(etag))
 
 		bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
@@ -621,7 +623,7 @@ func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
 	checkInitDataStore()
 
 	err = singleton.db.View(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteEtagsBucket))
+		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
 		etag = string(bucket.Get([]byte(region)))
 		return nil
 	})
@@ -649,6 +651,41 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
 	return data, nil
 }
 
+// SetUrlETag stores an ETag for the specfied URL.
+// Note: input URL is treated as a string, and is not
+// encoded or decoded or otherwise canonicalized.
+func SetUrlETag(url, etag string) error {
+	checkInitDataStore()
+
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(urlETagsBucket))
+		err := bucket.Put([]byte(url), []byte(etag))
+		return err
+	})
+
+	if err != nil {
+		return ContextError(err)
+	}
+	return nil
+}
+
+// GetUrlETag retrieves a previously stored an ETag for the
+// specfied URL. If not found, it returns an empty string value.
+func GetUrlETag(url string) (etag string, err error) {
+	checkInitDataStore()
+
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte(urlETagsBucket))
+		etag = string(bucket.Get([]byte(url)))
+		return nil
+	})
+
+	if err != nil {
+		return "", ContextError(err)
+	}
+	return etag, nil
+}
+
 // SetKeyValue stores a key/value pair.
 func SetKeyValue(key, value string) error {
 	checkInitDataStore()

+ 23 - 1
psiphon/remoteServerList.go

@@ -47,7 +47,20 @@ func FetchRemoteServerList(config *Config, dialConfig *DialConfig) (err error) {
 		Transport: transport,
 	}
 
-	response, err := httpClient.Get(config.RemoteServerListUrl)
+	request, err := http.NewRequest("GET", config.RemoteServerListUrl, nil)
+	if err != nil {
+		return ContextError(err)
+	}
+
+	etag, err := GetUrlETag(config.RemoteServerListUrl)
+	if err != nil {
+		return ContextError(err)
+	}
+	if etag != "" {
+		request.Header.Add("If-None-Match", etag)
+	}
+
+	response, err := httpClient.Do(request)
 	if err != nil {
 		return ContextError(err)
 	}
@@ -74,5 +87,14 @@ func FetchRemoteServerList(config *Config, dialConfig *DialConfig) (err error) {
 		return ContextError(err)
 	}
 
+	etag = response.Header.Get("ETag")
+	if etag != "" {
+		err := SetUrlETag(config.RemoteServerListUrl, etag)
+		if err != nil {
+			NoticeAlert("failed to set remote server list etag: %s", ContextError(err))
+			// This fetch is still reported as a success, even if we can't store the etag
+		}
+	}
+
 	return nil
 }