Parcourir la source

Move upstream metrics to top level of server_load

- These metrics were already logged only once per
  server_load, under the non-region, "ALL" protocol.
Rod Hynes il y a 4 ans
Parent
commit
51f3b2cdaf
2 fichiers modifiés avec 59 ajouts et 45 suppressions
  1. 5 1
      psiphon/server/services.go
  2. 54 44
      psiphon/server/tunnelServer.go

+ 5 - 1
psiphon/server/services.go

@@ -376,9 +376,13 @@ func logServerLoad(support *SupportServices) {
 
 
 	serverLoad.Add(support.ServerTacticsParametersCache.GetMetrics())
 	serverLoad.Add(support.ServerTacticsParametersCache.GetMetrics())
 
 
-	protocolStats, regionStats :=
+	upstreamStats, protocolStats, regionStats :=
 		support.TunnelServer.GetLoadStats()
 		support.TunnelServer.GetLoadStats()
 
 
+	for name, value := range upstreamStats {
+		serverLoad[name] = value
+	}
+
 	for protocol, stats := range protocolStats {
 	for protocol, stats := range protocolStats {
 		serverLoad[protocol] = stats
 		serverLoad[protocol] = stats
 	}
 	}

+ 54 - 44
psiphon/server/tunnelServer.go

@@ -264,7 +264,9 @@ func (server *TunnelServer) Run() error {
 // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
 // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
 // include current connected client count, total number of current port
 // include current connected client count, total number of current port
 // forwards.
 // forwards.
-func (server *TunnelServer) GetLoadStats() (ProtocolStats, RegionStats) {
+func (server *TunnelServer) GetLoadStats() (
+	UpstreamStats, ProtocolStats, RegionStats) {
+
 	return server.sshServer.getLoadStats()
 	return server.sshServer.getLoadStats()
 }
 }
 
 
@@ -717,17 +719,19 @@ func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
 	client.stop()
 	client.stop()
 }
 }
 
 
+type UpstreamStats map[string]interface{}
 type ProtocolStats map[string]map[string]interface{}
 type ProtocolStats map[string]map[string]interface{}
 type RegionStats map[string]map[string]map[string]interface{}
 type RegionStats map[string]map[string]map[string]interface{}
 
 
-func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
+func (sshServer *sshServer) getLoadStats() (
+	UpstreamStats, ProtocolStats, RegionStats) {
 
 
 	sshServer.clientsMutex.Lock()
 	sshServer.clientsMutex.Lock()
 	defer sshServer.clientsMutex.Unlock()
 	defer sshServer.clientsMutex.Unlock()
 
 
 	// Explicitly populate with zeros to ensure 0 counts in log messages.
 	// Explicitly populate with zeros to ensure 0 counts in log messages.
 
 
-	zeroStats := func() map[string]interface{} {
+	zeroClientStats := func() map[string]interface{} {
 		stats := make(map[string]interface{})
 		stats := make(map[string]interface{})
 		stats["accepted_clients"] = int64(0)
 		stats["accepted_clients"] = int64(0)
 		stats["established_clients"] = int64(0)
 		stats["established_clients"] = int64(0)
@@ -739,10 +743,9 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 	// enumeration for zeroing is a best effort.
 	// enumeration for zeroing is a best effort.
 	resolverIPs := sshServer.support.DNSResolver.GetAll()
 	resolverIPs := sshServer.support.DNSResolver.GetAll()
 
 
-	// Only the non-region "ALL" log has the following fields, which are
-	// primarily concerned with upstream/egress performance.
-	zeroStatsAll := func() map[string]interface{} {
-		stats := zeroStats()
+	// Fields which are primarily concerned with upstream/egress performance.
+	zeroUpstreamStats := func() map[string]interface{} {
+		stats := make(map[string]interface{})
 		stats["dialing_tcp_port_forwards"] = int64(0)
 		stats["dialing_tcp_port_forwards"] = int64(0)
 		stats["tcp_port_forwards"] = int64(0)
 		stats["tcp_port_forwards"] = int64(0)
 		stats["total_tcp_port_forwards"] = int64(0)
 		stats["total_tcp_port_forwards"] = int64(0)
@@ -779,15 +782,11 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 		return stats
 		return stats
 	}
 	}
 
 
-	zeroProtocolStats := func(nonRegion bool) map[string]map[string]interface{} {
+	zeroProtocolStats := func() map[string]map[string]interface{} {
 		stats := make(map[string]map[string]interface{})
 		stats := make(map[string]map[string]interface{})
-		if nonRegion {
-			stats["ALL"] = zeroStatsAll()
-		} else {
-			stats["ALL"] = zeroStats()
-		}
+		stats["ALL"] = zeroClientStats()
 		for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
 		for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
-			stats[tunnelProtocol] = zeroStats()
+			stats[tunnelProtocol] = zeroClientStats()
 		}
 		}
 		return stats
 		return stats
 	}
 	}
@@ -796,8 +795,10 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 		stats[name] = stats[name].(int64) + value
 		stats[name] = stats[name].(int64) + value
 	}
 	}
 
 
+	upstreamStats := zeroUpstreamStats()
+
 	// [<protocol or ALL>][<stat name>] -> count
 	// [<protocol or ALL>][<stat name>] -> count
-	protocolStats := zeroProtocolStats(true)
+	protocolStats := zeroProtocolStats()
 
 
 	// [<region][<protocol or ALL>][<stat name>] -> count
 	// [<region][<protocol or ALL>][<stat name>] -> count
 	regionStats := make(RegionStats)
 	regionStats := make(RegionStats)
@@ -809,7 +810,7 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 
 
 			if acceptedClientCount > 0 {
 			if acceptedClientCount > 0 {
 				if regionStats[region] == nil {
 				if regionStats[region] == nil {
-					regionStats[region] = zeroProtocolStats(false)
+					regionStats[region] = zeroProtocolStats()
 				}
 				}
 
 
 				addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)
 				addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)
@@ -829,7 +830,7 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 		region := client.geoIPData.Country
 		region := client.geoIPData.Country
 
 
 		if regionStats[region] == nil {
 		if regionStats[region] == nil {
-			regionStats[region] = zeroProtocolStats(false)
+			regionStats[region] = zeroProtocolStats()
 		}
 		}
 
 
 		for _, stats := range []map[string]interface{}{
 		for _, stats := range []map[string]interface{}{
@@ -841,59 +842,68 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 			addInt64(stats, "established_clients", 1)
 			addInt64(stats, "established_clients", 1)
 		}
 		}
 
 
-		stats := protocolStats["ALL"]
-
 		// Note:
 		// Note:
 		// - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
 		// - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
 		// - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
 		// - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
 
 
-		addInt64(stats, "dialing_tcp_port_forwards", client.tcpTrafficState.concurrentDialingPortForwardCount)
+		addInt64(upstreamStats, "dialing_tcp_port_forwards",
+			client.tcpTrafficState.concurrentDialingPortForwardCount)
 
 
-		addInt64(stats, "tcp_port_forwards", client.tcpTrafficState.concurrentPortForwardCount)
+		addInt64(upstreamStats, "tcp_port_forwards",
+			client.tcpTrafficState.concurrentPortForwardCount)
 
 
-		addInt64(stats, "total_tcp_port_forwards", client.tcpTrafficState.totalPortForwardCount)
+		addInt64(upstreamStats, "total_tcp_port_forwards",
+			client.tcpTrafficState.totalPortForwardCount)
 
 
-		addInt64(stats, "udp_port_forwards", client.udpTrafficState.concurrentPortForwardCount)
+		addInt64(upstreamStats, "udp_port_forwards",
+			client.udpTrafficState.concurrentPortForwardCount)
 
 
-		addInt64(stats, "total_udp_port_forwards", client.udpTrafficState.totalPortForwardCount)
+		addInt64(upstreamStats, "total_udp_port_forwards",
+			client.udpTrafficState.totalPortForwardCount)
 
 
-		addInt64(stats, "tcp_port_forward_dialed_count", client.qualityMetrics.TCPPortForwardDialedCount)
+		addInt64(upstreamStats, "tcp_port_forward_dialed_count",
+			client.qualityMetrics.TCPPortForwardDialedCount)
 
 
-		addInt64(stats, "tcp_port_forward_dialed_duration",
+		addInt64(upstreamStats, "tcp_port_forward_dialed_duration",
 			int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))
 			int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))
 
 
-		addInt64(stats, "tcp_port_forward_failed_count", client.qualityMetrics.TCPPortForwardFailedCount)
+		addInt64(upstreamStats, "tcp_port_forward_failed_count",
+			client.qualityMetrics.TCPPortForwardFailedCount)
 
 
-		addInt64(stats, "tcp_port_forward_failed_duration",
+		addInt64(upstreamStats, "tcp_port_forward_failed_duration",
 			int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))
 			int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))
 
 
-		addInt64(stats, "tcp_port_forward_rejected_dialing_limit_count",
+		addInt64(upstreamStats, "tcp_port_forward_rejected_dialing_limit_count",
 			client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)
 			client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)
 
 
-		addInt64(stats, "tcp_port_forward_rejected_disallowed_count",
+		addInt64(upstreamStats, "tcp_port_forward_rejected_disallowed_count",
 			client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)
 			client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)
 
 
-		addInt64(stats, "udp_port_forward_rejected_disallowed_count",
+		addInt64(upstreamStats, "udp_port_forward_rejected_disallowed_count",
 			client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)
 			client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)
 
 
-		addInt64(stats, "tcp_ipv4_port_forward_dialed_count", client.qualityMetrics.TCPIPv4PortForwardDialedCount)
+		addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_count",
+			client.qualityMetrics.TCPIPv4PortForwardDialedCount)
 
 
-		addInt64(stats, "tcp_ipv4_port_forward_dialed_duration",
+		addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_duration",
 			int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))
 			int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))
 
 
-		addInt64(stats, "tcp_ipv4_port_forward_failed_count", client.qualityMetrics.TCPIPv4PortForwardFailedCount)
+		addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_count",
+			client.qualityMetrics.TCPIPv4PortForwardFailedCount)
 
 
-		addInt64(stats, "tcp_ipv4_port_forward_failed_duration",
+		addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_duration",
 			int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))
 			int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))
 
 
-		addInt64(stats, "tcp_ipv6_port_forward_dialed_count", client.qualityMetrics.TCPIPv6PortForwardDialedCount)
+		addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_count",
+			client.qualityMetrics.TCPIPv6PortForwardDialedCount)
 
 
-		addInt64(stats, "tcp_ipv6_port_forward_dialed_duration",
+		addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_duration",
 			int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))
 			int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))
 
 
-		addInt64(stats, "tcp_ipv6_port_forward_failed_count", client.qualityMetrics.TCPIPv6PortForwardFailedCount)
+		addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_count",
+			client.qualityMetrics.TCPIPv6PortForwardFailedCount)
 
 
-		addInt64(stats, "tcp_ipv6_port_forward_failed_duration",
+		addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_duration",
 			int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))
 			int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))
 
 
 		// DNS metrics limitations:
 		// DNS metrics limitations:
@@ -903,19 +913,19 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 		// Every client.qualityMetrics DNS map has an "ALL" entry.
 		// Every client.qualityMetrics DNS map has an "ALL" entry.
 
 
 		for key, value := range client.qualityMetrics.DNSCount {
 		for key, value := range client.qualityMetrics.DNSCount {
-			stats["dns_count"].(map[string]int64)[key] += value
+			upstreamStats["dns_count"].(map[string]int64)[key] += value
 		}
 		}
 
 
 		for key, value := range client.qualityMetrics.DNSDuration {
 		for key, value := range client.qualityMetrics.DNSDuration {
-			stats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
+			upstreamStats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
 		}
 		}
 
 
 		for key, value := range client.qualityMetrics.DNSFailedCount {
 		for key, value := range client.qualityMetrics.DNSFailedCount {
-			stats["dns_failed_count"].(map[string]int64)[key] += value
+			upstreamStats["dns_failed_count"].(map[string]int64)[key] += value
 		}
 		}
 
 
 		for key, value := range client.qualityMetrics.DNSFailedDuration {
 		for key, value := range client.qualityMetrics.DNSFailedDuration {
-			stats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
+			upstreamStats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
 		}
 		}
 
 
 		client.qualityMetrics.reset()
 		client.qualityMetrics.reset()
@@ -923,7 +933,7 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 		client.Unlock()
 		client.Unlock()
 	}
 	}
 
 
-	return protocolStats, regionStats
+	return upstreamStats, protocolStats, regionStats
 }
 }
 
 
 func (sshServer *sshServer) getEstablishedClientCount() int {
 func (sshServer *sshServer) getEstablishedClientCount() int {