Browse Source

Restore full support for legacy dest_bytes

- Also restore logic to log only destinations still configured in tactics
Rod Hynes 1 year ago
parent
commit
b5d85724bd
2 changed files with 207 additions and 74 deletions
  1. 112 38
      psiphon/server/server_test.go
  2. 95 36
      psiphon/server/tunnelServer.go

+ 112 - 38
psiphon/server/server_test.go

@@ -578,17 +578,32 @@ func TestBurstMonitorAndDestinationBytes(t *testing.T) {
 		})
 }
 
+func TestBurstMonitorAndLegacyDestinationBytes(t *testing.T) {
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:           "OSSH",
+			requireAuthorization:     true,
+			doTunneledWebRequest:     true,
+			doTunneledNTPRequest:     true,
+			doDanglingTCPConn:        true,
+			doBurstMonitor:           true,
+			doLegacyDestinationBytes: true,
+			doLogHostProvider:        true,
+		})
+}
+
 func TestChangeBytesConfig(t *testing.T) {
 	runServer(t,
 		&runServerConfig{
-			tunnelProtocol:       "OSSH",
-			requireAuthorization: true,
-			doTunneledWebRequest: true,
-			doTunneledNTPRequest: true,
-			doDanglingTCPConn:    true,
-			doDestinationBytes:   true,
-			doChangeBytesConfig:  true,
-			doLogHostProvider:    true,
+			tunnelProtocol:           "OSSH",
+			requireAuthorization:     true,
+			doTunneledWebRequest:     true,
+			doTunneledNTPRequest:     true,
+			doDanglingTCPConn:        true,
+			doDestinationBytes:       true,
+			doLegacyDestinationBytes: true,
+			doChangeBytesConfig:      true,
+			doLogHostProvider:        true,
 		})
 }
 
@@ -646,34 +661,35 @@ func TestLegacyAPIEncoding(t *testing.T) {
 }
 
 type runServerConfig struct {
-	tunnelProtocol       string
-	clientTunnelProtocol string
-	passthrough          bool
-	tlsProfile           string
-	doHotReload          bool
-	doDefaultSponsorID   bool
-	denyTrafficRules     bool
-	requireAuthorization bool
-	omitAuthorization    bool
-	doTunneledWebRequest bool
-	doTunneledNTPRequest bool
-	applyPrefix          bool
-	forceFragmenting     bool
-	forceLivenessTest    bool
-	doPruneServerEntries bool
-	doDanglingTCPConn    bool
-	doPacketManipulation bool
-	doBurstMonitor       bool
-	doSplitTunnel        bool
-	limitQUICVersions    bool
-	doDestinationBytes   bool
-	doChangeBytesConfig  bool
-	doLogHostProvider    bool
-	inspectFlows         bool
-	doSteeringIP         bool
-	doTargetBrokerSpecs  bool
-	useLegacyAPIEncoding bool
-	doPersonalPairing    bool
+	tunnelProtocol           string
+	clientTunnelProtocol     string
+	passthrough              bool
+	tlsProfile               string
+	doHotReload              bool
+	doDefaultSponsorID       bool
+	denyTrafficRules         bool
+	requireAuthorization     bool
+	omitAuthorization        bool
+	doTunneledWebRequest     bool
+	doTunneledNTPRequest     bool
+	applyPrefix              bool
+	forceFragmenting         bool
+	forceLivenessTest        bool
+	doPruneServerEntries     bool
+	doDanglingTCPConn        bool
+	doPacketManipulation     bool
+	doBurstMonitor           bool
+	doSplitTunnel            bool
+	limitQUICVersions        bool
+	doDestinationBytes       bool
+	doLegacyDestinationBytes bool
+	doChangeBytesConfig      bool
+	doLogHostProvider        bool
+	inspectFlows             bool
+	doSteeringIP             bool
+	doTargetBrokerSpecs      bool
+	useLegacyAPIEncoding     bool
+	doPersonalPairing        bool
 }
 
 var (
@@ -772,7 +788,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		runConfig.applyPrefix ||
 		runConfig.forceFragmenting ||
 		runConfig.doBurstMonitor ||
-		runConfig.doDestinationBytes
+		runConfig.doDestinationBytes ||
+		runConfig.doLegacyDestinationBytes
 
 	// All servers require a tactics config with valid keys.
 	tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey, err :=
@@ -908,6 +925,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			livenessTestSize,
 			runConfig.doBurstMonitor,
 			runConfig.doDestinationBytes,
+			runConfig.doLegacyDestinationBytes,
 			runConfig.applyPrefix,
 			runConfig.forceFragmenting,
 			"classic",
@@ -1177,6 +1195,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 				livenessTestSize,
 				runConfig.doBurstMonitor,
 				runConfig.doDestinationBytes,
+				runConfig.doLegacyDestinationBytes,
 				runConfig.applyPrefix,
 				runConfig.forceFragmenting,
 				"consistent",
@@ -1615,7 +1634,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 	if runConfig.doChangeBytesConfig {
 
-		if !runConfig.doDestinationBytes {
+		if !runConfig.doDestinationBytes || !runConfig.doLegacyDestinationBytes {
 			t.Fatalf("invalid test configuration")
 		}
 
@@ -1641,6 +1660,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			livenessTestSize,
 			runConfig.doBurstMonitor,
 			false,
+			false,
 			runConfig.applyPrefix,
 			runConfig.forceFragmenting,
 			"consistent",
@@ -1788,6 +1808,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		expectQUICVersion = limitQUICVersions[0]
 	}
 	expectDestinationBytesFields := runConfig.doDestinationBytes && !runConfig.doChangeBytesConfig
+	expectLegacyDestinationBytesFields := runConfig.doLegacyDestinationBytes && !runConfig.doChangeBytesConfig
 	expectMeekHTTPVersion := ""
 	if protocol.TunnelProtocolUsesMeek(runConfig.tunnelProtocol) {
 		if protocol.TunnelProtocolUsesFrontedMeek(runConfig.tunnelProtocol) {
@@ -1819,6 +1840,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			expectUDPDataTransfer,
 			expectQUICVersion,
 			expectDestinationBytesFields,
+			expectLegacyDestinationBytesFields,
 			passthroughAddress,
 			expectMeekHTTPVersion,
 			inproxyTestConfig,
@@ -2074,6 +2096,7 @@ func checkExpectedServerTunnelLogFields(
 	expectUDPDataTransfer bool,
 	expectQUICVersion string,
 	expectDestinationBytesFields bool,
+	expectLegacyDestinationBytesFields bool,
 	expectPassthroughAddress *string,
 	expectMeekHTTPVersion string,
 	inproxyTestConfig *inproxyTestConfig,
@@ -2634,6 +2657,47 @@ func checkExpectedServerTunnelLogFields(
 		}
 	}
 
+	for _, name := range []string{
+		"dest_bytes_asn",
+		"dest_bytes_up_tcp",
+		"dest_bytes_down_tcp",
+		"dest_bytes_up_udp",
+		"dest_bytes_down_udp",
+		"dest_bytes",
+	} {
+		if expectLegacyDestinationBytesFields && fields[name] == nil {
+			return fmt.Errorf("missing expected field '%s'", name)
+
+		} else if !expectLegacyDestinationBytesFields && fields[name] != nil {
+			return fmt.Errorf("unexpected field '%s'", name)
+		}
+	}
+
+	if expectLegacyDestinationBytesFields {
+		name := "dest_bytes_asn"
+		if fields[name].(string) != testGeoIPASN {
+			return fmt.Errorf("unexpected field value %s: '%v'", name, fields[name])
+		}
+		for _, pair := range [][]string{
+			{"dest_bytes_up_tcp", "bytes_up_tcp"},
+			{"dest_bytes_down_tcp", "bytes_down_tcp"},
+			{"dest_bytes_up_udp", "bytes_up_udp"},
+			{"dest_bytes_down_udp", "bytes_down_udp"},
+			{"dest_bytes", "bytes"},
+		} {
+			value0 := int64(fields[pair[0]].(float64))
+			value1 := int64(fields[pair[1]].(float64))
+			ok := value0 == value1
+			if pair[0] == "dest_bytes_up_udp" || pair[0] == "dest_bytes_down_udp" || pair[0] == "dest_bytes" {
+				// DNS requests are excluded from destination bytes counting
+				ok = value0 > 0 && value0 < value1
+			}
+			if !ok {
+				return fmt.Errorf("unexpected field value %s: %v != %v", pair[0], fields[pair[0]], fields[pair[1]])
+			}
+		}
+	}
+
 	if expectPassthroughAddress != nil {
 		name := "passthrough_address"
 		if fields[name] == nil {
@@ -3287,6 +3351,7 @@ func paveTacticsConfigFile(
 	livenessTestSize int,
 	doBurstMonitor bool,
 	doDestinationBytes bool,
+	doLegacyDestinationBytes bool,
 	applyOsshPrefix bool,
 	enableOsshPrefixFragmenting bool,
 	discoveryStategy string,
@@ -3308,6 +3373,7 @@ func paveTacticsConfigFile(
           %s
           %s
           %s
+          %s
           "LimitTunnelProtocols" : ["%s"],
           "FragmentorLimitProtocols" : ["%s"],
           "FragmentorProbability" : 1.0,
@@ -3392,6 +3458,13 @@ func paveTacticsConfigFile(
 	destinationBytesParameters := ""
 	if doDestinationBytes {
 		destinationBytesParameters = fmt.Sprintf(`
+          "DestinationBytesMetricsASNs" : ["%s"],
+	`, testGeoIPASN)
+	}
+
+	legacyDestinationBytesParameters := ""
+	if doLegacyDestinationBytes {
+		legacyDestinationBytesParameters = fmt.Sprintf(`
           "DestinationBytesMetricsASN" : "%s",
 	`, testGeoIPASN)
 	}
@@ -3415,6 +3488,7 @@ func paveTacticsConfigFile(
 		tacticsRequestObfuscatedKey,
 		burstParameters,
 		destinationBytesParameters,
+		legacyDestinationBytesParameters,
 		osshPrefix,
 		inproxyParametersJSON,
 		tunnelProtocol,

+ 95 - 36
psiphon/server/tunnelServer.go

@@ -3364,31 +3364,88 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 
 	if sshClient.destinationBytesMetrics != nil {
 
-		destBytes := make(map[string]int64)
-		destBytesUpTCP := make(map[string]int64)
-		destBytesDownTCP := make(map[string]int64)
-		destBytesUpUDP := make(map[string]int64)
-		destBytesDownUDP := make(map[string]int64)
-
-		for ASN, destinationBytesMetrics := range sshClient.destinationBytesMetrics {
-
-			bytesUpTCP := destinationBytesMetrics.tcpMetrics.getBytesUp()
-			bytesDownTCP := destinationBytesMetrics.tcpMetrics.getBytesDown()
-			bytesUpUDP := destinationBytesMetrics.udpMetrics.getBytesUp()
-			bytesDownUDP := destinationBytesMetrics.udpMetrics.getBytesDown()
-
-			destBytes[ASN] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
-			destBytesUpTCP[ASN] = bytesUpTCP
-			destBytesDownTCP[ASN] = bytesDownTCP
-			destBytesUpUDP[ASN] = bytesUpUDP
-			destBytesDownUDP[ASN] = bytesDownUDP
+		// Only log destination bytes for ASNs that remain enabled in tactics.
+		//
+		// Any counts accumulated before DestinationBytesMetricsASN[s] changes
+		// are lost. At this time we can't change destination byte counting
+		// dynamically, after a tactics hot reload, as there may be
+		// destination bytes port forwards that were in place before the
+		// change, which will continue to count.
+
+		destinationBytesMetricsASNs := []string{}
+		destinationBytesMetricsASN := ""
+		if sshClient.sshServer.support.ServerTacticsParametersCache != nil {
+
+			// Target this using the client, not peer, GeoIP. In the case of
+			// in-proxy tunnel protocols, the client GeoIP fields will be None
+			// if the handshake does not complete. In that case, no bytes will
+			// have transferred.
+
+			p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
+			if err == nil && !p.IsNil() {
+				destinationBytesMetricsASNs = p.Strings(parameters.DestinationBytesMetricsASNs)
+				destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)
+			}
+			p.Close()
+		}
+
+		if destinationBytesMetricsASN != "" {
+
+			// Log any parameters.DestinationBytesMetricsASN data in the
+			// legacy log field format.
+
+			destinationBytesMetrics, ok :=
+				sshClient.destinationBytesMetrics[destinationBytesMetricsASN]
+
+			if ok {
+				bytesUpTCP := destinationBytesMetrics.tcpMetrics.getBytesUp()
+				bytesDownTCP := destinationBytesMetrics.tcpMetrics.getBytesDown()
+				bytesUpUDP := destinationBytesMetrics.udpMetrics.getBytesUp()
+				bytesDownUDP := destinationBytesMetrics.udpMetrics.getBytesDown()
+
+				logFields["dest_bytes_asn"] = destinationBytesMetricsASN
+				logFields["dest_bytes"] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
+				logFields["dest_bytes_up_tcp"] = bytesUpTCP
+				logFields["dest_bytes_down_tcp"] = bytesDownTCP
+				logFields["dest_bytes_up_udp"] = bytesUpUDP
+				logFields["dest_bytes_down_udp"] = bytesDownUDP
+			}
 		}
 
-		logFields["asn_dest_bytes"] = destBytes
-		logFields["asn_dest_bytes_up_tcp"] = destBytesUpTCP
-		logFields["asn_dest_bytes_down_tcp"] = destBytesDownTCP
-		logFields["asn_dest_bytes_up_udp"] = destBytesUpUDP
-		logFields["asn_dest_bytes_down_udp"] = destBytesDownUDP
+		if len(destinationBytesMetricsASNs) > 0 {
+
+			destBytes := make(map[string]int64)
+			destBytesUpTCP := make(map[string]int64)
+			destBytesDownTCP := make(map[string]int64)
+			destBytesUpUDP := make(map[string]int64)
+			destBytesDownUDP := make(map[string]int64)
+
+			for _, ASN := range destinationBytesMetricsASNs {
+
+				destinationBytesMetrics, ok :=
+					sshClient.destinationBytesMetrics[ASN]
+				if !ok {
+					continue
+				}
+
+				bytesUpTCP := destinationBytesMetrics.tcpMetrics.getBytesUp()
+				bytesDownTCP := destinationBytesMetrics.tcpMetrics.getBytesDown()
+				bytesUpUDP := destinationBytesMetrics.udpMetrics.getBytesUp()
+				bytesDownUDP := destinationBytesMetrics.udpMetrics.getBytesDown()
+
+				destBytes[ASN] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
+				destBytesUpTCP[ASN] = bytesUpTCP
+				destBytesDownTCP[ASN] = bytesDownTCP
+				destBytesUpUDP[ASN] = bytesUpUDP
+				destBytesDownUDP[ASN] = bytesDownUDP
+			}
+
+			logFields["asn_dest_bytes"] = destBytes
+			logFields["asn_dest_bytes_up_tcp"] = destBytesUpTCP
+			logFields["asn_dest_bytes_down_tcp"] = destBytesDownTCP
+			logFields["asn_dest_bytes_up_udp"] = destBytesUpUDP
+			logFields["asn_dest_bytes_down_udp"] = destBytesDownUDP
+		}
 	}
 
 	// Only log fields for peakMetrics when there is data recorded, otherwise
@@ -4094,28 +4151,27 @@ func (sshClient *sshClient) setDestinationBytesMetrics() {
 		return
 	}
 
-	// Future enhancement: for 5 or fewer ASNs, iterate over a slice instead
-	// of using a map? See, for example, stringLookupThreshold in
-	// common/tactics.
-
 	ASNs := p.Strings(parameters.DestinationBytesMetricsASNs)
 
-	// Use the legacy single ASN parameter when DestinationBytesMetricsASNs is
-	// empty.
-	if len(ASNs) == 0 {
-		ASN := p.String(parameters.DestinationBytesMetricsASN)
-		if ASN != "" {
-			ASNs = []string{ASN}
-		}
-	}
+	// Merge in any legacy parameters.DestinationBytesMetricsASN
+	// configuration. Data for this target will be logged using the legacy
+	// log field format; see logTunnel. If an ASN is in _both_ configuration
+	// parameters, its data will be logged in both log field formats.
+	ASN := p.String(parameters.DestinationBytesMetricsASN)
 
-	if len(ASNs) == 0 {
+	if len(ASNs) == 0 && ASN == "" {
 		return
 	}
 
 	sshClient.destinationBytesMetrics = make(map[string]*protocolDestinationBytesMetrics)
 
 	for _, ASN := range ASNs {
+		if ASN != "" {
+			sshClient.destinationBytesMetrics[ASN] = &protocolDestinationBytesMetrics{}
+		}
+	}
+
+	if ASN != "" {
 		sshClient.destinationBytesMetrics[ASN] = &protocolDestinationBytesMetrics{}
 	}
 }
@@ -4130,6 +4186,9 @@ func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType in
 
 	destinationASN := sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN
 
+	// Future enhancement: for 5 or fewer ASNs, iterate over a slice instead
+	// of using a map? See, for example, stringLookupThreshold in
+	// common/tactics.
 	metrics, ok := sshClient.destinationBytesMetrics[destinationASN]
 	if !ok {
 		return nil