Эх сурвалжийг харах

Add destination bytes metrics test

Rod Hynes 3 жил өмнө
parent
commit
1cb5645430

Файлын зөрүү хэтэрхий том тул дарагдсан байна
+ 72 - 30
psiphon/server/geoip_test.go


+ 99 - 11
psiphon/server/server_test.go

@@ -136,6 +136,7 @@ func TestSSH(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -159,6 +160,7 @@ func TestOSSH(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -182,6 +184,7 @@ func TestFragmentedOSSH(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -205,6 +208,7 @@ func TestUnfrontedMeek(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -229,6 +233,7 @@ func TestUnfrontedMeekHTTPS(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -253,6 +258,7 @@ func TestUnfrontedMeekHTTPSTLS13(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -277,6 +283,7 @@ func TestUnfrontedMeekSessionTicket(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -301,6 +308,7 @@ func TestUnfrontedMeekSessionTicketTLS13(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -327,6 +335,7 @@ func TestQUICOSSH(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -353,6 +362,7 @@ func TestLimitedQUICOSSH(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    true,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -376,6 +386,7 @@ func TestWebTransportAPIRequests(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -399,6 +410,7 @@ func TestHotReload(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -422,6 +434,7 @@ func TestDefaultSponsorID(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -445,6 +458,7 @@ func TestDenyTrafficRules(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -468,6 +482,7 @@ func TestOmitAuthorization(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -491,6 +506,7 @@ func TestNoAuthorization(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -514,6 +530,7 @@ func TestUnusedAuthorization(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -537,6 +554,7 @@ func TestTCPOnlySLOK(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -560,6 +578,7 @@ func TestUDPOnlySLOK(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -583,6 +602,7 @@ func TestLivenessTest(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -606,10 +626,11 @@ func TestPruneServerEntries(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
-func TestBurstMonitor(t *testing.T) {
+func TestBurstMonitorAndDestinationBytes(t *testing.T) {
 	runServer(t,
 		&runServerConfig{
 			tunnelProtocol:       "OSSH",
@@ -629,6 +650,7 @@ func TestBurstMonitor(t *testing.T) {
 			doBurstMonitor:       true,
 			doSplitTunnel:        false,
 			limitQUICVersions:    false,
+			doDestinationBytes:   true,
 		})
 }
 
@@ -652,6 +674,7 @@ func TestSplitTunnel(t *testing.T) {
 			doBurstMonitor:       false,
 			doSplitTunnel:        true,
 			limitQUICVersions:    false,
+			doDestinationBytes:   false,
 		})
 }
 
@@ -674,6 +697,7 @@ type runServerConfig struct {
 	doBurstMonitor       bool
 	doSplitTunnel        bool
 	limitQUICVersions    bool
+	doDestinationBytes   bool
 }
 
 var (
@@ -723,7 +747,10 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	// establish.
 
 	doClientTactics := protocol.TunnelProtocolUsesMeek(runConfig.tunnelProtocol)
-	doServerTactics := doClientTactics || runConfig.forceFragmenting || runConfig.doBurstMonitor
+	doServerTactics := doClientTactics ||
+		runConfig.forceFragmenting ||
+		runConfig.doBurstMonitor ||
+		runConfig.doDestinationBytes
 
 	// All servers require a tactics config with valid keys.
 	tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey, err :=
@@ -822,7 +849,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			runConfig.tunnelProtocol,
 			propagationChannelID,
 			livenessTestSize,
-			runConfig.doBurstMonitor)
+			runConfig.doBurstMonitor,
+			runConfig.doDestinationBytes)
 	}
 
 	blocklistFilename := filepath.Join(testDataDirName, "blocklist.csv")
@@ -831,13 +859,19 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	var serverConfig map[string]interface{}
 	json.Unmarshal(serverConfigJSON, &serverConfig)
 
-	// The test GeoIP database maps all IPs to a single, non-"None" country. When
-	// split tunnel mode is enabled, this should cause port forwards to be
-	// untunneled. When split tunnel mode is not enabled, port forwards should be
-	// tunneled despite the country match.
-	geoIPDatabaseFilename := filepath.Join(testDataDirName, "geoip_database.mmbd")
-	paveGeoIPDatabaseFile(t, geoIPDatabaseFilename)
-	serverConfig["GeoIPDatabaseFilenames"] = []string{geoIPDatabaseFilename}
+	// The test GeoIP databases map all IPs to a single, non-"None" country
+	// and ASN.
+	//
+	// When split tunnel mode is enabled, this should cause port forwards to
+	// be untunneled. When split tunnel mode is not enabled, port forwards
+	// should be tunneled despite the country match.
+	//
+	// When destination bytes metrics are enabled, all traffic will map to the
+	// single ASN.
+	geoIPCityDatabaseFilename := filepath.Join(testDataDirName, "geoip_city_database.mmbd")
+	geoIPISPDatabaseFilename := filepath.Join(testDataDirName, "geoip_isp_database.mmbd")
+	paveGeoIPDatabaseFiles(t, geoIPCityDatabaseFilename, geoIPISPDatabaseFilename)
+	serverConfig["GeoIPDatabaseFilenames"] = []string{geoIPCityDatabaseFilename, geoIPISPDatabaseFilename}
 
 	serverConfig["PsinetDatabaseFilename"] = psinetFilename
 	serverConfig["TrafficRulesFilename"] = trafficRulesFilename
@@ -1395,6 +1429,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	if runConfig.limitQUICVersions {
 		expectQUICVersion = limitQUICVersions[0]
 	}
+	expectDestinationBytesFields := runConfig.doDestinationBytes
 
 	select {
 	case logFields := <-serverTunnelLog:
@@ -1408,6 +1443,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			expectTCPDataTransfer,
 			expectUDPDataTransfer,
 			expectQUICVersion,
+			expectDestinationBytesFields,
 			logFields)
 		if err != nil {
 			t.Fatalf("invalid server tunnel log fields: %s", err)
@@ -1469,6 +1505,7 @@ func checkExpectedServerTunnelLogFields(
 	expectTCPDataTransfer bool,
 	expectUDPDataTransfer bool,
 	expectQUICVersion string,
+	expectDestinationBytesFields bool,
 	fields map[string]interface{}) error {
 
 	// Limitations:
@@ -1870,6 +1907,47 @@ func checkExpectedServerTunnelLogFields(
 		}
 	}
 
+	for _, name := range []string{
+		"dest_bytes_asn",
+		"dest_bytes_up_tcp",
+		"dest_bytes_down_tcp",
+		"dest_bytes_up_udp",
+		"dest_bytes_down_udp",
+		"dest_bytes",
+	} {
+		if expectDestinationBytesFields && fields[name] == nil {
+			return fmt.Errorf("missing expected field '%s'", name)
+
+		} else if !expectDestinationBytesFields && fields[name] != nil {
+			return fmt.Errorf("unexpected field '%s'", name)
+		}
+	}
+
+	if expectDestinationBytesFields {
+		name := "dest_bytes_asn"
+		if fields[name].(string) != testGeoIPASN {
+			return fmt.Errorf("unexpected field value %s: '%v'", name, fields[name])
+		}
+		for _, pair := range [][]string{
+			[]string{"dest_bytes_up_tcp", "bytes_up_tcp"},
+			[]string{"dest_bytes_down_tcp", "bytes_down_tcp"},
+			[]string{"dest_bytes_up_udp", "bytes_up_udp"},
+			[]string{"dest_bytes_down_udp", "bytes_down_udp"},
+			[]string{"dest_bytes", "bytes"},
+		} {
+			value0 := int64(fields[pair[0]].(float64))
+			value1 := int64(fields[pair[1]].(float64))
+			ok := value0 == value1
+			if pair[0] == "dest_bytes_up_udp" || pair[0] == "dest_bytes_down_udp" || pair[0] == "dest_bytes" {
+				// DNS requests are excluded from destination bytes counting
+				ok = value0 > 0 && value0 < value1
+			}
+			if !ok {
+				return fmt.Errorf("unexpected field value %s: %v != %v", pair[0], fields[pair[0]], fields[pair[1]])
+			}
+		}
+	}
+
 	return nil
 }
 
@@ -2398,7 +2476,8 @@ func paveTacticsConfigFile(
 	tunnelProtocol string,
 	propagationChannelID string,
 	livenessTestSize int,
-	doBurstMonitor bool) {
+	doBurstMonitor bool,
+	doDestinationBytes bool) {
 
 	// Setting LimitTunnelProtocols passively exercises the
 	// server-side LimitTunnelProtocols enforcement.
@@ -2412,6 +2491,7 @@ func paveTacticsConfigFile(
         "TTL" : "60s",
         "Probability" : 1.0,
         "Parameters" : {
+          %s
           %s
           "LimitTunnelProtocols" : ["%s"],
           "FragmentorLimitProtocols" : ["%s"],
@@ -2493,10 +2573,18 @@ func paveTacticsConfigFile(
 	`
 	}
 
+	destinationBytesParameters := ""
+	if doDestinationBytes {
+		destinationBytesParameters = fmt.Sprintf(`
+          "DestinationBytesMetricsASN" : "%s",
+	`, testGeoIPASN)
+	}
+
 	tacticsConfigJSON := fmt.Sprintf(
 		tacticsConfigJSONFormat,
 		tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey,
 		burstParameters,
+		destinationBytesParameters,
 		tunnelProtocol,
 		tunnelProtocol,
 		tunnelProtocol,

+ 22 - 8
psiphon/server/tunnelServer.go

@@ -1569,10 +1569,21 @@ type destinationBytesMetrics struct {
 func (d *destinationBytesMetrics) UpdateProgress(
 	downstreamBytes, upstreamBytes, _ int64) {
 
+	// Concurrency: UpdateProgress may be called without holding the sshClient
+	// lock; all accesses to bytesUp/bytesDown must use atomic operations.
+
 	atomic.AddInt64(&d.bytesUp, upstreamBytes)
 	atomic.AddInt64(&d.bytesDown, downstreamBytes)
 }
 
+func (d *destinationBytesMetrics) getBytesUp() int64 {
+	return atomic.LoadInt64(&d.bytesUp)
+}
+
+func (d *destinationBytesMetrics) getBytesDown() int64 {
+	return atomic.LoadInt64(&d.bytesDown)
+}
+
 type splitTunnelLookup struct {
 	regions       []string
 	regionsLookup map[string]bool
@@ -2895,15 +2906,18 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 	logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
 
 	if sshClient.destinationBytesMetricsASN != "" {
+
+		bytesUpTCP := sshClient.tcpDestinationBytesMetrics.getBytesUp()
+		bytesDownTCP := sshClient.tcpDestinationBytesMetrics.getBytesDown()
+		bytesUpUDP := sshClient.udpDestinationBytesMetrics.getBytesUp()
+		bytesDownUDP := sshClient.udpDestinationBytesMetrics.getBytesDown()
+
 		logFields["dest_bytes_asn"] = sshClient.destinationBytesMetricsASN
-		logFields["dest_bytes_up_tcp"] = sshClient.tcpDestinationBytesMetrics.bytesUp
-		logFields["dest_bytes_down_tcp"] = sshClient.tcpDestinationBytesMetrics.bytesDown
-		logFields["dest_bytes_up_udp"] = sshClient.udpDestinationBytesMetrics.bytesUp
-		logFields["dest_bytes_down_udp"] = sshClient.udpDestinationBytesMetrics.bytesDown
-		logFields["dest_bytes"] = sshClient.tcpDestinationBytesMetrics.bytesUp +
-			sshClient.tcpDestinationBytesMetrics.bytesDown +
-			sshClient.udpDestinationBytesMetrics.bytesUp +
-			sshClient.udpDestinationBytesMetrics.bytesDown
+		logFields["dest_bytes_up_tcp"] = bytesUpTCP
+		logFields["dest_bytes_down_tcp"] = bytesDownTCP
+		logFields["dest_bytes_up_udp"] = bytesUpUDP
+		logFields["dest_bytes_down_udp"] = bytesDownUDP
+		logFields["dest_bytes"] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
 	}
 
 	// Only log fields for peakMetrics when there is data recorded, otherwise

Энэ ялгаанд хэт олон файл өөрчлөгдсөн тул зарим файлыг харуулаагүй болно