Przeglądaj źródła

Implement server affinity scheme

* Make an extra effort to connect to the same server as the previous session
* Doing this benefits Psiphon stats calculations as well as user application behavior
* The scheme allows extra time for the server affinity candidate to successfully connect
  before before an established tunnel is activated
Rod Hynes 10 lat temu
rodzic
commit
9d9b889b97
3 zmienionych plików z 88 dodań i 11 usunięć
  1. 1 0
      psiphon/config.go
  2. 77 7
      psiphon/controller.go
  3. 10 4
      psiphon/tunnel.go

+ 1 - 0
psiphon/config.go

@@ -45,6 +45,7 @@ const (
 	ESTABLISH_TUNNEL_TIMEOUT_SECONDS               = 300
 	ESTABLISH_TUNNEL_WORK_TIME                     = 60 * time.Second
 	ESTABLISH_TUNNEL_PAUSE_PERIOD                  = 5 * time.Second
+	ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD  = 500 * time.Millisecond
 	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT               = 15 * time.Second
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST       = 50
 	FETCH_REMOTE_SERVER_LIST_TIMEOUT               = 30 * time.Second

+ 77 - 7
psiphon/controller.go

@@ -51,7 +51,7 @@ type Controller struct {
 	isEstablishing                 bool
 	establishWaitGroup             *sync.WaitGroup
 	stopEstablishingBroadcast      chan struct{}
-	candidateServerEntries         chan *ServerEntry
+	candidateServerEntries         chan *candidateServerEntry
 	establishPendingConns          *Conns
 	untunneledPendingConns         *Conns
 	untunneledDialConfig           *DialConfig
@@ -59,6 +59,12 @@ type Controller struct {
 	signalFetchRemoteServerList    chan struct{}
 	impairedProtocolClassification map[string]int
 	signalReportConnected          chan struct{}
+	serverAffinityDoneBroadcast    chan struct{}
+}
+
+type candidateServerEntry struct {
+	serverEntry               *ServerEntry
+	isServerAffinityCandidate bool
 }
 
 // NewController initializes a new controller.
@@ -615,6 +621,10 @@ func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
 	controller.tunnels = append(controller.tunnels, tunnel)
 	NoticeTunnels(len(controller.tunnels))
 
+	// Promote this successful tunnel to first rank so it's one
+	// of the first candidates next time establish runs.
+	PromoteServerEntry(tunnel.serverEntry.IpAddress)
+
 	return len(controller.tunnels), true
 }
 
@@ -758,9 +768,32 @@ func (controller *Controller) startEstablishing() {
 	controller.isEstablishing = true
 	controller.establishWaitGroup = new(sync.WaitGroup)
 	controller.stopEstablishingBroadcast = make(chan struct{})
-	controller.candidateServerEntries = make(chan *ServerEntry)
+	controller.candidateServerEntries = make(chan *candidateServerEntry)
 	controller.establishPendingConns.Reset()
 
+	// The server affinity mechanism attempts to favor the previously
+	// used server when reconnecting. This is useful for server-side
+	// session duration stats calculation and also beneficial for user
+	// applications which expect consistency in user IP address (for
+	// example, a web site which prompts for additional user
+	// authentication when the IP address changes).
+	//
+	// Only the very first server, as determined by
+	// datastore.PromoteServerEntry(), is the server affinity candidate.
+	// Concurrent connections attempts to many servers are launched
+	// without delay, in case the affinity server connection fails.
+	// While the affinity server connection is outstanding, when any
+	// other connection is established, there is a short grace period
+	// delay before delivering the established tunnel; this allows some
+	// time for the affinity server connection to succeed first.
+	// When the affinity server connection fails, any other established
+	// tunnel is registered without delay.
+	//
+	// Note: the establishTunnelWorker that receives the affinity
+	// candidate is solely resonsible for closing
+	// controller.serverAffinityDoneBroadcast.
+	controller.serverAffinityDoneBroadcast = make(chan struct{})
+
 	for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
 		controller.establishWaitGroup.Add(1)
 		go controller.establishTunnelWorker()
@@ -791,6 +824,7 @@ func (controller *Controller) stopEstablishing() {
 	controller.establishWaitGroup = nil
 	controller.stopEstablishingBroadcast = nil
 	controller.candidateServerEntries = nil
+	controller.serverAffinityDoneBroadcast = nil
 }
 
 // establishCandidateGenerator populates the candidate queue with server entries
@@ -808,6 +842,14 @@ func (controller *Controller) establishCandidateGenerator(impairedProtocols []st
 	}
 	defer iterator.Close()
 
+	isServerAffinityCandidate := true
+
+	// TODO: reconcile server affinity scheme with multi-tunnel mode
+	if controller.config.TunnelPoolSize > 1 {
+		isServerAffinityCandidate = false
+		close(controller.serverAffinityDoneBroadcast)
+	}
+
 loop:
 	// Repeat until stopped
 	for i := 0; ; i++ {
@@ -853,11 +895,16 @@ loop:
 				}
 			}
 
+			// Note: there must be only one server affinity candidate, as it
+			// closes the serverAffinityDoneBroadcast channel.
+			candidate := &candidateServerEntry{serverEntry, isServerAffinityCandidate}
+			isServerAffinityCandidate = false
+
 			// TODO: here we could generate multiple candidates from the
 			// server entry when there are many MeekFrontingAddresses.
 
 			select {
-			case controller.candidateServerEntries <- serverEntry:
+			case controller.candidateServerEntries <- candidate:
 			case <-controller.stopEstablishingBroadcast:
 				break loop
 			case <-controller.shutdownBroadcast:
@@ -911,7 +958,7 @@ loop:
 func (controller *Controller) establishTunnelWorker() {
 	defer controller.establishWaitGroup.Done()
 loop:
-	for serverEntry := range controller.candidateServerEntries {
+	for candidateServerEntry := range controller.candidateServerEntries {
 		// Note: don't receive from candidateServerEntries and stopEstablishingBroadcast
 		// in the same select, since we want to prioritize receiving the stop signal
 		if controller.isStopEstablishingBroadcast() {
@@ -919,7 +966,7 @@ loop:
 		}
 
 		// There may already be a tunnel to this candidate. If so, skip it.
-		if controller.isActiveTunnelServerEntry(serverEntry) {
+		if controller.isActiveTunnelServerEntry(candidateServerEntry.serverEntry) {
 			continue
 		}
 
@@ -928,18 +975,35 @@ loop:
 			controller.untunneledDialConfig,
 			controller.sessionId,
 			controller.establishPendingConns,
-			serverEntry,
+			candidateServerEntry.serverEntry,
 			controller) // TunnelOwner
 		if err != nil {
+
+			// Unblock other candidates immediately when
+			// server affinity candidate fails.
+			if candidateServerEntry.isServerAffinityCandidate {
+				close(controller.serverAffinityDoneBroadcast)
+			}
+
 			// Before emitting error, check if establish interrupted, in which
 			// case the error is noise.
 			if controller.isStopEstablishingBroadcast() {
 				break loop
 			}
-			NoticeInfo("failed to connect to %s: %s", serverEntry.IpAddress, err)
+			NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
 			continue
 		}
 
+		// Block for server affinity grace period before delivering.
+		if !candidateServerEntry.isServerAffinityCandidate {
+			timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD)
+			select {
+			case <-timer.C:
+			case <-controller.serverAffinityDoneBroadcast:
+			case <-controller.stopEstablishingBroadcast:
+			}
+		}
+
 		// Deliver established tunnel.
 		// Don't block. Assumes the receiver has a buffer large enough for
 		// the number of desired tunnels. If there's no room, the tunnel must
@@ -949,6 +1013,12 @@ loop:
 		default:
 			controller.discardTunnel(tunnel)
 		}
+
+		// Unblock other candidates only after delivering when
+		// server affinity candidate succeeds.
+		if candidateServerEntry.isServerAffinityCandidate {
+			close(controller.serverAffinityDoneBroadcast)
+		}
 	}
 	NoticeInfo("stopped establish worker")
 }

+ 10 - 4
psiphon/tunnel.go

@@ -153,10 +153,6 @@ func EstablishTunnel(
 	// Now that network operations are complete, cancel interruptibility
 	pendingConns.Remove(conn)
 
-	// Promote this successful tunnel to first rank so it's one
-	// of the first candidates next time establish runs.
-	PromoteServerEntry(tunnel.serverEntry.IpAddress)
-
 	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
 	tunnel.operateWaitGroup.Add(1)
 	go tunnel.operateTunnel(tunnelOwner)
@@ -773,6 +769,11 @@ func sendStats(tunnel *Tunnel, isConnected bool) bool {
 		return true
 	}
 
+	// TODO: reconcile session duration scheme with multi-tunnel mode
+	if tunnel.config.TunnelPoolSize > 1 && !isConnected {
+		return true
+	}
+
 	payload := transferstats.GetForServer(tunnel.serverEntry.IpAddress)
 	err := tunnel.session.DoStatusRequest(payload, isConnected)
 	if err != nil {
@@ -798,6 +799,11 @@ func sendUntunneledStats(tunnel *Tunnel, isShutdown bool) {
 		return
 	}
 
+	// TODO: reconcile session duration scheme with multi-tunnel mode
+	if tunnel.config.TunnelPoolSize > 1 {
+		return
+	}
+
 	payload := transferstats.GetForServer(tunnel.serverEntry.IpAddress)
 	err := TryUntunneledStatusRequest(tunnel, payload, isShutdown)
 	if err != nil {