Browse Source

Add DestinationBytesMetricsASNs

Rod Hynes 1 year ago
parent
commit
acab254345
3 changed files with 76 additions and 61 deletions
  1. 3 1
      psiphon/common/parameters/parameters.go
  2. 15 17
      psiphon/server/server_test.go
  3. 58 43
      psiphon/server/tunnelServer.go

+ 3 - 1
psiphon/common/parameters/parameters.go

@@ -327,6 +327,7 @@ const (
 	RestrictDirectProviderIDsClientProbability         = "RestrictDirectProviderIDsClientProbability"
 	UpstreamProxyAllowAllServerEntrySources            = "UpstreamProxyAllowAllServerEntrySources"
 	DestinationBytesMetricsASN                         = "DestinationBytesMetricsASN"
+	DestinationBytesMetricsASNs                        = "DestinationBytesMetricsASNs"
 	DNSResolverAttemptsPerServer                       = "DNSResolverAttemptsPerServer"
 	DNSResolverAttemptsPerPreferredServer              = "DNSResolverAttemptsPerPreferredServer"
 	DNSResolverRequestTimeout                          = "DNSResolverRequestTimeout"
@@ -825,7 +826,8 @@ var defaultParameters = map[string]struct {
 
 	UpstreamProxyAllowAllServerEntrySources: {value: false},
 
-	DestinationBytesMetricsASN: {value: "", flags: serverSideOnly},
+	DestinationBytesMetricsASN:  {value: "", flags: serverSideOnly},
+	DestinationBytesMetricsASNs: {value: []string{}, flags: serverSideOnly},
 
 	DNSResolverAttemptsPerServer:                {value: 2, minimum: 1},
 	DNSResolverAttemptsPerPreferredServer:       {value: 1, minimum: 1},

+ 15 - 17
psiphon/server/server_test.go

@@ -2596,12 +2596,11 @@ func checkExpectedServerTunnelLogFields(
 	}
 
 	for _, name := range []string{
-		"dest_bytes_asn",
-		"dest_bytes_up_tcp",
-		"dest_bytes_down_tcp",
-		"dest_bytes_up_udp",
-		"dest_bytes_down_udp",
-		"dest_bytes",
+		"asn_dest_bytes",
+		"asn_dest_bytes_up_tcp",
+		"asn_dest_bytes_down_tcp",
+		"asn_dest_bytes_up_udp",
+		"asn_dest_bytes_down_udp",
 	} {
 		if expectDestinationBytesFields && fields[name] == nil {
 			return fmt.Errorf("missing expected field '%s'", name)
@@ -2612,21 +2611,20 @@ func checkExpectedServerTunnelLogFields(
 	}
 
 	if expectDestinationBytesFields {
-		name := "dest_bytes_asn"
-		if fields[name].(string) != testGeoIPASN {
-			return fmt.Errorf("unexpected field value %s: '%v'", name, fields[name])
-		}
 		for _, pair := range [][]string{
-			{"dest_bytes_up_tcp", "bytes_up_tcp"},
-			{"dest_bytes_down_tcp", "bytes_down_tcp"},
-			{"dest_bytes_up_udp", "bytes_up_udp"},
-			{"dest_bytes_down_udp", "bytes_down_udp"},
-			{"dest_bytes", "bytes"},
+			{"asn_dest_bytes", "bytes"},
+			{"asn_dest_bytes_up_tcp", "bytes_up_tcp"},
+			{"asn_dest_bytes_down_tcp", "bytes_down_tcp"},
+			{"asn_dest_bytes_up_udp", "bytes_up_udp"},
+			{"asn_dest_bytes_down_udp", "bytes_down_udp"},
 		} {
-			value0 := int64(fields[pair[0]].(float64))
+			if _, ok := fields[pair[0]].(map[string]any)[testGeoIPASN].(float64); !ok {
+				return fmt.Errorf("missing field entry %s: '%v'", pair[0], testGeoIPASN)
+			}
+			value0 := int64(fields[pair[0]].(map[string]any)[testGeoIPASN].(float64))
 			value1 := int64(fields[pair[1]].(float64))
 			ok := value0 == value1
-			if pair[0] == "dest_bytes_up_udp" || pair[0] == "dest_bytes_down_udp" || pair[0] == "dest_bytes" {
+			if pair[0] == "asn_dest_bytes_up_udp" || pair[0] == "asn_dest_bytes_down_udp" || pair[0] == "asn_dest_bytes" {
 				// DNS requests are excluded from destination bytes counting
 				ok = value0 > 0 && value0 < value1
 			}

+ 58 - 43
psiphon/server/tunnelServer.go

@@ -1804,9 +1804,7 @@ type sshClient struct {
 	sendAlertRequests                    chan protocol.AlertRequest
 	sentAlertRequests                    map[string]bool
 	peakMetrics                          peakMetrics
-	destinationBytesMetricsASN           string
-	tcpDestinationBytesMetrics           destinationBytesMetrics
-	udpDestinationBytesMetrics           destinationBytesMetrics
+	destinationBytesMetrics              map[string]*protocolDestinationBytesMetrics
 }
 
 type trafficState struct {
@@ -1936,6 +1934,11 @@ type handshakeState struct {
 	inproxyRelayLogFields   common.LogFields
 }
 
+type protocolDestinationBytesMetrics struct {
+	tcpMetrics destinationBytesMetrics
+	udpMetrics destinationBytesMetrics
+}
+
 type destinationBytesMetrics struct {
 	bytesUp   int64
 	bytesDown int64
@@ -3359,47 +3362,33 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 	logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
 	logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
 
-	if sshClient.destinationBytesMetricsASN != "" {
+	if sshClient.destinationBytesMetrics != nil {
 
-		// Check if the configured DestinationBytesMetricsASN has changed
-		// (or been cleared). If so, don't log and discard the accumulated
-		// bytes to ensure we don't continue to record stats as previously
-		// configured.
-		//
-		// Any counts accumulated before the DestinationBytesMetricsASN change
-		// are lost. At this time we can't change
-		// sshClient.destinationBytesMetricsASN dynamically, after a tactics
-		// hot reload, as there may be destination bytes port forwards that
-		// were in place before the change, which will continue to count.
+		destBytes := make(map[string]int64)
+		destBytesUpTCP := make(map[string]int64)
+		destBytesDownTCP := make(map[string]int64)
+		destBytesUpUDP := make(map[string]int64)
+		destBytesDownUDP := make(map[string]int64)
 
-		logDestBytes := true
-		if sshClient.sshServer.support.ServerTacticsParametersCache != nil {
+		for ASN, destinationBytesMetrics := range sshClient.destinationBytesMetrics {
 
-			// Target this using the client, not peer, GeoIP. In the case of
-			// in-proxy tunnel protocols, the client GeoIP fields will be None
-			// if the handshake does not complete. In that case, no bytes will
-			// have transferred.
+			bytesUpTCP := destinationBytesMetrics.tcpMetrics.getBytesUp()
+			bytesDownTCP := destinationBytesMetrics.tcpMetrics.getBytesDown()
+			bytesUpUDP := destinationBytesMetrics.udpMetrics.getBytesUp()
+			bytesDownUDP := destinationBytesMetrics.udpMetrics.getBytesDown()
 
-			p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
-			if err != nil || p.IsNil() ||
-				sshClient.destinationBytesMetricsASN != p.String(parameters.DestinationBytesMetricsASN) {
-				logDestBytes = false
-			}
+			destBytes[ASN] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
+			destBytesUpTCP[ASN] = bytesUpTCP
+			destBytesDownTCP[ASN] = bytesDownTCP
+			destBytesUpUDP[ASN] = bytesUpUDP
+			destBytesDownUDP[ASN] = bytesDownUDP
 		}
 
-		if logDestBytes {
-			bytesUpTCP := sshClient.tcpDestinationBytesMetrics.getBytesUp()
-			bytesDownTCP := sshClient.tcpDestinationBytesMetrics.getBytesDown()
-			bytesUpUDP := sshClient.udpDestinationBytesMetrics.getBytesUp()
-			bytesDownUDP := sshClient.udpDestinationBytesMetrics.getBytesDown()
-
-			logFields["dest_bytes_asn"] = sshClient.destinationBytesMetricsASN
-			logFields["dest_bytes_up_tcp"] = bytesUpTCP
-			logFields["dest_bytes_down_tcp"] = bytesDownTCP
-			logFields["dest_bytes_up_udp"] = bytesUpUDP
-			logFields["dest_bytes_down_udp"] = bytesDownUDP
-			logFields["dest_bytes"] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
-		}
+		logFields["asn_dest_bytes"] = destBytes
+		logFields["asn_dest_bytes_up_tcp"] = destBytesUpTCP
+		logFields["asn_dest_bytes_down_tcp"] = destBytesDownTCP
+		logFields["asn_dest_bytes_up_udp"] = destBytesUpUDP
+		logFields["asn_dest_bytes_down_udp"] = destBytesDownUDP
 	}
 
 	// Only log fields for peakMetrics when there is data recorded, otherwise
@@ -4105,26 +4094,52 @@ func (sshClient *sshClient) setDestinationBytesMetrics() {
 		return
 	}
 
-	sshClient.destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)
+	// Future enhancement: for 5 or fewer ASNs, iterate over a slice instead
+	// of using a map? See, for example, stringLookupThreshold in
+	// common/tactics.
+
+	ASNs := p.Strings(parameters.DestinationBytesMetricsASNs)
+
+	// Use the legacy single ASN parameter when DestinationBytesMetricsASNs is
+	// empty.
+	if len(ASNs) == 0 {
+		ASN := p.String(parameters.DestinationBytesMetricsASN)
+		if ASN != "" {
+			ASNs = []string{ASN}
+		}
+	}
+
+	if len(ASNs) == 0 {
+		return
+	}
+
+	sshClient.destinationBytesMetrics = make(map[string]*protocolDestinationBytesMetrics)
+
+	for _, ASN := range ASNs {
+		sshClient.destinationBytesMetrics[ASN] = &protocolDestinationBytesMetrics{}
+	}
 }
 
 func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType int, IPAddress net.IP) *destinationBytesMetrics {
 	sshClient.Lock()
 	defer sshClient.Unlock()
 
-	if sshClient.destinationBytesMetricsASN == "" {
+	if sshClient.destinationBytesMetrics == nil {
 		return nil
 	}
 
-	if sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN != sshClient.destinationBytesMetricsASN {
+	destinationASN := sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN
+
+	metrics, ok := sshClient.destinationBytesMetrics[destinationASN]
+	if !ok {
 		return nil
 	}
 
 	if portForwardType == portForwardTypeTCP {
-		return &sshClient.tcpDestinationBytesMetrics
+		return &metrics.tcpMetrics
 	}
 
-	return &sshClient.udpDestinationBytesMetrics
+	return &metrics.udpMetrics
 }
 
 func (sshClient *sshClient) getActivityUpdaters(portForwardType int, IPAddress net.IP) []common.ActivityUpdater {