Просмотр исходного кода

Report regional server_load stats

Rod Hynes 9 лет назад
Родитель
Сommit
12a36f68a1
1 измененных файлов с 54 добавлено и 30 удалено
  1. 54 30
      psiphon/server/tunnelServer.go

+ 54 - 30
psiphon/server/tunnelServer.go

@@ -191,7 +191,7 @@ func (server *TunnelServer) Run() error {
 // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
 // include current connected client count, total number of current port
 // forwards.
-func (server *TunnelServer) GetLoadStats() map[string]map[string]int64 {
+func (server *TunnelServer) GetLoadStats() map[string]interface{} {
 	return server.sshServer.getLoadStats()
 }
 
@@ -239,7 +239,7 @@ type sshServer struct {
 	sshHostKey           ssh.Signer
 	clientsMutex         sync.Mutex
 	stoppingClients      bool
-	acceptedClientCounts map[string]int64
+	acceptedClientCounts map[string]map[string]int64
 	clients              map[string]*sshClient
 }
 
@@ -263,7 +263,7 @@ func newSSHServer(
 		establishTunnels:     1,
 		shutdownBroadcast:    shutdownBroadcast,
 		sshHostKey:           signer,
-		acceptedClientCounts: make(map[string]int64),
+		acceptedClientCounts: make(map[string]map[string]int64),
 		clients:              make(map[string]*sshClient),
 	}, nil
 }
@@ -375,20 +375,24 @@ func (sshServer *sshServer) runListener(
 
 // An accepted client has completed a direct TCP or meek connection and has a net.Conn. Registration
 // is for tracking the number of connections.
-func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol string) {
+func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {
 
 	sshServer.clientsMutex.Lock()
 	defer sshServer.clientsMutex.Unlock()
 
-	sshServer.acceptedClientCounts[tunnelProtocol] += 1
+	if sshServer.acceptedClientCounts[tunnelProtocol] == nil {
+		sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)
+	}
+
+	sshServer.acceptedClientCounts[tunnelProtocol][region] += 1
 }
 
-func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol string) {
+func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {
 
 	sshServer.clientsMutex.Lock()
 	defer sshServer.clientsMutex.Unlock()
 
-	sshServer.acceptedClientCounts[tunnelProtocol] -= 1
+	sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1
 }
 
 // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
@@ -437,43 +441,57 @@ func (sshServer *sshServer) unregisterEstablishedClient(sessionID string) {
 	}
 }
 
-func (sshServer *sshServer) getLoadStats() map[string]map[string]int64 {
+func (sshServer *sshServer) getLoadStats() map[string]interface{} {
 
 	sshServer.clientsMutex.Lock()
 	defer sshServer.clientsMutex.Unlock()
 
-	loadStats := make(map[string]map[string]int64)
+	protocolStats := make(map[string]map[string]map[string]int64)
 
 	// Explicitly populate with zeros to get 0 counts in log messages derived from getLoadStats()
 
 	for tunnelProtocol, _ := range sshServer.support.Config.TunnelProtocolPorts {
-		loadStats[tunnelProtocol] = make(map[string]int64)
-		loadStats[tunnelProtocol]["accepted_clients"] = 0
-		loadStats[tunnelProtocol]["established_clients"] = 0
-		loadStats[tunnelProtocol]["tcp_port_forwards"] = 0
-		loadStats[tunnelProtocol]["total_tcp_port_forwards"] = 0
-		loadStats[tunnelProtocol]["udp_port_forwards"] = 0
-		loadStats[tunnelProtocol]["total_udp_port_forwards"] = 0
+		protocolStats[tunnelProtocol] = make(map[string]map[string]int64)
+		protocolStats[tunnelProtocol]["ALL"] = make(map[string]int64)
+		protocolStats[tunnelProtocol]["ALL"]["accepted_clients"] = 0
+		protocolStats[tunnelProtocol]["ALL"]["established_clients"] = 0
+		protocolStats[tunnelProtocol]["ALL"]["tcp_port_forwards"] = 0
+		protocolStats[tunnelProtocol]["ALL"]["total_tcp_port_forwards"] = 0
+		protocolStats[tunnelProtocol]["ALL"]["udp_port_forwards"] = 0
+		protocolStats[tunnelProtocol]["ALL"]["total_udp_port_forwards"] = 0
 	}
 
 	// Note: as currently tracked/counted, each established client is also an accepted client
 
-	for tunnelProtocol, acceptedClientCount := range sshServer.acceptedClientCounts {
-		loadStats[tunnelProtocol]["accepted_clients"] = acceptedClientCount
+	for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {
+		total := int64(0)
+		for region, acceptedClientCount := range regionAcceptedClientCounts {
+			if protocolStats[tunnelProtocol][region] == nil {
+				protocolStats[tunnelProtocol][region] = make(map[string]int64)
+			}
+			protocolStats[tunnelProtocol][region]["accepted_clients"] = acceptedClientCount
+			total += acceptedClientCount
+		}
+		protocolStats[tunnelProtocol]["ALL"]["accepted_clients"] = total
 	}
 
 	var aggregatedQualityMetrics qualityMetrics
 
 	for _, client := range sshServer.clients {
-		// Note: can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
-		loadStats[client.tunnelProtocol]["established_clients"] += 1
 
 		client.Lock()
 
-		loadStats[client.tunnelProtocol]["tcp_port_forwards"] += client.tcpTrafficState.concurrentPortForwardCount
-		loadStats[client.tunnelProtocol]["total_tcp_port_forwards"] += client.tcpTrafficState.totalPortForwardCount
-		loadStats[client.tunnelProtocol]["udp_port_forwards"] += client.udpTrafficState.concurrentPortForwardCount
-		loadStats[client.tunnelProtocol]["total_udp_port_forwards"] += client.udpTrafficState.totalPortForwardCount
+		for _, region := range []string{"ALL", client.geoIPData.Country} {
+
+			// Note: can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
+			protocolStats[client.tunnelProtocol][region]["established_clients"] += 1
+
+			protocolStats[client.tunnelProtocol][region]["tcp_port_forwards"] += client.tcpTrafficState.concurrentPortForwardCount
+			protocolStats[client.tunnelProtocol][region]["total_tcp_port_forwards"] += client.tcpTrafficState.totalPortForwardCount
+			protocolStats[client.tunnelProtocol][region]["udp_port_forwards"] += client.udpTrafficState.concurrentPortForwardCount
+			protocolStats[client.tunnelProtocol][region]["total_udp_port_forwards"] += client.udpTrafficState.totalPortForwardCount
+
+		}
 
 		aggregatedQualityMetrics.tcpPortForwardDialedCount += client.qualityMetrics.tcpPortForwardDialedCount
 		aggregatedQualityMetrics.tcpPortForwardDialedDuration +=
@@ -504,13 +522,19 @@ func (sshServer *sshServer) getLoadStats() map[string]map[string]int64 {
 	allProtocolsStats["tcp_port_forward_failed_count"] = aggregatedQualityMetrics.tcpPortForwardFailedCount
 	allProtocolsStats["tcp_port_forward_failed_duration"] = int64(aggregatedQualityMetrics.tcpPortForwardFailedDuration)
 
-	for _, stats := range loadStats {
-		for name, value := range stats {
-			allProtocolsStats[name] += value
+	for _, regionStats := range protocolStats {
+		for _, stats := range regionStats {
+			for name, value := range stats {
+				allProtocolsStats[name] += value
+			}
 		}
 	}
 
+	loadStats := make(map[string]interface{})
 	loadStats["ALL"] = allProtocolsStats
+	for tunnelProtocol, stats := range protocolStats {
+		loadStats[tunnelProtocol] = stats
+	}
 
 	return loadStats
 }
@@ -577,12 +601,12 @@ func (sshServer *sshServer) stopClients() {
 
 func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.Conn) {
 
-	sshServer.registerAcceptedClient(tunnelProtocol)
-	defer sshServer.unregisterAcceptedClient(tunnelProtocol)
-
 	geoIPData := sshServer.support.GeoIPService.Lookup(
 		common.IPAddressFromAddr(clientConn.RemoteAddr()))
 
+	sshServer.registerAcceptedClient(tunnelProtocol, geoIPData.Country)
+	defer sshServer.unregisterAcceptedClient(tunnelProtocol, geoIPData.Country)
+
 	sshClient := newSshClient(sshServer, tunnelProtocol, geoIPData)
 
 	sshClient.run(clientConn)