ソースを参照

Add meek rate limiter

Rod Hynes 7 年 前
コミット
650d48b6c7
2 ファイル変更212 行追加48 行削除
  1. 38 0
      psiphon/server/config.go
  2. 174 48
      psiphon/server/meek.go

+ 38 - 0
psiphon/server/config.go

@@ -214,6 +214,33 @@ type Config struct {
 	// is 0.
 	// is 0.
 	MeekCachedResponsePoolBufferCount int
 	MeekCachedResponsePoolBufferCount int
 
 
+	// MeekRateLimiterHistorySize enables the late-stage meek rate limiter and
+	// sets its history size. The late-stage meek rate limiter acts on client
+	// IPs relayed in MeekProxyForwardedForHeaders, and so it must wait for
+	// the HTTP headers to be read. This rate limiter immediately terminates
+	// any client endpoint request or any request to create a new session, but
+	// not any meek request for an existing session, if the
+	// MeekRateLimiterHistorySize requests occur in MeekRateLimiterThresholdSeconds.
+	MeekRateLimiterHistorySize int
+
+	// MeekRateLimiterThresholdSeconds is part of the meek rate limiter
+	// specification and must be set when MeekRateLimiterHistorySize is set.
+	MeekRateLimiterThresholdSeconds int
+
+	// MeekRateLimiterGarbageCollectionTriggerCount specifies the number of
+	// rate limit events after which garbage collection is manually triggered
+	// in order to reclaim memory used by rate limited and other rejected
+	// requests.
+	// A default of 1000 is used when
+	// MeekRateLimiterGarbageCollectionTriggerCount is 0.
+	MeekRateLimiterGarbageCollectionTriggerCount int
+
+	// MeekRateLimiterReapHistoryFrequencySeconds specifies a schedule for
+	// reaping old records from the rate limit history.
+	// A default of 600 is used when
+	// MeekRateLimiterReapHistoryFrequencySeconds is 0.
+	MeekRateLimiterReapHistoryFrequencySeconds int
+
 	// UDPInterceptUdpgwServerAddress specifies the network address of
 	// UDPInterceptUdpgwServerAddress specifies the network address of
 	// a udpgw server which clients may be port forwarding to. When
 	// a udpgw server which clients may be port forwarding to. When
 	// specified, these TCP port forwards are intercepted and handled
 	// specified, these TCP port forwards are intercepted and handled
@@ -322,6 +349,11 @@ func (config *Config) RunPeriodicGarbageCollection() bool {
 	return config.PeriodicGarbageCollectionSeconds > 0
 	return config.PeriodicGarbageCollectionSeconds > 0
 }
 }
 
 
+// ...
+func (config *Config) RunMeekRateLimiter() bool {
+	return config.MeekRateLimiterHistorySize > 0
+}
+
 // LoadConfig loads and validates a JSON encoded server config.
 // LoadConfig loads and validates a JSON encoded server config.
 func LoadConfig(configJSON []byte) (*Config, error) {
 func LoadConfig(configJSON []byte) (*Config, error) {
 
 
@@ -408,6 +440,12 @@ func LoadConfig(configJSON []byte) (*Config, error) {
 			"AccessControlVerificationKeyRing is invalid: %s", err)
 			"AccessControlVerificationKeyRing is invalid: %s", err)
 	}
 	}
 
 
+	if config.MeekRateLimiterHistorySize > 0 {
+		if config.MeekRateLimiterThresholdSeconds <= 0 {
+			return nil, errors.New("Meek rate limiter requires MeekRateLimiterThresholdSeconds")
+		}
+	}
+
 	return &config, nil
 	return &config, nil
 }
 }
 
 

+ 174 - 48
psiphon/server/meek.go

@@ -31,6 +31,7 @@ import (
 	"io"
 	"io"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
+	"runtime"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
@@ -69,16 +70,18 @@ const (
 	// when retrying a request for a partially downloaded response payload.
 	// when retrying a request for a partially downloaded response payload.
 	MEEK_PROTOCOL_VERSION_3 = 3
 	MEEK_PROTOCOL_VERSION_3 = 3
 
 
-	MEEK_MAX_REQUEST_PAYLOAD_LENGTH     = 65536
-	MEEK_TURN_AROUND_TIMEOUT            = 20 * time.Millisecond
-	MEEK_EXTENDED_TURN_AROUND_TIMEOUT   = 100 * time.Millisecond
-	MEEK_MAX_SESSION_STALENESS          = 45 * time.Second
-	MEEK_HTTP_CLIENT_IO_TIMEOUT         = 45 * time.Second
-	MEEK_MIN_SESSION_ID_LENGTH          = 8
-	MEEK_MAX_SESSION_ID_LENGTH          = 20
-	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
-	MEEK_DEFAULT_POOL_BUFFER_LENGTH     = 65536
-	MEEK_DEFAULT_POOL_BUFFER_COUNT      = 2048
+	MEEK_MAX_REQUEST_PAYLOAD_LENGTH                           = 65536
+	MEEK_TURN_AROUND_TIMEOUT                                  = 20 * time.Millisecond
+	MEEK_EXTENDED_TURN_AROUND_TIMEOUT                         = 100 * time.Millisecond
+	MEEK_MAX_SESSION_STALENESS                                = 45 * time.Second
+	MEEK_HTTP_CLIENT_IO_TIMEOUT                               = 45 * time.Second
+	MEEK_MIN_SESSION_ID_LENGTH                                = 8
+	MEEK_MAX_SESSION_ID_LENGTH                                = 20
+	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH                       = 65536
+	MEEK_DEFAULT_POOL_BUFFER_LENGTH                           = 65536
+	MEEK_DEFAULT_POOL_BUFFER_COUNT                            = 2048
+	MEEK_DEFAULT_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT = 1000
+	MEEK_DEFAULT_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS  = 600
 )
 )
 
 
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
@@ -93,16 +96,20 @@ const (
 // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
 // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
 // the meekConn struct.
 // the meekConn struct.
 type MeekServer struct {
 type MeekServer struct {
-	support       *SupportServices
-	listener      net.Listener
-	tlsConfig     *utls.Config
-	clientHandler func(clientTunnelProtocol string, clientConn net.Conn)
-	openConns     *common.Conns
-	stopBroadcast <-chan struct{}
-	sessionsLock  sync.RWMutex
-	sessions      map[string]*meekSession
-	checksumTable *crc64.Table
-	bufferPool    *CachedResponseBufferPool
+	support           *SupportServices
+	listener          net.Listener
+	tlsConfig         *utls.Config
+	clientHandler     func(clientTunnelProtocol string, clientConn net.Conn)
+	openConns         *common.Conns
+	stopBroadcast     <-chan struct{}
+	sessionsLock      sync.RWMutex
+	sessions          map[string]*meekSession
+	checksumTable     *crc64.Table
+	bufferPool        *CachedResponseBufferPool
+	rateLimitLock     sync.Mutex
+	rateLimitHistory  map[string][]monotime.Time
+	rateLimitCount    int
+	rateLimitSignalGC chan struct{}
 }
 }
 
 
 // NewMeekServer initializes a new meek server.
 // NewMeekServer initializes a new meek server.
@@ -138,6 +145,11 @@ func NewMeekServer(
 		bufferPool:    bufferPool,
 		bufferPool:    bufferPool,
 	}
 	}
 
 
+	if support.Config.RunMeekRateLimiter() {
+		meekServer.rateLimitHistory = make(map[string][]monotime.Time)
+		meekServer.rateLimitSignalGC = make(chan struct{}, 1)
+	}
+
 	if useTLS {
 	if useTLS {
 		tlsConfig, err := makeMeekTLSConfig(
 		tlsConfig, err := makeMeekTLSConfig(
 			support, useObfuscatedSessionTickets)
 			support, useObfuscatedSessionTickets)
@@ -158,12 +170,11 @@ func NewMeekServer(
 // signal specified in NewMeekServer.
 // signal specified in NewMeekServer.
 func (server *MeekServer) Run() error {
 func (server *MeekServer) Run() error {
 
 
-	// Expire sessions
+	waitGroup := new(sync.WaitGroup)
 
 
-	reaperWaitGroup := new(sync.WaitGroup)
-	reaperWaitGroup.Add(1)
+	waitGroup.Add(1)
 	go func() {
 	go func() {
-		defer reaperWaitGroup.Done()
+		defer waitGroup.Done()
 		ticker := time.NewTicker(MEEK_MAX_SESSION_STALENESS / 2)
 		ticker := time.NewTicker(MEEK_MAX_SESSION_STALENESS / 2)
 		defer ticker.Stop()
 		defer ticker.Stop()
 		for {
 		for {
@@ -176,9 +187,16 @@ func (server *MeekServer) Run() error {
 		}
 		}
 	}()
 	}()
 
 
-	// Serve HTTP or HTTPS
+	if server.support.Config.RunMeekRateLimiter() {
+		waitGroup.Add(1)
+		go func() {
+			defer waitGroup.Done()
+			server.rateLimitWorker()
+		}()
+	}
 
 
-	// Notes:
+	// Serve HTTP or HTTPS
+	//
 	// - WriteTimeout may include time awaiting request, as per:
 	// - WriteTimeout may include time awaiting request, as per:
 	//   https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
 	//   https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
 	// - Legacy meek-server wrapped each client HTTP connection with an explicit idle
 	// - Legacy meek-server wrapped each client HTTP connection with an explicit idle
@@ -222,7 +240,7 @@ func (server *MeekServer) Run() error {
 	server.listener.Close()
 	server.listener.Close()
 	server.openConns.CloseAll()
 	server.openConns.CloseAll()
 
 
-	reaperWaitGroup.Wait()
+	waitGroup.Wait()
 
 
 	return err
 	return err
 }
 }
@@ -504,37 +522,19 @@ func checkRangeHeader(request *http.Request) (int, bool) {
 func (server *MeekServer) getSessionOrEndpoint(
 func (server *MeekServer) getSessionOrEndpoint(
 	request *http.Request, meekCookie *http.Cookie) (string, *meekSession, string, string, error) {
 	request *http.Request, meekCookie *http.Cookie) (string, *meekSession, string, string, error) {
 
 
-	// Check for an existing session
+	// Check for an existing session.
 
 
 	server.sessionsLock.RLock()
 	server.sessionsLock.RLock()
 	existingSessionID := meekCookie.Value
 	existingSessionID := meekCookie.Value
 	session, ok := server.sessions[existingSessionID]
 	session, ok := server.sessions[existingSessionID]
 	server.sessionsLock.RUnlock()
 	server.sessionsLock.RUnlock()
 	if ok {
 	if ok {
+		// TODO: can multiple http client connections using same session cookie
+		// cause race conditions on session struct?
 		session.touch()
 		session.touch()
 		return existingSessionID, session, "", "", nil
 		return existingSessionID, session, "", "", nil
 	}
 	}
 
 
-	// TODO: can multiple http client connections using same session cookie
-	// cause race conditions on session struct?
-
-	// The session is new (or expired). Treat the cookie value as a new meek
-	// cookie, extract the payload, and create a new session.
-
-	payloadJSON, err := getMeekCookiePayload(server.support, meekCookie.Value)
-	if err != nil {
-		return "", nil, "", "", common.ContextError(err)
-	}
-
-	// Note: this meek server ignores legacy values PsiphonClientSessionId
-	// and PsiphonServerAddress.
-	var clientSessionData protocol.MeekCookieData
-
-	err = json.Unmarshal(payloadJSON, &clientSessionData)
-	if err != nil {
-		return "", nil, "", "", common.ContextError(err)
-	}
-
 	// Determine the client remote address, which is used for geolocation
 	// Determine the client remote address, which is used for geolocation
 	// and stats. When an intermediate proxy or CDN is in use, we may be
 	// and stats. When an intermediate proxy or CDN is in use, we may be
 	// able to determine the original client address by inspecting HTTP
 	// able to determine the original client address by inspecting HTTP
@@ -560,6 +560,27 @@ func (server *MeekServer) getSessionOrEndpoint(
 		}
 		}
 	}
 	}
 
 
+	if server.rateLimit(clientIP) {
+		return "", nil, "", "", common.ContextError(errors.New("rate limit exceeded"))
+	}
+
+	// The session is new (or expired). Treat the cookie value as a new meek
+	// cookie, extract the payload, and create a new session.
+
+	payloadJSON, err := getMeekCookiePayload(server.support, meekCookie.Value)
+	if err != nil {
+		return "", nil, "", "", common.ContextError(err)
+	}
+
+	// Note: this meek server ignores legacy values PsiphonClientSessionId
+	// and PsiphonServerAddress.
+	var clientSessionData protocol.MeekCookieData
+
+	err = json.Unmarshal(payloadJSON, &clientSessionData)
+	if err != nil {
+		return "", nil, "", "", common.ContextError(err)
+	}
+
 	// Handle endpoints before enforcing the GetEstablishTunnels check.
 	// Handle endpoints before enforcing the GetEstablishTunnels check.
 	// Currently, endpoints are tactics requests, and we allow these to be
 	// Currently, endpoints are tactics requests, and we allow these to be
 	// handled by servers which would otherwise reject new tunnels.
 	// handled by servers which would otherwise reject new tunnels.
@@ -637,6 +658,111 @@ func (server *MeekServer) getSessionOrEndpoint(
 	return sessionID, session, "", "", nil
 	return sessionID, session, "", "", nil
 }
 }
 
 
+func (server *MeekServer) rateLimit(clientIP string) bool {
+
+	if !server.support.Config.RunMeekRateLimiter() {
+		return false
+	}
+
+	limit := true
+	triggerGC := false
+
+	now := monotime.Now()
+	threshold := now.Add(
+		-time.Duration(server.support.Config.MeekRateLimiterThresholdSeconds) * time.Second)
+
+	server.rateLimitLock.Lock()
+
+	history, ok := server.rateLimitHistory[clientIP]
+	if !ok {
+		history = make([]monotime.Time, server.support.Config.MeekRateLimiterHistorySize)
+		server.rateLimitHistory[clientIP] = history
+	}
+
+	for i := 0; i < len(history); i++ {
+		if history[i] == 0 || history[i].Before(threshold) {
+			limit = false
+		}
+		if i == len(history)-1 {
+			history[i] = now
+		} else {
+			history[i] = history[i+1]
+		}
+	}
+
+	if limit {
+
+		server.rateLimitCount += 1
+
+		triggerCount := server.support.Config.MeekRateLimiterGarbageCollectionTriggerCount
+		if triggerCount <= 0 {
+			triggerCount = MEEK_DEFAULT_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT
+		}
+		if server.rateLimitCount >= triggerCount {
+			triggerGC = true
+			server.rateLimitCount = 0
+		}
+	}
+
+	server.rateLimitLock.Unlock()
+
+	if triggerGC {
+		select {
+		case server.rateLimitSignalGC <- *new(struct{}):
+		default:
+		}
+	}
+
+	return limit
+}
+
+func (server *MeekServer) rateLimitWorker() {
+
+	frequencySeconds := server.support.Config.MeekRateLimiterReapHistoryFrequencySeconds
+	if frequencySeconds <= 0 {
+		frequencySeconds = MEEK_DEFAULT_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS
+	}
+
+	ticker := time.NewTicker(time.Duration(frequencySeconds) * time.Second)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+
+			server.rateLimitLock.Lock()
+
+			threshold := monotime.Now().Add(
+				-time.Duration(server.support.Config.MeekRateLimiterThresholdSeconds) * time.Second)
+
+			for key, history := range server.rateLimitHistory {
+				reap := true
+				for i := 0; i < len(history); i++ {
+					if history[i] != 0 && !history[i].Before(threshold) {
+						reap = false
+					}
+				}
+				if reap {
+					delete(server.rateLimitHistory, key)
+				}
+			}
+
+			// Enable rate limit history map to be garbage collected when possible.
+			if len(server.rateLimitHistory) == 0 {
+				server.rateLimitHistory = make(map[string][]monotime.Time)
+			}
+
+			server.rateLimitLock.Unlock()
+
+		case <-server.rateLimitSignalGC:
+			runtime.GC()
+
+		case <-server.stopBroadcast:
+			return
+		}
+	}
+}
+
 func (server *MeekServer) deleteSession(sessionID string) {
 func (server *MeekServer) deleteSession(sessionID string) {
 
 
 	// Don't obtain the server.sessionsLock write lock until modifying
 	// Don't obtain the server.sessionsLock write lock until modifying