Browse Source

Add protocol targeting to meek rate limiter

- This change also moves meek rate limit
  enforcement to after the meek cookie
  validation.
Rod Hynes 4 years ago
parent
commit
bd4a1f26a5

+ 17 - 4
psiphon/common/protocol/protocol.go

@@ -253,14 +253,27 @@ func TunnelProtocolMayUseServerPacketManipulation(protocol string) bool {
 		protocol == TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET
 }
 
-func UseClientTunnelProtocol(
+func IsValidClientTunnelProtocol(
 	clientProtocol string,
+	listenerProtocol string,
 	serverProtocols TunnelProtocols) bool {
 
+	if !common.Contains(serverProtocols, clientProtocol) {
+		return false
+	}
+
+	// If the client reports the same tunnel protocol as the listener, the value
+	// is valid.
+
+	if clientProtocol == listenerProtocol {
+		return true
+	}
+
 	// When the server is running multiple fronted protocols, and the client
-	// reports a fronted protocol, use the client's reported tunnel protocol
-	// since some CDNs forward several protocols to the same server port; in this
-	// case the server port is not sufficient to distinguish these protocols.
+	// reports a fronted protocol, the client's reported tunnel protocol is
+	// presumed to be valid since some CDNs forward several protocols to the same
+	// server port; in this case the listener port is not sufficient to
+	// distinguish these protocols.
 
 	if !TunnelProtocolUsesFrontedMeek(clientProtocol) {
 		return false

+ 12 - 0
psiphon/server/config.go

@@ -425,6 +425,7 @@ type Config struct {
 	stopEstablishTunnelsEstablishedClientThreshold int
 	dumpProfilesOnStopEstablishTunnelsDone         int32
 	frontingProviderID                             string
+	runningProtocols                               []string
 }
 
 // GetLogFileReopenConfig gets the reopen retries, and create/mode inputs for
@@ -497,6 +498,12 @@ func (config *Config) GetFrontingProviderID() string {
 	return config.frontingProviderID
 }
 
+// GetRunningProtocols returns the list of protcols this server is running.
+// The caller must not mutate the return value.
+func (config *Config) GetRunningProtocols() []string {
+	return config.runningProtocols
+}
+
 // LoadConfig loads and validates a JSON encoded server config.
 func LoadConfig(configJSON []byte) (*Config, error) {
 
@@ -658,6 +665,11 @@ func LoadConfig(configJSON []byte) (*Config, error) {
 		}
 	}
 
+	config.runningProtocols = []string{}
+	for tunnelProtocol := range config.TunnelProtocolPorts {
+		config.runningProtocols = append(config.runningProtocols, tunnelProtocol)
+	}
+
 	return &config, nil
 }
 

+ 42 - 8
psiphon/server/meek.go

@@ -630,10 +630,6 @@ func (server *MeekServer) getSessionOrEndpoint(
 		}
 	}
 
-	if server.rateLimit(clientIP) {
-		return "", nil, nil, "", "", errors.TraceNew("rate limit exceeded")
-	}
-
 	// The session is new (or expired). Treat the cookie value as a new meek
 	// cookie, extract the payload, and create a new session.
 
@@ -651,6 +647,32 @@ func (server *MeekServer) getSessionOrEndpoint(
 		return "", nil, nil, "", "", errors.Trace(err)
 	}
 
+	tunnelProtocol := server.listenerTunnelProtocol
+
+	if clientSessionData.ClientTunnelProtocol != "" {
+
+		if !protocol.IsValidClientTunnelProtocol(
+			clientSessionData.ClientTunnelProtocol,
+			server.listenerTunnelProtocol,
+			server.support.Config.GetRunningProtocols()) {
+
+			return "", nil, nil, "", "", errors.Tracef(
+				"invalid client tunnel protocol: %s", clientSessionData.ClientTunnelProtocol)
+		}
+
+		tunnelProtocol = clientSessionData.ClientTunnelProtocol
+	}
+
+	// Any rate limit is enforced after the meek cookie is validated, so a prober
+	// without the obfuscation secret will be unable to fingerprint the server
+	// based on response time combined with the rate limit configuration. The
+	// rate limit is primarily intended to limit memory resource consumption and
+	// not the overhead incurred by cookie validation.
+
+	if server.rateLimit(clientIP, tunnelProtocol) {
+		return "", nil, nil, "", "", errors.TraceNew("rate limit exceeded")
+	}
+
 	// Handle endpoints before enforcing CheckEstablishTunnels.
 	// Currently, endpoints are tactics requests, and we allow these to be
 	// handled by servers which would otherwise reject new tunnels.
@@ -729,15 +751,27 @@ func (server *MeekServer) getSessionOrEndpoint(
 	return sessionID, session, underlyingConn, "", "", nil
 }
 
-func (server *MeekServer) rateLimit(clientIP string) bool {
+func (server *MeekServer) rateLimit(clientIP string, tunnelProtocol string) bool {
 
-	historySize, thresholdSeconds, regions, ISPs, cities, GCTriggerCount, _ :=
+	historySize,
+		thresholdSeconds,
+		tunnelProtocols,
+		regions,
+		ISPs,
+		cities,
+		GCTriggerCount, _ :=
 		server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
 
 	if historySize == 0 {
 		return false
 	}
 
+	if len(tunnelProtocols) > 0 {
+		if !common.Contains(tunnelProtocols, tunnelProtocol) {
+			return false
+		}
+	}
+
 	if len(regions) > 0 || len(ISPs) > 0 || len(cities) > 0 {
 
 		// TODO: avoid redundant GeoIP lookups?
@@ -811,7 +845,7 @@ func (server *MeekServer) rateLimit(clientIP string) bool {
 
 func (server *MeekServer) rateLimitWorker() {
 
-	_, _, _, _, _, _, reapFrequencySeconds :=
+	_, _, _, _, _, _, _, reapFrequencySeconds :=
 		server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
 
 	timer := time.NewTimer(time.Duration(reapFrequencySeconds) * time.Second)
@@ -821,7 +855,7 @@ func (server *MeekServer) rateLimitWorker() {
 		select {
 		case <-timer.C:
 
-			_, thresholdSeconds, _, _, _, _, reapFrequencySeconds :=
+			_, thresholdSeconds, _, _, _, _, _, reapFrequencySeconds :=
 				server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
 
 			server.rateLimitLock.Lock()

+ 29 - 12
psiphon/server/meek_test.go

@@ -37,6 +37,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"golang.org/x/crypto/nacl/box"
 )
 
@@ -400,9 +401,18 @@ func (interruptor *fileDescriptorInterruptor) BindToDevice(fileDescriptor int) (
 }
 
 func TestMeekRateLimiter(t *testing.T) {
+	runTestMeekRateLimiter(t, true)
+	runTestMeekRateLimiter(t, false)
+}
+
+func runTestMeekRateLimiter(t *testing.T, rateLimit bool) {
+
+	attempts := 10
 
 	allowedConnections := 5
-	testDurationSeconds := 10
+	if !rateLimit {
+		allowedConnections = 10
+	}
 
 	// Run meek server
 
@@ -414,14 +424,23 @@ func TestMeekRateLimiter(t *testing.T) {
 	meekCookieEncryptionPrivateKey := base64.StdEncoding.EncodeToString(rawMeekCookieEncryptionPrivateKey[:])
 	meekObfuscatedKey := prng.HexString(SSH_OBFUSCATED_KEY_BYTE_LENGTH)
 
+	tunnelProtocol := protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK
+
+	meekRateLimiterTunnelProtocols := []string{tunnelProtocol}
+	if !rateLimit {
+		meekRateLimiterTunnelProtocols = []string{protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS}
+	}
+
 	mockSupport := &SupportServices{
 		Config: &Config{
 			MeekObfuscatedKey:              meekObfuscatedKey,
 			MeekCookieEncryptionPrivateKey: meekCookieEncryptionPrivateKey,
+			TunnelProtocolPorts:            map[string]int{tunnelProtocol: 0},
 		},
 		TrafficRulesSet: &TrafficRulesSet{
 			MeekRateLimiterHistorySize:                   allowedConnections,
-			MeekRateLimiterThresholdSeconds:              testDurationSeconds,
+			MeekRateLimiterThresholdSeconds:              attempts,
+			MeekRateLimiterTunnelProtocols:               meekRateLimiterTunnelProtocols,
 			MeekRateLimiterGarbageCollectionTriggerCount: 1,
 			MeekRateLimiterReapHistoryFrequencySeconds:   1,
 		},
@@ -444,7 +463,7 @@ func TestMeekRateLimiter(t *testing.T) {
 	server, err := NewMeekServer(
 		mockSupport,
 		listener,
-		"",
+		tunnelProtocol,
 		0,
 		useTLS,
 		isFronted,
@@ -486,15 +505,13 @@ func TestMeekRateLimiter(t *testing.T) {
 	}()
 
 	// Run meek clients:
-	// For 10 seconds, connect once per second vs. rate limit of 5-per-10 seconds,
+	// For 10 attempts, connect once per second vs. rate limit of 5-per-10 seconds,
 	// so about half of the connections should be rejected by the rate limiter.
 
-	stopTime := time.Now().Add(time.Duration(testDurationSeconds) * time.Second)
-
 	totalConnections := 0
 	totalFailures := 0
 
-	for {
+	for i := 0; i < attempts; i++ {
 
 		dialConfig := &psiphon.DialConfig{}
 
@@ -541,14 +558,14 @@ func TestMeekRateLimiter(t *testing.T) {
 			totalConnections += 1
 		}
 
-		if !time.Now().Before(stopTime) {
-			break
+		if i < attempts-1 {
+			time.Sleep(1 * time.Second)
 		}
-
-		time.Sleep(1 * time.Second)
 	}
 
-	if totalConnections != allowedConnections || totalFailures == 0 {
+	if totalConnections != allowedConnections ||
+		totalFailures != attempts-totalConnections {
+
 		t.Fatalf(
 			"Unexpected results: %d connections, %d failures",
 			totalConnections, totalFailures)

+ 19 - 3
psiphon/server/trafficRules.go

@@ -70,8 +70,16 @@ type TrafficRulesSet struct {
 	// any client endpoint request or any request to create a new session, but
 	// not any meek request for an existing session, if the
 	// MeekRateLimiterHistorySize requests occur in
-	// MeekRateLimiterThresholdSeconds. The scope of rate limiting may be
-	// limited using LimitMeekRateLimiterRegions/ISPs/Cities.
+	// MeekRateLimiterThresholdSeconds.
+	//
+	// A use case for the the meek rate limiter is to mitigate dangling resource
+	// usage that results from meek connections that are partially established
+	// and then interrupted (e.g, drop packets after allowing up to the initial
+	// HTTP request and header lines). In the case of CDN fronted meek, the CDN
+	// itself may hold open the interrupted connection.
+	//
+	// The scope of rate limiting may be
+	// limited using LimitMeekRateLimiterTunnelProtocols/Regions/ISPs/Cities.
 	//
 	// Hot reloading a new history size will result in existing history being
 	// truncated.
@@ -81,6 +89,11 @@ type TrafficRulesSet struct {
 	// specification and must be set when MeekRateLimiterHistorySize is set.
 	MeekRateLimiterThresholdSeconds int
 
+	// MeekRateLimiterTunnelProtocols, if set, limits application of the meek
+	// late-stage rate limiter to the specified meek protocols. When omitted or
+	// empty, meek rate limiting is applied to all meek protocols.
+	MeekRateLimiterTunnelProtocols []string
+
 	// MeekRateLimiterRegions, if set, limits application of the meek
 	// late-stage rate limiter to clients in the specified list of GeoIP
 	// countries. When omitted or empty, meek rate limiting, if configured,
@@ -317,6 +330,7 @@ func NewTrafficRulesSet(filename string) (*TrafficRulesSet, error) {
 			// Modify actual traffic rules only after validation
 			set.MeekRateLimiterHistorySize = newSet.MeekRateLimiterHistorySize
 			set.MeekRateLimiterThresholdSeconds = newSet.MeekRateLimiterThresholdSeconds
+			set.MeekRateLimiterTunnelProtocols = newSet.MeekRateLimiterTunnelProtocols
 			set.MeekRateLimiterRegions = newSet.MeekRateLimiterRegions
 			set.MeekRateLimiterISPs = newSet.MeekRateLimiterISPs
 			set.MeekRateLimiterCities = newSet.MeekRateLimiterCities
@@ -906,7 +920,8 @@ func (rules *TrafficRules) allowSubnet(remoteIP net.IP) bool {
 
 // GetMeekRateLimiterConfig gets a snapshot of the meek rate limiter
 // configuration values.
-func (set *TrafficRulesSet) GetMeekRateLimiterConfig() (int, int, []string, []string, []string, int, int) {
+func (set *TrafficRulesSet) GetMeekRateLimiterConfig() (
+	int, int, []string, []string, []string, []string, int, int) {
 
 	set.ReloadableFile.RLock()
 	defer set.ReloadableFile.RUnlock()
@@ -924,6 +939,7 @@ func (set *TrafficRulesSet) GetMeekRateLimiterConfig() (int, int, []string, []st
 
 	return set.MeekRateLimiterHistorySize,
 		set.MeekRateLimiterThresholdSeconds,
+		set.MeekRateLimiterTunnelProtocols,
 		set.MeekRateLimiterRegions,
 		set.MeekRateLimiterISPs,
 		set.MeekRateLimiterCities,

+ 7 - 25
psiphon/server/tunnelServer.go

@@ -462,11 +462,6 @@ func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
 // occurs, it will send the error to the listenerError channel.
 func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {
 
-	runningProtocols := make([]string, 0)
-	for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
-		runningProtocols = append(runningProtocols, tunnelProtocol)
-	}
-
 	handleClient := func(clientTunnelProtocol string, clientConn net.Conn) {
 
 		// Note: establish tunnel limiter cannot simply stop TCP
@@ -479,28 +474,15 @@ func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError
 			return
 		}
 
-		// The tunnelProtocol passed to handleClient is used for stats,
-		// throttling, etc. When the tunnel protocol can be determined
-		// unambiguously from the listening port, use that protocol and
-		// don't use any client-declared value. Only use the client's
-		// value, if present, in special cases where the listening port
-		// cannot distinguish the protocol.
+		// tunnelProtocol is used for stats and traffic rules. In many cases, its
+		// value is unambiguously determined by the listener port. In certain cases,
+		// such as multiple fronted protocols with a single backend listener, the
+		// client's reported tunnel protocol value is used. The caller must validate
+		// clientTunnelProtocol with protocol.IsValidClientTunnelProtocol.
+
 		tunnelProtocol := sshListener.tunnelProtocol
 		if clientTunnelProtocol != "" {
-
-			if !common.Contains(runningProtocols, clientTunnelProtocol) {
-				log.WithTraceFields(
-					LogFields{
-						"clientTunnelProtocol": clientTunnelProtocol}).
-					Warning("invalid client tunnel protocol")
-				clientConn.Close()
-				return
-			}
-
-			if protocol.UseClientTunnelProtocol(
-				clientTunnelProtocol, runningProtocols) {
-				tunnelProtocol = clientTunnelProtocol
-			}
+			tunnelProtocol = clientTunnelProtocol
 		}
 
 		// sshListener.tunnelProtocol indictes the tunnel protocol run by the