Explorar el Código

Add DNS quality metrics

- Log DNS performance counters, including which
  resolvers are providing responses.

- To limit log size, remove all upstream/egress-
  oriented performance counters from per-protocol
  and per-region load logs.
Rod Hynes hace 4 años
padre
commit
7bcfa4857c
Se han modificado 5 ficheros con 421 adiciones y 134 borrados
  1. 101 17
      psiphon/common/tun/tun.go
  2. 4 1
      psiphon/common/tun/tun_test.go
  3. 18 5
      psiphon/server/dns.go
  4. 250 102
      psiphon/server/tunnelServer.go
  5. 48 9
      psiphon/server/udp.go

+ 101 - 17
psiphon/common/tun/tun.go

@@ -357,6 +357,12 @@ type MetricsUpdater func(
 	TCPApplicationBytesDown, TCPApplicationBytesUp,
 	UDPApplicationBytesDown, UDPApplicationBytesUp int64)
 
+// DNSQualityReporter is a function which receives a DNS quality report:
+// whether a DNS request received a reponse, the elapsed time, and the
+// resolver used.
+type DNSQualityReporter func(
+	receivedResponse bool, requestDuration time.Duration, resolverIP net.IP)
+
 // ClientConnected handles new client connections, creating or resuming
 // a session and returns with client packet handlers running.
 //
@@ -394,7 +400,8 @@ func (server *Server) ClientConnected(
 	checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
 	checkAllowedDomainFunc AllowedDomainChecker,
 	flowActivityUpdaterMaker FlowActivityUpdaterMaker,
-	metricsUpdater MetricsUpdater) error {
+	metricsUpdater MetricsUpdater,
+	dnsQualityReporter DNSQualityReporter) error {
 
 	// It's unusual to call both sync.WaitGroup.Add() _and_ Done() in the same
 	// goroutine. There's no other place to call Add() since ClientConnected is
@@ -479,7 +486,8 @@ func (server *Server) ClientConnected(
 		checkAllowedUDPPortFunc,
 		checkAllowedDomainFunc,
 		flowActivityUpdaterMaker,
-		metricsUpdater)
+		metricsUpdater,
+		dnsQualityReporter)
 
 	return nil
 }
@@ -518,7 +526,8 @@ func (server *Server) resumeSession(
 	checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
 	checkAllowedDomainFunc AllowedDomainChecker,
 	flowActivityUpdaterMaker FlowActivityUpdaterMaker,
-	metricsUpdater MetricsUpdater) {
+	metricsUpdater MetricsUpdater,
+	dnsQualityReporter DNSQualityReporter) {
 
 	session.mutex.Lock()
 	defer session.mutex.Unlock()
@@ -560,6 +569,8 @@ func (server *Server) resumeSession(
 
 	session.setMetricsUpdater(&metricsUpdater)
 
+	session.setDNSQualityReporter(&dnsQualityReporter)
+
 	session.channel = channel
 
 	// Parent context is not server.runContext so that session workers
@@ -1071,12 +1082,13 @@ type session struct {
 	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
 	lastActivity             int64
 	lastFlowReapIndex        int64
+	downstreamPackets        unsafe.Pointer
 	checkAllowedTCPPortFunc  unsafe.Pointer
 	checkAllowedUDPPortFunc  unsafe.Pointer
 	checkAllowedDomainFunc   unsafe.Pointer
 	flowActivityUpdaterMaker unsafe.Pointer
 	metricsUpdater           unsafe.Pointer
-	downstreamPackets        unsafe.Pointer
+	dnsQualityReporter       unsafe.Pointer
 
 	allowBogons              bool
 	metrics                  *packetMetrics
@@ -1138,6 +1150,14 @@ func (session *session) getOriginalIPv6Address() net.IP {
 	return session.originalIPv6Address
 }
 
+func (session *session) setDownstreamPackets(p *PacketQueue) {
+	atomic.StorePointer(&session.downstreamPackets, unsafe.Pointer(p))
+}
+
+func (session *session) getDownstreamPackets() *PacketQueue {
+	return (*PacketQueue)(atomic.LoadPointer(&session.downstreamPackets))
+}
+
 func (session *session) setCheckAllowedTCPPortFunc(p *AllowedPortChecker) {
 	atomic.StorePointer(&session.checkAllowedTCPPortFunc, unsafe.Pointer(p))
 }
@@ -1198,12 +1218,16 @@ func (session *session) getMetricsUpdater() MetricsUpdater {
 	return *p
 }
 
-func (session *session) setDownstreamPackets(p *PacketQueue) {
-	atomic.StorePointer(&session.downstreamPackets, unsafe.Pointer(p))
+func (session *session) setDNSQualityReporter(p *DNSQualityReporter) {
+	atomic.StorePointer(&session.dnsQualityReporter, unsafe.Pointer(p))
 }
 
-func (session *session) getDownstreamPackets() *PacketQueue {
-	return (*PacketQueue)(atomic.LoadPointer(&session.downstreamPackets))
+func (session *session) getDNSQualityReporter() DNSQualityReporter {
+	p := (*DNSQualityReporter)(atomic.LoadPointer(&session.dnsQualityReporter))
+	if p == nil {
+		return nil
+	}
+	return *p
 }
 
 // flowID identifies an IP traffic flow using the conventional
@@ -1249,9 +1273,13 @@ type flowState struct {
 	// Note: 64-bit ints used with atomic operations are placed
 	// at the start of struct to ensure 64-bit alignment.
 	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	lastUpstreamPacketTime   int64
-	lastDownstreamPacketTime int64
-	activityUpdaters         []FlowActivityUpdater
+	firstUpstreamPacketTime   int64
+	lastUpstreamPacketTime    int64
+	firstDownstreamPacketTime int64
+	lastDownstreamPacketTime  int64
+	isDNS                     bool
+	dnsQualityReporter        DNSQualityReporter
+	activityUpdaters          []FlowActivityUpdater
 }
 
 func (flowState *flowState) expired(idleExpiry time.Duration) bool {
@@ -1271,7 +1299,7 @@ func (session *session) isTrackingFlow(ID flowID) bool {
 
 	// Check if flow is expired but not yet reaped.
 	if flowState.expired(FLOW_IDLE_EXPIRY) {
-		session.flows.Delete(ID)
+		session.deleteFlow(ID, flowState)
 		return false
 	}
 
@@ -1285,6 +1313,7 @@ func (session *session) isTrackingFlow(ID flowID) bool {
 // - one-time permissions checks for a flow
 // - OSLs
 // - domain bytes transferred [TODO]
+// - DNS quality metrics
 //
 // The applicationData from the first packet in the flow is
 // inspected to determine any associated hostname, using HTTP or
@@ -1305,7 +1334,10 @@ func (session *session) isTrackingFlow(ID flowID) bool {
 // startTrackingFlow may be called from concurrent goroutines; if
 // the flow is already tracked, it is simply updated.
 func (session *session) startTrackingFlow(
-	ID flowID, direction packetDirection, applicationData []byte) {
+	ID flowID,
+	direction packetDirection,
+	applicationData []byte,
+	isDNS bool) {
 
 	now := int64(monotime.Now())
 
@@ -1334,12 +1366,16 @@ func (session *session) startTrackingFlow(
 	}
 
 	flowState := &flowState{
-		activityUpdaters: activityUpdaters,
+		isDNS:              isDNS,
+		activityUpdaters:   activityUpdaters,
+		dnsQualityReporter: session.getDNSQualityReporter(),
 	}
 
 	if direction == packetDirectionServerUpstream {
+		flowState.firstUpstreamPacketTime = now
 		flowState.lastUpstreamPacketTime = now
 	} else {
+		flowState.firstDownstreamPacketTime = now
 		flowState.lastDownstreamPacketTime = now
 	}
 
@@ -1350,7 +1386,9 @@ func (session *session) startTrackingFlow(
 }
 
 func (session *session) updateFlow(
-	ID flowID, direction packetDirection, applicationData []byte) {
+	ID flowID,
+	direction packetDirection,
+	applicationData []byte) {
 
 	f, ok := session.flows.Load(ID)
 	if !ok {
@@ -1366,10 +1404,16 @@ func (session *session) updateFlow(
 
 	if direction == packetDirectionServerUpstream {
 		upstreamBytes = int64(len(applicationData))
+
+		atomic.CompareAndSwapInt64(&flowState.firstUpstreamPacketTime, 0, now)
+
 		atomic.StoreInt64(&flowState.lastUpstreamPacketTime, now)
+
 	} else {
 		downstreamBytes = int64(len(applicationData))
 
+		atomic.CompareAndSwapInt64(&flowState.firstDownstreamPacketTime, 0, now)
+
 		// Follows common.ActivityMonitoredConn semantics, where
 		// duration is updated only for downstream activity. This
 		// is intened to produce equivalent behaviour for port
@@ -1384,12 +1428,52 @@ func (session *session) updateFlow(
 	}
 }
 
+// deleteFlow stops tracking a flow and logs any outstanding metrics.
+// flowState is passed in to avoid duplicating the lookup that all callers
+// have already performed.
+func (session *session) deleteFlow(ID flowID, flowState *flowState) {
+
+	if flowState.isDNS {
+
+		dnsStartTime := monotime.Time(
+			atomic.LoadInt64(&flowState.firstUpstreamPacketTime))
+
+		if dnsStartTime > 0 {
+
+			// Record DNS quality metrics using a heuristic: if a packet was sent and
+			// then a packet was received, assume the DNS request successfully received
+			// a valid response; failure occurs when the resolver fails to provide a
+			// response; a "no such host" response is still a success. Limitations: we
+			// assume a resolver will not respond when, e.g., rate limiting; we ignore
+			// subsequent requests made via the same UDP/TCP flow.
+
+			dnsEndTime := monotime.Time(
+				atomic.LoadInt64(&flowState.firstDownstreamPacketTime))
+
+			dnsSuccess := true
+			if dnsEndTime == 0 {
+				dnsSuccess = false
+				dnsEndTime = monotime.Now()
+			}
+
+			resolveElapsedTime := dnsEndTime.Sub(dnsStartTime)
+
+			flowState.dnsQualityReporter(
+				dnsSuccess,
+				resolveElapsedTime,
+				net.IP(ID.upstreamIPAddress[:]))
+		}
+	}
+
+	session.flows.Delete(ID)
+}
+
 // reapFlows removes expired idle flows.
 func (session *session) reapFlows() {
 	session.flows.Range(func(key, value interface{}) bool {
 		flowState := value.(*flowState)
 		if flowState.expired(FLOW_IDLE_EXPIRY) {
-			session.flows.Delete(key)
+			session.deleteFlow(key.(flowID), flowState)
 		}
 		return true
 	})
@@ -2570,7 +2654,7 @@ func processPacket(
 
 	if doFlowTracking {
 		if !isTrackingFlow {
-			session.startTrackingFlow(ID, direction, applicationData)
+			session.startTrackingFlow(ID, direction, applicationData, doTransparentDNS)
 		} else {
 			session.updateFlow(ID, direction, applicationData)
 		}

+ 4 - 1
psiphon/common/tun/tun_test.go

@@ -409,6 +409,8 @@ func (server *testServer) run() {
 			checkAllowedPortFunc := func(net.IP, int) bool { return true }
 			checkAllowedDomainFunc := func(string) bool { return true }
 
+			dnsQualityReporter := func(_ bool, _ time.Duration, _ net.IP) {}
+
 			server.tunServer.ClientConnected(
 				sessionID,
 				signalConn,
@@ -416,7 +418,8 @@ func (server *testServer) run() {
 				checkAllowedPortFunc,
 				checkAllowedDomainFunc,
 				server.updaterMaker,
-				server.metricsUpdater)
+				server.metricsUpdater,
+				dnsQualityReporter)
 
 			signalConn.Wait()
 

+ 18 - 5
psiphon/server/dns.go

@@ -172,21 +172,28 @@ func (dns *DNSResolver) reloadWhenStale() {
 	}
 }
 
+// GetAll returns a list of all DNS resolver addresses. Cached values are
+// updated if they're stale. If reloading fails, the previous values are
+// used.
+func (dns *DNSResolver) GetAll() []net.IP {
+	return dns.getAll(true, true)
+}
+
 // GetAllIPv4 returns a list of all IPv4 DNS resolver addresses.
 // Cached values are updated if they're stale. If reloading fails,
 // the previous values are used.
 func (dns *DNSResolver) GetAllIPv4() []net.IP {
-	return dns.getAll(false)
+	return dns.getAll(true, false)
 }
 
 // GetAllIPv6 returns a list of all IPv6 DNS resolver addresses.
 // Cached values are updated if they're stale. If reloading fails,
 // the previous values are used.
 func (dns *DNSResolver) GetAllIPv6() []net.IP {
-	return dns.getAll(true)
+	return dns.getAll(false, true)
 }
 
-func (dns *DNSResolver) getAll(wantIPv6 bool) []net.IP {
+func (dns *DNSResolver) getAll(wantIPv4, wantIPv6 bool) []net.IP {
 
 	dns.reloadWhenStale()
 
@@ -195,8 +202,14 @@ func (dns *DNSResolver) getAll(wantIPv6 bool) []net.IP {
 
 	resolvers := make([]net.IP, 0)
 	for _, resolver := range dns.resolvers {
-		if (resolver.To4() == nil) == wantIPv6 {
-			resolvers = append(resolvers, resolver)
+		if resolver.To4() != nil {
+			if wantIPv4 {
+				resolvers = append(resolvers, resolver)
+			}
+		} else {
+			if wantIPv6 {
+				resolvers = append(resolvers, resolver)
+			}
 		}
 	}
 	return resolvers

+ 250 - 102
psiphon/server/tunnelServer.go

@@ -717,53 +717,87 @@ func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
 	client.stop()
 }
 
-type ProtocolStats map[string]map[string]int64
-type RegionStats map[string]map[string]map[string]int64
+type ProtocolStats map[string]map[string]interface{}
+type RegionStats map[string]map[string]map[string]interface{}
 
 func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 
 	sshServer.clientsMutex.Lock()
 	defer sshServer.clientsMutex.Unlock()
 
-	// Explicitly populate with zeros to ensure 0 counts in log messages
-	zeroStats := func() map[string]int64 {
-		stats := make(map[string]int64)
-		stats["accepted_clients"] = 0
-		stats["established_clients"] = 0
-		stats["dialing_tcp_port_forwards"] = 0
-		stats["tcp_port_forwards"] = 0
-		stats["total_tcp_port_forwards"] = 0
-		stats["udp_port_forwards"] = 0
-		stats["total_udp_port_forwards"] = 0
-		stats["tcp_port_forward_dialed_count"] = 0
-		stats["tcp_port_forward_dialed_duration"] = 0
-		stats["tcp_port_forward_failed_count"] = 0
-		stats["tcp_port_forward_failed_duration"] = 0
-		stats["tcp_port_forward_rejected_dialing_limit_count"] = 0
-		stats["tcp_port_forward_rejected_disallowed_count"] = 0
-		stats["udp_port_forward_rejected_disallowed_count"] = 0
-		stats["tcp_ipv4_port_forward_dialed_count"] = 0
-		stats["tcp_ipv4_port_forward_dialed_duration"] = 0
-		stats["tcp_ipv4_port_forward_failed_count"] = 0
-		stats["tcp_ipv4_port_forward_failed_duration"] = 0
-		stats["tcp_ipv6_port_forward_dialed_count"] = 0
-		stats["tcp_ipv6_port_forward_dialed_duration"] = 0
-		stats["tcp_ipv6_port_forward_failed_count"] = 0
-		stats["tcp_ipv6_port_forward_failed_duration"] = 0
+	// Explicitly populate with zeros to ensure 0 counts in log messages.
+
+	zeroStats := func() map[string]interface{} {
+		stats := make(map[string]interface{})
+		stats["accepted_clients"] = int64(0)
+		stats["established_clients"] = int64(0)
+		return stats
+	}
+
+	// Due to hot reload and changes to the underlying system configuration, the
+	// set of resolver IPs may change between getLoadStats calls, so this
+	// enumeration for zeroing is a best effort.
+	resolverIPs := sshServer.support.DNSResolver.GetAll()
+
+	// Only the non-region "ALL" log has the following fields, which are
+	// primarily concerned with upstream/egress performance.
+	zeroStatsAll := func() map[string]interface{} {
+		stats := zeroStats()
+		stats["dialing_tcp_port_forwards"] = int64(0)
+		stats["tcp_port_forwards"] = int64(0)
+		stats["total_tcp_port_forwards"] = int64(0)
+		stats["udp_port_forwards"] = int64(0)
+		stats["total_udp_port_forwards"] = int64(0)
+		stats["tcp_port_forward_dialed_count"] = int64(0)
+		stats["tcp_port_forward_dialed_duration"] = int64(0)
+		stats["tcp_port_forward_failed_count"] = int64(0)
+		stats["tcp_port_forward_failed_duration"] = int64(0)
+		stats["tcp_port_forward_rejected_dialing_limit_count"] = int64(0)
+		stats["tcp_port_forward_rejected_disallowed_count"] = int64(0)
+		stats["udp_port_forward_rejected_disallowed_count"] = int64(0)
+		stats["tcp_ipv4_port_forward_dialed_count"] = int64(0)
+		stats["tcp_ipv4_port_forward_dialed_duration"] = int64(0)
+		stats["tcp_ipv4_port_forward_failed_count"] = int64(0)
+		stats["tcp_ipv4_port_forward_failed_duration"] = int64(0)
+		stats["tcp_ipv6_port_forward_dialed_count"] = int64(0)
+		stats["tcp_ipv6_port_forward_dialed_duration"] = int64(0)
+		stats["tcp_ipv6_port_forward_failed_count"] = int64(0)
+		stats["tcp_ipv6_port_forward_failed_duration"] = int64(0)
+
+		zeroDNSStats := func() map[string]int64 {
+			m := map[string]int64{"ALL": 0}
+			for _, resolverIP := range resolverIPs {
+				m[resolverIP.String()] = 0
+			}
+			return m
+		}
+
+		stats["dns_count"] = zeroDNSStats()
+		stats["dns_duration"] = zeroDNSStats()
+		stats["dns_failed_count"] = zeroDNSStats()
+		stats["dns_failed_duration"] = zeroDNSStats()
 		return stats
 	}
 
-	zeroProtocolStats := func() map[string]map[string]int64 {
-		stats := make(map[string]map[string]int64)
-		stats["ALL"] = zeroStats()
+	zeroProtocolStats := func(nonRegion bool) map[string]map[string]interface{} {
+		stats := make(map[string]map[string]interface{})
+		if nonRegion {
+			stats["ALL"] = zeroStatsAll()
+		} else {
+			stats["ALL"] = zeroStats()
+		}
 		for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
 			stats[tunnelProtocol] = zeroStats()
 		}
 		return stats
 	}
 
+	addInt64 := func(stats map[string]interface{}, name string, value int64) {
+		stats[name] = stats[name].(int64) + value
+	}
+
 	// [<protocol or ALL>][<stat name>] -> count
-	protocolStats := zeroProtocolStats()
+	protocolStats := zeroProtocolStats(true)
 
 	// [<region][<protocol or ALL>][<stat name>] -> count
 	regionStats := make(RegionStats)
@@ -775,14 +809,14 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 
 			if acceptedClientCount > 0 {
 				if regionStats[region] == nil {
-					regionStats[region] = zeroProtocolStats()
+					regionStats[region] = zeroProtocolStats(false)
 				}
 
-				protocolStats["ALL"]["accepted_clients"] += acceptedClientCount
-				protocolStats[tunnelProtocol]["accepted_clients"] += acceptedClientCount
+				addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)
+				addInt64(protocolStats[tunnelProtocol], "accepted_clients", acceptedClientCount)
 
-				regionStats[region]["ALL"]["accepted_clients"] += acceptedClientCount
-				regionStats[region][tunnelProtocol]["accepted_clients"] += acceptedClientCount
+				addInt64(regionStats[region]["ALL"], "accepted_clients", acceptedClientCount)
+				addInt64(regionStats[region][tunnelProtocol], "accepted_clients", acceptedClientCount)
 			}
 		}
 	}
@@ -795,73 +829,96 @@ func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
 		region := client.geoIPData.Country
 
 		if regionStats[region] == nil {
-			regionStats[region] = zeroProtocolStats()
+			regionStats[region] = zeroProtocolStats(false)
 		}
 
-		stats := []map[string]int64{
+		for _, stats := range []map[string]interface{}{
 			protocolStats["ALL"],
 			protocolStats[tunnelProtocol],
 			regionStats[region]["ALL"],
-			regionStats[region][tunnelProtocol]}
-
-		for _, stat := range stats {
-
-			stat["established_clients"] += 1
-
-			// Note: can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
-
-			stat["dialing_tcp_port_forwards"] += client.tcpTrafficState.concurrentDialingPortForwardCount
-			stat["tcp_port_forwards"] += client.tcpTrafficState.concurrentPortForwardCount
-			stat["total_tcp_port_forwards"] += client.tcpTrafficState.totalPortForwardCount
-			// client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
-			stat["udp_port_forwards"] += client.udpTrafficState.concurrentPortForwardCount
-			stat["total_udp_port_forwards"] += client.udpTrafficState.totalPortForwardCount
-
-			stat["tcp_port_forward_dialed_count"] += client.qualityMetrics.TCPPortForwardDialedCount
-			stat["tcp_port_forward_dialed_duration"] +=
-				int64(client.qualityMetrics.TCPPortForwardDialedDuration / time.Millisecond)
-			stat["tcp_port_forward_failed_count"] += client.qualityMetrics.TCPPortForwardFailedCount
-			stat["tcp_port_forward_failed_duration"] +=
-				int64(client.qualityMetrics.TCPPortForwardFailedDuration / time.Millisecond)
-			stat["tcp_port_forward_rejected_dialing_limit_count"] +=
-				client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount
-			stat["tcp_port_forward_rejected_disallowed_count"] +=
-				client.qualityMetrics.TCPPortForwardRejectedDisallowedCount
-			stat["udp_port_forward_rejected_disallowed_count"] +=
-				client.qualityMetrics.UDPPortForwardRejectedDisallowedCount
-
-			stat["tcp_ipv4_port_forward_dialed_count"] += client.qualityMetrics.TCPIPv4PortForwardDialedCount
-			stat["tcp_ipv4_port_forward_dialed_duration"] +=
-				int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration / time.Millisecond)
-			stat["tcp_ipv4_port_forward_failed_count"] += client.qualityMetrics.TCPIPv4PortForwardFailedCount
-			stat["tcp_ipv4_port_forward_failed_duration"] +=
-				int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration / time.Millisecond)
-
-			stat["tcp_ipv6_port_forward_dialed_count"] += client.qualityMetrics.TCPIPv6PortForwardDialedCount
-			stat["tcp_ipv6_port_forward_dialed_duration"] +=
-				int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration / time.Millisecond)
-			stat["tcp_ipv6_port_forward_failed_count"] += client.qualityMetrics.TCPIPv6PortForwardFailedCount
-			stat["tcp_ipv6_port_forward_failed_duration"] +=
-				int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration / time.Millisecond)
-		}
-
-		client.qualityMetrics.TCPPortForwardDialedCount = 0
-		client.qualityMetrics.TCPPortForwardDialedDuration = 0
-		client.qualityMetrics.TCPPortForwardFailedCount = 0
-		client.qualityMetrics.TCPPortForwardFailedDuration = 0
-		client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount = 0
-		client.qualityMetrics.TCPPortForwardRejectedDisallowedCount = 0
-		client.qualityMetrics.UDPPortForwardRejectedDisallowedCount = 0
-
-		client.qualityMetrics.TCPIPv4PortForwardDialedCount = 0
-		client.qualityMetrics.TCPIPv4PortForwardDialedDuration = 0
-		client.qualityMetrics.TCPIPv4PortForwardFailedCount = 0
-		client.qualityMetrics.TCPIPv4PortForwardFailedDuration = 0
-
-		client.qualityMetrics.TCPIPv6PortForwardDialedCount = 0
-		client.qualityMetrics.TCPIPv6PortForwardDialedDuration = 0
-		client.qualityMetrics.TCPIPv6PortForwardFailedCount = 0
-		client.qualityMetrics.TCPIPv6PortForwardFailedDuration = 0
+			regionStats[region][tunnelProtocol]} {
+
+			addInt64(stats, "established_clients", 1)
+		}
+
+		stats := protocolStats["ALL"]
+
+		// Note:
+		// - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
+		// - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
+
+		addInt64(stats, "dialing_tcp_port_forwards", client.tcpTrafficState.concurrentDialingPortForwardCount)
+
+		addInt64(stats, "tcp_port_forwards", client.tcpTrafficState.concurrentPortForwardCount)
+
+		addInt64(stats, "total_tcp_port_forwards", client.tcpTrafficState.totalPortForwardCount)
+
+		addInt64(stats, "udp_port_forwards", client.udpTrafficState.concurrentPortForwardCount)
+
+		addInt64(stats, "total_udp_port_forwards", client.udpTrafficState.totalPortForwardCount)
+
+		addInt64(stats, "tcp_port_forward_dialed_count", client.qualityMetrics.TCPPortForwardDialedCount)
+
+		addInt64(stats, "tcp_port_forward_dialed_duration",
+			int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))
+
+		addInt64(stats, "tcp_port_forward_failed_count", client.qualityMetrics.TCPPortForwardFailedCount)
+
+		addInt64(stats, "tcp_port_forward_failed_duration",
+			int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))
+
+		addInt64(stats, "tcp_port_forward_rejected_dialing_limit_count",
+			client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)
+
+		addInt64(stats, "tcp_port_forward_rejected_disallowed_count",
+			client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)
+
+		addInt64(stats, "udp_port_forward_rejected_disallowed_count",
+			client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)
+
+		addInt64(stats, "tcp_ipv4_port_forward_dialed_count", client.qualityMetrics.TCPIPv4PortForwardDialedCount)
+
+		addInt64(stats, "tcp_ipv4_port_forward_dialed_duration",
+			int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))
+
+		addInt64(stats, "tcp_ipv4_port_forward_failed_count", client.qualityMetrics.TCPIPv4PortForwardFailedCount)
+
+		addInt64(stats, "tcp_ipv4_port_forward_failed_duration",
+			int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))
+
+		addInt64(stats, "tcp_ipv6_port_forward_dialed_count", client.qualityMetrics.TCPIPv6PortForwardDialedCount)
+
+		addInt64(stats, "tcp_ipv6_port_forward_dialed_duration",
+			int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))
+
+		addInt64(stats, "tcp_ipv6_port_forward_failed_count", client.qualityMetrics.TCPIPv6PortForwardFailedCount)
+
+		addInt64(stats, "tcp_ipv6_port_forward_failed_duration",
+			int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))
+
+		// DNS metrics limitations:
+		// - port forwards (sshClient.handleTCPChannel) don't know or log the resolver IP.
+		// - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure.
+
+		// Every client.qualityMetrics DNS map has an "ALL" entry.
+
+		for key, value := range client.qualityMetrics.DNSCount {
+			stats["dns_count"].(map[string]int64)[key] += value
+		}
+
+		for key, value := range client.qualityMetrics.DNSDuration {
+			stats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
+		}
+
+		for key, value := range client.qualityMetrics.DNSFailedCount {
+			stats["dns_failed_count"].(map[string]int64)[key] += value
+		}
+
+		for key, value := range client.qualityMetrics.DNSFailedDuration {
+			stats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
+		}
+
+		client.qualityMetrics.reset()
 
 		client.Unlock()
 	}
@@ -1215,7 +1272,7 @@ type sshClient struct {
 	trafficRules                         TrafficRules
 	tcpTrafficState                      trafficState
 	udpTrafficState                      trafficState
-	qualityMetrics                       qualityMetrics
+	qualityMetrics                       *qualityMetrics
 	tcpPortForwardLRU                    *common.LRUConns
 	oslClientSeedState                   *osl.ClientSeedState
 	signalIssueSLOKs                     chan struct{}
@@ -1271,6 +1328,57 @@ type qualityMetrics struct {
 	TCPIPv6PortForwardDialedDuration        time.Duration
 	TCPIPv6PortForwardFailedCount           int64
 	TCPIPv6PortForwardFailedDuration        time.Duration
+	DNSCount                                map[string]int64
+	DNSDuration                             map[string]time.Duration
+	DNSFailedCount                          map[string]int64
+	DNSFailedDuration                       map[string]time.Duration
+}
+
+func newQualityMetrics() *qualityMetrics {
+	return &qualityMetrics{
+		DNSCount:          make(map[string]int64),
+		DNSDuration:       make(map[string]time.Duration),
+		DNSFailedCount:    make(map[string]int64),
+		DNSFailedDuration: make(map[string]time.Duration),
+	}
+}
+
+func (q *qualityMetrics) reset() {
+
+	q.TCPPortForwardDialedCount = 0
+	q.TCPPortForwardDialedDuration = 0
+	q.TCPPortForwardFailedCount = 0
+	q.TCPPortForwardFailedDuration = 0
+	q.TCPPortForwardRejectedDialingLimitCount = 0
+	q.TCPPortForwardRejectedDisallowedCount = 0
+
+	q.UDPPortForwardRejectedDisallowedCount = 0
+
+	q.TCPIPv4PortForwardDialedCount = 0
+	q.TCPIPv4PortForwardDialedDuration = 0
+	q.TCPIPv4PortForwardFailedCount = 0
+	q.TCPIPv4PortForwardFailedDuration = 0
+
+	q.TCPIPv6PortForwardDialedCount = 0
+	q.TCPIPv6PortForwardDialedDuration = 0
+	q.TCPIPv6PortForwardFailedCount = 0
+	q.TCPIPv6PortForwardFailedDuration = 0
+
+	// Retain existing maps to avoid memory churn. The Go compiler optimizes map
+	// clearing operations of the following form.
+
+	for k := range q.DNSCount {
+		delete(q.DNSCount, k)
+	}
+	for k := range q.DNSDuration {
+		delete(q.DNSDuration, k)
+	}
+	for k := range q.DNSFailedCount {
+		delete(q.DNSFailedCount, k)
+	}
+	for k := range q.DNSFailedDuration {
+		delete(q.DNSFailedDuration, k)
+	}
 }
 
 type handshakeState struct {
@@ -1316,6 +1424,7 @@ func newSshClient(
 		clientAddr:                       clientAddr,
 		geoIPData:                        geoIPData,
 		isFirstTunnelInSession:           true,
+		qualityMetrics:                   newQualityMetrics(),
 		tcpPortForwardLRU:                common.NewLRUConns(),
 		signalIssueSLOKs:                 make(chan struct{}, 1),
 		runCtx:                           runCtx,
@@ -2347,6 +2456,8 @@ func (sshClient *sshClient) handleNewPacketTunnelChannel(
 		sshClient.Unlock()
 	}
 
+	dnsQualityReporter := sshClient.updateQualityMetricsWithDNSResult
+
 	err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
 		sshClient.sessionID,
 		packetTunnelChannel,
@@ -2354,7 +2465,8 @@ func (sshClient *sshClient) handleNewPacketTunnelChannel(
 		checkAllowedUDPPortFunc,
 		checkAllowedDomainFunc,
 		flowActivityUpdaterMaker,
-		metricUpdater)
+		metricUpdater,
+		dnsQualityReporter)
 	if err != nil {
 		log.WithTraceFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
 		sshClient.setPacketTunnelChannel(nil)
@@ -3506,6 +3618,33 @@ func (sshClient *sshClient) updateQualityMetricsWithUDPRejectedDisallowed() {
 	sshClient.qualityMetrics.UDPPortForwardRejectedDisallowedCount += 1
 }
 
+func (sshClient *sshClient) updateQualityMetricsWithDNSResult(
+	success bool, duration time.Duration, resolverIP net.IP) {
+
+	sshClient.Lock()
+	defer sshClient.Unlock()
+
+	resolver := ""
+	if resolverIP != nil {
+		resolver = resolverIP.String()
+	}
+	if success {
+		sshClient.qualityMetrics.DNSCount["ALL"] += 1
+		sshClient.qualityMetrics.DNSDuration["ALL"] += duration
+		if resolver != "" {
+			sshClient.qualityMetrics.DNSCount[resolver] += 1
+			sshClient.qualityMetrics.DNSDuration[resolver] += duration
+		}
+	} else {
+		sshClient.qualityMetrics.DNSFailedCount["ALL"] += 1
+		sshClient.qualityMetrics.DNSFailedDuration["ALL"] += duration
+		if resolver != "" {
+			sshClient.qualityMetrics.DNSFailedCount[resolver] += 1
+			sshClient.qualityMetrics.DNSFailedDuration[resolver] += duration
+		}
+	}
+}
+
 func (sshClient *sshClient) handleTCPChannel(
 	remainingDialTimeout time.Duration,
 	hostToConnect string,
@@ -3583,6 +3722,17 @@ func (sshClient *sshClient) handleTCPChannel(
 	IPs, err := (&net.Resolver{}).LookupIPAddr(ctx, hostToConnect)
 	cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
 
+	resolveElapsedTime := time.Since(dialStartTime)
+
+	// Record DNS metrics. If LookupIPAddr returns net.DNSError.IsNotFound, this
+	// is "no such host" and not a DNS failure. Limitation: the resolver IP is
+	// not known.
+
+	dnsErr, ok := err.(*net.DNSError)
+	dnsNotFound := ok && dnsErr.IsNotFound
+	dnsSuccess := err == nil || dnsNotFound
+	sshClient.updateQualityMetricsWithDNSResult(dnsSuccess, resolveElapsedTime, nil)
+
 	// IPv4 is preferred in case the host has limited IPv6 routing. IPv6 is
 	// selected and attempted only when there's no IPv4 option.
 	// TODO: shuffle list to try other IPs?
@@ -3603,8 +3753,6 @@ func (sshClient *sshClient) handleTCPChannel(
 		err = std_errors.New("no IP address")
 	}
 
-	resolveElapsedTime := time.Since(dialStartTime)
-
 	if err != nil {
 
 		// Record a port forward failure

+ 48 - 9
psiphon/server/udp.go

@@ -29,6 +29,7 @@ import (
 	"sync"
 	"sync/atomic"
 
+	"github.com/Psiphon-Labs/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
@@ -247,6 +248,11 @@ func (mux *udpPortForwardMultiplexer) run() {
 				bytesDown:    0,
 				mux:          mux,
 			}
+
+			if message.forwardDNS {
+				portForward.dnsFirstWriteTime = int64(monotime.Now())
+			}
+
 			mux.portForwardsMutex.Lock()
 			mux.portForwards[portForward.connID] = portForward
 			mux.portForwardsMutex.Unlock()
@@ -291,15 +297,17 @@ type udpPortForward struct {
 	// Note: 64-bit ints used with atomic operations are placed
 	// at the start of struct to ensure 64-bit alignment.
 	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	bytesUp      int64
-	bytesDown    int64
-	connID       uint16
-	preambleSize int
-	remoteIP     []byte
-	remotePort   uint16
-	conn         net.Conn
-	lruEntry     *common.LRUConnsEntry
-	mux          *udpPortForwardMultiplexer
+	dnsFirstWriteTime int64
+	dnsFirstReadTime  int64
+	bytesUp           int64
+	bytesDown         int64
+	connID            uint16
+	preambleSize      int
+	remoteIP          []byte
+	remotePort        uint16
+	conn              net.Conn
+	lruEntry          *common.LRUConnsEntry
+	mux               *udpPortForwardMultiplexer
 }
 
 func (portForward *udpPortForward) relayDownstream() {
@@ -330,6 +338,11 @@ func (portForward *udpPortForward) relayDownstream() {
 			break
 		}
 
+		if atomic.LoadInt64(&portForward.dnsFirstWriteTime) > 0 &&
+			atomic.LoadInt64(&portForward.dnsFirstReadTime) == 0 { // Check if already set before invoking Now.
+			atomic.CompareAndSwapInt64(&portForward.dnsFirstReadTime, 0, int64(monotime.Now()))
+		}
+
 		err = writeUdpgwPreamble(
 			portForward.preambleSize,
 			0,
@@ -369,6 +382,32 @@ func (portForward *udpPortForward) relayDownstream() {
 	bytesDown := atomic.LoadInt64(&portForward.bytesDown)
 	portForward.mux.sshClient.closedPortForward(portForwardTypeUDP, bytesUp, bytesDown)
 
+	dnsStartTime := monotime.Time(atomic.LoadInt64(&portForward.dnsFirstWriteTime))
+	if dnsStartTime > 0 {
+
+		// Record DNS metrics using a heuristic: if a UDP packet was written and
+		// then a packet was read, assume the DNS request successfully received a
+		// valid response; failure occurs when the resolver fails to provide a
+		// response; a "no such host" response is still a success. Limitations: we
+		// assume a resolver will not respond when, e.g., rate limiting; we ignore
+		// subsequent requests made via the same UDP port forward.
+
+		dnsEndTime := monotime.Time(atomic.LoadInt64(&portForward.dnsFirstReadTime))
+
+		dnsSuccess := true
+		if dnsEndTime == 0 {
+			dnsSuccess = false
+			dnsEndTime = monotime.Now()
+		}
+
+		resolveElapsedTime := dnsEndTime.Sub(dnsStartTime)
+
+		portForward.mux.sshClient.updateQualityMetricsWithDNSResult(
+			dnsSuccess,
+			resolveElapsedTime,
+			net.IP(portForward.remoteIP))
+	}
+
 	log.WithTraceFields(
 		LogFields{
 			"remoteAddr": fmt.Sprintf("%s:%d",