Explorar el Código

TrafficRules changes for meek rate limiter

- Move meek rate limit configuration to hot-reloadable
  traffic rules

- Add MeekRateLimiterRegions

- Add additional TrafficRules validation

- Fix: AllowSubnets in FilteredRules not applied

- Increase default MeekRateLimiterGarbageCollectionTriggerCount

- Passively exercise meek rate limiter in server_test
Rod Hynes hace 7 años
padre
commit
26897ec8f9

+ 0 - 38
psiphon/server/config.go

@@ -214,33 +214,6 @@ type Config struct {
 	// is 0.
 	MeekCachedResponsePoolBufferCount int
 
-	// MeekRateLimiterHistorySize enables the late-stage meek rate limiter and
-	// sets its history size. The late-stage meek rate limiter acts on client
-	// IPs relayed in MeekProxyForwardedForHeaders, and so it must wait for
-	// the HTTP headers to be read. This rate limiter immediately terminates
-	// any client endpoint request or any request to create a new session, but
-	// not any meek request for an existing session, if the
-	// MeekRateLimiterHistorySize requests occur in MeekRateLimiterThresholdSeconds.
-	MeekRateLimiterHistorySize int
-
-	// MeekRateLimiterThresholdSeconds is part of the meek rate limiter
-	// specification and must be set when MeekRateLimiterHistorySize is set.
-	MeekRateLimiterThresholdSeconds int
-
-	// MeekRateLimiterGarbageCollectionTriggerCount specifies the number of
-	// rate limit events after which garbage collection is manually triggered
-	// in order to reclaim memory used by rate limited and other rejected
-	// requests.
-	// A default of 1000 is used when
-	// MeekRateLimiterGarbageCollectionTriggerCount is 0.
-	MeekRateLimiterGarbageCollectionTriggerCount int
-
-	// MeekRateLimiterReapHistoryFrequencySeconds specifies a schedule for
-	// reaping old records from the rate limit history.
-	// A default of 600 is used when
-	// MeekRateLimiterReapHistoryFrequencySeconds is 0.
-	MeekRateLimiterReapHistoryFrequencySeconds int
-
 	// UDPInterceptUdpgwServerAddress specifies the network address of
 	// a udpgw server which clients may be port forwarding to. When
 	// specified, these TCP port forwards are intercepted and handled
@@ -349,11 +322,6 @@ func (config *Config) RunPeriodicGarbageCollection() bool {
 	return config.PeriodicGarbageCollectionSeconds > 0
 }
 
-// ...
-func (config *Config) RunMeekRateLimiter() bool {
-	return config.MeekRateLimiterHistorySize > 0
-}
-
 // LoadConfig loads and validates a JSON encoded server config.
 func LoadConfig(configJSON []byte) (*Config, error) {
 
@@ -440,12 +408,6 @@ func LoadConfig(configJSON []byte) (*Config, error) {
 			"AccessControlVerificationKeyRing is invalid: %s", err)
 	}
 
-	if config.MeekRateLimiterHistorySize > 0 {
-		if config.MeekRateLimiterThresholdSeconds <= 0 {
-			return nil, errors.New("Meek rate limiter requires MeekRateLimiterThresholdSeconds")
-		}
-	}
-
 	return &config, nil
 }
 

+ 51 - 51
psiphon/server/meek.go

@@ -70,18 +70,16 @@ const (
 	// when retrying a request for a partially downloaded response payload.
 	MEEK_PROTOCOL_VERSION_3 = 3
 
-	MEEK_MAX_REQUEST_PAYLOAD_LENGTH                           = 65536
-	MEEK_TURN_AROUND_TIMEOUT                                  = 20 * time.Millisecond
-	MEEK_EXTENDED_TURN_AROUND_TIMEOUT                         = 100 * time.Millisecond
-	MEEK_MAX_SESSION_STALENESS                                = 45 * time.Second
-	MEEK_HTTP_CLIENT_IO_TIMEOUT                               = 45 * time.Second
-	MEEK_MIN_SESSION_ID_LENGTH                                = 8
-	MEEK_MAX_SESSION_ID_LENGTH                                = 20
-	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH                       = 65536
-	MEEK_DEFAULT_POOL_BUFFER_LENGTH                           = 65536
-	MEEK_DEFAULT_POOL_BUFFER_COUNT                            = 2048
-	MEEK_DEFAULT_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT = 1000
-	MEEK_DEFAULT_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS  = 600
+	MEEK_MAX_REQUEST_PAYLOAD_LENGTH     = 65536
+	MEEK_TURN_AROUND_TIMEOUT            = 20 * time.Millisecond
+	MEEK_EXTENDED_TURN_AROUND_TIMEOUT   = 100 * time.Millisecond
+	MEEK_MAX_SESSION_STALENESS          = 45 * time.Second
+	MEEK_HTTP_CLIENT_IO_TIMEOUT         = 45 * time.Second
+	MEEK_MIN_SESSION_ID_LENGTH          = 8
+	MEEK_MAX_SESSION_ID_LENGTH          = 20
+	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
+	MEEK_DEFAULT_POOL_BUFFER_LENGTH     = 65536
+	MEEK_DEFAULT_POOL_BUFFER_COUNT      = 2048
 )
 
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
@@ -135,19 +133,16 @@ func NewMeekServer(
 	bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
 
 	meekServer := &MeekServer{
-		support:       support,
-		listener:      listener,
-		clientHandler: clientHandler,
-		openConns:     new(common.Conns),
-		stopBroadcast: stopBroadcast,
-		sessions:      make(map[string]*meekSession),
-		checksumTable: checksumTable,
-		bufferPool:    bufferPool,
-	}
-
-	if support.Config.RunMeekRateLimiter() {
-		meekServer.rateLimitHistory = make(map[string][]monotime.Time)
-		meekServer.rateLimitSignalGC = make(chan struct{}, 1)
+		support:           support,
+		listener:          listener,
+		clientHandler:     clientHandler,
+		openConns:         new(common.Conns),
+		stopBroadcast:     stopBroadcast,
+		sessions:          make(map[string]*meekSession),
+		checksumTable:     checksumTable,
+		bufferPool:        bufferPool,
+		rateLimitHistory:  make(map[string][]monotime.Time),
+		rateLimitSignalGC: make(chan struct{}, 1),
 	}
 
 	if useTLS {
@@ -187,13 +182,11 @@ func (server *MeekServer) Run() error {
 		}
 	}()
 
-	if server.support.Config.RunMeekRateLimiter() {
-		waitGroup.Add(1)
-		go func() {
-			defer waitGroup.Done()
-			server.rateLimitWorker()
-		}()
-	}
+	waitGroup.Add(1)
+	go func() {
+		defer waitGroup.Done()
+		server.rateLimitWorker()
+	}()
 
 	// Serve HTTP or HTTPS
 	//
@@ -660,22 +653,31 @@ func (server *MeekServer) getSessionOrEndpoint(
 
 func (server *MeekServer) rateLimit(clientIP string) bool {
 
-	if !server.support.Config.RunMeekRateLimiter() {
+	historySize, thresholdSeconds, regions, GCTriggerCount, _ :=
+		server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
+
+	if historySize == 0 {
 		return false
 	}
 
+	if len(regions) > 0 {
+		// TODO: avoid redundant GeoIP lookups?
+		if !common.Contains(regions, server.support.GeoIPService.Lookup(clientIP).Country) {
+			return false
+		}
+	}
+
 	limit := true
 	triggerGC := false
 
 	now := monotime.Now()
-	threshold := now.Add(
-		-time.Duration(server.support.Config.MeekRateLimiterThresholdSeconds) * time.Second)
+	threshold := now.Add(-time.Duration(thresholdSeconds) * time.Second)
 
 	server.rateLimitLock.Lock()
 
 	history, ok := server.rateLimitHistory[clientIP]
-	if !ok {
-		history = make([]monotime.Time, server.support.Config.MeekRateLimiterHistorySize)
+	if !ok || len(history) != historySize {
+		history = make([]monotime.Time, historySize)
 		server.rateLimitHistory[clientIP] = history
 	}
 
@@ -694,11 +696,7 @@ func (server *MeekServer) rateLimit(clientIP string) bool {
 
 		server.rateLimitCount += 1
 
-		triggerCount := server.support.Config.MeekRateLimiterGarbageCollectionTriggerCount
-		if triggerCount <= 0 {
-			triggerCount = MEEK_DEFAULT_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT
-		}
-		if server.rateLimitCount >= triggerCount {
+		if server.rateLimitCount >= GCTriggerCount {
 			triggerGC = true
 			server.rateLimitCount = 0
 		}
@@ -718,22 +716,22 @@ func (server *MeekServer) rateLimit(clientIP string) bool {
 
 func (server *MeekServer) rateLimitWorker() {
 
-	frequencySeconds := server.support.Config.MeekRateLimiterReapHistoryFrequencySeconds
-	if frequencySeconds <= 0 {
-		frequencySeconds = MEEK_DEFAULT_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS
-	}
+	_, _, _, _, reapFrequencySeconds :=
+		server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
 
-	ticker := time.NewTicker(time.Duration(frequencySeconds) * time.Second)
-	defer ticker.Stop()
+	timer := time.NewTimer(time.Duration(reapFrequencySeconds) * time.Second)
+	defer timer.Stop()
 
 	for {
 		select {
-		case <-ticker.C:
+		case <-timer.C:
+
+			_, thresholdSeconds, _, _, reapFrequencySeconds :=
+				server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
 
 			server.rateLimitLock.Lock()
 
-			threshold := monotime.Now().Add(
-				-time.Duration(server.support.Config.MeekRateLimiterThresholdSeconds) * time.Second)
+			threshold := monotime.Now().Add(-time.Duration(thresholdSeconds) * time.Second)
 
 			for key, history := range server.rateLimitHistory {
 				reap := true
@@ -754,6 +752,8 @@ func (server *MeekServer) rateLimitWorker() {
 
 			server.rateLimitLock.Unlock()
 
+			timer.Reset(time.Duration(reapFrequencySeconds) * time.Second)
+
 		case <-server.rateLimitSignalGC:
 			runtime.GC()
 

+ 1 - 0
psiphon/server/meek_test.go

@@ -240,6 +240,7 @@ func TestMeekResiliency(t *testing.T) {
 			MeekObfuscatedKey:              meekObfuscatedKey,
 			MeekCookieEncryptionPrivateKey: meekCookieEncryptionPrivateKey,
 		},
+		TrafficRulesSet: &TrafficRulesSet{},
 	}
 
 	listener, err := net.Listen("tcp", "127.0.0.1:0")

+ 6 - 1
psiphon/server/server_test.go

@@ -1037,7 +1037,12 @@ func paveTrafficRulesFile(
                 "WriteBytesPerSecond": 16384
             },
             "AllowTCPPorts" : [0],
-            "AllowUDPPorts" : [0]
+            "AllowUDPPorts" : [0],
+            "MeekRateLimiterHistorySize" : 10,
+            "MeekRateLimiterThresholdSeconds" : 1,
+            "MeekRateLimiterGarbageCollectionTriggerCount" : 1,
+            "MeekRateLimiterReapHistoryFrequencySeconds" : 1,
+            "MeekRateLimiterRegions" : []
         },
         "FilteredRules" : [
             {

+ 111 - 6
psiphon/server/trafficRules.go

@@ -21,6 +21,7 @@ package server
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"net"
 
@@ -28,12 +29,14 @@ import (
 )
 
 const (
-	DEFAULT_IDLE_TCP_PORT_FORWARD_TIMEOUT_MILLISECONDS = 30000
-	DEFAULT_IDLE_UDP_PORT_FORWARD_TIMEOUT_MILLISECONDS = 30000
-	DEFAULT_DIAL_TCP_PORT_FORWARD_TIMEOUT_MILLISECONDS = 10000
-	DEFAULT_MAX_TCP_DIALING_PORT_FORWARD_COUNT         = 64
-	DEFAULT_MAX_TCP_PORT_FORWARD_COUNT                 = 512
-	DEFAULT_MAX_UDP_PORT_FORWARD_COUNT                 = 32
+	DEFAULT_IDLE_TCP_PORT_FORWARD_TIMEOUT_MILLISECONDS        = 30000
+	DEFAULT_IDLE_UDP_PORT_FORWARD_TIMEOUT_MILLISECONDS        = 30000
+	DEFAULT_DIAL_TCP_PORT_FORWARD_TIMEOUT_MILLISECONDS        = 10000
+	DEFAULT_MAX_TCP_DIALING_PORT_FORWARD_COUNT                = 64
+	DEFAULT_MAX_TCP_PORT_FORWARD_COUNT                        = 512
+	DEFAULT_MAX_UDP_PORT_FORWARD_COUNT                        = 32
+	DEFAULT_MEEK_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT = 5000
+	DEFAULT_MEEK_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS  = 600
 )
 
 // TrafficRulesSet represents the various traffic rules to
@@ -59,6 +62,44 @@ type TrafficRulesSet struct {
 		Filter TrafficRulesFilter
 		Rules  TrafficRules
 	}
+
+	// MeekRateLimiterHistorySize enables the late-stage meek rate limiter and
+	// sets its history size. The late-stage meek rate limiter acts on client
+	// IPs relayed in MeekProxyForwardedForHeaders, and so it must wait for
+	// the HTTP headers to be read. This rate limiter immediately terminates
+	// any client endpoint request or any request to create a new session, but
+	// not any meek request for an existing session, if the
+	// MeekRateLimiterHistorySize requests occur in
+	// MeekRateLimiterThresholdSeconds. The scope of rate limiting may be
+	// limited using LimitMeekRateLimiterRegions.
+	//
+	// Hot reloading a new history size will result in existing history being
+	// truncated.
+	MeekRateLimiterHistorySize int
+
+	// MeekRateLimiterThresholdSeconds is part of the meek rate limiter
+	// specification and must be set when MeekRateLimiterHistorySize is set.
+	MeekRateLimiterThresholdSeconds int
+
+	// MeekRateLimiterRegions, if set, limits application of the meek
+	// late-stage rate limiter to clients in the specified list of GeoIP
+	// countries. When omitted or empty, meek rate limiting, if configured,
+	// is applied to all clients.
+	MeekRateLimiterRegions []string
+
+	// MeekRateLimiterGarbageCollectionTriggerCount specifies the number of
+	// rate limit events after which garbage collection is manually triggered
+	// in order to reclaim memory used by rate limited and other rejected
+	// requests.
+	// A default of 5000 is used when
+	// MeekRateLimiterGarbageCollectionTriggerCount is 0.
+	MeekRateLimiterGarbageCollectionTriggerCount int
+
+	// MeekRateLimiterReapHistoryFrequencySeconds specifies a schedule for
+	// reaping old records from the rate limit history.
+	// A default of 600 is used when
+	// MeekRateLimiterReapHistoryFrequencySeconds is 0.
+	MeekRateLimiterReapHistoryFrequencySeconds int
 }
 
 // TrafficRulesFilter defines a filter to match against client attributes.
@@ -232,7 +273,37 @@ func NewTrafficRulesSet(filename string) (*TrafficRulesSet, error) {
 // Validate checks for correct input formats in a TrafficRulesSet.
 func (set *TrafficRulesSet) Validate() error {
 
+	if set.MeekRateLimiterHistorySize < 0 ||
+		set.MeekRateLimiterThresholdSeconds < 0 ||
+		set.MeekRateLimiterGarbageCollectionTriggerCount < 0 ||
+		set.MeekRateLimiterReapHistoryFrequencySeconds < 0 {
+		return common.ContextError(
+			errors.New("MeekRateLimiter values must be >= 0"))
+	}
+
+	if set.MeekRateLimiterHistorySize > 0 {
+		if set.MeekRateLimiterThresholdSeconds <= 0 {
+			return common.ContextError(
+				errors.New("MeekRateLimiterThresholdSeconds must be > 0"))
+		}
+	}
+
 	validateTrafficRules := func(rules *TrafficRules) error {
+
+		if (rules.RateLimits.ReadUnthrottledBytes != nil && *rules.RateLimits.ReadUnthrottledBytes < 0) ||
+			(rules.RateLimits.ReadBytesPerSecond != nil && *rules.RateLimits.ReadBytesPerSecond < 0) ||
+			(rules.RateLimits.WriteUnthrottledBytes != nil && *rules.RateLimits.WriteUnthrottledBytes < 0) ||
+			(rules.RateLimits.WriteBytesPerSecond != nil && *rules.RateLimits.WriteBytesPerSecond < 0) ||
+			(rules.DialTCPPortForwardTimeoutMilliseconds != nil && *rules.DialTCPPortForwardTimeoutMilliseconds < 0) ||
+			(rules.IdleTCPPortForwardTimeoutMilliseconds != nil && *rules.IdleTCPPortForwardTimeoutMilliseconds < 0) ||
+			(rules.IdleUDPPortForwardTimeoutMilliseconds != nil && *rules.IdleUDPPortForwardTimeoutMilliseconds < 0) ||
+			(rules.MaxTCPDialingPortForwardCount != nil && *rules.MaxTCPDialingPortForwardCount < 0) ||
+			(rules.MaxTCPPortForwardCount != nil && *rules.MaxTCPPortForwardCount < 0) ||
+			(rules.MaxUDPPortForwardCount != nil && *rules.MaxUDPPortForwardCount < 0) {
+			return common.ContextError(
+				errors.New("TrafficRules values must be >= 0"))
+		}
+
 		for _, subnet := range rules.AllowSubnets {
 			_, _, err := net.ParseCIDR(subnet)
 			if err != nil {
@@ -240,6 +311,7 @@ func (set *TrafficRulesSet) Validate() error {
 					fmt.Errorf("invalid subnet: %s %s", subnet, err))
 			}
 		}
+
 		return nil
 	}
 
@@ -369,6 +441,10 @@ func (set *TrafficRulesSet) GetTrafficRules(
 		trafficRules.AllowUDPPorts = make([]int, 0)
 	}
 
+	if trafficRules.AllowSubnets == nil {
+		trafficRules.AllowSubnets = make([]string, 0)
+	}
+
 	// TODO: faster lookup?
 	for _, filteredRules := range set.FilteredRules {
 
@@ -496,6 +572,10 @@ func (set *TrafficRulesSet) GetTrafficRules(
 			trafficRules.AllowUDPPorts = filteredRules.Rules.AllowUDPPorts
 		}
 
+		if filteredRules.Rules.AllowSubnets != nil {
+			trafficRules.AllowSubnets = filteredRules.Rules.AllowSubnets
+		}
+
 		break
 	}
 
@@ -508,3 +588,28 @@ func (set *TrafficRulesSet) GetTrafficRules(
 
 	return trafficRules
 }
+
+// GetMeekRateLimiterConfig gets a snapshot of the meek rate limiter
+// configuration values.
+func (set *TrafficRulesSet) GetMeekRateLimiterConfig() (int, int, []string, int, int) {
+
+	set.ReloadableFile.RLock()
+	defer set.ReloadableFile.RUnlock()
+
+	GCTriggerCount := set.MeekRateLimiterGarbageCollectionTriggerCount
+	if GCTriggerCount <= 0 {
+		GCTriggerCount = DEFAULT_MEEK_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT
+	}
+
+	reapFrequencySeconds := set.MeekRateLimiterReapHistoryFrequencySeconds
+	if reapFrequencySeconds <= 0 {
+		reapFrequencySeconds = DEFAULT_MEEK_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS
+
+	}
+
+	return set.MeekRateLimiterHistorySize,
+		set.MeekRateLimiterThresholdSeconds,
+		set.MeekRateLimiterRegions,
+		GCTriggerCount,
+		reapFrequencySeconds
+}