Explorar o código

Merge pull request #270 from rod-hynes/master

psiphond: log TCP port forward dial durations as a quality metric
Rod Hynes %!s(int64=9) %!d(string=hai) anos
pai
achega
c08432380c
Modificáronse 1 ficheiros con 55 adicións e 3 borrados
  1. 55 3
      psiphon/server/tunnelServer.go

+ 55 - 3
psiphon/server/tunnelServer.go

@@ -32,6 +32,7 @@ import (
 	"time"
 
 	"github.com/Psiphon-Inc/crypto/ssh"
+	"github.com/Psiphon-Inc/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 )
@@ -433,14 +434,30 @@ func (sshServer *sshServer) getLoadStats() map[string]map[string]int64 {
 		loadStats[tunnelProtocol]["accepted_clients"] = acceptedClientCount
 	}
 
+	var aggregatedQualityMetrics qualityMetrics
+
 	for _, client := range sshServer.clients {
 		// Note: can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
 		loadStats[client.tunnelProtocol]["established_clients"] += 1
+
 		client.Lock()
+
 		loadStats[client.tunnelProtocol]["tcp_port_forwards"] += client.tcpTrafficState.concurrentPortForwardCount
 		loadStats[client.tunnelProtocol]["total_tcp_port_forwards"] += client.tcpTrafficState.totalPortForwardCount
 		loadStats[client.tunnelProtocol]["udp_port_forwards"] += client.udpTrafficState.concurrentPortForwardCount
 		loadStats[client.tunnelProtocol]["total_udp_port_forwards"] += client.udpTrafficState.totalPortForwardCount
+
+		aggregatedQualityMetrics.tcpPortForwardDialedCount += client.qualityMetrics.tcpPortForwardDialedCount
+		aggregatedQualityMetrics.tcpPortForwardDialedDuration +=
+			client.qualityMetrics.tcpPortForwardDialedDuration / time.Millisecond
+		aggregatedQualityMetrics.tcpPortForwardFailedCount += client.qualityMetrics.tcpPortForwardFailedCount
+		aggregatedQualityMetrics.tcpPortForwardFailedDuration +=
+			client.qualityMetrics.tcpPortForwardFailedDuration / time.Millisecond
+		client.qualityMetrics.tcpPortForwardDialedCount = 0
+		client.qualityMetrics.tcpPortForwardDialedDuration = 0
+		client.qualityMetrics.tcpPortForwardFailedCount = 0
+		client.qualityMetrics.tcpPortForwardFailedDuration = 0
+
 		client.Unlock()
 	}
 
@@ -455,6 +472,11 @@ func (sshServer *sshServer) getLoadStats() map[string]map[string]int64 {
 	}
 	loadStats["ALL"] = allProtocolsStats
 
+	loadStats["ALL"]["tcp_port_forward_dialed_count"] = aggregatedQualityMetrics.tcpPortForwardDialedCount
+	loadStats["ALL"]["tcp_port_forward_dialed_duration"] = int64(aggregatedQualityMetrics.tcpPortForwardDialedDuration)
+	loadStats["ALL"]["tcp_port_forward_failed_count"] = aggregatedQualityMetrics.tcpPortForwardFailedCount
+	loadStats["ALL"]["tcp_port_forward_failed_duration"] = int64(aggregatedQualityMetrics.tcpPortForwardFailedDuration)
+
 	return loadStats
 }
 
@@ -649,15 +671,13 @@ type sshClient struct {
 	trafficRules            TrafficRules
 	tcpTrafficState         trafficState
 	udpTrafficState         trafficState
+	qualityMetrics          qualityMetrics
 	channelHandlerWaitGroup *sync.WaitGroup
 	tcpPortForwardLRU       *common.LRUConns
 	stopBroadcast           chan struct{}
 }
 
 type trafficState struct {
-	// Note: 64-bit ints used with atomic operations are at placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
 	bytesUp                        int64
 	bytesDown                      int64
 	concurrentPortForwardCount     int64
@@ -665,6 +685,18 @@ type trafficState struct {
 	totalPortForwardCount          int64
 }
 
+// qualityMetrics records upstream TCP dial attempts and
+// elapsed time. Elapsed time includes the full TCP handshake
+// and, in aggregate, is a measure of the quality of the
+// upstream link. These stats are recorded by each sshClient
+// and then reported and reset in sshServer.getLoadStats().
+type qualityMetrics struct {
+	tcpPortForwardDialedCount    int64
+	tcpPortForwardDialedDuration time.Duration
+	tcpPortForwardFailedCount    int64
+	tcpPortForwardFailedDuration time.Duration
+}
+
 type handshakeState struct {
 	completed   bool
 	apiProtocol string
@@ -1075,6 +1107,22 @@ func (sshClient *sshClient) openedPortForward(
 	state.totalPortForwardCount += 1
 }
 
+func (sshClient *sshClient) updateQualityMetrics(
+	tcpPortForwardDialSuccess bool, dialDuration time.Duration) {
+
+	sshClient.Lock()
+	defer sshClient.Unlock()
+
+	if tcpPortForwardDialSuccess {
+		sshClient.qualityMetrics.tcpPortForwardDialedCount += 1
+		sshClient.qualityMetrics.tcpPortForwardDialedDuration += dialDuration
+
+	} else {
+		sshClient.qualityMetrics.tcpPortForwardFailedCount += 1
+		sshClient.qualityMetrics.tcpPortForwardFailedDuration += dialDuration
+	}
+}
+
 func (sshClient *sshClient) closedPortForward(
 	portForwardType int, bytesUp, bytesDown int64) {
 
@@ -1184,6 +1232,7 @@ func (sshClient *sshClient) handleTCPChannel(
 	}
 
 	resultChannel := make(chan *dialTcpResult, 1)
+	dialStartTime := monotime.Now()
 
 	go func() {
 		// TODO: on EADDRNOTAVAIL, temporarily suspend new clients
@@ -1201,6 +1250,9 @@ func (sshClient *sshClient) handleTCPChannel(
 		return
 	}
 
+	sshClient.updateQualityMetrics(
+		result.err == nil, monotime.Since(dialStartTime))
+
 	if result.err != nil {
 		sshClient.rejectNewChannel(newChannel, ssh.ConnectionFailed, result.err.Error())
 		return