Просмотр исходного кода

Additional fragmentor metrics tests and fixes

Rod Hynes 7 лет назад
Родитель
Сommit
ecbc59fb42

+ 1 - 1
psiphon/TCPConn.go

@@ -82,7 +82,7 @@ func DialTCP(
 	}
 
 	if config.FragmentorConfig.IsFragmenting() {
-		fragmentor.NewConn(
+		conn = fragmentor.NewConn(
 			config.FragmentorConfig,
 			func(message string) { NoticeInfo(message) },
 			conn)

+ 1 - 1
psiphon/common/fragmentor/fragmentor.go

@@ -89,7 +89,7 @@ func newConfig(
 	coinFlip := p.WeightedCoinFlip(probability)
 	tunnelProtocols := p.TunnelProtocols(limitProtocols)
 
-	if !coinFlip || (len(tunnelProtocols) > 0 && common.Contains(tunnelProtocols, tunnelProtocol)) {
+	if !coinFlip || (len(tunnelProtocols) > 0 && !common.Contains(tunnelProtocols, tunnelProtocol)) {
 		return nil
 	}
 

+ 64 - 7
psiphon/server/server_test.go

@@ -129,6 +129,7 @@ func TestSSH(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -144,6 +145,23 @@ func TestOSSH(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
+		})
+}
+
+func TestFragmentedOSSH(t *testing.T) {
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:       "OSSH",
+			enableSSHAPIRequests: true,
+			doHotReload:          false,
+			doDefaultSponsorID:   false,
+			denyTrafficRules:     false,
+			requireAuthorization: true,
+			omitAuthorization:    false,
+			doTunneledWebRequest: true,
+			doTunneledNTPRequest: true,
+			forceFragmenting:     true,
 		})
 }
 
@@ -159,6 +177,7 @@ func TestUnfrontedMeek(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -175,6 +194,7 @@ func TestUnfrontedMeekHTTPS(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -191,6 +211,7 @@ func TestUnfrontedMeekHTTPSTLS13(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -207,6 +228,7 @@ func TestUnfrontedMeekSessionTicket(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -223,6 +245,7 @@ func TestUnfrontedMeekSessionTicketTLS13(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -238,6 +261,7 @@ func TestQUICOSSH(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -256,6 +280,7 @@ func TestMarionetteOSSH(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -271,6 +296,7 @@ func TestWebTransportAPIRequests(t *testing.T) {
 			omitAuthorization:    true,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -286,6 +312,7 @@ func TestHotReload(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -301,6 +328,7 @@ func TestDefaultSessionID(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -316,6 +344,7 @@ func TestDenyTrafficRules(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -331,6 +360,7 @@ func TestOmitAuthorization(t *testing.T) {
 			omitAuthorization:    true,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -346,6 +376,7 @@ func TestNoAuthorization(t *testing.T) {
 			omitAuthorization:    true,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -361,6 +392,7 @@ func TestUnusedAuthorization(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -376,6 +408,7 @@ func TestTCPOnlySLOK(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: false,
+			forceFragmenting:     false,
 		})
 }
 
@@ -391,6 +424,7 @@ func TestUDPOnlySLOK(t *testing.T) {
 			omitAuthorization:    false,
 			doTunneledWebRequest: false,
 			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
 		})
 }
 
@@ -405,6 +439,7 @@ type runServerConfig struct {
 	omitAuthorization    bool
 	doTunneledWebRequest bool
 	doTunneledNTPRequest bool
+	forceFragmenting     bool
 }
 
 func runServer(t *testing.T, runConfig *runServerConfig) {
@@ -438,7 +473,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	// succeed, overriding the nonfunctional values, for the tunnel to
 	// establish.
 
-	doTactics := protocol.TunnelProtocolUsesMeek(runConfig.tunnelProtocol)
+	doClientTactics := protocol.TunnelProtocolUsesMeek(runConfig.tunnelProtocol)
+	doServerTactics := doClientTactics || runConfig.forceFragmenting
 
 	// All servers require a tactics config with valid keys.
 	tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey, err :=
@@ -468,7 +504,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		generateConfigParams.MarionetteFormat = "http_simple_nonblocking"
 	}
 
-	if doTactics {
+	if doServerTactics {
 		generateConfigParams.TacticsRequestPublicKey = tacticsRequestPublicKey
 		generateConfigParams.TacticsRequestObfuscatedKey = tacticsRequestObfuscatedKey
 	}
@@ -501,7 +537,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 	// Only pave the tactics config when tactics are required. This exercises the
 	// case where the tactics config is omitted.
-	if doTactics {
+	if doServerTactics {
 		tacticsConfigFilename = filepath.Join(testDataDirName, "tactics_config.json")
 		paveTacticsConfigFile(
 			t, tacticsConfigFilename,
@@ -516,7 +552,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	serverConfig["PsinetDatabaseFilename"] = psinetFilename
 	serverConfig["TrafficRulesFilename"] = trafficRulesFilename
 	serverConfig["OSLConfigFilename"] = oslConfigFilename
-	if doTactics {
+	if doServerTactics {
 		serverConfig["TacticsConfigFilename"] = tacticsConfigFilename
 	}
 	serverConfig["LogFilename"] = filepath.Join(testDataDirName, "psiphond.log")
@@ -567,7 +603,9 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		}
 	}()
 
-	// TODO: monitor logs for more robust wait-until-loaded
+	// TODO: monitor logs for more robust wait-until-loaded. For example,
+	// especially with the race detector on, QUIC-OSSH tests can fail as the
+	// client sends its initial pacjet before the server is ready.
 	time.Sleep(1 * time.Second)
 
 	// Test: hot reload (of psinet and traffic rules)
@@ -607,7 +645,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	localHTTPProxyPort := 8081
 
 	jsonNetworkID := ""
-	if doTactics {
+	if doClientTactics {
 		// Use a distinct prefix for network ID for each test run to
 		// ensure tactics from different runs don't apply; this is
 		// a workaround for the singleton datastore.
@@ -660,7 +698,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		t.Fatalf("error committing configuration file: %s", err)
 	}
 
-	if doTactics {
+	if doClientTactics {
 		// Configure nonfunctional values that must be overridden by tactics.
 
 		applyParameters := make(map[string]interface{})
@@ -668,6 +706,25 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		applyParameters[parameters.TunnelConnectTimeout] = "1s"
 		applyParameters[parameters.TunnelRateLimits] = common.RateLimits{WriteBytesPerSecond: 1}
 
+		err = clientConfig.SetClientParameters("", true, applyParameters)
+		if err != nil {
+			t.Fatalf("SetClientParameters failed: %s", err)
+		}
+
+	} else if runConfig.forceFragmenting {
+		// Directly apply same parameters that would've come from tactics.
+
+		applyParameters := make(map[string]interface{})
+
+		applyParameters[parameters.FragmentorLimitProtocols] = protocol.TunnelProtocols{runConfig.tunnelProtocol}
+		applyParameters[parameters.FragmentorProbability] = 1.0
+		applyParameters[parameters.FragmentorMinTotalBytes] = 1000
+		applyParameters[parameters.FragmentorMaxTotalBytes] = 2000
+		applyParameters[parameters.FragmentorMinWriteBytes] = 1
+		applyParameters[parameters.FragmentorMaxWriteBytes] = 100
+		applyParameters[parameters.FragmentorMinDelay] = 1 * time.Millisecond
+		applyParameters[parameters.FragmentorMaxDelay] = 10 * time.Millisecond
+
 		err = clientConfig.SetClientParameters("", true, applyParameters)
 		if err != nil {
 			t.Fatalf("SetClientParameters failed: %s", err)

+ 26 - 21
psiphon/server/tunnelServer.go

@@ -1024,7 +1024,7 @@ func newSshClient(
 }
 
 func (sshClient *sshClient) run(
-	clientConn net.Conn, onSSHHandshakeFinished func()) {
+	baseConn net.Conn, onSSHHandshakeFinished func()) {
 
 	// onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
 	defer func() {
@@ -1036,6 +1036,8 @@ func (sshClient *sshClient) run(
 	// Set initial traffic rules, pre-handshake, based on currently known info.
 	sshClient.setTrafficRules()
 
+	conn := baseConn
+
 	// Wrap the base client connection with an ActivityMonitoredConn which will
 	// terminate the connection if no data is received before the deadline. This
 	// timeout is in effect for the entire duration of the SSH connection. Clients
@@ -1044,22 +1046,22 @@ func (sshClient *sshClient) run(
 	// due to buffering.
 
 	activityConn, err := common.NewActivityMonitoredConn(
-		clientConn,
+		conn,
 		SSH_CONNECTION_READ_DEADLINE,
 		false,
 		nil,
 		nil)
 	if err != nil {
-		clientConn.Close()
+		conn.Close()
 		log.WithContextFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
 		return
 	}
-	clientConn = activityConn
+	conn = activityConn
 
 	// Further wrap the connection in a rate limiting ThrottledConn.
 
-	throttledConn := common.NewThrottledConn(clientConn, sshClient.rateLimits())
-	clientConn = throttledConn
+	throttledConn := common.NewThrottledConn(conn, sshClient.rateLimits())
+	conn = throttledConn
 
 	// Run the initial [obfuscated] SSH handshake in a goroutine so we can both
 	// respect shutdownBroadcast and implement a specific handshake timeout.
@@ -1067,11 +1069,11 @@ func (sshClient *sshClient) run(
 	// too long.
 
 	type sshNewServerConnResult struct {
-		conn     net.Conn
-		sshConn  *ssh.ServerConn
-		channels <-chan ssh.NewChannel
-		requests <-chan *ssh.Request
-		err      error
+		obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn
+		sshConn           *ssh.ServerConn
+		channels          <-chan ssh.NewChannel
+		requests          <-chan *ssh.Request
+		err               error
 	}
 
 	resultChannel := make(chan *sshNewServerConnResult, 2)
@@ -1112,7 +1114,7 @@ func (sshClient *sshClient) run(
 		if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
 			// Note: NewObfuscatedSSHConn blocks on network I/O
 			// TODO: ensure this won't block shutdown
-			conn, result.err = obfuscator.NewObfuscatedSSHConn(
+			result.obfuscatedSSHConn, result.err = obfuscator.NewObfuscatedSSHConn(
 				obfuscator.OBFUSCATION_CONN_MODE_SERVER,
 				conn,
 				sshClient.sshServer.support.Config.ObfuscatedSSHKey,
@@ -1120,6 +1122,7 @@ func (sshClient *sshClient) run(
 			if result.err != nil {
 				result.err = common.ContextError(result.err)
 			}
+			conn = result.obfuscatedSSHConn
 		}
 
 		if result.err == nil {
@@ -1129,7 +1132,7 @@ func (sshClient *sshClient) run(
 
 		resultChannel <- result
 
-	}(clientConn)
+	}(conn)
 
 	var result *sshNewServerConnResult
 	select {
@@ -1137,7 +1140,7 @@ func (sshClient *sshClient) run(
 	case <-sshClient.sshServer.shutdownBroadcast:
 		// Close() will interrupt an ongoing handshake
 		// TODO: wait for SSH handshake goroutines to exit before returning?
-		clientConn.Close()
+		conn.Close()
 		return
 	}
 
@@ -1146,7 +1149,7 @@ func (sshClient *sshClient) run(
 	}
 
 	if result.err != nil {
-		clientConn.Close()
+		conn.Close()
 		// This is a Debug log due to noise. The handshake often fails due to I/O
 		// errors as clients frequently interrupt connections in progress when
 		// client-side load balancing completes a connection to a different server.
@@ -1168,7 +1171,7 @@ func (sshClient *sshClient) run(
 	sshClient.Unlock()
 
 	if !sshClient.sshServer.registerEstablishedClient(sshClient) {
-		clientConn.Close()
+		conn.Close()
 		log.WithContext().Warning("register failed")
 		return
 	}
@@ -1183,15 +1186,17 @@ func (sshClient *sshClient) run(
 	// Some conns report additional metrics. Meek conns report resiliency
 	// metrics and fragmentor.Conns report fragmentor configs.
 	//
-	// Limitation: for meek, GetMetrics from underlying fragmentor.Conns
+	// Limitation: for meek, GetMetrics from underlying fragmentor.Conn(s)
 	// should be called in order to log fragmentor metrics for meek sessions.
 
 	var additionalMetrics []LogFields
-	if metricsSource, ok := clientConn.(common.MetricsSource); ok {
-		additionalMetrics = append(additionalMetrics, LogFields(metricsSource.GetMetrics()))
+	if metricsSource, ok := baseConn.(common.MetricsSource); ok {
+		additionalMetrics = append(
+			additionalMetrics, LogFields(metricsSource.GetMetrics()))
 	}
-	if metricsSource, ok := sshClient.sshConn.(common.MetricsSource); ok {
-		additionalMetrics = append(additionalMetrics, LogFields(metricsSource.GetMetrics()))
+	if result.obfuscatedSSHConn != nil {
+		additionalMetrics = append(
+			additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
 	}
 
 	sshClient.logTunnel(additionalMetrics)

+ 2 - 2
psiphon/serverApi.go

@@ -786,14 +786,14 @@ func getBaseAPIParameters(
 	if dialStats.DialConnMetrics != nil {
 		metrics := dialStats.DialConnMetrics.GetMetrics()
 		for name, value := range metrics {
-			params[name] = fmt.Sprintf("%s", value)
+			params[name] = fmt.Sprintf("%v", value)
 		}
 	}
 
 	if dialStats.ObfuscatedSSHConnMetrics != nil {
 		metrics := dialStats.ObfuscatedSSHConnMetrics.GetMetrics()
 		for name, value := range metrics {
-			params[name] = fmt.Sprintf("%s", value)
+			params[name] = fmt.Sprintf("%v", value)
 		}
 	}
 

+ 5 - 0
psiphon/tunnel.go

@@ -955,6 +955,11 @@ func dialSsh(
 		}
 	}
 
+	// Some conns report additional metrics. fragmentor.Conns report
+	// fragmentor configs.
+	//
+	// Limitation: for meek, GetMetrics from underlying fragmentor.Conn(s)
+	// should be called in order to log fragmentor metrics for meek sessions.
 	if metricsSource, ok := dialConn.(common.MetricsSource); ok {
 		dialStats.DialConnMetrics = metricsSource
 	}