Просмотр исходного кода

Meek server changes

- Add a distinct timeout for fronted protocols

- Don't terminate fronted persistent connections due to client errors

- Add MeekRequiredHeaders
Rod Hynes 3 лет назад
Родитель
Сommit
1fd8f2db0d
3 измененных файлов с 101 добавлено и 17 удалено
  1. 9 0
      psiphon/server/config.go
  2. 66 12
      psiphon/server/meek.go
  3. 26 5
      psiphon/server/meek_test.go

+ 9 - 0
psiphon/server/config.go

@@ -228,6 +228,10 @@ type Config struct {
 	// request fails. This is used to defend against abuse.
 	MeekProhibitedHeaders []string
 
+	// MeekRequiredHeaders is a list of HTTP header names and values that must
+	// appear in requests. This is used to defend against abuse.
+	MeekRequiredHeaders map[string]string
+
 	// MeekProxyForwardedForHeaders is a list of HTTP headers which
 	// may be added by downstream HTTP proxies or CDNs in front
 	// of clients. These headers supply the original client IP
@@ -266,6 +270,11 @@ type Config struct {
 	// timeouts. The default is MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT.
 	MeekHTTPClientIOTimeoutMilliseconds *int
 
+	// MeekFrontedHTTPClientIOTimeoutMilliseconds specifies meek HTTP server
+	// I/O timeouts for fronted protocols. The default is
+	// MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT.
+	MeekFrontedHTTPClientIOTimeoutMilliseconds *int
+
 	// MeekCachedResponseBufferSize is the size of a private,
 	// fixed-size buffer allocated for every meek client. The buffer
 	// is used to cache response payload, allowing the client to retry

+ 66 - 12
psiphon/server/meek.go

@@ -85,6 +85,7 @@ const (
 	MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD = 8192
 	MEEK_DEFAULT_MAX_SESSION_STALENESS               = 45 * time.Second
 	MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT              = 45 * time.Second
+	MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT      = 360 * time.Second
 	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH              = 65536
 	MEEK_DEFAULT_POOL_BUFFER_LENGTH                  = 65536
 	MEEK_DEFAULT_POOL_BUFFER_COUNT                   = 2048
@@ -106,6 +107,7 @@ type MeekServer struct {
 	listener                        net.Listener
 	listenerTunnelProtocol          string
 	listenerPort                    int
+	isFronted                       bool
 	passthroughAddress              string
 	turnAroundTimeout               time.Duration
 	extendedTurnAroundTimeout       time.Duration
@@ -162,10 +164,24 @@ func NewMeekServer(
 			*support.Config.MeekMaxSessionStalenessMilliseconds) * time.Millisecond
 	}
 
-	httpClientIOTimeout := MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT
-	if support.Config.MeekHTTPClientIOTimeoutMilliseconds != nil {
-		httpClientIOTimeout = time.Duration(
-			*support.Config.MeekHTTPClientIOTimeoutMilliseconds) * time.Millisecond
+	var httpClientIOTimeout time.Duration
+	if isFronted {
+
+		// Fronted has a distinct timeout, and the default is higher since new
+		// clients may connect to a CDN edge and start using an existing
+		// persistent connection.
+
+		httpClientIOTimeout = MEEK_DEFAULT_FRONTED_HTTP_CLIENT_IO_TIMEOUT
+		if support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds != nil {
+			httpClientIOTimeout = time.Duration(
+				*support.Config.MeekFrontedHTTPClientIOTimeoutMilliseconds) * time.Millisecond
+		}
+	} else {
+		httpClientIOTimeout = MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT
+		if support.Config.MeekHTTPClientIOTimeoutMilliseconds != nil {
+			httpClientIOTimeout = time.Duration(
+				*support.Config.MeekHTTPClientIOTimeoutMilliseconds) * time.Millisecond
+		}
 	}
 
 	checksumTable := crc64.MakeTable(crc64.ECMA)
@@ -195,6 +211,7 @@ func NewMeekServer(
 		listener:                        listener,
 		listenerTunnelProtocol:          listenerTunnelProtocol,
 		listenerPort:                    listenerPort,
+		isFronted:                       isFronted,
 		passthroughAddress:              passthroughAddress,
 		turnAroundTimeout:               turnAroundTimeout,
 		extendedTurnAroundTimeout:       extendedTurnAroundTimeout,
@@ -321,6 +338,25 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 	// Note: no longer requiring that the request method is POST
 
+	// Check for required headers and values. For fronting, required headers
+	// may be used to identify a CDN edge. When this check fails,
+	// TerminateHTTPConnection is called instead of handleError, so any
+	// persistent connection is always closed.
+
+	if len(server.support.Config.MeekRequiredHeaders) > 0 {
+		for header, value := range server.support.Config.MeekRequiredHeaders {
+			requestValue := request.Header.Get(header)
+			if requestValue != value {
+				log.WithTraceFields(LogFields{
+					"header": header,
+					"value":  requestValue,
+				}).Warning("invalid required meek header")
+				common.TerminateHTTPConnection(responseWriter, request)
+				return
+			}
+		}
+	}
+
 	// Check for the expected meek/session ID cookie.
 	// Also check for prohibited HTTP headers.
 
@@ -343,7 +379,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 					"header": header,
 					"value":  value,
 				}).Warning("prohibited meek header")
-				common.TerminateHTTPConnection(responseWriter, request)
+				server.handleError(responseWriter, request)
 				return
 			}
 		}
@@ -370,7 +406,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		// Debug since session cookie errors commonly occur during
 		// normal operation.
 		log.WithTraceFields(LogFields{"error": err}).Debug("session lookup failed")
-		common.TerminateHTTPConnection(responseWriter, request)
+		server.handleError(responseWriter, request)
 		return
 	}
 
@@ -383,7 +419,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 			endPoint, common.GeoIPData(*endPointGeoIPData), responseWriter, request)
 		if !handled {
 			log.WithTraceFields(LogFields{"endPoint": endPoint}).Info("unhandled endpoint")
-			common.TerminateHTTPConnection(responseWriter, request)
+			server.handleError(responseWriter, request)
 		}
 		return
 	}
@@ -518,7 +554,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 		if !session.cachedResponse.HasPosition(position) {
 			greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
-			common.TerminateHTTPConnection(responseWriter, request)
+			server.handleError(responseWriter, request)
 			session.delete(true)
 			return
 		}
@@ -572,7 +608,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 			// also, golang network error messages may contain client IP.
 			log.WithTraceFields(LogFields{"error": responseError}).Debug("write response failed")
 		}
-		common.TerminateHTTPConnection(responseWriter, request)
+		server.handleError(responseWriter, request)
 
 		// Note: keep session open to allow client to retry
 
@@ -580,6 +616,20 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	}
 }
 
+func (server *MeekServer) handleError(responseWriter http.ResponseWriter, request *http.Request) {
+
+	// When fronted, keep the persistent connection open since it may be used
+	// by many clients coming through the same edge. For performance reasons,
+	// an error, including invalid input, from one client shouldn't close the
+	// persistent connection used by other clients.
+
+	if server.isFronted {
+		http.NotFound(responseWriter, request)
+		return
+	}
+	common.TerminateHTTPConnection(responseWriter, request)
+}
+
 func checkRangeHeader(request *http.Request) (int, bool) {
 	rangeHeader := request.Header.Get("Range")
 	if rangeHeader == "" {
@@ -1729,9 +1779,13 @@ func (conn *meekConn) Close() error {
 	if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
 		close(conn.closeBroadcast)
 
-		// In general, we reply on "net/http" to close underlying TCP conns. In this
-		// case, we can directly close the first once, if it's still open.
-		conn.firstUnderlyingConn.Close()
+		// In general, we reply on "net/http" to close underlying TCP conns.
+		// In this case, we can directly close the first once, if it's still
+		// open. Don't close a persistent connection when fronted, as it may
+		// be still be used by other clients.
+		if !conn.meekServer.isFronted {
+			conn.firstUnderlyingConn.Close()
+		}
 	}
 	return nil
 }

+ 26 - 5
psiphon/server/meek_test.go

@@ -28,6 +28,7 @@ import (
 	"io/ioutil"
 	"math/rand"
 	"net"
+	"net/http"
 	"path/filepath"
 	"sync"
 	"sync/atomic"
@@ -407,17 +408,23 @@ func (interruptor *fileDescriptorInterruptor) BindToDevice(fileDescriptor int) (
 	return "", nil
 }
 
+func TestMeekServer(t *testing.T) {
+	runTestMeekAccessControl(t, false, false, false)
+}
+
 func TestMeekRateLimiter(t *testing.T) {
-	runTestMeekAccessControl(t, true, false)
-	runTestMeekAccessControl(t, false, false)
+	runTestMeekAccessControl(t, true, false, false)
 }
 
 func TestMeekRestrictFrontingProviders(t *testing.T) {
-	runTestMeekAccessControl(t, false, true)
-	runTestMeekAccessControl(t, false, false)
+	runTestMeekAccessControl(t, false, true, false)
 }
 
-func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider bool) {
+func TestMeekMissingRequiredHeaders(t *testing.T) {
+	runTestMeekAccessControl(t, false, false, true)
+}
+
+func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider, missingRequiredHeaders bool) {
 
 	attempts := 10
 
@@ -431,6 +438,10 @@ func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider bool) {
 		allowedConnections = 0
 	}
 
+	if missingRequiredHeaders {
+		allowedConnections = 0
+	}
+
 	// Configure tactics
 
 	frontingProviderID := prng.HexString(8)
@@ -492,10 +503,14 @@ func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider bool) {
 		meekRateLimiterTunnelProtocols = []string{protocol.TUNNEL_PROTOCOL_FRONTED_MEEK}
 	}
 
+	requiredHeaderName := "X-Psiphon-Required-Header"
+	requiredHeaderValue := prng.Base64String(32)
+
 	mockSupport := &SupportServices{
 		Config: &Config{
 			MeekObfuscatedKey:              meekObfuscatedKey,
 			MeekCookieEncryptionPrivateKey: meekCookieEncryptionPrivateKey,
+			MeekRequiredHeaders:            map[string]string{requiredHeaderName: requiredHeaderValue},
 			TunnelProtocolPorts:            map[string]int{tunnelProtocol: 0},
 			frontingProviderID:             frontingProviderID,
 		},
@@ -590,6 +605,12 @@ func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider bool) {
 			},
 		}
 
+		if !missingRequiredHeaders {
+			headers := make(http.Header)
+			headers.Add(requiredHeaderName, requiredHeaderValue)
+			dialConfig.CustomHeaders = headers
+		}
+
 		params, err := parameters.NewParameters(nil)
 		if err != nil {
 			t.Fatalf("NewParameters failed: %s", err)