Browse Source

Synchronize selectProtocol logic with count of ConnectTunnels

- Fixes: InitialLimitTunnelProtocolsCandidateCount
  wasn't satisfied as candidates skipped due to
  failed selectProtocol were counted as progress

- Add limitProtocols test
Rod Hynes 7 years ago
parent
commit
ce7f610a98
2 changed files with 273 additions and 66 deletions
  1. 60 66
      psiphon/controller.go
  2. 213 0
      psiphon/limitProtocols_test.go

+ 60 - 66
psiphon/controller.go

@@ -59,6 +59,7 @@ type Controller struct {
 	isEstablishing                          bool
 	establishLimitTunnelProtocolsState      *limitTunnelProtocolsState
 	concurrentEstablishTunnelsMutex         sync.Mutex
+	establishConnectTunnelCount             int
 	concurrentEstablishTunnels              int
 	concurrentIntensiveEstablishTunnels     int
 	peakConcurrentEstablishTunnels          int
@@ -985,14 +986,14 @@ func (l *limitTunnelProtocolsState) isCandidate(
 		len(serverEntry.GetSupportedProtocols(l.useUpstreamProxy, l.protocols, excludeIntensive)) > 0
 }
 
-var errNoProtocolSupported = errors.New("server does not support any required protocol(s)")
+var errNoProtocolSupported = errors.New("server does not support any required protocol")
 
 func (l *limitTunnelProtocolsState) selectProtocol(
-	candidateIndex int, excludeIntensive bool, serverEntry *protocol.ServerEntry) (string, error) {
+	connectTunnelCount int, excludeIntensive bool, serverEntry *protocol.ServerEntry) (string, error) {
 
 	limitProtocols := l.protocols
 
-	if len(l.initialProtocols) > 0 && l.initialCandidateCount > candidateIndex {
+	if len(l.initialProtocols) > 0 && l.initialCandidateCount > connectTunnelCount {
 		limitProtocols = l.initialProtocols
 	}
 
@@ -1024,7 +1025,6 @@ func (l *limitTunnelProtocolsState) selectProtocol(
 type candidateServerEntry struct {
 	serverEntry                *protocol.ServerEntry
 	isServerAffinityCandidate  bool
-	candidateIndex             int
 	adjustedEstablishStartTime monotime.Time
 }
 
@@ -1038,6 +1038,7 @@ func (controller *Controller) startEstablishing() {
 	NoticeInfo("start establishing")
 
 	controller.concurrentEstablishTunnelsMutex.Lock()
+	controller.establishConnectTunnelCount = 0
 	controller.concurrentEstablishTunnels = 0
 	controller.concurrentIntensiveEstablishTunnels = 0
 	controller.peakConcurrentEstablishTunnels = 0
@@ -1200,6 +1201,7 @@ func (controller *Controller) stopEstablishing() {
 	controller.concurrentEstablishTunnelsMutex.Lock()
 	peakConcurrent := controller.peakConcurrentEstablishTunnels
 	peakConcurrentIntensive := controller.peakConcurrentIntensiveEstablishTunnels
+	controller.establishConnectTunnelCount = 0
 	controller.concurrentEstablishTunnels = 0
 	controller.concurrentIntensiveEstablishTunnels = 0
 	controller.peakConcurrentEstablishTunnels = 0
@@ -1446,8 +1448,6 @@ func (controller *Controller) establishCandidateGenerator() {
 		close(controller.serverAffinityDoneBroadcast)
 	}
 
-	candidateIndex := 0
-
 loop:
 	// Repeat until stopped
 	for {
@@ -1505,7 +1505,6 @@ loop:
 			candidate := &candidateServerEntry{
 				serverEntry:                serverEntry,
 				isServerAffinityCandidate:  isServerAffinityCandidate,
-				candidateIndex:             candidateIndex,
 				adjustedEstablishStartTime: adjustedEstablishStartTime,
 			}
 
@@ -1518,8 +1517,6 @@ loop:
 			// TODO: here we could generate multiple candidates from the
 			// server entry when there are many MeekFrontingAddresses.
 
-			candidateIndex++
-
 			select {
 			case controller.candidateServerEntries <- candidate:
 			case <-controller.establishCtx.Done():
@@ -1655,10 +1652,6 @@ loop:
 			continue
 		}
 
-		// ConnectTunnel will allocate significant memory, so first attempt to
-		// reclaim as much as possible.
-		defaultGarbageCollection()
-
 		// Select the tunnel protocol. The selection will be made at random from
 		// protocols supported by the server entry, optionally limited by
 		// LimitTunnelProtocols.
@@ -1667,36 +1660,41 @@ loop:
 		// workers, and at the limit, do not select resource intensive
 		// protocols since otherwise the candidate must be skipped.
 		//
-		// If at the limit and unabled to select a non-meek protocol, skip the
+		// If at the limit and unabled to select a non-intensive protocol, skip the
 		// candidate entirely and move on to the next. Since candidates are shuffled
-		// it's probable that the next candidate is not meek. In this case, a
+		// it's likely that the next candidate is not intensive. In this case, a
 		// StaggerConnectionWorkersMilliseconds delay may still be incurred.
 
 		limitIntensiveConnectionWorkers := controller.config.clientParameters.Get().Int(
 			parameters.LimitIntensiveConnectionWorkers)
 
-		excludeIntensive := false
 		controller.concurrentEstablishTunnelsMutex.Lock()
+
+		excludeIntensive := false
 		if limitIntensiveConnectionWorkers > 0 &&
-			controller.concurrentIntensiveEstablishTunnels >=
-				limitIntensiveConnectionWorkers {
+			controller.concurrentIntensiveEstablishTunnels >= limitIntensiveConnectionWorkers {
 			excludeIntensive = true
 		}
-		controller.concurrentEstablishTunnelsMutex.Unlock()
 
 		selectedProtocol, err := controller.establishLimitTunnelProtocolsState.selectProtocol(
-			candidateServerEntry.candidateIndex,
+			controller.establishConnectTunnelCount,
 			excludeIntensive,
 			candidateServerEntry.serverEntry)
+		if err != nil {
+
+			controller.concurrentEstablishTunnelsMutex.Unlock()
 
-		if err == errNoProtocolSupported {
 			// selectProtocol returns errNoProtocolSupported when the server
 			// does not support any protocol that remains after applying the
-			// LimitTunnelProtocols parameter and the excludeMeek flag.
-			// Skip this candidate.
+			// LimitTunnelProtocols parameter and the excludeIntensive flag.
+			// Silently skip the candidate in this case.
+			if err != errNoProtocolSupported {
+				NoticeInfo("failed to select protocol for %s: %s",
+					candidateServerEntry.serverEntry.IpAddress, err)
+			}
 
-			// Unblock other candidates immediately when
-			// server affinity candidate is skipped.
+			// Unblock other candidates immediately when server affinity
+			// candidate is skipped.
 			if candidateServerEntry.isServerAffinityCandidate {
 				close(controller.serverAffinityDoneBroadcast)
 			}
@@ -1704,50 +1702,44 @@ loop:
 			continue
 		}
 
-		var tunnel *Tunnel
-		if err == nil {
+		// Increment establishConnectTunnelCount only after selectProtocol has
+		// succeeded to ensure InitialLimitTunnelProtocolsCandidateCount
+		// candidates use InitialLimitTunnelProtocols.
+		controller.establishConnectTunnelCount += 1
+
+		isIntensive := protocol.TunnelProtocolIsResourceIntensive(selectedProtocol)
 
-			isIntensive := protocol.TunnelProtocolIsResourceIntensive(selectedProtocol)
+		if isIntensive {
+			controller.concurrentIntensiveEstablishTunnels += 1
+			if controller.concurrentIntensiveEstablishTunnels > controller.peakConcurrentIntensiveEstablishTunnels {
+				controller.peakConcurrentIntensiveEstablishTunnels = controller.concurrentIntensiveEstablishTunnels
+			}
+		}
+		controller.concurrentEstablishTunnels += 1
+		if controller.concurrentEstablishTunnels > controller.peakConcurrentEstablishTunnels {
+			controller.peakConcurrentEstablishTunnels = controller.concurrentEstablishTunnels
+		}
 
-			controller.concurrentEstablishTunnelsMutex.Lock()
-			if isIntensive {
+		controller.concurrentEstablishTunnelsMutex.Unlock()
 
-				// Recheck the limit now that we know we're selecting the resource
-				// intensive protocol and adjusting concurrentIntensiveEstablishTunnels.
-				if limitIntensiveConnectionWorkers > 0 &&
-					controller.concurrentIntensiveEstablishTunnels >=
-						limitIntensiveConnectionWorkers {
+		// ConnectTunnel will allocate significant memory, so first attempt to
+		// reclaim as much as possible.
+		defaultGarbageCollection()
 
-					// Skip this candidate.
-					controller.concurrentEstablishTunnelsMutex.Unlock()
-					continue
-				}
-				controller.concurrentIntensiveEstablishTunnels += 1
-				if controller.concurrentIntensiveEstablishTunnels > controller.peakConcurrentIntensiveEstablishTunnels {
-					controller.peakConcurrentIntensiveEstablishTunnels = controller.concurrentIntensiveEstablishTunnels
-				}
-			}
-			controller.concurrentEstablishTunnels += 1
-			if controller.concurrentEstablishTunnels > controller.peakConcurrentEstablishTunnels {
-				controller.peakConcurrentEstablishTunnels = controller.concurrentEstablishTunnels
-			}
-			controller.concurrentEstablishTunnelsMutex.Unlock()
+		tunnel, err := ConnectTunnel(
+			controller.establishCtx,
+			controller.config,
+			controller.sessionId,
+			candidateServerEntry.serverEntry,
+			selectedProtocol,
+			candidateServerEntry.adjustedEstablishStartTime)
 
-			tunnel, err = ConnectTunnel(
-				controller.establishCtx,
-				controller.config,
-				controller.sessionId,
-				candidateServerEntry.serverEntry,
-				selectedProtocol,
-				candidateServerEntry.adjustedEstablishStartTime)
-
-			controller.concurrentEstablishTunnelsMutex.Lock()
-			if isIntensive {
-				controller.concurrentIntensiveEstablishTunnels -= 1
-			}
-			controller.concurrentEstablishTunnels -= 1
-			controller.concurrentEstablishTunnelsMutex.Unlock()
+		controller.concurrentEstablishTunnelsMutex.Lock()
+		if isIntensive {
+			controller.concurrentIntensiveEstablishTunnels -= 1
 		}
+		controller.concurrentEstablishTunnels -= 1
+		controller.concurrentEstablishTunnelsMutex.Unlock()
 
 		// Periodically emit memory metrics during the establishment cycle.
 		if !controller.isStopEstablishing() {
@@ -1766,8 +1758,8 @@ loop:
 
 		if err != nil {
 
-			// Unblock other candidates immediately when
-			// server affinity candidate fails.
+			// Unblock other candidates immediately when server affinity
+			// candidate fails.
 			if candidateServerEntry.isServerAffinityCandidate {
 				close(controller.serverAffinityDoneBroadcast)
 			}
@@ -1778,7 +1770,9 @@ loop:
 				break loop
 			}
 
-			NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
+			NoticeInfo("failed to connect to %s: %s",
+				candidateServerEntry.serverEntry.IpAddress, err)
+
 			continue
 		}
 

+ 213 - 0
psiphon/limitProtocols_test.go

@@ -0,0 +1,213 @@
+/*
+ * Copyright (c) 2018, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"context"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server"
+)
+
+func TestLimitTunnelProtocols(t *testing.T) {
+
+	initialLimitTunnelProtocols := protocol.TunnelProtocols{"OSSH", "UNFRONTED-MEEK-HTTPS-OSSH"}
+	initialLimitTunnelProtocolsCandidateCount := 100
+	limitTunnelProtocols := protocol.TunnelProtocols{"SSH", "UNFRONTED-MEEK-OSSH"}
+
+	initialConnectingCount := 0
+	connectingCount := 0
+
+	SetNoticeWriter(NewNoticeReceiver(
+		func(notice []byte) {
+			noticeType, payload, err := GetNotice(notice)
+			if err != nil {
+				return
+			}
+
+			if noticeType == "ConnectingServer" {
+
+				connectingCount += 1
+
+				protocolField, _ := payload["protocol"]
+				protocol := protocolField.(string)
+
+				if common.Contains(initialLimitTunnelProtocols, protocol) {
+					initialConnectingCount += 1
+				}
+
+				if common.Contains(limitTunnelProtocols, protocol) {
+					connectingCount += 1
+				}
+
+				// At the end of the InitialLimit phase, the order of
+				// ConnectingServer notices isn't strictly synchronized and
+				// it's possible for a Limit candidate ConnectingServer notice
+				// to arrive before the last InitialLimit notice. So strict
+				// checking of notice order is performed only up to 90% of
+				// InitialLimitTunnelProtocolsCandidateCount.
+
+				if initialConnectingCount <= (initialLimitTunnelProtocolsCandidateCount*9)/10 {
+
+					var expectedProtocols []string
+					if connectingCount <= initialLimitTunnelProtocolsCandidateCount {
+						expectedProtocols = initialLimitTunnelProtocols
+					} else {
+						expectedProtocols = limitTunnelProtocols
+					}
+
+					if !common.Contains(expectedProtocols, protocol) {
+						t.Fatalf("unexpected protocol: %s (%d %+v)", protocol, connectingCount, expectedProtocols)
+					}
+				}
+			}
+		}))
+
+	testDataDirName, err := ioutil.TempDir("", "psiphon-limit-tunnel-protocols-test")
+	if err != nil {
+		t.Fatalf("TempDir failed: %s", err)
+	}
+	defer os.RemoveAll(testDataDirName)
+
+	os.Remove(filepath.Join(testDataDirName, DATA_STORE_FILENAME))
+
+	clientConfigJSON := `
+    {
+        "ClientPlatform" : "Windows",
+        "ClientVersion" : "0",
+        "SponsorId" : "0",
+        "PropagationChannelId" : "0",
+        "DisableRemoteServerListFetcher" : true
+    }`
+	clientConfig, err := LoadConfig([]byte(clientConfigJSON))
+	if err != nil {
+		t.Fatalf("error processing configuration file: %s", err)
+	}
+
+	clientConfig.DataStoreDirectory = testDataDirName
+
+	err = clientConfig.Commit()
+	if err != nil {
+		t.Fatalf("error committing configuration file: %s", err)
+	}
+
+	applyParameters := make(map[string]interface{})
+
+	applyParameters[parameters.ConnectionWorkerPoolSize] = initialLimitTunnelProtocolsCandidateCount / 2
+	applyParameters[parameters.LimitIntensiveConnectionWorkers] = initialLimitTunnelProtocolsCandidateCount / 4
+	applyParameters[parameters.TunnelConnectTimeout] = "1s"
+	applyParameters[parameters.EstablishTunnelPausePeriod] = "1s"
+	applyParameters[parameters.InitialLimitTunnelProtocols] = initialLimitTunnelProtocols
+	applyParameters[parameters.InitialLimitTunnelProtocolsCandidateCount] = initialLimitTunnelProtocolsCandidateCount
+	applyParameters[parameters.LimitTunnelProtocols] = limitTunnelProtocols
+
+	err = clientConfig.SetClientParameters("", true, applyParameters)
+	if err != nil {
+		t.Fatalf("error setting client parameters: %s", err)
+	}
+
+	err = OpenDataStore(clientConfig)
+	if err != nil {
+		t.Fatalf("error initializing client datastore: %s", err)
+	}
+	defer CloseDataStore()
+
+	if CountServerEntries() > 0 {
+		t.Fatalf("unexpected server entries")
+	}
+
+	serverEntries := make([]map[string]interface{}, len(protocol.SupportedTunnelProtocols))
+
+	for i, tunnelProtocol := range protocol.SupportedTunnelProtocols {
+
+		_, _, _, _, encodedServerEntry, err := server.GenerateConfig(
+			&server.GenerateConfigParams{
+				ServerIPAddress:      fmt.Sprintf("0.1.0.0"),
+				EnableSSHAPIRequests: true,
+				WebServerPort:        8000,
+				TunnelProtocolPorts:  map[string]int{tunnelProtocol: 4000},
+			})
+		if err != nil {
+			t.Fatalf("error generating server config: %s", err)
+		}
+
+		serverEntryFields, err := protocol.DecodeServerEntryFields(
+			string(encodedServerEntry),
+			common.GetCurrentTimestamp(),
+			protocol.SERVER_ENTRY_SOURCE_REMOTE)
+		if err != nil {
+			t.Fatalf("error decoding server entry: %s", err)
+		}
+
+		serverEntries[i] = serverEntryFields
+	}
+
+	for i := 0; i < 1000; i++ {
+
+		serverEntryFields := serverEntries[i%len(protocol.SupportedTunnelProtocols)]
+
+		serverEntryFields["ipAddress"] = fmt.Sprintf("0.1.%d.%d", (i>>8)&0xFF, i&0xFF)
+
+		err = StoreServerEntry(serverEntryFields, true)
+		if err != nil {
+			t.Fatalf("error storing server entry: %s", err)
+		}
+	}
+
+	controller, err := NewController(clientConfig)
+	if err != nil {
+		t.Fatalf("error creating client controller: %s", err)
+	}
+
+	ctx, cancelFunc := context.WithCancel(context.Background())
+
+	controllerWaitGroup := new(sync.WaitGroup)
+
+	controllerWaitGroup.Add(1)
+	go func() {
+		defer controllerWaitGroup.Done()
+		controller.Run(ctx)
+	}()
+
+	time.Sleep(10 * time.Second)
+
+	cancelFunc()
+
+	controllerWaitGroup.Wait()
+
+	t.Logf("initial-connecting and connecting count: %d/%d", initialConnectingCount, connectingCount)
+
+	if initialConnectingCount != initialLimitTunnelProtocolsCandidateCount {
+		t.Fatalf("unexpected initial-connecting count")
+	}
+
+	if connectingCount < 3*initialLimitTunnelProtocolsCandidateCount {
+		t.Fatalf("unexpected connecting count")
+	}
+}