Просмотр исходного кода

Changes to server affinity

- Skip server affinity phase when switching from a specific
  egress region to any region, since the affinity server from
  the specific region may be too distant.

- Skip server affinity when no server has previously been
  promoted, since, in this case, any existing server will be
  an arbitrary candidate.
Rod Hynes 8 лет назад
Родитель
Сommit
b1a93d8d43
2 измененных файлов с 97 добавлено и 18 удалено
  1. 7 4
      psiphon/controller.go
  2. 90 14
      psiphon/dataStore.go

+ 7 - 4
psiphon/controller.go

@@ -911,7 +911,7 @@ func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
 	// Connecting to a TargetServerEntry does not change the
 	// ranking.
 	if controller.config.TargetServerEntry == "" {
-		PromoteServerEntry(tunnel.serverEntry.IpAddress)
+		PromoteServerEntry(controller.config, tunnel.serverEntry.IpAddress)
 	}
 
 	return true
@@ -1191,7 +1191,7 @@ func (controller *Controller) establishCandidateGenerator(impairedProtocols []st
 	establishStartTime := monotime.Now()
 	var networkWaitDuration time.Duration
 
-	iterator, err := NewServerEntryIterator(controller.config)
+	applyServerAffinity, iterator, err := NewServerEntryIterator(controller.config)
 	if err != nil {
 		NoticeAlert("failed to iterate over candidates: %s", err)
 		controller.SignalComponentFailure()
@@ -1199,10 +1199,13 @@ func (controller *Controller) establishCandidateGenerator(impairedProtocols []st
 	}
 	defer iterator.Close()
 
-	isServerAffinityCandidate := true
-
 	// TODO: reconcile server affinity scheme with multi-tunnel mode
 	if controller.config.TunnelPoolSize > 1 {
+		applyServerAffinity = false
+	}
+
+	isServerAffinityCandidate := true
+	if !applyServerAffinity {
 		isServerAffinityCandidate = false
 		close(controller.serverAffinityDoneBroadcast)
 	}

+ 90 - 14
psiphon/dataStore.go

@@ -65,6 +65,7 @@ const (
 
 const (
 	DATA_STORE_LAST_CONNECTED_KEY           = "lastConnected"
+	DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY = "lastServerEntryFilter"
 	PERSISTENT_STAT_TYPE_TUNNEL             = tunnelStatsBucket
 	PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
 )
@@ -307,7 +308,7 @@ func StreamingStoreServerEntries(
 // max rank) to the specified server entry. Server candidates are
 // iterated in decending rank order, so this server entry will be
 // the first candidate in a subsequent tunnel establishment.
-func PromoteServerEntry(ipAddress string) error {
+func PromoteServerEntry(config *Config, ipAddress string) error {
 	checkInitDataStore()
 
 	err := singleton.db.Update(func(tx *bolt.Tx) error {
@@ -323,7 +324,22 @@ func PromoteServerEntry(ipAddress string) error {
 			return nil
 		}
 
-		return insertRankedServerEntry(tx, ipAddress, 0)
+		err := insertRankedServerEntry(tx, ipAddress, 0)
+		if err != nil {
+			return err
+		}
+
+		// Store the current server entry filter (e.g, region, etc.) that
+		// was in use when the entry was promoted. This is used to detect
+		// when the top ranked server entry was promoted under a different
+		// filter.
+
+		currentFilter, err := makeServerEntryFilterValue(config)
+		if err != nil {
+			return err
+		}
+		bucket = tx.Bucket([]byte(keyValueBucket))
+		return bucket.Put([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY), currentFilter)
 	})
 
 	if err != nil {
@@ -332,6 +348,47 @@ func PromoteServerEntry(ipAddress string) error {
 	return nil
 }
 
+func makeServerEntryFilterValue(config *Config) ([]byte, error) {
+
+	filter, err := json.Marshal(
+		struct {
+			Region   string
+			Protocol string
+		}{config.EgressRegion, config.TunnelProtocol})
+	if err != nil {
+		return nil, common.ContextError(err)
+	}
+
+	return filter, nil
+}
+
+func hasServerEntryFilterChanged(config *Config) (bool, error) {
+
+	currentFilter, err := makeServerEntryFilterValue(config)
+	if err != nil {
+		return false, common.ContextError(err)
+	}
+
+	changed := false
+	err = singleton.db.View(func(tx *bolt.Tx) error {
+
+		// previousFilter will be nil not found (not previously
+		// set) which will never match any current filter.
+
+		bucket := tx.Bucket([]byte(keyValueBucket))
+		previousFilter := bucket.Get([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY))
+		if bytes.Compare(previousFilter, currentFilter) == 0 {
+			changed = true
+		}
+		return nil
+	})
+	if err != nil {
+		return false, common.ContextError(err)
+	}
+
+	return changed, nil
+}
+
 func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
 	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
 	data := bucket.Get([]byte(rankedServerEntriesKey))
@@ -420,8 +477,19 @@ type ServerEntryIterator struct {
 	targetServerEntry           *protocol.ServerEntry
 }
 
-// NewServerEntryIterator creates a new ServerEntryIterator
-func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
+// NewServerEntryIterator creates a new ServerEntryIterator.
+//
+// The boolean return value indicates whether to treat the first server(s)
+// as affinity servers or not. When the server entry selection filter changes
+// such as from a specific region to any region, or when there was no previous
+// filter/iterator, the the first server(s) are arbitrary and should not be
+// given affinity treatment.
+//
+// NewServerEntryIterator and any returned ServerEntryIterator are not
+// designed for concurrent use as not all related datastore operations are
+// performed in a single transaction.
+//
+func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
 
 	// When configured, this target server entry is the only candidate
 	if config.TargetServerEntry != "" {
@@ -429,43 +497,51 @@ func NewServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err
 	}
 
 	checkInitDataStore()
-	iterator = &ServerEntryIterator{
+
+	applyServerAffinity, err := hasServerEntryFilterChanged(config)
+	if err != nil {
+		return false, nil, common.ContextError(err)
+	}
+
+	iterator := &ServerEntryIterator{
 		region:                      config.EgressRegion,
 		protocol:                    config.TunnelProtocol,
 		shuffleHeadLength:           config.TunnelPoolSize,
 		isTargetServerEntryIterator: false,
 	}
+
 	err = iterator.Reset()
 	if err != nil {
-		return nil, err
+		return false, nil, common.ContextError(err)
 	}
-	return iterator, nil
+
+	return applyServerAffinity, iterator, nil
 }
 
 // newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
-func newTargetServerEntryIterator(config *Config) (iterator *ServerEntryIterator, err error) {
+func newTargetServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error) {
 	serverEntry, err := protocol.DecodeServerEntry(
 		config.TargetServerEntry, common.GetCurrentTimestamp(), protocol.SERVER_ENTRY_SOURCE_TARGET)
 	if err != nil {
-		return nil, err
+		return false, nil, common.ContextError(err)
 	}
 	if config.EgressRegion != "" && serverEntry.Region != config.EgressRegion {
-		return nil, errors.New("TargetServerEntry does not support EgressRegion")
+		return false, nil, common.ContextError(errors.New("TargetServerEntry does not support EgressRegion"))
 	}
 	if config.TunnelProtocol != "" {
 		// Note: same capability/protocol mapping as in StoreServerEntry
 		requiredCapability := strings.TrimSuffix(config.TunnelProtocol, "-OSSH")
 		if !common.Contains(serverEntry.Capabilities, requiredCapability) {
-			return nil, errors.New("TargetServerEntry does not support TunnelProtocol")
+			return false, nil, common.ContextError(errors.New("TargetServerEntry does not support TunnelProtocol"))
 		}
 	}
-	iterator = &ServerEntryIterator{
+	iterator := &ServerEntryIterator{
 		isTargetServerEntryIterator: true,
 		hasNextTargetServerEntry:    true,
 		targetServerEntry:           serverEntry,
 	}
 	NoticeInfo("using TargetServerEntry: %s", serverEntry.IpAddress)
-	return iterator, nil
+	return false, iterator, nil
 }
 
 // Reset a NewServerEntryIterator to the start of its cycle. The next
@@ -946,7 +1022,7 @@ func CountUnreportedPersistentStats() int {
 
 	unreported := 0
 
-	err := singleton.db.Update(func(tx *bolt.Tx) error {
+	err := singleton.db.View(func(tx *bolt.Tx) error {
 
 		for _, statType := range persistentStatTypes {