Sfoglia il codice sorgente

Update packet tunnel flow tracking

- DNS flow tracking may now be enabled with a
  config parameter. This enables packet tunnel
  DNS quality metrics.

- Fix: consistently use upstream address space
  for flow IDs. This ensures that up- and
  downstream flow components have the same ID.

- Fix: flows no longer expire when only one
  direction is idle. This fixes an issue where
  isTrackingFlow could immediately expire a
  flow.

- Fix: transparent DNS forwarding for a TCP flow
  sends all packets to the same resolver.
Rod Hynes 4 anni fa
parent
commit
e1bf3dcf22

+ 146 - 70
psiphon/common/tun/tun.go

@@ -210,6 +210,12 @@ type ServerConfig struct {
 	// IPv6 DNS traffic. It functions like GetDNSResolverIPv4Addresses.
 	GetDNSResolverIPv6Addresses func() []net.IP
 
+	// EnableDNSFlowTracking specifies whether to apply flow tracking to DNS
+	// flows, as required for DNS quality metrics. Typically there are many
+	// short-lived DNS flows to track and each tracked flow adds some overhead,
+	// so this defaults to off.
+	EnableDNSFlowTracking bool
+
 	// DownstreamPacketQueueSize specifies the size of the downstream
 	// packet queue. The packet tunnel server multiplexes all client
 	// packets through a single tun device, so when a packet is read,
@@ -459,11 +465,21 @@ func (server *Server) ClientConnected(
 			lastActivity:             int64(monotime.Now()),
 			sessionID:                sessionID,
 			metrics:                  new(packetMetrics),
+			enableDNSFlowTracking:    server.config.EnableDNSFlowTracking,
 			DNSResolverIPv4Addresses: append([]net.IP(nil), DNSResolverIPv4Addresses...),
 			DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
 			workers:                  new(sync.WaitGroup),
 		}
 
+		// One-time, for this session, random resolver selection for TCP transparent
+		// DNS forwarding. See comment in processPacket.
+		if len(clientSession.DNSResolverIPv4Addresses) > 0 {
+			clientSession.TCPDNSResolverIPv4Index = prng.Intn(len(clientSession.DNSResolverIPv4Addresses))
+		}
+		if len(clientSession.DNSResolverIPv6Addresses) > 0 {
+			clientSession.TCPDNSResolverIPv6Index = prng.Intn(len(clientSession.DNSResolverIPv6Addresses))
+		}
+
 		// allocateIndex initializes session.index, session.assignedIPv4Address,
 		// and session.assignedIPv6Address; and updates server.indexToSession and
 		// server.sessionIDToIndex.
@@ -698,6 +714,9 @@ func (server *Server) removeSession(session *session) {
 	server.sessionIDToIndex.Delete(session.sessionID)
 	server.indexToSession.Delete(session.index)
 	server.interruptSession(session)
+
+	// Delete flows to ensure any pending flow metrics are reported.
+	session.deleteFlows()
 }
 
 func (server *Server) runOrphanMetricsCheckpointer() {
@@ -1094,11 +1113,14 @@ type session struct {
 	metrics                  *packetMetrics
 	sessionID                string
 	index                    int32
+	enableDNSFlowTracking    bool
 	DNSResolverIPv4Addresses []net.IP
+	TCPDNSResolverIPv4Index  int
 	assignedIPv4Address      net.IP
 	setOriginalIPv4Address   int32
 	originalIPv4Address      net.IP
 	DNSResolverIPv6Addresses []net.IP
+	TCPDNSResolverIPv6Index  int
 	assignedIPv6Address      net.IP
 	setOriginalIPv6Address   int32
 	originalIPv6Address      net.IP
@@ -1284,7 +1306,12 @@ type flowState struct {
 
 func (flowState *flowState) expired(idleExpiry time.Duration) bool {
 	now := monotime.Now()
-	return (now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastUpstreamPacketTime))) > idleExpiry) ||
+
+	// Traffic in either direction keeps the flow alive. Initially, only one of
+	// lastUpstreamPacketTime or lastDownstreamPacketTime will be set by
+	// startTrackingFlow, and the other value will be 0 and evaluate as expired.
+
+	return (now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastUpstreamPacketTime))) > idleExpiry) &&
 		(now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastDownstreamPacketTime))) > idleExpiry)
 }
 
@@ -1445,7 +1472,9 @@ func (session *session) deleteFlow(ID flowID, flowState *flowState) {
 			// a valid response; failure occurs when the resolver fails to provide a
 			// response; a "no such host" response is still a success. Limitations: we
 			// assume a resolver will not respond when, e.g., rate limiting; we ignore
-			// subsequent requests made via the same UDP/TCP flow.
+			// subsequent requests made via the same UDP/TCP flow; deleteFlow may be
+			// called only after the flow has expired, which adds some delay to the
+			// recording of the DNS metric.
 
 			dnsEndTime := monotime.Time(
 				atomic.LoadInt64(&flowState.firstDownstreamPacketTime))
@@ -1479,6 +1508,14 @@ func (session *session) reapFlows() {
 	})
 }
 
+// deleteFlows deletes all flows.
+func (session *session) deleteFlows() {
+	session.flows.Range(func(key, value interface{}) bool {
+		session.deleteFlow(key.(flowID), value.(*flowState))
+		return true
+	})
+}
+
 type packetMetrics struct {
 	upstreamRejectReasons   [packetRejectReasonCount]int64
 	downstreamRejectReasons [packetRejectReasonCount]int64
@@ -2371,8 +2408,8 @@ func processPacket(
 	// Check if the packet qualifies for transparent DNS rewriting
 	//
 	// - Both TCP and UDP DNS packets may qualify
-	// - Transparent DNS flows are not tracked, as most DNS
-	//   resolutions are very-short lived exchanges
+	// - Unless configured, transparent DNS flows are not tracked,
+	//   as most DNS resolutions are very-short lived exchanges
 	// - The traffic rules checks are bypassed, since transparent
 	//   DNS is essential
 
@@ -2385,7 +2422,9 @@ func processPacket(
 			// will be rewritten to go to one of the server's resolvers.
 
 			if destinationPort == portNumberDNS {
-				if version == 4 && destinationIPAddress.Equal(transparentDNSResolverIPv4Address) {
+				if version == 4 &&
+					destinationIPAddress.Equal(transparentDNSResolverIPv4Address) {
+
 					numResolvers := len(session.DNSResolverIPv4Addresses)
 					if numResolvers > 0 {
 						doTransparentDNS = true
@@ -2394,7 +2433,9 @@ func processPacket(
 						return false
 					}
 
-				} else if version == 6 && destinationIPAddress.Equal(transparentDNSResolverIPv6Address) {
+				} else if version == 6 &&
+					destinationIPAddress.Equal(transparentDNSResolverIPv6Address) {
+
 					numResolvers := len(session.DNSResolverIPv6Addresses)
 					if numResolvers > 0 {
 						doTransparentDNS = true
@@ -2456,9 +2497,82 @@ func processPacket(
 		}
 	}
 
+	// Apply rewrites before determining flow ID to ensure that corresponding up-
+	// and downstream flows yield the same flow ID.
+
+	var rewriteSourceIPAddress, rewriteDestinationIPAddress net.IP
+
+	if direction == packetDirectionServerUpstream {
+
+		// Store original source IP address to be replaced in
+		// downstream rewriting.
+
+		if version == 4 {
+			session.setOriginalIPv4AddressIfNotSet(sourceIPAddress)
+			rewriteSourceIPAddress = session.assignedIPv4Address
+		} else { // version == 6
+			session.setOriginalIPv6AddressIfNotSet(sourceIPAddress)
+			rewriteSourceIPAddress = session.assignedIPv6Address
+		}
+
+		// Rewrite DNS packets destinated for the transparent DNS target addresses
+		// to go to one of the server's resolvers. This random selection uses
+		// math/rand to minimize overhead.
+		//
+		// Limitation: TCP packets are always assigned to the same resolver, as
+		// currently there is no method for tracking the assigned resolver per TCP
+		// flow.
+
+		if doTransparentDNS {
+			if version == 4 {
+
+				index := session.TCPDNSResolverIPv4Index
+				if protocol == internetProtocolUDP {
+					index = rand.Intn(len(session.DNSResolverIPv4Addresses))
+				}
+				rewriteDestinationIPAddress = session.DNSResolverIPv4Addresses[index]
+
+			} else { // version == 6
+
+				index := session.TCPDNSResolverIPv6Index
+				if protocol == internetProtocolUDP {
+					index = rand.Intn(len(session.DNSResolverIPv6Addresses))
+				}
+				rewriteDestinationIPAddress = session.DNSResolverIPv6Addresses[index]
+			}
+		}
+
+	} else if direction == packetDirectionServerDownstream {
+
+		// Destination address will be original source address.
+
+		if version == 4 {
+			rewriteDestinationIPAddress = session.getOriginalIPv4Address()
+		} else { // version == 6
+			rewriteDestinationIPAddress = session.getOriginalIPv6Address()
+		}
+
+		if rewriteDestinationIPAddress == nil {
+			metrics.rejectedPacket(direction, packetRejectNoOriginalAddress)
+			return false
+		}
+
+		// Rewrite source address of packets from servers' resolvers
+		// to transparent DNS target address.
+
+		if doTransparentDNS {
+
+			if version == 4 {
+				rewriteSourceIPAddress = transparentDNSResolverIPv4Address
+			} else { // version == 6
+				rewriteSourceIPAddress = transparentDNSResolverIPv6Address
+			}
+		}
+	}
+
 	// Check if flow is tracked before checking traffic permission
 
-	doFlowTracking := !doTransparentDNS && isServer
+	doFlowTracking := isServer && (!doTransparentDNS || session.enableDNSFlowTracking)
 
 	// TODO: verify this struct is stack allocated
 	var ID flowID
@@ -2468,12 +2582,32 @@ func processPacket(
 	if doFlowTracking {
 
 		if direction == packetDirectionServerUpstream {
-			ID.set(
-				sourceIPAddress, sourcePort, destinationIPAddress, destinationPort, protocol)
+
+			// Reflect rewrites in the upstream case and don't reflect rewrites in the
+			// following downstream case: all flow IDs are in the upstream space, with
+			// the assigned private IP for the client and, in the case of DNS, the
+			// actual resolver IP.
+
+			srcIP := sourceIPAddress
+			if rewriteSourceIPAddress != nil {
+				srcIP = rewriteSourceIPAddress
+			}
+
+			destIP := destinationIPAddress
+			if rewriteDestinationIPAddress != nil {
+				destIP = rewriteDestinationIPAddress
+			}
+
+			ID.set(srcIP, sourcePort, destIP, destinationPort, protocol)
 
 		} else if direction == packetDirectionServerDownstream {
+
 			ID.set(
-				destinationIPAddress, destinationPort, sourceIPAddress, sourcePort, protocol)
+				destinationIPAddress,
+				destinationPort,
+				sourceIPAddress,
+				sourcePort,
+				protocol)
 		}
 
 		isTrackingFlow = session.isTrackingFlow(ID)
@@ -2561,68 +2695,10 @@ func processPacket(
 		}
 	}
 
-	// Configure rewriting.
+	// Apply packet rewrites. IP (v4 only) and TCP/UDP all have packet
+	// checksums which are updated to relect the rewritten headers.
 
 	var checksumAccumulator int32
-	var rewriteSourceIPAddress, rewriteDestinationIPAddress net.IP
-
-	if direction == packetDirectionServerUpstream {
-
-		// Store original source IP address to be replaced in
-		// downstream rewriting.
-
-		if version == 4 {
-			session.setOriginalIPv4AddressIfNotSet(sourceIPAddress)
-			rewriteSourceIPAddress = session.assignedIPv4Address
-		} else { // version == 6
-			session.setOriginalIPv6AddressIfNotSet(sourceIPAddress)
-			rewriteSourceIPAddress = session.assignedIPv6Address
-		}
-
-		// Rewrite DNS packets destinated for the transparent DNS target
-		// addresses to go to one of the server's resolvers.
-
-		if doTransparentDNS {
-
-			if version == 4 {
-				rewriteDestinationIPAddress = session.DNSResolverIPv4Addresses[rand.Intn(
-					len(session.DNSResolverIPv4Addresses))]
-			} else { // version == 6
-				rewriteDestinationIPAddress = session.DNSResolverIPv6Addresses[rand.Intn(
-					len(session.DNSResolverIPv6Addresses))]
-			}
-		}
-
-	} else if direction == packetDirectionServerDownstream {
-
-		// Destination address will be original source address.
-
-		if version == 4 {
-			rewriteDestinationIPAddress = session.getOriginalIPv4Address()
-		} else { // version == 6
-			rewriteDestinationIPAddress = session.getOriginalIPv6Address()
-		}
-
-		if rewriteDestinationIPAddress == nil {
-			metrics.rejectedPacket(direction, packetRejectNoOriginalAddress)
-			return false
-		}
-
-		// Rewrite source address  of packets from servers' resolvers
-		// to transparent DNS target address.
-
-		if doTransparentDNS {
-
-			if version == 4 {
-				rewriteSourceIPAddress = transparentDNSResolverIPv4Address
-			} else { // version == 6
-				rewriteSourceIPAddress = transparentDNSResolverIPv6Address
-			}
-		}
-	}
-
-	// Apply rewrites. IP (v4 only) and TCP/UDP all have packet
-	// checksums which are updated to relect the rewritten headers.
 
 	if rewriteSourceIPAddress != nil {
 		checksumAccumulate(sourceIPAddress, false, &checksumAccumulator)

+ 4 - 0
psiphon/server/config.go

@@ -335,6 +335,10 @@ type Config struct {
 	// PacketTunnelEgressInterface specifies tun.ServerConfig.EgressInterface.
 	PacketTunnelEgressInterface string
 
+	// PacketTunnelEnableDNSFlowTracking sets
+	// tun.ServerConfig.EnableDNSFlowTracking.
+	PacketTunnelEnableDNSFlowTracking bool
+
 	// PacketTunnelDownstreamPacketQueueSize specifies
 	// tun.ServerConfig.DownStreamPacketQueueSize.
 	PacketTunnelDownstreamPacketQueueSize int

+ 1 - 0
psiphon/server/services.go

@@ -98,6 +98,7 @@ func RunServices(configJSON []byte) (retErr error) {
 			SudoNetworkConfigCommands:   config.PacketTunnelSudoNetworkConfigCommands,
 			GetDNSResolverIPv4Addresses: support.DNSResolver.GetAllIPv4,
 			GetDNSResolverIPv6Addresses: support.DNSResolver.GetAllIPv6,
+			EnableDNSFlowTracking:       config.PacketTunnelEnableDNSFlowTracking,
 			EgressInterface:             config.PacketTunnelEgressInterface,
 			DownstreamPacketQueueSize:   config.PacketTunnelDownstreamPacketQueueSize,
 			SessionIdleExpirySeconds:    config.PacketTunnelSessionIdleExpirySeconds,

+ 2 - 1
psiphon/server/tunnelServer.go

@@ -908,7 +908,8 @@ func (sshServer *sshServer) getLoadStats() (
 
 		// DNS metrics limitations:
 		// - port forwards (sshClient.handleTCPChannel) don't know or log the resolver IP.
-		// - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure.
+		// - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure,
+		//   and there may be some delay before these code paths report DNS metrics.
 
 		// Every client.qualityMetrics DNS map has an "ALL" entry.