ソースを参照

Meek server fixes

- Correctly parse IPv6 peer addresses
- Add max entries to cap the rate limit history size
- Rate limit by /56 for IPv6
- Use only trustworthy Forwarded-For-type header data
Rod Hynes 4 年 前
コミット
938e7964c3

+ 3 - 0
psiphon/common/refraction/refraction.go

@@ -73,6 +73,9 @@ type Listener struct {
 // RemoteAddr _must_ be called non-concurrently before calling Read on
 // accepted conns as the HAProxy proxy protocol header reading logic sets
 // SetReadDeadline and performs a Read.
+//
+// Psiphon server hosts should be configured to accept tunnel connections only
+// from Refraction Networking stations.
 func Listen(address string) (net.Listener, error) {
 
 	tcpListener, err := net.Listen("tcp", address)

+ 81 - 83
psiphon/server/meek.go

@@ -48,6 +48,8 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
 	tris "github.com/Psiphon-Labs/tls-tris"
+	lrucache "github.com/cognusion/go-cache-lru"
+	"github.com/juju/ratelimit"
 	"golang.org/x/crypto/nacl/box"
 )
 
@@ -120,7 +122,7 @@ type MeekServer struct {
 	checksumTable                   *crc64.Table
 	bufferPool                      *CachedResponseBufferPool
 	rateLimitLock                   sync.Mutex
-	rateLimitHistory                map[string][]time.Time
+	rateLimitHistory                *lrucache.Cache
 	rateLimitCount                  int
 	rateLimitSignalGC               chan struct{}
 }
@@ -178,6 +180,14 @@ func NewMeekServer(
 		bufferCount = support.Config.MeekCachedResponsePoolBufferCount
 	}
 
+	_, thresholdSeconds, _, _, _, _, _, reapFrequencySeconds, maxEntries :=
+		support.TrafficRulesSet.GetMeekRateLimiterConfig()
+
+	rateLimitHistory := lrucache.NewWithLRU(
+		time.Duration(thresholdSeconds)*time.Second,
+		time.Duration(reapFrequencySeconds)*time.Second,
+		maxEntries)
+
 	bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
 
 	meekServer := &MeekServer{
@@ -198,7 +208,7 @@ func NewMeekServer(
 		sessions:                        make(map[string]*meekSession),
 		checksumTable:                   checksumTable,
 		bufferPool:                      bufferPool,
-		rateLimitHistory:                make(map[string][]time.Time),
+		rateLimitHistory:                rateLimitHistory,
 		rateLimitSignalGC:               make(chan struct{}, 1),
 	}
 
@@ -620,38 +630,59 @@ func (server *MeekServer) getSessionOrEndpoint(
 	}
 
 	// Determine the client remote address, which is used for geolocation
-	// and stats. When an intermediate proxy or CDN is in use, we may be
+	// stats, rate limiting, anti-probing, discovery, and tactics selection
+	// logic.
+	//
+	// When an intermediate proxy or CDN is in use, we may be
 	// able to determine the original client address by inspecting HTTP
 	// headers such as X-Forwarded-For.
+	//
+	// We trust only headers provided by CDNs. Fronted Psiphon server hosts
+	// should be configured to accept tunnel connections only from CDN edges.
+	// When the CDN passes along a chain of IPs, as in X-Forwarded-For, we
+	// trust only the right-most IP, which is provided by the CDN.
 
-	clientIP := strings.Split(request.RemoteAddr, ":")[0]
-	usedProxyForwardedForHeader := false
-	var geoIPData GeoIPData
+	clientIP, _, err := net.SplitHostPort(request.RemoteAddr)
+	if err != nil {
+		return "", nil, nil, "", nil, errors.Trace(err)
+	}
+	if net.ParseIP(clientIP) == nil {
+		return "", nil, nil, "", nil, errors.TraceNew("invalid IP address")
+	}
 
-	if len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
+	if protocol.TunnelProtocolUsesFrontedMeek(server.listenerTunnelProtocol) &&
+		len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
+
+		// When there are multiple header names in MeekProxyForwardedForHeaders,
+		// the first valid match is preferred. MeekProxyForwardedForHeaders should be
+		// configured to use header names that are always provided by the CDN(s) and
+		// not header names that may be passed through from clients.
 		for _, header := range server.support.Config.MeekProxyForwardedForHeaders {
-			value := request.Header.Get(header)
-			if len(value) > 0 {
+
+			// In the case where there are multiple headers,
+			// request.Header.Get returns the first header, but we want the
+			// last header; so use request.Header.Values and select the last
+			// value. As per RFC 2616 section 4.2, a proxy must not change
+			// the order of field values, which implies that it should append
+			// values to the last header.
+			values := request.Header.Values(header)
+			if len(values) > 0 {
+				value := values[len(values)-1]
+
 				// Some headers, such as X-Forwarded-For, are a comma-separated
-				// list of IPs (each proxy in a chain). The first IP should be
-				// the client IP.
-				proxyClientIP := strings.Split(value, ",")[0]
-				if net.ParseIP(proxyClientIP) != nil {
-					proxyClientGeoIPData := server.support.GeoIPService.Lookup(proxyClientIP)
-					if proxyClientGeoIPData.Country != GEOIP_UNKNOWN_VALUE {
-						usedProxyForwardedForHeader = true
-						clientIP = proxyClientIP
-						geoIPData = proxyClientGeoIPData
-						break
-					}
+				// list of IPs (each proxy in a chain). Select the last IP.
+				IPs := strings.Split(value, ",")
+				IP := IPs[len(IPs)-1]
+
+				if net.ParseIP(IP) != nil {
+					clientIP = IP
+					break
 				}
 			}
 		}
 	}
 
-	if !usedProxyForwardedForHeader {
-		geoIPData = server.support.GeoIPService.Lookup(clientIP)
-	}
+	geoIPData := server.support.GeoIPService.Lookup(clientIP)
 
 	// The session is new (or expired). Treat the cookie value as a new meek
 	// cookie, extract the payload, and create a new session.
@@ -818,7 +849,7 @@ func (server *MeekServer) rateLimit(
 		regions,
 		ISPs,
 		cities,
-		GCTriggerCount, _ :=
+		GCTriggerCount, _, _ :=
 		server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
 
 	if historySize == 0 {
@@ -852,35 +883,40 @@ func (server *MeekServer) rateLimit(
 		}
 	}
 
-	limit := true
-	triggerGC := false
-
-	now := time.Now()
-	threshold := now.Add(-time.Duration(thresholdSeconds) * time.Second)
+	// With IPv6, individual users or sites are users commonly allocated a /64
+	// or /56, so rate limit by /56.
+	rateLimitIP := clientIP
+	IP := net.ParseIP(clientIP)
+	if IP != nil && IP.To4() == nil {
+		rateLimitIP = IP.Mask(net.CIDRMask(56, 128)).String()
+	}
 
+	// go-cache-lru is safe for concurrent access, but lacks an atomic
+	// compare-and-set type operations to check if an entry exists before
+	// adding a new one. This mutex ensures the Get and Add are atomic
+	// (as well as synchronizing access to rateLimitCount).
 	server.rateLimitLock.Lock()
 
-	history, ok := server.rateLimitHistory[clientIP]
-	if !ok || len(history) != historySize {
-		history = make([]time.Time, historySize)
-		server.rateLimitHistory[clientIP] = history
+	var rateLimiter *ratelimit.Bucket
+	entry, ok := server.rateLimitHistory.Get(rateLimitIP)
+	if ok {
+		rateLimiter = entry.(*ratelimit.Bucket)
+	} else {
+		rateLimiter = ratelimit.NewBucketWithQuantum(
+			time.Duration(thresholdSeconds)*time.Second,
+			int64(historySize),
+			int64(historySize))
+		server.rateLimitHistory.Set(
+			rateLimitIP,
+			rateLimiter,
+			time.Duration(thresholdSeconds)*time.Second)
 	}
 
-	for i := 0; i < len(history); i++ {
-		if history[i].IsZero() || history[i].Before(threshold) {
-			limit = false
-		}
-		if i == len(history)-1 {
-			history[i] = now
-		} else {
-			history[i] = history[i+1]
-		}
-	}
+	limit := rateLimiter.TakeAvailable(1) < 1
 
+	triggerGC := false
 	if limit {
-
 		server.rateLimitCount += 1
-
 		if server.rateLimitCount >= GCTriggerCount {
 			triggerGC = true
 			server.rateLimitCount = 0
@@ -900,48 +936,10 @@ func (server *MeekServer) rateLimit(
 }
 
 func (server *MeekServer) rateLimitWorker() {
-
-	_, _, _, _, _, _, _, reapFrequencySeconds :=
-		server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
-
-	timer := time.NewTimer(time.Duration(reapFrequencySeconds) * time.Second)
-	defer timer.Stop()
-
 	for {
 		select {
-		case <-timer.C:
-
-			_, thresholdSeconds, _, _, _, _, _, reapFrequencySeconds :=
-				server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
-
-			server.rateLimitLock.Lock()
-
-			threshold := time.Now().Add(-time.Duration(thresholdSeconds) * time.Second)
-
-			for key, history := range server.rateLimitHistory {
-				reap := true
-				for i := 0; i < len(history); i++ {
-					if !history[i].IsZero() && !history[i].Before(threshold) {
-						reap = false
-					}
-				}
-				if reap {
-					delete(server.rateLimitHistory, key)
-				}
-			}
-
-			// Enable rate limit history map to be garbage collected when possible.
-			if len(server.rateLimitHistory) == 0 {
-				server.rateLimitHistory = make(map[string][]time.Time)
-			}
-
-			server.rateLimitLock.Unlock()
-
-			timer.Reset(time.Duration(reapFrequencySeconds) * time.Second)
-
 		case <-server.rateLimitSignalGC:
 			runtime.GC()
-
 		case <-server.stopBroadcast:
 			return
 		}

+ 28 - 9
psiphon/server/trafficRules.go

@@ -36,7 +36,8 @@ const (
 	DEFAULT_MAX_TCP_PORT_FORWARD_COUNT                        = 512
 	DEFAULT_MAX_UDP_PORT_FORWARD_COUNT                        = 32
 	DEFAULT_MEEK_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT = 5000
-	DEFAULT_MEEK_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS  = 600
+	DEFAULT_MEEK_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS  = 300
+	DEFAULT_MEEK_RATE_LIMITER_MAX_ENTRIES                     = 1000000
 )
 
 // TrafficRulesSet represents the various traffic rules to
@@ -81,8 +82,10 @@ type TrafficRulesSet struct {
 	// The scope of rate limiting may be
 	// limited using LimitMeekRateLimiterTunnelProtocols/Regions/ISPs/Cities.
 	//
-	// Hot reloading a new history size will result in existing history being
-	// truncated.
+	// Upon hot reload,
+	// MeekRateLimiterHistorySize/MeekRateLimiterThresholdSeconds are not
+	// changed for currently tracked client IPs; new values will apply to
+	// newly tracked client IPs.
 	MeekRateLimiterHistorySize int
 
 	// MeekRateLimiterThresholdSeconds is part of the meek rate limiter
@@ -116,15 +119,24 @@ type TrafficRulesSet struct {
 	// rate limit events after which garbage collection is manually triggered
 	// in order to reclaim memory used by rate limited and other rejected
 	// requests.
-	// A default of 5000 is used when
-	// MeekRateLimiterGarbageCollectionTriggerCount is 0.
+	//
+	// A default of DEFAULT_MEEK_RATE_LIMITER_GARBAGE_COLLECTOR_TRIGGER_COUNT
+	// is used when MeekRateLimiterGarbageCollectionTriggerCount is 0.
 	MeekRateLimiterGarbageCollectionTriggerCount int
 
 	// MeekRateLimiterReapHistoryFrequencySeconds specifies a schedule for
 	// reaping old records from the rate limit history.
-	// A default of 600 is used when
-	// MeekRateLimiterReapHistoryFrequencySeconds is 0.
+	//
+	// A default of DEFAULT_MEEK_RATE_LIMITER_REAP_HISTORY_FREQUENCY_SECONDS
+	// is used when MeekRateLimiterReapHistoryFrequencySeconds is 0.
+	//
+	// MeekRateLimiterReapHistoryFrequencySeconds is not applied upon hot
+	// reload.
 	MeekRateLimiterReapHistoryFrequencySeconds int
+
+	// MeekRateLimiterMaxEntries specifies a maximum size for the rate limit
+	// history.
+	MeekRateLimiterMaxEntries int
 }
 
 // TrafficRulesFilter defines a filter to match against client attributes.
@@ -846,7 +858,7 @@ func (rules *TrafficRules) allowSubnet(remoteIP net.IP) bool {
 // GetMeekRateLimiterConfig gets a snapshot of the meek rate limiter
 // configuration values.
 func (set *TrafficRulesSet) GetMeekRateLimiterConfig() (
-	int, int, []string, []string, []string, []string, int, int) {
+	int, int, []string, []string, []string, []string, int, int, int) {
 
 	set.ReloadableFile.RLock()
 	defer set.ReloadableFile.RUnlock()
@@ -862,6 +874,12 @@ func (set *TrafficRulesSet) GetMeekRateLimiterConfig() (
 
 	}
 
+	maxEntries := set.MeekRateLimiterMaxEntries
+	if maxEntries <= 0 {
+		maxEntries = DEFAULT_MEEK_RATE_LIMITER_MAX_ENTRIES
+
+	}
+
 	return set.MeekRateLimiterHistorySize,
 		set.MeekRateLimiterThresholdSeconds,
 		set.MeekRateLimiterTunnelProtocols,
@@ -869,5 +887,6 @@ func (set *TrafficRulesSet) GetMeekRateLimiterConfig() (
 		set.MeekRateLimiterISPs,
 		set.MeekRateLimiterCities,
 		GCTriggerCount,
-		reapFrequencySeconds
+		reapFrequencySeconds,
+		maxEntries
 }