فهرست منبع

Avoid spurious upstream proxy error notices

Rod Hynes 5 سال پیش
والد
کامیت
23f6c2780e

+ 3 - 1
psiphon/TCPConn.go

@@ -136,7 +136,9 @@ func proxiedTcpDial(
 	go func() {
 		conn, err := upstreamDialer("tcp", addr)
 		if _, ok := err.(*upstreamproxy.Error); ok {
-			NoticeUpstreamProxyError(err)
+			if config.UpstreamProxyErrorCallback != nil {
+				config.UpstreamProxyErrorCallback(err)
+			}
 		}
 		resultChannel <- upstreamDialResult{
 			conn: conn,

+ 2 - 0
psiphon/common/parameters/clientParameters.go

@@ -86,6 +86,7 @@ const (
 	StaggerConnectionWorkersPeriod                   = "StaggerConnectionWorkersPeriod"
 	StaggerConnectionWorkersJitter                   = "StaggerConnectionWorkersJitter"
 	LimitIntensiveConnectionWorkers                  = "LimitIntensiveConnectionWorkers"
+	UpstreamProxyErrorWaitDuration                   = "UpstreamProxyErrorWaitDuration"
 	IgnoreHandshakeStatsRegexps                      = "IgnoreHandshakeStatsRegexps"
 	PrioritizeTunnelProtocolsProbability             = "PrioritizeTunnelProtocolsProbability"
 	PrioritizeTunnelProtocols                        = "PrioritizeTunnelProtocols"
@@ -281,6 +282,7 @@ var defaultClientParameters = map[string]struct {
 	StaggerConnectionWorkersPeriod:           {value: time.Duration(0), minimum: time.Duration(0)},
 	StaggerConnectionWorkersJitter:           {value: 0.1, minimum: 0.0},
 	LimitIntensiveConnectionWorkers:          {value: 0, minimum: 0},
+	UpstreamProxyErrorWaitDuration:           {value: 30 * time.Second, minimum: time.Duration(0)},
 	IgnoreHandshakeStatsRegexps:              {value: false},
 	TunnelOperateShutdownTimeout:             {value: 1 * time.Second, minimum: 1 * time.Millisecond, flags: useNetworkLatencyMultiplier},
 	TunnelPortForwardDialTimeout:             {value: 10 * time.Second, minimum: 1 * time.Millisecond, flags: useNetworkLatencyMultiplier},

+ 50 - 0
psiphon/controller.go

@@ -1664,6 +1664,55 @@ loop:
 			continue
 		}
 
+		// upstreamProxyErrorCallback will post NoticeUpstreamProxyError when the
+		// tunnel dial fails due to an upstream proxy error. As the upstream proxy
+		// is user configured, the error message may need to be relayed to the user.
+
+		upstreamProxyErrorCallback := func(err error) {
+
+			// Do not post the notice when overall context is canceled or timed-out:
+			// the upstream proxy connection error is likely a result of the
+			// cancellation, and not a condition to be fixed by the user.
+			//
+			// Concurrency note: due to accessing controller.establishCtx,
+			// upstreamProxyErrorCallback must only be called while establishing;
+			// specifically, from the following ConnectTunnel call.
+
+			if controller.establishCtx.Err() != nil {
+				return
+			}
+
+			// Another class of non-fatal upstream proxy error arises from proxies
+			// which limit permitted proxied ports. In this case, some tunnels may fail
+			// due to dial port, while others may eventually succeed. To avoid this
+			// class of errors, delay posting the notice. If the upstream proxy works,
+			// _some_ tunnel should connect. If the upstream proxy configuration is
+			// broken, the error should persist and eventually get posted.
+
+			p := controller.config.GetClientParameters().Get()
+			workerPoolSize := p.Int(parameters.ConnectionWorkerPoolSize)
+			waitDuration := p.Duration(parameters.UpstreamProxyErrorWaitDuration)
+			p.Close()
+
+			controller.concurrentEstablishTunnelsMutex.Lock()
+			establishConnectTunnelCount := controller.establishConnectTunnelCount
+			controller.concurrentEstablishTunnelsMutex.Unlock()
+
+			// Delay until either UpstreamProxyErrorWaitDuration has elapsed (excluding
+			// time spent waiting for network connectivity) or, to post sooner if many
+			// candidates are failing, at least workerPoolSize tunnel connection
+			// attempts have completed. We infer that at least workerPoolSize
+			// candidates have completed by checking that at least 2*workerPoolSize
+			// candidates have started.
+
+			if time.Since(candidateServerEntry.adjustedEstablishStartTime) < waitDuration &&
+				establishConnectTunnelCount < 2*workerPoolSize {
+				return
+			}
+
+			NoticeUpstreamProxyError(err)
+		}
+
 		// Select the tunnel protocol. The selection will be made at random
 		// from protocols supported by the server entry, optionally limited by
 		// LimitTunnelProtocols.
@@ -1727,6 +1776,7 @@ loop:
 
 		dialParams, err := MakeDialParameters(
 			controller.config,
+			upstreamProxyErrorCallback,
 			canReplay,
 			selectProtocol,
 			candidateServerEntry.serverEntry,

+ 5 - 1
psiphon/dialParameters.go

@@ -145,6 +145,7 @@ type DialParameters struct {
 // when establishment is cancelled.
 func MakeDialParameters(
 	config *Config,
+	upstreamProxyErrorCallback func(error),
 	canReplay func(serverEntry *protocol.ServerEntry, replayProtocol string) bool,
 	selectProtocol func(serverEntry *protocol.ServerEntry) (string, bool),
 	serverEntry *protocol.ServerEntry,
@@ -262,8 +263,10 @@ func MakeDialParameters(
 		isReplay = false
 	}
 
-	// Set IsExchanged so that full dial parameters are stored and replayed upon success.
+	// Set IsExchanged such that full dial parameters are stored and replayed
+	// upon success.
 	dialParams.IsExchanged = false
+
 	dialParams.ServerEntry = serverEntry
 	dialParams.NetworkID = networkID
 	dialParams.IsReplay = isReplay
@@ -662,6 +665,7 @@ func MakeDialParameters(
 		IPv6Synthesizer:               config.IPv6Synthesizer,
 		TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
 		FragmentorConfig:              fragmentor.NewUpstreamConfig(p, dialParams.TunnelProtocol, dialParams.FragmentorSeed),
+		UpstreamProxyErrorCallback:    upstreamProxyErrorCallback,
 	}
 
 	// Unconditionally initialize MeekResolvedIPAddress, so a valid string can

+ 20 - 12
psiphon/dialParameters_test.go

@@ -108,7 +108,10 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	// Test: expected dial parameter fields set
 
-	dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	upstreamProxyErrorCallback := func(_ error) {}
+
+	dialParams, err := MakeDialParameters(
+		clientConfig, upstreamProxyErrorCallback, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -201,11 +204,16 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("missing API request fields")
 	}
 
+	dialConfig := dialParams.GetDialConfig()
+	if dialConfig.UpstreamProxyErrorCallback == nil {
+		t.Fatalf("missing upstreamProxyErrorCallback")
+	}
+
 	// Test: no replay after dial reported to fail
 
 	dialParams.Failed(clientConfig)
 
-	dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -220,7 +228,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	testNetworkID = prng.HexString(8)
 
-	dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -237,7 +245,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	dialParams.Succeeded()
 
-	replayDialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	replayDialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -323,7 +331,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetClientParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -338,7 +346,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	time.Sleep(1 * time.Second)
 
-	dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -353,7 +361,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	serverEntries[0].ConfigurationVersion += 1
 
-	dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -377,14 +385,14 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetClientParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
 
 	dialParams.Succeeded()
 
-	replayDialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	replayDialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -432,7 +440,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 		if i%10 == 0 {
 
-			dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntry, false, 0, 0)
+			dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
 			if err != nil {
 				t.Fatalf("MakeDialParameters failed: %s", err)
 			}
@@ -461,7 +469,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 				t.Fatalf("ServerEntryIterator.Next failed: %s", err)
 			}
 
-			dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntry, false, 0, 0)
+			dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
 			if err != nil {
 				t.Fatalf("MakeDialParameters failed: %s", err)
 			}
@@ -483,7 +491,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 				t.Fatalf("ServerEntryIterator.Next failed: %s", err)
 			}
 
-			dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntry, false, 0, 0)
+			dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
 			if err != nil {
 				t.Fatalf("MakeDialParameters failed: %s", err)
 			}

+ 1 - 0
psiphon/exchange_test.go

@@ -180,6 +180,7 @@ func TestServerEntryExchange(t *testing.T) {
 
 			dialParams, err := MakeDialParameters(
 				config,
+				nil,
 				canReplay,
 				selectProtocol,
 				serverEntry,

+ 5 - 0
psiphon/net.go

@@ -98,6 +98,11 @@ type DialConfig struct {
 	// FragmentorConfig specifies whether to layer a fragmentor.Conn on top
 	// of dialed TCP conns, and the fragmentation configuration to use.
 	FragmentorConfig *fragmentor.Config
+
+	// UpstreamProxyErrorCallback is called when a dial fails due to an upstream
+	// proxy error. As the upstream proxy is user configured, the error message
+	// may need to be relayed to the user.
+	UpstreamProxyErrorCallback func(error)
 }
 
 // NetworkConnectivityChecker defines the interface to the external

+ 1 - 0
psiphon/server/server_test.go

@@ -2199,6 +2199,7 @@ func storePruneServerEntriesTest(
 
 		dialParams, err := psiphon.MakeDialParameters(
 			clientConfig,
+			nil,
 			func(_ *protocol.ServerEntry, _ string) bool { return true },
 			func(serverEntry *protocol.ServerEntry) (string, bool) {
 				return runConfig.tunnelProtocol, true

+ 1 - 0
psiphon/server/sessionID_test.go

@@ -162,6 +162,7 @@ func TestDuplicateSessionID(t *testing.T) {
 
 		dialParams, err := psiphon.MakeDialParameters(
 			clientConfig,
+			nil,
 			func(_ *protocol.ServerEntry, _ string) bool { return false },
 			func(_ *protocol.ServerEntry) (string, bool) { return "OSSH", true },
 			serverEntry,

+ 7 - 0
psiphon/tactics.go

@@ -177,8 +177,15 @@ func fetchTactics(
 		return tacticsProtocols[index], true
 	}
 
+	// No upstreamProxyErrorCallback is set: for tunnel establishment, the
+	// tactics head start is short, and tunnel connections will eventually post
+	// NoticeUpstreamProxyError for any persistent upstream proxy error
+	// conditions. Non-tunnel establishment cases, such as SendFeedback, which
+	// use tactics are not currently expected to post NoticeUpstreamProxyError.
+
 	dialParams, err := MakeDialParameters(
 		config,
+		nil,
 		canReplay,
 		selectProtocol,
 		serverEntry,