|
|
@@ -31,6 +31,7 @@ import (
|
|
|
"io"
|
|
|
"net"
|
|
|
"net/http"
|
|
|
+ "runtime"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
@@ -93,16 +94,20 @@ const (
|
|
|
// HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
|
|
|
// the meekConn struct.
|
|
|
type MeekServer struct {
|
|
|
- support *SupportServices
|
|
|
- listener net.Listener
|
|
|
- tlsConfig *utls.Config
|
|
|
- clientHandler func(clientTunnelProtocol string, clientConn net.Conn)
|
|
|
- openConns *common.Conns
|
|
|
- stopBroadcast <-chan struct{}
|
|
|
- sessionsLock sync.RWMutex
|
|
|
- sessions map[string]*meekSession
|
|
|
- checksumTable *crc64.Table
|
|
|
- bufferPool *CachedResponseBufferPool
|
|
|
+ support *SupportServices
|
|
|
+ listener net.Listener
|
|
|
+ tlsConfig *utls.Config
|
|
|
+ clientHandler func(clientTunnelProtocol string, clientConn net.Conn)
|
|
|
+ openConns *common.Conns
|
|
|
+ stopBroadcast <-chan struct{}
|
|
|
+ sessionsLock sync.RWMutex
|
|
|
+ sessions map[string]*meekSession
|
|
|
+ checksumTable *crc64.Table
|
|
|
+ bufferPool *CachedResponseBufferPool
|
|
|
+ rateLimitLock sync.Mutex
|
|
|
+ rateLimitHistory map[string][]monotime.Time
|
|
|
+ rateLimitCount int
|
|
|
+ rateLimitSignalGC chan struct{}
|
|
|
}
|
|
|
|
|
|
// NewMeekServer initializes a new meek server.
|
|
|
@@ -128,14 +133,16 @@ func NewMeekServer(
|
|
|
bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
|
|
|
|
|
|
meekServer := &MeekServer{
|
|
|
- support: support,
|
|
|
- listener: listener,
|
|
|
- clientHandler: clientHandler,
|
|
|
- openConns: new(common.Conns),
|
|
|
- stopBroadcast: stopBroadcast,
|
|
|
- sessions: make(map[string]*meekSession),
|
|
|
- checksumTable: checksumTable,
|
|
|
- bufferPool: bufferPool,
|
|
|
+ support: support,
|
|
|
+ listener: listener,
|
|
|
+ clientHandler: clientHandler,
|
|
|
+ openConns: new(common.Conns),
|
|
|
+ stopBroadcast: stopBroadcast,
|
|
|
+ sessions: make(map[string]*meekSession),
|
|
|
+ checksumTable: checksumTable,
|
|
|
+ bufferPool: bufferPool,
|
|
|
+ rateLimitHistory: make(map[string][]monotime.Time),
|
|
|
+ rateLimitSignalGC: make(chan struct{}, 1),
|
|
|
}
|
|
|
|
|
|
if useTLS {
|
|
|
@@ -158,12 +165,11 @@ func NewMeekServer(
|
|
|
// signal specified in NewMeekServer.
|
|
|
func (server *MeekServer) Run() error {
|
|
|
|
|
|
- // Expire sessions
|
|
|
+ waitGroup := new(sync.WaitGroup)
|
|
|
|
|
|
- reaperWaitGroup := new(sync.WaitGroup)
|
|
|
- reaperWaitGroup.Add(1)
|
|
|
+ waitGroup.Add(1)
|
|
|
go func() {
|
|
|
- defer reaperWaitGroup.Done()
|
|
|
+ defer waitGroup.Done()
|
|
|
ticker := time.NewTicker(MEEK_MAX_SESSION_STALENESS / 2)
|
|
|
defer ticker.Stop()
|
|
|
for {
|
|
|
@@ -176,9 +182,14 @@ func (server *MeekServer) Run() error {
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- // Serve HTTP or HTTPS
|
|
|
+ waitGroup.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer waitGroup.Done()
|
|
|
+ server.rateLimitWorker()
|
|
|
+ }()
|
|
|
|
|
|
- // Notes:
|
|
|
+ // Serve HTTP or HTTPS
|
|
|
+ //
|
|
|
// - WriteTimeout may include time awaiting request, as per:
|
|
|
// https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts
|
|
|
// - Legacy meek-server wrapped each client HTTP connection with an explicit idle
|
|
|
@@ -222,7 +233,7 @@ func (server *MeekServer) Run() error {
|
|
|
server.listener.Close()
|
|
|
server.openConns.CloseAll()
|
|
|
|
|
|
- reaperWaitGroup.Wait()
|
|
|
+ waitGroup.Wait()
|
|
|
|
|
|
return err
|
|
|
}
|
|
|
@@ -504,37 +515,19 @@ func checkRangeHeader(request *http.Request) (int, bool) {
|
|
|
func (server *MeekServer) getSessionOrEndpoint(
|
|
|
request *http.Request, meekCookie *http.Cookie) (string, *meekSession, string, string, error) {
|
|
|
|
|
|
- // Check for an existing session
|
|
|
+ // Check for an existing session.
|
|
|
|
|
|
server.sessionsLock.RLock()
|
|
|
existingSessionID := meekCookie.Value
|
|
|
session, ok := server.sessions[existingSessionID]
|
|
|
server.sessionsLock.RUnlock()
|
|
|
if ok {
|
|
|
+ // TODO: can multiple http client connections using same session cookie
|
|
|
+ // cause race conditions on session struct?
|
|
|
session.touch()
|
|
|
return existingSessionID, session, "", "", nil
|
|
|
}
|
|
|
|
|
|
- // TODO: can multiple http client connections using same session cookie
|
|
|
- // cause race conditions on session struct?
|
|
|
-
|
|
|
- // The session is new (or expired). Treat the cookie value as a new meek
|
|
|
- // cookie, extract the payload, and create a new session.
|
|
|
-
|
|
|
- payloadJSON, err := getMeekCookiePayload(server.support, meekCookie.Value)
|
|
|
- if err != nil {
|
|
|
- return "", nil, "", "", common.ContextError(err)
|
|
|
- }
|
|
|
-
|
|
|
- // Note: this meek server ignores legacy values PsiphonClientSessionId
|
|
|
- // and PsiphonServerAddress.
|
|
|
- var clientSessionData protocol.MeekCookieData
|
|
|
-
|
|
|
- err = json.Unmarshal(payloadJSON, &clientSessionData)
|
|
|
- if err != nil {
|
|
|
- return "", nil, "", "", common.ContextError(err)
|
|
|
- }
|
|
|
-
|
|
|
// Determine the client remote address, which is used for geolocation
|
|
|
// and stats. When an intermediate proxy or CDN is in use, we may be
|
|
|
// able to determine the original client address by inspecting HTTP
|
|
|
@@ -560,6 +553,27 @@ func (server *MeekServer) getSessionOrEndpoint(
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if server.rateLimit(clientIP) {
|
|
|
+ return "", nil, "", "", common.ContextError(errors.New("rate limit exceeded"))
|
|
|
+ }
|
|
|
+
|
|
|
+ // The session is new (or expired). Treat the cookie value as a new meek
|
|
|
+ // cookie, extract the payload, and create a new session.
|
|
|
+
|
|
|
+ payloadJSON, err := getMeekCookiePayload(server.support, meekCookie.Value)
|
|
|
+ if err != nil {
|
|
|
+ return "", nil, "", "", common.ContextError(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Note: this meek server ignores legacy values PsiphonClientSessionId
|
|
|
+ // and PsiphonServerAddress.
|
|
|
+ var clientSessionData protocol.MeekCookieData
|
|
|
+
|
|
|
+ err = json.Unmarshal(payloadJSON, &clientSessionData)
|
|
|
+ if err != nil {
|
|
|
+ return "", nil, "", "", common.ContextError(err)
|
|
|
+ }
|
|
|
+
|
|
|
// Handle endpoints before enforcing the GetEstablishTunnels check.
|
|
|
// Currently, endpoints are tactics requests, and we allow these to be
|
|
|
// handled by servers which would otherwise reject new tunnels.
|
|
|
@@ -637,6 +651,118 @@ func (server *MeekServer) getSessionOrEndpoint(
|
|
|
return sessionID, session, "", "", nil
|
|
|
}
|
|
|
|
|
|
+func (server *MeekServer) rateLimit(clientIP string) bool {
|
|
|
+
|
|
|
+ historySize, thresholdSeconds, regions, GCTriggerCount, _ :=
|
|
|
+ server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
|
|
|
+
|
|
|
+ if historySize == 0 {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(regions) > 0 {
|
|
|
+ // TODO: avoid redundant GeoIP lookups?
|
|
|
+ if !common.Contains(regions, server.support.GeoIPService.Lookup(clientIP).Country) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ limit := true
|
|
|
+ triggerGC := false
|
|
|
+
|
|
|
+ now := monotime.Now()
|
|
|
+ threshold := now.Add(-time.Duration(thresholdSeconds) * time.Second)
|
|
|
+
|
|
|
+ server.rateLimitLock.Lock()
|
|
|
+
|
|
|
+ history, ok := server.rateLimitHistory[clientIP]
|
|
|
+ if !ok || len(history) != historySize {
|
|
|
+ history = make([]monotime.Time, historySize)
|
|
|
+ server.rateLimitHistory[clientIP] = history
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := 0; i < len(history); i++ {
|
|
|
+ if history[i] == 0 || history[i].Before(threshold) {
|
|
|
+ limit = false
|
|
|
+ }
|
|
|
+ if i == len(history)-1 {
|
|
|
+ history[i] = now
|
|
|
+ } else {
|
|
|
+ history[i] = history[i+1]
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if limit {
|
|
|
+
|
|
|
+ server.rateLimitCount += 1
|
|
|
+
|
|
|
+ if server.rateLimitCount >= GCTriggerCount {
|
|
|
+ triggerGC = true
|
|
|
+ server.rateLimitCount = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ server.rateLimitLock.Unlock()
|
|
|
+
|
|
|
+ if triggerGC {
|
|
|
+ select {
|
|
|
+ case server.rateLimitSignalGC <- *new(struct{}):
|
|
|
+ default:
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return limit
|
|
|
+}
|
|
|
+
|
|
|
+func (server *MeekServer) rateLimitWorker() {
|
|
|
+
|
|
|
+ _, _, _, _, reapFrequencySeconds :=
|
|
|
+ server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
|
|
|
+
|
|
|
+ timer := time.NewTimer(time.Duration(reapFrequencySeconds) * time.Second)
|
|
|
+ defer timer.Stop()
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-timer.C:
|
|
|
+
|
|
|
+ _, thresholdSeconds, _, _, reapFrequencySeconds :=
|
|
|
+ server.support.TrafficRulesSet.GetMeekRateLimiterConfig()
|
|
|
+
|
|
|
+ server.rateLimitLock.Lock()
|
|
|
+
|
|
|
+ threshold := monotime.Now().Add(-time.Duration(thresholdSeconds) * time.Second)
|
|
|
+
|
|
|
+ for key, history := range server.rateLimitHistory {
|
|
|
+ reap := true
|
|
|
+ for i := 0; i < len(history); i++ {
|
|
|
+ if history[i] != 0 && !history[i].Before(threshold) {
|
|
|
+ reap = false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if reap {
|
|
|
+ delete(server.rateLimitHistory, key)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Enable rate limit history map to be garbage collected when possible.
|
|
|
+ if len(server.rateLimitHistory) == 0 {
|
|
|
+ server.rateLimitHistory = make(map[string][]monotime.Time)
|
|
|
+ }
|
|
|
+
|
|
|
+ server.rateLimitLock.Unlock()
|
|
|
+
|
|
|
+ timer.Reset(time.Duration(reapFrequencySeconds) * time.Second)
|
|
|
+
|
|
|
+ case <-server.rateLimitSignalGC:
|
|
|
+ runtime.GC()
|
|
|
+
|
|
|
+ case <-server.stopBroadcast:
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (server *MeekServer) deleteSession(sessionID string) {
|
|
|
|
|
|
// Don't obtain the server.sessionsLock write lock until modifying
|