Răsfoiți Sursa

Release packet buffer memory when clients disconnect

- Too much memory is consumed by maintaining
  packet buffers for sessions with no active
  client, as in many cases the client is gone
  and not reconnecting.

- Downstream packets are dropped for sessions
  with no connected client.
Rod Hynes 8 ani în urmă
părinte
comite
4c191ed53b
1 a modificat fișierele cu 42 adăugiri și 16 ștergeri
  1. 42 16
      psiphon/common/tun/tun.go

+ 42 - 16
psiphon/common/tun/tun.go

@@ -135,6 +135,7 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+	"unsafe"
 
 	"github.com/Psiphon-Inc/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -411,11 +412,6 @@ func (server *Server) ClientConnected(
 
 	} else {
 
-		downstreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
-		if server.config.DownstreamPacketQueueSize > 0 {
-			downstreamPacketQueueSize = server.config.DownstreamPacketQueueSize
-		}
-
 		// Store IPv4 resolver addresses in 4-byte representation
 		// for use in rewritting.
 		resolvers := server.config.GetDNSResolverIPv4Addresses()
@@ -434,7 +430,6 @@ func (server *Server) ClientConnected(
 			checkAllowedTCPPortFunc:  checkAllowedTCPPortFunc,
 			checkAllowedUDPPortFunc:  checkAllowedUDPPortFunc,
 			flowActivityUpdaterMaker: flowActivityUpdaterMaker,
-			downstreamPackets:        NewPacketQueue(downstreamPacketQueueSize),
 			workers:                  new(sync.WaitGroup),
 		}
 
@@ -491,6 +486,13 @@ func (server *Server) resumeSession(session *session, channel *Channel) {
 	session.mutex.Lock()
 	defer session.mutex.Unlock()
 
+	downstreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
+	if server.config.DownstreamPacketQueueSize > 0 {
+		downstreamPacketQueueSize = server.config.DownstreamPacketQueueSize
+	}
+	downstreamPackets := NewPacketQueue(downstreamPacketQueueSize)
+	atomic.StorePointer(&session.downstreamPackets, unsafe.Pointer(downstreamPackets))
+
 	session.channel = channel
 
 	// Parent context is not server.runContext so that session workers
@@ -538,6 +540,14 @@ func (server *Server) interruptSession(session *session) {
 		session.channel = nil
 	}
 
+	// Release the downstream packet buffer, so the associated
+	// memory is not consumed while no client is connected.
+	//
+	// Since runDeviceDownstream continues to run and will access
+	// session.downstreamPackets, an atomic pointer is used to
+	// synchronize access.
+	atomic.StorePointer(&session.downstreamPackets, unsafe.Pointer(nil))
+
 	// interruptSession may be called for idle sessions, to ensure
 	// the session is in an expected state: in ClientConnected,
 	// and in server.Stop(); don't log in those cases.
@@ -684,6 +694,17 @@ func (server *Server) runDeviceDownstream() {
 
 		session := s.(*session)
 
+		downstreamPackets := (*PacketQueue)(atomic.LoadPointer(&session.downstreamPackets))
+
+		// No downstreamPackets buffer is maintained when no client is
+		// connected, so the packet is dropped.
+
+		if downstreamPackets == nil {
+			server.orphanMetrics.rejectedPacket(
+				packetDirectionServerDownstream, packetRejectNoClient)
+			continue
+		}
+
 		// Simply enqueue the packet for client handling, and move on to
 		// read the next packet. The packet tunnel server multiplexes all
 		// client packets through a single tun device, so we must not block
@@ -692,9 +713,6 @@ func (server *Server) runDeviceDownstream() {
 		// When the queue is full, the packet is dropped. This is standard
 		// behavior for routers, VPN servers, etc.
 		//
-		// We allow packets to enqueue in an idle session in case a client
-		// is in the process of reconnecting.
-		//
 		// TODO: processPacket is performed here, instead of runClientDownstream,
 		// since packets are packed contiguously into the packet queue and if
 		// the packet it to be omitted, that should be done before enqueuing.
@@ -718,7 +736,7 @@ func (server *Server) runDeviceDownstream() {
 			continue
 		}
 
-		session.downstreamPackets.Enqueue(readPacket)
+		downstreamPackets.Enqueue(readPacket)
 	}
 }
 
@@ -793,7 +811,12 @@ func (server *Server) runClientDownstream(session *session) {
 
 	for {
 
-		packetBuffer, ok := session.downstreamPackets.DequeueFramedPackets(session.runContext)
+		downstreamPackets := (*PacketQueue)(atomic.LoadPointer(&session.downstreamPackets))
+
+		// Note: downstreamPackets will not be nil, since this goroutine only
+		// runs while the session has a connected client.
+
+		packetBuffer, ok := downstreamPackets.DequeueFramedPackets(session.runContext)
 		if !ok {
 			// Dequeue aborted due to session.runContext.Done()
 			return
@@ -806,7 +829,7 @@ func (server *Server) runClientDownstream(session *session) {
 			server.config.Logger.WithContextFields(
 				common.LogFields{"error": err}).Debug("write channel packets failed")
 
-			session.downstreamPackets.Replace(packetBuffer)
+			downstreamPackets.Replace(packetBuffer)
 
 			// Tear down the session. Must be invoked asynchronously.
 			go server.interruptSession(session)
@@ -816,7 +839,7 @@ func (server *Server) runClientDownstream(session *session) {
 
 		session.touch()
 
-		session.downstreamPackets.Replace(packetBuffer)
+		downstreamPackets.Replace(packetBuffer)
 	}
 }
 
@@ -982,7 +1005,7 @@ type session struct {
 	checkAllowedTCPPortFunc  AllowedPortChecker
 	checkAllowedUDPPortFunc  AllowedPortChecker
 	flowActivityUpdaterMaker FlowActivityUpdaterMaker
-	downstreamPackets        *PacketQueue
+	downstreamPackets        unsafe.Pointer
 	flows                    sync.Map
 	workers                  *sync.WaitGroup
 	mutex                    sync.Mutex
@@ -1817,8 +1840,9 @@ const (
 	packetRejectUDPPort            = 9
 	packetRejectNoOriginalAddress  = 10
 	packetRejectNoDNSResolvers     = 11
-	packetRejectReasonCount        = 12
-	packetOk                       = 12
+	packetRejectNoClient           = 12
+	packetRejectReasonCount        = 13
+	packetOk                       = 13
 )
 
 type packetDirection int
@@ -1855,6 +1879,8 @@ func packetRejectReasonDescription(reason packetRejectReason) string {
 		return "no_original_address"
 	case packetRejectNoDNSResolvers:
 		return "no_dns_resolvers"
+	case packetRejectNoClient:
+		return "no_client"
 	}
 
 	return "unknown_reason"