Просмотр исходного кода

Server fixes

- SSH KEX randomization changes
  - Avoid negotiating weak MACs
  - Insert new algorithms into server KEX while maintaining backwards
    compatibility
  - Fix server-side NoEncryptThenMACHash logic

- Session ID changes
  - Don't expect or use session_id API param when it's already sent in the SSH
    payload
  - Explicitly add session_id param to cases where there is no SSH payload
  - Remove obsolete code: handshake response no longer returns session ID;
    remove client_session_id

- Use packet buffer pool for udpgw to reduce GC churn (motivated by heap
  profiling)

- Add more IsLogLevelDebug checks to reduce overhead in hot paths

- Document why remote address is ignored in ssh.CertChecker

- Document OSSH specName integrity protection limitation

- Fix anti-replay strict mode for OSSH; enable strict mode by default

- Fix tun packet length checks

- Don't resolve .local mDNS domains; never use the cgo resolver

- Add per-client meek extended response buffer limit

- Add simple, sanity check limits for number of SSH clients and meek sessions;
  document expected external rate and load limiting implementation
Rod Hynes 1 год назад
Родитель
Сommit
80689bf6db

+ 188 - 59
psiphon/common/crypto/ssh/handshake.go

@@ -16,6 +16,7 @@ import (
 
 	// [Psiphon]
 
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 )
 
@@ -467,6 +468,12 @@ const (
 	kexStrictServer = "kex-strict-s-v00@openssh.com"
 )
 
+// [Psiphon]
+// For testing only. Enables testing support for legacy clients, which have
+// only the legacy algorithm lists and no weak-MAC or new-server-algos logic.
+// Not safe for concurrent access.
+var testLegacyClient = false
+
 // sendKexInit sends a key change message.
 func (t *handshakeTransport) sendKexInit() error {
 	t.mu.Lock()
@@ -550,8 +557,8 @@ func (t *handshakeTransport) sendKexInit() error {
 	// its KEX using the specified seed; deterministically adjust own
 	// randomized KEX to ensure negotiation succeeds.
 	//
-	// When NoEncryptThenMACHash is specified, do not use Encrypt-then-MAC has
-	// algorithms.
+	// When NoEncryptThenMACHash is specified, do not use Encrypt-then-MAC
+	// hash algorithms.
 	//
 	// Limitations:
 	//
@@ -632,6 +639,59 @@ func (t *handshakeTransport) sendKexInit() error {
 			return list
 		}
 
+		avoid := func(PRNG *prng.PRNG, list, avoidList, addList []string) []string {
+
+			// Avoid negotiating items in avoidList, by moving a non-avoid
+			// item to the front of the list; either by swapping with a
+			// later, non-avoid item, or inserting a new item.
+
+			if len(list) < 1 {
+				return list
+			}
+			if !common.Contains(avoidList, list[0]) {
+				// The first item isn't on the avoid list.
+				return list
+			}
+			for i := 1; i < len(list); i++ {
+				if !common.Contains(avoidList, list[i]) {
+					// Swap with a later, existing non-avoid item.
+					list[0], list[i] = list[i], list[0]
+					return list
+				}
+			}
+			for _, item := range permute(PRNG, addList) {
+				if !common.Contains(avoidList, item) {
+					// Insert a randomly selected non-avoid item.
+					return append([]string{item}, list...)
+				}
+			}
+			// Can't avoid.
+			return list
+		}
+
+		addSome := func(PRNG *prng.PRNG, list, addList []string) []string {
+			newList := list
+			for _, item := range addList {
+				if PRNG.FlipCoin() {
+					index := PRNG.Range(0, len(newList))
+					newList = append(
+						newList[:index],
+						append([]string{item}, newList[index:]...)...)
+				}
+			}
+			return newList
+		}
+
+		toFront := func(list []string, item string) []string {
+			for index, existingItem := range list {
+				if existingItem == item {
+					list[0], list[index] = list[index], list[0]
+					return list
+				}
+			}
+			return append([]string{item}, list...)
+		}
+
 		firstKexAlgo := func(kexAlgos []string) (string, bool) {
 			for _, kexAlgo := range kexAlgos {
 				switch kexAlgo {
@@ -662,10 +722,9 @@ func (t *handshakeTransport) sendKexInit() error {
 		// server's algorithms; (b) random truncation by the server doesn't
 		// select only new algorithms unknown to existing clients.
 		//
-		// TODO: add a versioning mechanism, such as a SSHv2 capability, to
-		// allow for servers with new algorithm lists, where older clients
-		// won't try to connect to these servers, and new clients know to use
-		// non-legacy lists in the PeerKEXPRNGSeed mechanism.
+		// New algorithms are then randomly inserted only after the legacy
+		// lists are processed in legacy PRNG state order.
+
 		legacyServerKexAlgos := []string{
 			kexAlgoCurve25519SHA256LibSSH,
 			kexAlgoECDH256, kexAlgoECDH384, kexAlgoECDH521,
@@ -681,9 +740,11 @@ func (t *handshakeTransport) sendKexInit() error {
 			"hmac-sha2-256", "hmac-sha1", "hmac-sha1-96",
 		}
 		legacyServerNoEncryptThenMACs := []string{
-			"hmac-sha2-256", "hmac-sha1", "hmac-sha1-96"}
-
-		isServer := len(t.hostKeys) > 0
+			"hmac-sha2-256", "hmac-sha1", "hmac-sha1-96",
+		}
+		if t.config.NoEncryptThenMACHash {
+			legacyServerMACs = legacyServerNoEncryptThenMACs
+		}
 
 		PRNG := prng.NewPRNGWithSeed(t.config.KEXPRNGSeed)
 
@@ -691,95 +752,163 @@ func (t *handshakeTransport) sendKexInit() error {
 		startingCiphers := msg.CiphersClientServer
 		startingMACs := msg.MACsClientServer
 
-		if isServer {
+		// testLegacyClient: legacy clients are older clients which start with
+		// the same algorithm lists as legacyServer and have neither the
+		// newServer-algorithm nor the weak-MAC KEX prediction logic.
+
+		if isServer || testLegacyClient {
 			startingKexAlgos = legacyServerKexAlgos
 			startingCiphers = legacyServerCiphers
 			startingMACs = legacyServerMACs
+			if t.config.NoEncryptThenMACHash {
+				startingMACs = legacyServerNoEncryptThenMACs
+			}
 		}
 
-		msg.KexAlgos = selectKexAlgos(PRNG, startingKexAlgos)
+		kexAlgos := selectKexAlgos(PRNG, startingKexAlgos)
 
 		ciphers := truncate(PRNG, permute(PRNG, startingCiphers))
-		msg.CiphersClientServer = ciphers
-		msg.CiphersServerClient = ciphers
 
 		MACs := truncate(PRNG, permute(PRNG, startingMACs))
-		msg.MACsClientServer = MACs
-		msg.MACsServerClient = MACs
 
+		var hostKeyAlgos []string
 		if isServer {
-			msg.ServerHostKeyAlgos = permute(PRNG, msg.ServerHostKeyAlgos)
+			hostKeyAlgos = permute(PRNG, msg.ServerHostKeyAlgos)
 		} else {
 			// Must offer KeyAlgoRSA to Psiphon server.
-			msg.ServerHostKeyAlgos = retain(
+			hostKeyAlgos = retain(
 				PRNG,
 				truncate(PRNG, permute(PRNG, msg.ServerHostKeyAlgos)),
 				KeyAlgoRSA)
 		}
 
-		if !isServer && t.config.PeerKEXPRNGSeed != nil {
+		// To ensure compatibility with server KEX prediction in legacy
+		// clients, all preceeding PRNG operations must be performed in the
+		// given order, and all before the following operations.
 
-			// Generate the peer KEX and make adjustments if negotiation would
-			// fail. This assumes that PeerKEXPRNGSeed remains static (in
-			// Psiphon, the peer is the server and PeerKEXPRNGSeed is derived
-			// from the server entry); and that the PRNG is invoked in the
-			// exact same order on the peer (i.e., the code block immediately
-			// above is what the peer runs); and that the peer sets
-			// NoEncryptThenMACHash in the same cases.
+		// Avoid negotiating weak MAC algorithms. Servers will ensure that no
+		// weakMACs are the highest priority item. Clients will make
+		// adjustments after predicting the server KEX.
 
-			PeerPRNG := prng.NewPRNGWithSeed(t.config.PeerKEXPRNGSeed)
+		weakMACs := []string{"hmac-sha1-96"}
 
+		if isServer {
+			MACs = avoid(PRNG, MACs, weakMACs, startingMACs)
+		}
+
+		// Randomly insert new algorithms. For servers, the preceeding legacy
+		// operations will ensure selection of at least one legacy algorithm
+		// of each type, ensuring compatibility with legacy clients.
+
+		newServerKexAlgos := []string{
+			kexAlgoCurve25519SHA256, kexAlgoDH16SHA512,
+			"kex-strict-s-v00@openssh.com",
+		}
+		newServerCiphers := []string{
+			gcm256CipherID,
+		}
+		newServerMACs := []string{
+			"hmac-sha2-512-etm@openssh.com", "hmac-sha2-512",
+		}
+		newServerNoEncryptThenMACs := []string{
+			"hmac-sha2-512",
+		}
+		if t.config.NoEncryptThenMACHash {
+			newServerMACs = newServerNoEncryptThenMACs
+		}
+
+		if isServer {
+			kexAlgos = addSome(PRNG, kexAlgos, newServerKexAlgos)
+			ciphers = addSome(PRNG, ciphers, newServerCiphers)
+			MACs = addSome(PRNG, MACs, newServerMACs)
+		}
+
+		msg.KexAlgos = kexAlgos
+		msg.CiphersClientServer = ciphers
+		msg.CiphersServerClient = ciphers
+		msg.MACsClientServer = MACs
+		msg.MACsServerClient = MACs
+		msg.ServerHostKeyAlgos = hostKeyAlgos
+
+		if !isServer && t.config.PeerKEXPRNGSeed != nil {
+
+			// Generate the server KEX and make adjustments if negotiation
+			// would fail. This assumes that PeerKEXPRNGSeed remains static
+			// (in Psiphon, the peer is the server and PeerKEXPRNGSeed is
+			// derived from the server entry); and that the PRNG is invoked
+			// in the exact same order on the server (i.e., the code block
+			// immediately above is what the peer runs); and that the server
+			// sets NoEncryptThenMACHash in the same cases.
+			//
 			// Note that only the client sends "ext-info-c"
 			// and "kex-strict-c-v00@openssh.com" and only the server
 			// sends "kex-strict-s-v00@openssh.com", so these will never
 			// match and do not need to be filtered out before findCommon.
-			//
-			// The following assumes that the server always starts with the
-			// default preferredKexAlgos along with
-			// "kex-strict-s-v00@openssh.com" appended before randomizing.
-
-			serverKexAlgos := append(
-				append([]string(nil), preferredKexAlgos...),
-				"kex-strict-s-v00@openssh.com")
-			serverCiphers := preferredCiphers
-			serverMACS := supportedMACs
-			serverNoEncryptThenMACs := noEncryptThenMACs
-
-			// Switch to using the legacy algorithms that the server currently
-			// downgrades to (see comment above).
-			//
-			// TODO: for servers without legacy backwards compatibility
-			// concerns, skip the following lines.
-			serverKexAlgos = legacyServerKexAlgos
-			serverCiphers = legacyServerCiphers
-			serverMACS = legacyServerMACs
-			serverNoEncryptThenMACs = legacyServerNoEncryptThenMACs
 
-			serverKexAlgos = selectKexAlgos(PeerPRNG, serverKexAlgos)
+			PeerPRNG := prng.NewPRNGWithSeed(t.config.PeerKEXPRNGSeed)
+
+			startingKexAlgos := legacyServerKexAlgos
+			startingCiphers := legacyServerCiphers
+			startingMACs := legacyServerMACs
+			if t.config.NoEncryptThenMACHash {
+				startingMACs = legacyServerNoEncryptThenMACs
+			}
+
+			// The server populates msg.ServerHostKeyAlgos based on the host
+			// key type, which, for Psiphon servers, is "ssh-rsa", so
+			// algorithmsForKeyFormat("ssh-rsa") predicts the server
+			// msg.ServerHostKeyAlgos value.
+			startingHostKeyAlgos := algorithmsForKeyFormat("ssh-rsa")
+
+			serverKexAlgos := selectKexAlgos(PeerPRNG, startingKexAlgos)
+			serverCiphers := truncate(PeerPRNG, permute(PeerPRNG, startingCiphers))
+			serverMACs := truncate(PeerPRNG, permute(PeerPRNG, startingMACs))
+
+			if !testLegacyClient {
+
+				// This value is not used, but the identical PRNG operation must be
+				// performed in order to predict the PeerPRNG state.
+				_ = permute(PeerPRNG, startingHostKeyAlgos)
+
+				serverMACs = avoid(PeerPRNG, serverMACs, weakMACs, startingMACs)
+
+				serverKexAlgos = addSome(PeerPRNG, serverKexAlgos, newServerKexAlgos)
+				serverCiphers = addSome(PeerPRNG, serverCiphers, newServerCiphers)
+				serverMACs = addSome(PeerPRNG, serverMACs, newServerMACs)
+			}
+
+			// Adjust to ensure compatibility with the server KEX.
 
 			if _, err := findCommon("", msg.KexAlgos, serverKexAlgos); err != nil {
 				if kexAlgo, ok := firstKexAlgo(serverKexAlgos); ok {
-					msg.KexAlgos = retain(PRNG, msg.KexAlgos, kexAlgo)
+					kexAlgos = retain(PRNG, msg.KexAlgos, kexAlgo)
 				}
 			}
 
-			serverCiphers = truncate(PeerPRNG, permute(PeerPRNG, serverCiphers))
 			if _, err := findCommon("", ciphers, serverCiphers); err != nil {
 				ciphers = retain(PRNG, ciphers, serverCiphers[0])
-				msg.CiphersClientServer = ciphers
-				msg.CiphersServerClient = ciphers
 			}
 
-			if t.config.NoEncryptThenMACHash {
-				serverMACS = serverNoEncryptThenMACs
+			if _, err := findCommon("", MACs, serverMACs); err != nil {
+				MACs = retain(PRNG, MACs, serverMACs[0])
 			}
 
-			serverMACS = truncate(PeerPRNG, permute(PeerPRNG, serverMACS))
-			if _, err := findCommon("", MACs, serverMACS); err != nil {
-				MACs = retain(PRNG, MACs, serverMACS[0])
-				msg.MACsClientServer = MACs
-				msg.MACsServerClient = MACs
+			// Avoid negotiating weak MAC algorithms.
+			//
+			// Legacy clients, without this logic, may still select only weak
+			// MACs or predict only weak MACs for the server KEX.
+
+			commonMAC, _ := findCommon("", MACs, serverMACs)
+			if common.Contains(weakMACs, commonMAC) {
+				// serverMACs[0] is not in weakMACs.
+				MACs = toFront(MACs, serverMACs[0])
 			}
+
+			msg.KexAlgos = kexAlgos
+			msg.CiphersClientServer = ciphers
+			msg.CiphersServerClient = ciphers
+			msg.MACsClientServer = MACs
+			msg.MACsServerClient = MACs
 		}
 
 		// Offer "zlib@openssh.com", which is offered by OpenSSH. Compression

+ 46 - 8
psiphon/common/crypto/ssh/randomized_kex_test.go

@@ -33,15 +33,31 @@ import (
 )
 
 func TestRandomizedSSHKEXes(t *testing.T) {
+	err := runTestRandomizedSSHKEXes(false)
+	if err != nil {
+		t.Errorf("runTestRandomizedSSHKEXes failed: %s", err)
+		return
+	}
+}
+
+func TestLegacyRandomizedSSHKEXes(t *testing.T) {
+	err := runTestRandomizedSSHKEXes(true)
+	if err != nil {
+		t.Errorf("runTestRandomizedSSHKEXes failed: %s", err)
+		return
+	}
+}
+
+func runTestRandomizedSSHKEXes(legacyClient bool) error {
 
 	rsaKey, err := rsa.GenerateKey(rand.Reader, 4096)
 	if err != nil {
-		t.Fatalf("rsa.GenerateKey failed: %s", err)
+		return errors.Trace(err)
 	}
 
 	signer, err := NewSignerFromKey(rsaKey)
 	if err != nil {
-		t.Fatalf("NewSignerFromKey failed: %s", err)
+		return errors.Trace(err)
 	}
 
 	publicKey := signer.PublicKey()
@@ -49,6 +65,11 @@ func TestRandomizedSSHKEXes(t *testing.T) {
 	username := "username"
 	password := "password"
 
+	testLegacyClient = legacyClient
+	defer func() {
+		testLegacyClient = false
+	}()
+
 	for _, doPeerKEXPRNGSeed := range []bool{true, false} {
 
 		failed := false
@@ -57,17 +78,17 @@ func TestRandomizedSSHKEXes(t *testing.T) {
 
 			clientSeed, err := prng.NewSeed()
 			if err != nil {
-				t.Fatalf("prng.NewSeed failed: %s", err)
+				return errors.Trace(err)
 			}
 
 			serverSeed, err := prng.NewSeed()
 			if err != nil {
-				t.Fatalf("prng.NewSeed failed: %s", err)
+				return errors.Trace(err)
 			}
 
 			clientConn, serverConn, err := netPipe()
 			if err != nil {
-				t.Fatalf("netPipe failed: %s", err)
+				return errors.Trace(err)
 			}
 
 			testGroup, _ := errgroup.WithContext(context.Background())
@@ -102,6 +123,23 @@ func TestRandomizedSSHKEXes(t *testing.T) {
 					return errors.Trace(err)
 				}
 
+				if !legacyClient {
+					// Ensure weak MAC is not negotiated
+					for _, p := range []packetCipher{
+						clientSSHConn.(*connection).transport.conn.(*transport).reader.packetCipher,
+						clientSSHConn.(*connection).transport.conn.(*transport).writer.packetCipher} {
+						switch c := p.(type) {
+						case *gcmCipher, *chacha20Poly1305Cipher:
+							// No weak MAC.
+						case *streamPacketCipher:
+							// The only weak MAC, "hmac-sha1-96", is also the only truncatingMAC.
+							if _, ok := c.mac.(truncatingMAC); ok {
+								return errors.TraceNew("weak MAC negotiated")
+							}
+						}
+					}
+				}
+
 				clientSSHConn.Close()
 				clientConn.Close()
 				return nil
@@ -140,8 +178,7 @@ func TestRandomizedSSHKEXes(t *testing.T) {
 
 				// Expect no failure to negotiates when setting PeerKEXPRNGSeed.
 				if doPeerKEXPRNGSeed {
-					t.Fatalf("goroutine failed: %s", err)
-
+					return errors.Tracef("unexpected failure to negotiate: %v", err)
 				} else {
 					failed = true
 					break
@@ -151,7 +188,8 @@ func TestRandomizedSSHKEXes(t *testing.T) {
 
 		// Expect at least one failure to negotiate when not setting PeerKEXPRNGSeed.
 		if !doPeerKEXPRNGSeed && !failed {
-			t.Fatalf("unexpected success")
+			errors.TraceNew("unexpected success")
 		}
 	}
+	return nil
 }

+ 4 - 1
psiphon/common/obfuscator/obfuscatedSshConn.go

@@ -175,7 +175,10 @@ func NewObfuscatedSSHConn(
 		}
 
 	} else {
-		// NewServerObfuscator reads a seed message from conn
+		// NewServerObfuscator reads a seed message from conn.
+		//
+		// DisableStrictHistoryMode is not set, as legitimate clients never
+		// retry OSSH dials using a previous seed.
 		obfuscator, err = NewServerObfuscator(
 			&ObfuscatorConfig{
 				Keyword:           obfuscationKeyword,

+ 22 - 12
psiphon/common/obfuscator/obfuscator.go

@@ -80,11 +80,14 @@ type OSSHPrefixSplitConfig struct {
 // stream ciphers for:
 // https://github.com/brl/obfuscated-openssh/blob/master/README.obfuscation
 //
-// Limitation: the RC4 cipher is vulnerable to ciphertext malleability and
-// the "magic" value provides only weak authentication due to its small
-// size. Increasing the size of the magic field will break compatibility
-// with legacy clients. New protocols and schemes should not use this
-// obfuscator.
+// Limitations:
+//   - The RC4 cipher is vulnerable to ciphertext malleability and the "magic"
+//     value provides only weak authentication due to its small size.
+//     Increasing the size of the magic field will break compatibility with
+//     legacy clients.
+//   - The RC4 cipher does not provide integrity protection for the client
+//     preamble, particularly the prefix header.
+//   - New protocols and schemes should not use this obfuscator.
 type Obfuscator struct {
 	preamble []byte
 
@@ -120,9 +123,9 @@ type ObfuscatorConfig struct {
 	// SeedHistory and IrregularLogger are optional parameters used only by
 	// server obfuscators.
 
-	SeedHistory       *SeedHistory
-	StrictHistoryMode bool
-	IrregularLogger   func(clientIP string, err error, logFields common.LogFields)
+	SeedHistory              *SeedHistory
+	DisableStrictHistoryMode bool
+	IrregularLogger          func(clientIP string, err error, logFields common.LogFields)
 }
 
 // NewClientObfuscator creates a new Obfuscator, staging a seed message to be
@@ -344,7 +347,7 @@ func deriveKey(obfuscatorSeed, keyword, iv []byte) ([]byte, error) {
 // makeClientPreamble generates the preamble bytes for the Obfuscated SSH protocol.
 //
 // If a prefix is applied, preamble bytes refer to the prefix, prefix terminator,
-// followed by the Obufscted SSH initial client message, followed by the
+// followed by the Obfuscated SSH initial client message, followed by the
 // prefix header.
 //
 // If a prefix is not applied, preamble bytes refer to the Obfuscated SSH
@@ -369,6 +372,13 @@ func deriveKey(obfuscatorSeed, keyword, iv []byte) ([]byte, error) {
 //
 // Returns the preamble, the prefix header if a prefix was generated,
 // and the padding length.
+//
+// Limitation: as the RC4 stream cipher does not provide integrity protection,
+// the prefix header is not protected from manipulation. The prefix header is
+// treated, by the server, as untrusted input, so a corrupt or invalid prefix
+// header will result in a failed connection, as would happen with attempts
+// to corrupt the underlying SSH connection. However, a man-in-the-middle can
+// cause the server to respond with a different prefix.
 func makeClientPreamble(
 	keyword string,
 	prefixSpec *OSSHPrefixSpec,
@@ -431,7 +441,7 @@ func makeClientPreamble(
 
 	preamble := buffer.Bytes()
 
-	// Encryptes what comes after the magic value.
+	// Encrypts what comes after the magic value.
 	clientToServerCipher.XORKeyStream(
 		preamble[magicValueStartIndex:],
 		preamble[magicValueStartIndex:])
@@ -551,7 +561,7 @@ func readPreambleHelper(
 			// Adds the seed to the seed history only if the magic value is valid.
 			// This is to prevent malicious clients from filling up the history cache.
 			ok, duplicateLogFields := config.SeedHistory.AddNew(
-				config.StrictHistoryMode, clientIP, "obfuscator-seed", osshSeed)
+				!config.DisableStrictHistoryMode, clientIP, "obfuscator-seed", osshSeed)
 			errStr := "duplicate obfuscation seed"
 			if duplicateLogFields != nil {
 				if config.IrregularLogger != nil {
@@ -686,7 +696,7 @@ func makeTerminator(keyword string, b []byte, direction string) ([]byte, error)
 	return terminator, nil
 }
 
-// makeTerminatedPrefixWithPadding generates bytes starting with the prefix bytes defiend
+// makeTerminatedPrefixWithPadding generates bytes starting with the prefix bytes defined
 // by spec and ending with the generated terminator.
 // If the generated prefix is shorter than PREAMBLE_HEADER_LENGTH, it is padded
 // with random bytes.

+ 0 - 1
psiphon/common/protocol/protocol.go

@@ -732,7 +732,6 @@ func (transports ConjureTransports) PruneInvalid() ConjureTransports {
 }
 
 type HandshakeResponse struct {
-	SSHSessionID             string              `json:"ssh_session_id"`
 	Homepages                []string            `json:"homepages"`
 	UpgradeClientVersion     string              `json:"upgrade_client_version"`
 	PageViewRegexes          []map[string]string `json:"page_view_regexes"`

+ 3 - 1
psiphon/common/quic/quic.go

@@ -219,8 +219,10 @@ func Listen(
 		// The non-strict case where ok is true and logFields is not nil is
 		// ignored, and nothing is logged in that scenario.
 
+		strictMode := false
+
 		ok, logFields := clientRandomHistory.AddNew(
-			false, remoteAddr.String(), "client-hello-random", clientHelloRandom)
+			strictMode, remoteAddr.String(), "client-hello-random", clientHelloRandom)
 		if !ok && logFields != nil {
 			irregularTunnelLogger(
 				common.IPAddressFromAddr(remoteAddr),

+ 2 - 2
psiphon/common/tun/tun.go

@@ -2377,7 +2377,7 @@ func processPacket(
 		dataOffset := 0
 
 		if protocol == internetProtocolTCP {
-			if len(packet) < 33 {
+			if len(packet) < 38 {
 				metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
 				return false
 			}
@@ -2431,7 +2431,7 @@ func processPacket(
 		dataOffset := 0
 
 		if protocol == internetProtocolTCP {
-			if len(packet) < 53 {
+			if len(packet) < 58 {
 				metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
 				return false
 			}

+ 2 - 1
psiphon/controller.go

@@ -2734,7 +2734,8 @@ func (controller *Controller) inproxyGetProxyAPIParameters() (
 
 	// TODO: include broker fronting dial parameters to be logged by the
 	// broker.
-	params := getBaseAPIParameters(baseParametersNoDialParameters, controller.config, nil)
+	params := getBaseAPIParameters(
+		baseParametersNoDialParameters, true, controller.config, nil)
 
 	if controller.config.DisableTactics {
 		return params, "", nil

+ 54 - 60
psiphon/server/api.go

@@ -140,24 +140,13 @@ func sshAPIRequestHandler(
 
 var handshakeRequestParams = append(
 	append(
-		append(
-			[]requestParamSpec{
-				// Legacy clients may not send "session_id" in handshake
-				{"session_id", isHexDigits, requestParamOptional},
-				{"missing_server_entry_signature", isBase64String, requestParamOptional},
-				{"missing_server_entry_provider_id", isBase64String, requestParamOptional},
-			},
-			baseParams...),
-		baseDialParams...),
+		[]requestParamSpec{
+			{"missing_server_entry_signature", isBase64String, requestParamOptional},
+			{"missing_server_entry_provider_id", isBase64String, requestParamOptional},
+		},
+		baseAndDialParams...),
 	tacticsParams...)
 
-// inproxyHandshakeRequestParams adds inproxyDialParams to handshakeRequestParams.
-var inproxyHandshakeRequestParams = append(
-	append(
-		[]requestParamSpec{},
-		handshakeRequestParams...),
-	inproxyDialParams...)
-
 // handshakeAPIRequestHandler implements the "handshake" API request.
 // Clients make the handshake immediately after establishing a tunnel
 // connection; the response tells the client what homepage to open, what
@@ -229,17 +218,11 @@ func handshakeAPIRequestHandler(
 
 	// Note: ignoring legacy "known_servers" params
 
-	expectedParams := handshakeRequestParams
-	if sshClient.isInproxyTunnelProtocol {
-		expectedParams = inproxyHandshakeRequestParams
-	}
-
-	err := validateRequestParams(support.Config, params, expectedParams)
+	err := validateRequestParams(support.Config, params, handshakeRequestParams)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
 
-	sessionID, _ := getStringRequestParam(params, "client_session_id")
 	sponsorID, _ := getStringRequestParam(params, "sponsor_id")
 	clientVersion, _ := getStringRequestParam(params, "client_version")
 	clientPlatform, _ := getStringRequestParam(params, "client_platform")
@@ -304,7 +287,7 @@ func handshakeAPIRequestHandler(
 	// Flag the SSH client as having completed its handshake. This
 	// may reselect traffic rules and starts allowing port forwards.
 
-	apiParams := copyBaseSessionAndDialParams(params)
+	apiParams := copyBaseAndDialParams(params)
 
 	handshakeStateInfo, err := sshClient.setHandshakeState(
 		handshakeState{
@@ -333,13 +316,16 @@ func handshakeAPIRequestHandler(
 	// common API parameters and "handshake_completed" flag, this handshake
 	// log is mostly redundant and set to debug level.
 
-	log.WithTraceFields(
-		getRequestLogFields(
+	if IsLogLevelDebug() {
+		logFields := getRequestLogFields(
 			"",
+			sshClient.sessionID,
 			clientGeoIPData,
 			handshakeStateInfo.authorizedAccessTypes,
 			params,
-			handshakeRequestParams)).Debug("handshake")
+			handshakeRequestParams)
+		log.WithTraceFields(logFields).Debug("handshake")
+	}
 
 	pad_response, _ := getPaddingSizeRequestParam(params, "pad_response")
 
@@ -431,7 +417,6 @@ func handshakeAPIRequestHandler(
 	}
 
 	handshakeResponse := protocol.HandshakeResponse{
-		SSHSessionID:             sessionID,
 		Homepages:                homepages,
 		UpgradeClientVersion:     db.GetUpgradeClientVersion(clientVersion, normalizedPlatform),
 		PageViewRegexes:          make([]map[string]string, 0),
@@ -562,7 +547,7 @@ func doHandshakeInproxyBrokerRelay(
 var uniqueUserParams = append(
 	[]requestParamSpec{
 		{"last_connected", isLastConnected, 0}},
-	baseSessionParams...)
+	baseParams...)
 
 var connectedRequestParams = append(
 	[]requestParamSpec{
@@ -640,6 +625,7 @@ func connectedAPIRequestHandler(
 		log.LogRawFieldsWithTimestamp(
 			getRequestLogFields(
 				"unique_user",
+				sshClient.sessionID,
 				sshClient.getClientGeoIPData(),
 				authorizedAccessTypes,
 				params,
@@ -661,10 +647,12 @@ func connectedAPIRequestHandler(
 	return responsePayload, nil
 }
 
-var statusRequestParams = baseSessionParams
+var statusRequestParams = baseParams
 
 var remoteServerListStatParams = append(
 	[]requestParamSpec{
+		// Legacy clients don't record the session_id with remote_server_list_stats entries.
+		{"session_id", isHexDigits, requestParamOptional},
 		{"client_download_timestamp", isISO8601Date, 0},
 		{"tunneled", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
 		{"url", isAnyString, 0},
@@ -684,7 +672,7 @@ var remoteServerListStatParams = append(
 		{"tls_fragmented", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
 	},
 
-	baseSessionParams...)
+	baseParams...)
 
 // Backwards compatibility case: legacy clients do not include these fields in
 // the remote_server_list_stats entries. Use the values from the outer status
@@ -693,7 +681,6 @@ var remoteServerListStatParams = append(
 // recording time). Note that all but client_build_rev, device_region, and
 // device_location are required fields.
 var remoteServerListStatBackwardsCompatibilityParamNames = []string{
-	"session_id",
 	"propagation_channel_id",
 	"sponsor_id",
 	"client_version",
@@ -717,7 +704,7 @@ var failedTunnelStatParams = append(
 		{"bytes_up", isIntString, requestParamOptional | requestParamLogStringAsInt},
 		{"bytes_down", isIntString, requestParamOptional | requestParamLogStringAsInt},
 		{"tunnel_error", isAnyString, 0}},
-	baseSessionAndDialParams...)
+	baseAndDialParams...)
 
 // statusAPIRequestHandler implements the "status" API request.
 // Clients make periodic status requests which deliver client-side
@@ -768,6 +755,7 @@ func statusAPIRequestHandler(
 
 			domainBytesFields := getRequestLogFields(
 				"domain_bytes",
+				sshClient.sessionID,
 				sshClient.getClientGeoIPData(),
 				authorizedAccessTypes,
 				params,
@@ -802,10 +790,6 @@ func statusAPIRequestHandler(
 				}
 			}
 
-			// For validation, copy expected fields from the outer
-			// statusRequestParams.
-			remoteServerListStat["client_session_id"] = params["client_session_id"]
-
 			err := validateRequestParams(support.Config, remoteServerListStat, remoteServerListStatParams)
 			if err != nil {
 				// Occasionally, clients may send corrupt persistent stat data. Do not
@@ -816,6 +800,7 @@ func statusAPIRequestHandler(
 
 			remoteServerListFields := getRequestLogFields(
 				"remote_server_list",
+				"", // Use the session_id the client recorded with the event
 				sshClient.getClientGeoIPData(),
 				authorizedAccessTypes,
 				remoteServerListStat,
@@ -856,6 +841,7 @@ func statusAPIRequestHandler(
 
 			failedTunnelFields := getRequestLogFields(
 				"failed_tunnel",
+				"", // Use the session_id the client recorded with the event
 				sshClient.getClientGeoIPData(),
 				authorizedAccessTypes,
 				failedTunnelStat,
@@ -941,9 +927,9 @@ func statusAPIRequestHandler(
 // clientVerificationAPIRequestHandler is just a compliance stub
 // for older Android clients that still send verification requests
 func clientVerificationAPIRequestHandler(
-	support *SupportServices,
-	sshClient *sshClient,
-	params common.APIParameters) ([]byte, error) {
+	_ *SupportServices,
+	_ *sshClient,
+	_ common.APIParameters) ([]byte, error) {
 	return make([]byte, 0), nil
 }
 
@@ -954,9 +940,10 @@ var tacticsParams = []requestParamSpec{
 
 var tacticsRequestParams = append(
 	append(
-		[]requestParamSpec(nil),
+		[]requestParamSpec{
+			{"session_id", isHexDigits, 0}},
 		tacticsParams...),
-	baseSessionAndDialParams...)
+	baseAndDialParams...)
 
 func getTacticsAPIParameterValidator(config *Config) common.APIParameterValidator {
 	return func(params common.APIParameters) error {
@@ -970,6 +957,7 @@ func getTacticsAPIParameterLogFieldFormatter() common.APIParameterLogFieldFormat
 
 		logFields := getRequestLogFields(
 			tactics.TACTICS_METRIC_EVENT_NAME,
+			"", // Use the session_id the client reported
 			GeoIPData(geoIPData),
 			nil, // authorizedAccessTypes are not known yet
 			params,
@@ -981,9 +969,10 @@ func getTacticsAPIParameterLogFieldFormatter() common.APIParameterLogFieldFormat
 
 var inproxyBrokerRequestParams = append(
 	append(
-		[]requestParamSpec{},
+		[]requestParamSpec{
+			{"session_id", isHexDigits, 0}},
 		tacticsParams...),
-	baseSessionParams...)
+	baseParams...)
 
 func getInproxyBrokerAPIParameterValidator(config *Config) common.APIParameterValidator {
 	return func(params common.APIParameters) error {
@@ -997,6 +986,7 @@ func getInproxyBrokerAPIParameterLogFieldFormatter() common.APIParameterLogField
 
 		logFields := getRequestLogFields(
 			"inproxy_broker",
+			"", // Use the session_id the client reported
 			GeoIPData(geoIPData),
 			nil,
 			params,
@@ -1031,7 +1021,6 @@ const (
 // baseParams are the basic request parameters that are expected for all API
 // requests and log events.
 var baseParams = []requestParamSpec{
-	{"client_session_id", isHexDigits, requestParamNotLogged},
 	{"propagation_channel_id", isHexDigits, 0},
 	{"sponsor_id", isHexDigits, 0},
 	{"client_version", isIntString, requestParamLogStringAsInt},
@@ -1044,14 +1033,6 @@ var baseParams = []requestParamSpec{
 	{tactics.APPLIED_TACTICS_TAG_PARAMETER_NAME, isAnyString, requestParamOptional},
 }
 
-// baseSessionParams adds to baseParams the required session_id parameter. For
-// all requests except handshake, all existing clients are expected to send
-// session_id. Legacy clients may not send "session_id" in handshake.
-var baseSessionParams = append(
-	[]requestParamSpec{
-		{"session_id", isHexDigits, 0}},
-	baseParams...)
-
 // baseDialParams are the dial parameters, per-tunnel network protocol and
 // obfuscation metrics which are logged with server_tunnel, failed_tunnel, and
 // tactics.
@@ -1167,12 +1148,12 @@ var inproxyDialParams = []requestParamSpec{
 	{"inproxy_webrtc_remote_ice_candidate_port", isIntString, requestParamOptional | requestParamLogStringAsInt},
 }
 
-// baseSessionAndDialParams adds baseDialParams and inproxyDialParams to baseSessionParams.
-var baseSessionAndDialParams = append(
+// baseAndDialParams adds baseDialParams and inproxyDialParams to baseParams.
+var baseAndDialParams = append(
 	append(
 		append(
 			[]requestParamSpec{},
-			baseSessionParams...),
+			baseParams...),
 		baseDialParams...),
 	inproxyDialParams...)
 
@@ -1213,14 +1194,14 @@ func validateRequestParams(
 	return nil
 }
 
-// copyBaseSessionAndDialParams makes a copy of the params which includes only
-// the baseSessionAndDialParams.
-func copyBaseSessionAndDialParams(params common.APIParameters) common.APIParameters {
+// copyBaseAndDialParams makes a copy of the params which includes only
+// the baseAndDialParams.
+func copyBaseAndDialParams(params common.APIParameters) common.APIParameters {
 
 	// Note: not a deep copy; assumes baseSessionAndDialParams values are all
 	// scalar types (int, string, etc.)
 	paramsCopy := make(common.APIParameters)
-	for _, baseParam := range baseSessionAndDialParams {
+	for _, baseParam := range baseAndDialParams {
 		value := params[baseParam.name]
 		if value == nil {
 			continue
@@ -1281,6 +1262,7 @@ func validateStringArrayRequestParam(
 // the legacy psi_web and current ELK naming conventions.
 func getRequestLogFields(
 	eventName string,
+	sessionID string,
 	geoIPData GeoIPData,
 	authorizedAccessTypes []string,
 	params common.APIParameters,
@@ -1288,6 +1270,18 @@ func getRequestLogFields(
 
 	logFields := make(LogFields)
 
+	// A sessionID is specified for SSH API requests, where the Psiphon server
+	// has already received a session ID in the SSH auth payload. In this
+	// case, use that session ID.
+	//
+	// sessionID is "" for other, non-SSH server cases including tactics,
+	// in-proxy broker, and client-side store and forward events including
+	// remote server list and failed tunnel.
+
+	if sessionID != "" {
+		logFields["session_id"] = sessionID
+	}
+
 	if eventName != "" {
 		logFields["event_name"] = eventName
 	}

+ 5 - 0
psiphon/server/config.go

@@ -297,6 +297,11 @@ type Config struct {
 	// is 0.
 	MeekCachedResponsePoolBufferCount int
 
+	// MeekCachedResponsePoolBufferClientLimit is the maximum number of of
+	// shared buffers a single client may consume at once. A default of 32 is
+	// used when MeekCachedResponsePoolBufferClientLimit is 0.
+	MeekCachedResponsePoolBufferClientLimit int
+
 	// UDPInterceptUdpgwServerAddress specifies the network address of
 	// a udpgw server which clients may be port forwarding to. When
 	// specified, these TCP port forwards are intercepted and handled

+ 45 - 7
psiphon/server/meek.go

@@ -94,7 +94,9 @@ const (
 	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH              = 65536
 	MEEK_DEFAULT_POOL_BUFFER_LENGTH                  = 65536
 	MEEK_DEFAULT_POOL_BUFFER_COUNT                   = 2048
+	MEEK_DEFAULT_POOL_BUFFER_CLIENT_LIMIT            = 32
 	MEEK_ENDPOINT_MAX_REQUEST_PAYLOAD_LENGTH         = 65536
+	MEEK_MAX_SESSION_COUNT                           = 1000000
 )
 
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
@@ -216,6 +218,11 @@ func NewMeekServer(
 		bufferCount = support.Config.MeekCachedResponsePoolBufferCount
 	}
 
+	bufferPoolClientLimit := MEEK_DEFAULT_POOL_BUFFER_CLIENT_LIMIT
+	if support.Config.MeekCachedResponsePoolBufferClientLimit != 0 {
+		bufferPoolClientLimit = support.Config.MeekCachedResponsePoolBufferClientLimit
+	}
+
 	_, thresholdSeconds, _, _, _, _, _, _, reapFrequencySeconds, maxEntries :=
 		support.TrafficRulesSet.GetMeekRateLimiterConfig()
 
@@ -224,7 +231,14 @@ func NewMeekServer(
 		time.Duration(reapFrequencySeconds)*time.Second,
 		maxEntries)
 
-	bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
+	bufferPool := NewCachedResponseBufferPool(
+		bufferLength, bufferCount, bufferPoolClientLimit)
+
+	// Limitation: rate limiting and resource limiting are handled by external
+	// components, and MeekServer enforces only a sanity check limit on the
+	// number the number of entries in MeekServer.sessions.
+	//
+	// See comment in newSSHServer for more details.
 
 	meekServer := &MeekServer{
 		support:                         support,
@@ -784,12 +798,8 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 		responseWriter.WriteHeader(http.StatusPartialContent)
 
-		// TODO:
-		// - enforce a max extended buffer count per client, for
-		//   fairness? Throttling may make this unnecessary.
-		// - cachedResponse can now start releasing extended buffers,
-		//   as response bytes before "position" will never be requested
-		//   again?
+		// TODO: cachedResponse can now start releasing extended buffers, as
+		// response bytes before "position" will never be requested again?
 
 		responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter)
 		greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize))
@@ -819,6 +829,19 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		// pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
 		// write its downstream traffic through to the response body.
 
+		// Limitation: pumpWrites may write more response bytes than can be
+		// cached for future retries, either due to no extended buffers
+		// available, or exceeding the per-client extended buffer limit. In
+		// practice, with throttling in place and servers running under load
+		// limiting, metrics indicate that this rarely occurs. A potential
+		// future enhancement could be for pumpWrites to stop writing and
+		// send the response once there's no buffers remaining, favoring
+		// connection resilience over performance.
+		//
+		// TODO: use geo-targeted per-client extended buffer limit to reserve
+		// extended cache buffers for regions or ISPs with active or expected
+		// network connection interruptions?
+
 		responseSize, responseError = session.clientConn.pumpWrites(multiWriter, skipExtendedTurnAround)
 		greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
 		greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
@@ -1202,6 +1225,17 @@ func (server *MeekServer) getSessionOrEndpoint(
 	}
 
 	server.sessionsLock.Lock()
+
+	// MEEK_MAX_SESSION_COUNT is a simple sanity check and failsafe. Load
+	// limiting tuned to each server's host resources is provided by external
+	// components. See comment in newSSHServer for more details.
+	if len(server.sessions) >= MEEK_MAX_SESSION_COUNT {
+		server.sessionsLock.Unlock()
+		err := std_errors.New("MEEK_MAX_SESSION_COUNT exceeded")
+		log.WithTrace().Warning(err.Error())
+		return "", nil, nil, "", "", nil, errors.Trace(err)
+	}
+
 	server.sessions[sessionID] = session
 	server.sessionsLock.Unlock()
 
@@ -1439,6 +1473,10 @@ func (server *MeekServer) getMeekCookiePayload(
 					errors.Trace(err),
 					LogFields(logFields))
 			},
+
+			// To allow for meek retries, replay of the same meek cookie is
+			// permitted (but only from the same source IP).
+			DisableStrictHistoryMode: true,
 		},
 		clientIP,
 		reader)

+ 9 - 4
psiphon/server/meekBuffer.go

@@ -225,9 +225,12 @@ func (response *CachedResponse) Write(data []byte) (int, error) {
 			if response.writeBufferIndex == len(response.buffers)-1 &&
 				!response.overwriting {
 
-				extendedBuffer := response.extendedBufferPool.Get()
-				if extendedBuffer != nil {
-					response.buffers = append(response.buffers, extendedBuffer)
+				extendedBufferCount := len(response.buffers) - 1
+				if extendedBufferCount < response.extendedBufferPool.limit {
+					extendedBuffer := response.extendedBufferPool.Get()
+					if extendedBuffer != nil {
+						response.buffers = append(response.buffers, extendedBuffer)
+					}
 				}
 			}
 
@@ -257,13 +260,14 @@ func (response *CachedResponse) Write(data []byte) (int, error) {
 type CachedResponseBufferPool struct {
 	bufferSize int
 	buffers    chan []byte
+	limit      int
 }
 
 // NewCachedResponseBufferPool creates a new CachedResponseBufferPool
 // with the specified number of buffers. Buffers are allocated on
 // demand and once allocated remain allocated.
 func NewCachedResponseBufferPool(
-	bufferSize, bufferCount int) *CachedResponseBufferPool {
+	bufferSize, bufferCount int, limit int) *CachedResponseBufferPool {
 
 	buffers := make(chan []byte, bufferCount)
 	for i := 0; i < bufferCount; i++ {
@@ -273,6 +277,7 @@ func NewCachedResponseBufferPool(
 	return &CachedResponseBufferPool{
 		bufferSize: bufferSize,
 		buffers:    buffers,
+		limit:      limit,
 	}
 }
 

+ 24 - 15
psiphon/server/meek_test.go

@@ -59,45 +59,54 @@ func TestCachedResponse(t *testing.T) {
 		bufferSize          int
 		extendedBufferSize  int
 		extendedBufferCount int
+		extendedBufferLimit int
 		minBytesPerWrite    int
 		maxBytesPerWrite    int
 		copyPosition        int
 		expectedSuccess     bool
 	}{
-		{1, 16, 16, 0, 0, 1, 1, 0, true},
+		{1, 16, 16, 0, 0, -1, 1, 1, 0, true},
 
-		{1, 31, 16, 0, 0, 1, 1, 15, true},
+		{1, 31, 16, 0, 0, -1, 1, 1, 15, true},
 
-		{1, 16, 2, 2, 7, 1, 1, 0, true},
+		{1, 16, 2, 2, 7, -1, 1, 1, 0, true},
 
-		{1, 31, 15, 3, 5, 1, 1, 1, true},
+		{1, 31, 15, 3, 5, -1, 1, 1, 1, true},
 
-		{1, 16, 16, 0, 0, 1, 1, 16, true},
+		{1, 16, 16, 0, 0, -1, 1, 1, 16, true},
 
-		{1, 64*KB + 1, 64 * KB, 64 * KB, 1, 1, 1 * KB, 64 * KB, true},
+		{1, 64*KB + 1, 64 * KB, 64 * KB, 1, -1, 1, 1 * KB, 64 * KB, true},
 
-		{1, 10 * MB, 64 * KB, 64 * KB, 158, 1, 32 * KB, 0, false},
+		{1, 10 * MB, 64 * KB, 64 * KB, 158, -1, 1, 32 * KB, 0, false},
 
-		{1, 10 * MB, 64 * KB, 64 * KB, 159, 1, 32 * KB, 0, true},
+		{1, 10 * MB, 64 * KB, 64 * KB, 159, -1, 1, 32 * KB, 0, true},
 
-		{1, 10 * MB, 64 * KB, 64 * KB, 160, 1, 32 * KB, 0, true},
+		{1, 10 * MB, 64 * KB, 64 * KB, 160, -1, 1, 32 * KB, 0, true},
 
-		{1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 64 * KB, true},
+		{1, 128 * KB, 64 * KB, 0, 0, -1, 1, 1 * KB, 64 * KB, true},
 
-		{1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 63 * KB, false},
+		{1, 128 * KB, 64 * KB, 0, 0, -1, 1, 1 * KB, 63 * KB, false},
 
-		{1, 200 * KB, 64 * KB, 0, 0, 1, 1 * KB, 136 * KB, true},
+		{1, 200 * KB, 64 * KB, 0, 0, -1, 1, 1 * KB, 136 * KB, true},
 
-		{10, 10 * MB, 64 * KB, 64 * KB, 1589, 1, 32 * KB, 0, false},
+		{10, 10 * MB, 64 * KB, 64 * KB, 1589, -1, 1, 32 * KB, 0, false},
 
-		{10, 10 * MB, 64 * KB, 64 * KB, 1590, 1, 32 * KB, 0, true},
+		{10, 10 * MB, 64 * KB, 64 * KB, 1590, -1, 1, 32 * KB, 0, true},
+
+		{10, 10 * MB, 64 * KB, 64 * KB, 1590, 32, 1, 32 * KB, 0, false},
 	}
 
 	for _, testCase := range testCases {
 		description := fmt.Sprintf("test case: %+v", testCase)
 		t.Run(description, func(t *testing.T) {
 
-			pool := NewCachedResponseBufferPool(testCase.extendedBufferSize, testCase.extendedBufferCount)
+			limit := testCase.extendedBufferCount
+			if testCase.extendedBufferLimit != -1 {
+				limit = testCase.extendedBufferLimit
+			}
+
+			pool := NewCachedResponseBufferPool(
+				testCase.extendedBufferSize, testCase.extendedBufferCount, limit)
 
 			responses := make([]*CachedResponse, testCase.concurrentResponses)
 			for i := 0; i < testCase.concurrentResponses; i++ {

+ 2 - 1
psiphon/server/tlsTunnel.go

@@ -172,9 +172,10 @@ func (server *TLSTunnelServer) makeTLSTunnelConfig() (*tls.Config, error) {
 
 			// strictMode is true as legitimate clients never retry TLS
 			// connections using a previous random value.
+			strictMode := true
 
 			ok, logFields := server.obfuscatorSeedHistory.AddNewWithTTL(
-				true,
+				strictMode,
 				clientIP,
 				"client-random",
 				clientRandom,

+ 92 - 29
psiphon/server/tunnelServer.go

@@ -32,6 +32,7 @@ import (
 	"io/ioutil"
 	"net"
 	"strconv"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"syscall"
@@ -74,6 +75,7 @@ const (
 	PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
 	RANDOM_STREAM_MAX_BYTES               = 10485760
 	ALERT_REQUEST_QUEUE_BUFFER_SIZE       = 16
+	SSH_MAX_CLIENT_COUNT                  = 100000
 )
 
 // TunnelServer is the main server that accepts Psiphon client
@@ -451,6 +453,26 @@ func newSSHServer(
 		}
 	}
 
+	// Limitation: rate limiting and resource limiting are handled by external
+	// components, and sshServer enforces only a sanity check limit on the
+	// number of entries in sshServer.clients; and no limit on the number of
+	// entries in sshServer.geoIPSessionCache or sshServer.oslSessionCache.
+	//
+	// To avoid resource exhaustion, this implementation relies on:
+	//
+	// - Per-peer IP address and/or overall network connection rate limiting,
+	//   provided by iptables as configured by Psiphon automation
+	//   (https://github.com/Psiphon-Inc/psiphon-automation/blob/
+	//   4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L1451).
+	//
+	// - Host CPU/memory/network monitoring and signalling, installed Psiphon
+	//   automation
+	//   (https://github.com/Psiphon-Inc/psiphon-automation/blob/
+	//    4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L935).
+	//   When resource usage meets certain thresholds, the monitoring signals
+	//   this process with SIGTSTP or SIGCONT, and handlers call
+	//   sshServer.setEstablishTunnels to stop or resume accepting new clients.
+
 	sshServer := &sshServer{
 		support:                 support,
 		establishTunnels:        1,
@@ -528,7 +550,9 @@ func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError
 		// span multiple TCP connections.
 
 		if !sshServer.checkEstablishTunnels() {
-			log.WithTrace().Debug("not establishing tunnels")
+			if IsLogLevelDebug() {
+				log.WithTrace().Debug("not establishing tunnels")
+			}
 			conn.Close()
 			return
 		}
@@ -916,6 +940,14 @@ func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
 		return false
 	}
 
+	// SSH_MAX_CLIENT_COUNT is a simple sanity check and failsafe. Load
+	// limiting tuned to each server's host resources is provided by external
+	// components. See comment in newSSHServer for more details.
+	if len(sshServer.clients) >= SSH_MAX_CLIENT_COUNT {
+		log.WithTrace().Warning("SSH_MAX_CLIENT_COUNT exceeded")
+		return false
+	}
+
 	sshServer.clients[client.sessionID] = client
 
 	return true
@@ -3220,7 +3252,7 @@ var serverTunnelStatParams = append(
 	[]requestParamSpec{
 		{"last_connected", isLastConnected, requestParamOptional},
 		{"establishment_duration", isIntString, requestParamOptional}},
-	baseSessionAndDialParams...)
+	baseAndDialParams...)
 
 func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 
@@ -3232,6 +3264,7 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 
 	logFields := getRequestLogFields(
 		"server_tunnel",
+		sshClient.sessionID,
 		sshClient.clientGeoIPData,
 		sshClient.handshakeState.authorizedAccessTypes,
 		sshClient.handshakeState.apiParams,
@@ -3260,7 +3293,6 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 	if sshClient.sshListener.BPFProgramName != "" {
 		logFields["server_bpf"] = sshClient.sshListener.BPFProgramName
 	}
-	logFields["session_id"] = sshClient.sessionID
 	logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession
 	logFields["handshake_completed"] = sshClient.handshakeState.completed
 	logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp
@@ -3386,7 +3418,6 @@ var blocklistHitsStatParams = []requestParamSpec{
 	{"device_region", isAnyString, requestParamOptional},
 	{"device_location", isGeoHashString, requestParamOptional},
 	{"egress_region", isRegionCode, requestParamOptional},
-	{"session_id", isHexDigits, 0},
 	{"last_connected", isLastConnected, requestParamOptional},
 }
 
@@ -3401,13 +3432,12 @@ func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []Bl
 
 	logFields := getRequestLogFields(
 		"server_blocklist_hit",
+		sshClient.sessionID,
 		sshClient.clientGeoIPData,
 		sshClient.handshakeState.authorizedAccessTypes,
 		sshClient.handshakeState.apiParams,
 		blocklistHitsStatParams)
 
-	logFields["session_id"] = sshClient.sessionID
-
 	// Note: see comment in logTunnel regarding unlock and concurrent access.
 
 	sshClient.Unlock()
@@ -3603,12 +3633,14 @@ func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessa
 	reason := ssh.Prohibited
 
 	// Note: Debug level, as logMessage may contain user traffic destination address information
-	log.WithTraceFields(
-		LogFields{
-			"channelType":  newChannel.ChannelType(),
-			"logMessage":   logMessage,
-			"rejectReason": reason.String(),
-		}).Debug("reject new channel")
+	if IsLogLevelDebug() {
+		log.WithTraceFields(
+			LogFields{
+				"channelType":  newChannel.ChannelType(),
+				"logMessage":   logMessage,
+				"rejectReason": reason.String(),
+			}).Debug("reject new channel")
+	}
 
 	// Note: logMessage is internal, for logging only; just the reject reason is sent to the client.
 	newChannel.Reject(reason, reason.String())
@@ -4214,11 +4246,13 @@ func (sshClient *sshClient) isPortForwardPermitted(
 
 	sshClient.enqueueDisallowedTrafficAlertRequest()
 
-	log.WithTraceFields(
-		LogFields{
-			"type": portForwardType,
-			"port": port,
-		}).Debug("port forward denied by traffic rules")
+	if IsLogLevelDebug() {
+		log.WithTraceFields(
+			LogFields{
+				"type": portForwardType,
+				"port": port,
+			}).Debug("port forward denied by traffic rules")
+	}
 
 	return false
 }
@@ -4236,6 +4270,13 @@ func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) {
 		return false, "invalid domain name"
 	}
 
+	// Don't even attempt to resolve the default mDNS top-level domain.
+	// Non-default cases won't be caught here but should fail to resolve due
+	// to the PreferGo setting in net.Resolver.
+	if strings.HasSuffix(domain, ".local") {
+		return false, "port forward not permitted"
+	}
+
 	tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain)
 	if len(tags) > 0 {
 
@@ -4424,7 +4465,10 @@ func (sshClient *sshClient) establishedPortForward(
 	if !sshClient.allocatePortForward(portForwardType) {
 
 		portForwardLRU.CloseOldest()
-		log.WithTrace().Debug("closed LRU port forward")
+
+		if IsLogLevelDebug() {
+			log.WithTrace().Debug("closed LRU port forward")
+		}
 
 		state.availablePortForwardCond.L.Lock()
 		for !sshClient.allocatePortForward(portForwardType) {
@@ -4595,10 +4639,19 @@ func (sshClient *sshClient) handleTCPChannel(
 
 		// Resolve the hostname
 
-		log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
+		// PreferGo, equivalent to GODEBUG=netdns=go, is specified in order to
+		// avoid any cases where Go's resolver fails over to the cgo-based
+		// resolver (see https://pkg.go.dev/net#hdr-Name_Resolution). Such
+		// cases, if they resolve at all, may be expected to resolve to bogon
+		// IPs that won't be permitted; but the cgo invocation will consume
+		// an OS thread, which is a performance hit we can avoid.
+
+		if IsLogLevelDebug() {
+			log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
+		}
 
 		ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
-		IPs, err := (&net.Resolver{}).LookupIPAddr(ctx, hostToConnect)
+		IPs, err := (&net.Resolver{PreferGo: true}).LookupIPAddr(ctx, hostToConnect)
 		cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
 
 		resolveElapsedTime := time.Since(dialStartTime)
@@ -4715,7 +4768,9 @@ func (sshClient *sshClient) handleTCPChannel(
 
 	remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))
 
-	log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
+	if IsLogLevelDebug() {
+		log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
+	}
 
 	ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
 	fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)
@@ -4792,7 +4847,9 @@ func (sshClient *sshClient) handleTCPChannel(
 
 	// Relay channel to forwarded connection.
 
-	log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
+	if IsLogLevelDebug() {
+		log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
+	}
 
 	// TODO: relay errors to fwdChannel.Stderr()?
 	relayWaitGroup := new(sync.WaitGroup)
@@ -4807,7 +4864,9 @@ func (sshClient *sshClient) handleTCPChannel(
 		atomic.AddInt64(&bytesDown, bytes)
 		if err != nil && err != io.EOF {
 			// Debug since errors such as "connection reset by peer" occur during normal operation
-			log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
+			if IsLogLevelDebug() {
+				log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
+			}
 		}
 		// Interrupt upstream io.Copy when downstream is shutting down.
 		// TODO: this is done to quickly cleanup the port forward when
@@ -4819,7 +4878,9 @@ func (sshClient *sshClient) handleTCPChannel(
 		fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
 	atomic.AddInt64(&bytesUp, bytes)
 	if err != nil && err != io.EOF {
-		log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
+		if IsLogLevelDebug() {
+			log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
+		}
 	}
 	// Shutdown special case: fwdChannel will be closed and return EOF when
 	// the SSH connection is closed, but we need to explicitly close fwdConn
@@ -4829,9 +4890,11 @@ func (sshClient *sshClient) handleTCPChannel(
 
 	relayWaitGroup.Wait()
 
-	log.WithTraceFields(
-		LogFields{
-			"remoteAddr": remoteAddr,
-			"bytesUp":    atomic.LoadInt64(&bytesUp),
-			"bytesDown":  atomic.LoadInt64(&bytesDown)}).Debug("exiting")
+	if IsLogLevelDebug() {
+		log.WithTraceFields(
+			LogFields{
+				"remoteAddr": remoteAddr,
+				"bytesUp":    atomic.LoadInt64(&bytesUp),
+				"bytesDown":  atomic.LoadInt64(&bytesDown)}).Debug("exiting")
+	}
 }

+ 16 - 2
psiphon/server/udp.go

@@ -365,6 +365,12 @@ type udpgwPortForward struct {
 	mux               *udpgwPortForwardMultiplexer
 }
 
+var udpgwBufferPool = &sync.Pool{
+	New: func() any {
+		return make([]byte, udpgwProtocolMaxMessageSize)
+	},
+}
+
 func (portForward *udpgwPortForward) relayDownstream() {
 	defer portForward.relayWaitGroup.Done()
 	defer portForward.mux.relayWaitGroup.Done()
@@ -378,7 +384,13 @@ func (portForward *udpgwPortForward) relayDownstream() {
 	// Note: there is one downstream buffer per UDP port forward,
 	// while for upstream there is one buffer per client.
 	// TODO: is the buffer size larger than necessary?
-	buffer := make([]byte, udpgwProtocolMaxMessageSize)
+
+	// Use a buffer pool to minimize GC churn resulting from frequent,
+	// short-lived UDP flows, including DNS requests.
+	buffer := udpgwBufferPool.Get().([]byte)
+	clear(buffer)
+	defer udpgwBufferPool.Put(buffer)
+
 	packetBuffer := buffer[portForward.preambleSize:udpgwProtocolMaxMessageSize]
 	for {
 		// TODO: if read buffer is too small, excess bytes are discarded?
@@ -389,7 +401,9 @@ func (portForward *udpgwPortForward) relayDownstream() {
 		if err != nil {
 			if err != io.EOF {
 				// Debug since errors such as "use of closed network connection" occur during normal operation
-				log.WithTraceFields(LogFields{"error": err}).Debug("downstream UDP relay failed")
+				if IsLogLevelDebug() {
+					log.WithTraceFields(LogFields{"error": err}).Debug("downstream UDP relay failed")
+				}
 			}
 			break
 		}

+ 14 - 7
psiphon/serverApi.go

@@ -117,7 +117,7 @@ func NewServerContext(tunnel *Tunnel) (*ServerContext, error) {
 // stored -- and sponsor info (home pages, stat regexes).
 func (serverContext *ServerContext) doHandshakeRequest(ignoreStatsRegexps bool) error {
 
-	params := serverContext.getBaseAPIParameters(baseParametersAll)
+	params := serverContext.getBaseAPIParameters(baseParametersAll, false)
 
 	// The server will return a signed copy of its own server entry when the
 	// client specifies this 'missing_server_entry_signature' parameter.
@@ -491,7 +491,7 @@ func (serverContext *ServerContext) DoConnectedRequest() error {
 	defer serverContext.tunnel.SetInFlightConnectedRequest(nil)
 
 	params := serverContext.getBaseAPIParameters(
-		baseParametersOnlyUpstreamFragmentorDialParameters)
+		baseParametersOnlyUpstreamFragmentorDialParameters, false)
 
 	lastConnected, err := getLastConnected()
 	if err != nil {
@@ -563,7 +563,8 @@ func (serverContext *ServerContext) StatsRegexps() *transferstats.Regexps {
 // DoStatusRequest makes a "status" API request to the server, sending session stats.
 func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error {
 
-	params := serverContext.getBaseAPIParameters(baseParametersNoDialParameters)
+	params := serverContext.getBaseAPIParameters(
+		baseParametersNoDialParameters, false)
 
 	// Note: ensure putBackStatusRequestPayload is called, to replace
 	// payload for future attempt, in all failure cases.
@@ -847,7 +848,7 @@ func RecordFailedTunnelStat(
 		return errors.Trace(err)
 	}
 
-	params := getBaseAPIParameters(baseParametersAll, config, dialParams)
+	params := getBaseAPIParameters(baseParametersAll, true, config, dialParams)
 
 	delete(params, "server_secret")
 	params["server_entry_tag"] = dialParams.ServerEntry.Tag
@@ -978,10 +979,12 @@ const (
 )
 
 func (serverContext *ServerContext) getBaseAPIParameters(
-	filter baseParametersFilter) common.APIParameters {
+	filter baseParametersFilter,
+	includeSessionID bool) common.APIParameters {
 
 	params := getBaseAPIParameters(
 		filter,
+		includeSessionID,
 		serverContext.tunnel.config,
 		serverContext.tunnel.dialParams)
 
@@ -1016,13 +1019,17 @@ func (serverContext *ServerContext) getBaseAPIParameters(
 // baseParametersNoDialParameters.
 func getBaseAPIParameters(
 	filter baseParametersFilter,
+	includeSessionID bool,
 	config *Config,
 	dialParams *DialParameters) common.APIParameters {
 
 	params := make(common.APIParameters)
 
-	params["session_id"] = config.SessionID
-	params["client_session_id"] = config.SessionID
+	if includeSessionID {
+		// The session ID is included in non-SSH API requests only. For SSH
+		// API requests, the Psiphon server already has the client's session ID.
+		params["session_id"] = config.SessionID
+	}
 	params["propagation_channel_id"] = config.PropagationChannelId
 	params["sponsor_id"] = config.GetSponsorID()
 	params["client_version"] = config.ClientVersion

+ 1 - 1
psiphon/tactics.go

@@ -271,7 +271,7 @@ func fetchTactics(
 	defer meekConn.Close()
 
 	apiParams := getBaseAPIParameters(
-		baseParametersAll, config, dialParams)
+		baseParametersAll, true, config, dialParams)
 
 	tacticsRecord, err := tactics.FetchTactics(
 		ctx,

+ 10 - 2
psiphon/tunnel.go

@@ -1057,6 +1057,13 @@ func dialTunnel(
 			return false
 		},
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
+
+			// The remote address input isn't checked. In the case of fronted
+			// protocols, the immediate remote peer won't be the Psiphon
+			// server. In direct cases, the client has just dialed the IP
+			// address and expected public key both taken from the same
+			// trusted, signed server entry.
+
 			if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
 				return errors.TraceNew("unexpected host public key")
 			}
@@ -1104,7 +1111,7 @@ func dialTunnel(
 	} else {
 		// For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize
 		// its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation
-		// betweem two randomized KEXes.
+		// between two randomized KEXes.
 		if dialParams.ServerEntry.SshObfuscatedKey != "" {
 			sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
 				dialParams.ServerEntry.SshObfuscatedKey)
@@ -1541,7 +1548,8 @@ func dialInproxy(
 	// TODO: include broker fronting dial parameters to be logged by the
 	// broker -- as successful parameters might not otherwise by logged via
 	// server_tunnel if the subsequent WebRTC dials fail.
-	params := getBaseAPIParameters(baseParametersNoDialParameters, config, nil)
+	params := getBaseAPIParameters(
+		baseParametersNoDialParameters, true, config, nil)
 
 	// The debugLogging flag is passed to both NoticeCommonLogger and to the
 	// inproxy package as well; skipping debug logs in the inproxy package,