Просмотр исходного кода

Add meek upstream flow optimization

Rod Hynes 4 лет назад
Родитель
Сommit
9a95be9360
2 измененных файлов с 89 добавлено и 57 удалено
  1. 6 0
      psiphon/server/config.go
  2. 83 57
      psiphon/server/meek.go

+ 6 - 0
psiphon/server/config.go

@@ -252,6 +252,12 @@ type Config struct {
 	// default is MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT.
 	MeekExtendedTurnAroundTimeoutMilliseconds *int
 
+	// MeekSkipExtendedTurnAroundThresholdBytes specifies when to skip the
+	// extended turn around. When the number of bytes received in the client
+	// request meets the threshold, optimize for upstream flows with quicker
+	// round trip turn arounds.
+	MeekSkipExtendedTurnAroundThresholdBytes *int
+
 	// MeekMaxSessionStalenessMilliseconds specifies the TTL for meek sessions.
 	// The default is MEEK_DEFAULT_MAX_SESSION_STALENESS.
 	MeekMaxSessionStalenessMilliseconds *int

+ 83 - 57
psiphon/server/meek.go

@@ -75,16 +75,17 @@ const (
 	// when retrying a request for a partially downloaded response payload.
 	MEEK_PROTOCOL_VERSION_3 = 3
 
-	MEEK_MAX_REQUEST_PAYLOAD_LENGTH           = 65536
-	MEEK_MIN_SESSION_ID_LENGTH                = 8
-	MEEK_MAX_SESSION_ID_LENGTH                = 20
-	MEEK_DEFAULT_TURN_AROUND_TIMEOUT          = 20 * time.Millisecond
-	MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
-	MEEK_DEFAULT_MAX_SESSION_STALENESS        = 45 * time.Second
-	MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT       = 45 * time.Second
-	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH       = 65536
-	MEEK_DEFAULT_POOL_BUFFER_LENGTH           = 65536
-	MEEK_DEFAULT_POOL_BUFFER_COUNT            = 2048
+	MEEK_MAX_REQUEST_PAYLOAD_LENGTH                  = 65536
+	MEEK_MIN_SESSION_ID_LENGTH                       = 8
+	MEEK_MAX_SESSION_ID_LENGTH                       = 20
+	MEEK_DEFAULT_TURN_AROUND_TIMEOUT                 = 10 * time.Millisecond
+	MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT        = 100 * time.Millisecond
+	MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD = 16384
+	MEEK_DEFAULT_MAX_SESSION_STALENESS               = 45 * time.Second
+	MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT              = 45 * time.Second
+	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH              = 65536
+	MEEK_DEFAULT_POOL_BUFFER_LENGTH                  = 65536
+	MEEK_DEFAULT_POOL_BUFFER_COUNT                   = 2048
 )
 
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
@@ -99,28 +100,29 @@ const (
 // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
 // the meekConn struct.
 type MeekServer struct {
-	support                   *SupportServices
-	listener                  net.Listener
-	listenerTunnelProtocol    string
-	listenerPort              int
-	passthroughAddress        string
-	turnAroundTimeout         time.Duration
-	extendedTurnAroundTimeout time.Duration
-	maxSessionStaleness       time.Duration
-	httpClientIOTimeout       time.Duration
-	tlsConfig                 *tris.Config
-	obfuscatorSeedHistory     *obfuscator.SeedHistory
-	clientHandler             func(clientTunnelProtocol string, clientConn net.Conn)
-	openConns                 *common.Conns
-	stopBroadcast             <-chan struct{}
-	sessionsLock              sync.RWMutex
-	sessions                  map[string]*meekSession
-	checksumTable             *crc64.Table
-	bufferPool                *CachedResponseBufferPool
-	rateLimitLock             sync.Mutex
-	rateLimitHistory          map[string][]time.Time
-	rateLimitCount            int
-	rateLimitSignalGC         chan struct{}
+	support                         *SupportServices
+	listener                        net.Listener
+	listenerTunnelProtocol          string
+	listenerPort                    int
+	passthroughAddress              string
+	turnAroundTimeout               time.Duration
+	extendedTurnAroundTimeout       time.Duration
+	skipExtendedTurnAroundThreshold int
+	maxSessionStaleness             time.Duration
+	httpClientIOTimeout             time.Duration
+	tlsConfig                       *tris.Config
+	obfuscatorSeedHistory           *obfuscator.SeedHistory
+	clientHandler                   func(clientTunnelProtocol string, clientConn net.Conn)
+	openConns                       *common.Conns
+	stopBroadcast                   <-chan struct{}
+	sessionsLock                    sync.RWMutex
+	sessions                        map[string]*meekSession
+	checksumTable                   *crc64.Table
+	bufferPool                      *CachedResponseBufferPool
+	rateLimitLock                   sync.Mutex
+	rateLimitHistory                map[string][]time.Time
+	rateLimitCount                  int
+	rateLimitSignalGC               chan struct{}
 }
 
 // NewMeekServer initializes a new meek server.
@@ -147,6 +149,11 @@ func NewMeekServer(
 			*support.Config.MeekExtendedTurnAroundTimeoutMilliseconds) * time.Millisecond
 	}
 
+	skipExtendedTurnAroundThreshold := MEEK_DEFAULT_SKIP_EXTENDED_TURN_AROUND_THRESHOLD
+	if support.Config.MeekSkipExtendedTurnAroundThresholdBytes != nil {
+		skipExtendedTurnAroundThreshold = *support.Config.MeekSkipExtendedTurnAroundThresholdBytes
+	}
+
 	maxSessionStaleness := MEEK_DEFAULT_MAX_SESSION_STALENESS
 	if support.Config.MeekMaxSessionStalenessMilliseconds != nil {
 		maxSessionStaleness = time.Duration(
@@ -174,24 +181,25 @@ func NewMeekServer(
 	bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
 
 	meekServer := &MeekServer{
-		support:                   support,
-		listener:                  listener,
-		listenerTunnelProtocol:    listenerTunnelProtocol,
-		listenerPort:              listenerPort,
-		passthroughAddress:        passthroughAddress,
-		turnAroundTimeout:         turnAroundTimeout,
-		extendedTurnAroundTimeout: extendedTurnAroundTimeout,
-		maxSessionStaleness:       maxSessionStaleness,
-		httpClientIOTimeout:       httpClientIOTimeout,
-		obfuscatorSeedHistory:     obfuscator.NewSeedHistory(nil),
-		clientHandler:             clientHandler,
-		openConns:                 common.NewConns(),
-		stopBroadcast:             stopBroadcast,
-		sessions:                  make(map[string]*meekSession),
-		checksumTable:             checksumTable,
-		bufferPool:                bufferPool,
-		rateLimitHistory:          make(map[string][]time.Time),
-		rateLimitSignalGC:         make(chan struct{}, 1),
+		support:                         support,
+		listener:                        listener,
+		listenerTunnelProtocol:          listenerTunnelProtocol,
+		listenerPort:                    listenerPort,
+		passthroughAddress:              passthroughAddress,
+		turnAroundTimeout:               turnAroundTimeout,
+		extendedTurnAroundTimeout:       extendedTurnAroundTimeout,
+		skipExtendedTurnAroundThreshold: skipExtendedTurnAroundThreshold,
+		maxSessionStaleness:             maxSessionStaleness,
+		httpClientIOTimeout:             httpClientIOTimeout,
+		obfuscatorSeedHistory:           obfuscator.NewSeedHistory(nil),
+		clientHandler:                   clientHandler,
+		openConns:                       common.NewConns(),
+		stopBroadcast:                   stopBroadcast,
+		sessions:                        make(map[string]*meekSession),
+		checksumTable:                   checksumTable,
+		bufferPool:                      bufferPool,
+		rateLimitHistory:                make(map[string][]time.Time),
+		rateLimitSignalGC:               make(chan struct{}, 1),
 	}
 
 	if useTLS {
@@ -430,7 +438,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// interruption, without knowing whether the server has received or
 	// relayed the data.
 
-	err = session.clientConn.pumpReads(request.Body)
+	requestSize, err := session.clientConn.pumpReads(request.Body)
 	if err != nil {
 		if err != io.EOF {
 			// Debug since errors such as "i/o timeout" occur during normal operation;
@@ -444,6 +452,12 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		return
 	}
 
+	// The extended turn around mechanism optimizes for downstream flows by
+	// sending more data in the response as long as it's available. As a
+	// heuristic, when the request size meets a threshold, optimize instead
+	// of upstream flows by skipping the extended turn around.
+	skipExtendedTurnAround := requestSize >= int64(server.skipExtendedTurnAroundThreshold)
+
 	// Set cookie before writing the response.
 
 	if session.meekProtocolVersion >= MEEK_PROTOCOL_VERSION_2 && !session.sessionIDSent {
@@ -536,7 +550,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		// pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
 		// write its downstream traffic through to the response body.
 
-		responseSize, responseError = session.clientConn.pumpWrites(multiWriter)
+		responseSize, responseError = session.clientConn.pumpWrites(multiWriter, skipExtendedTurnAround)
 		greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
 		greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
 	}
@@ -1476,8 +1490,9 @@ func (conn *meekConn) GetReplay() (*prng.Seed, bool) {
 // without a Read() immediately consuming the bytes, but there's still
 // a possibility of a stall if no Read() calls are made after this
 // read buffer is full.
+// Returns the number of request bytes read.
 // Note: assumes only one concurrent call to pumpReads
-func (conn *meekConn) pumpReads(reader io.Reader) error {
+func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
 
 	// Use either an empty or partial buffer. By using a partial
 	// buffer, pumpReads will not block if the Read() caller has
@@ -1488,7 +1503,7 @@ func (conn *meekConn) pumpReads(reader io.Reader) error {
 	case readBuffer = <-conn.emptyReadBuffer:
 	case readBuffer = <-conn.partialReadBuffer:
 	case <-conn.closeBroadcast:
-		return io.EOF
+		return 0, io.EOF
 	}
 
 	newDataOffset := readBuffer.Len()
@@ -1512,7 +1527,7 @@ func (conn *meekConn) pumpReads(reader io.Reader) error {
 	if err != nil {
 		readBuffer.Truncate(newDataOffset)
 		conn.replaceReadBuffer(readBuffer)
-		return errors.Trace(err)
+		return 0, errors.Trace(err)
 	}
 
 	// Check if request payload checksum matches immediately
@@ -1536,7 +1551,7 @@ func (conn *meekConn) pumpReads(reader io.Reader) error {
 
 	conn.replaceReadBuffer(readBuffer)
 
-	return nil
+	return n, nil
 }
 
 var errMeekConnectionHasClosed = std_errors.New("meek connection has closed")
@@ -1579,8 +1594,10 @@ func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
 // to the specified writer. This function blocks until the meek response
 // body limits (size for protocol v1, turn around time for protocol v2+)
 // are met, or the meekConn is closed.
+//
 // Note: channel scheme assumes only one concurrent call to pumpWrites
-func (conn *meekConn) pumpWrites(writer io.Writer) (int, error) {
+func (conn *meekConn) pumpWrites(
+	writer io.Writer, skipExtendedTurnAround bool) (int, error) {
 
 	startTime := time.Now()
 	timeout := time.NewTimer(conn.meekServer.turnAroundTimeout)
@@ -1606,13 +1623,22 @@ func (conn *meekConn) pumpWrites(writer io.Writer) (int, error) {
 				// MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
 				return n, nil
 			}
+
+			if skipExtendedTurnAround {
+				// When fast turn around is indicated, skip the extended turn
+				// around timeout. This optimizes for upstream flows.
+				return n, nil
+			}
+
 			totalElapsedTime := time.Since(startTime) / time.Millisecond
 			if totalElapsedTime >= conn.meekServer.extendedTurnAroundTimeout {
 				return n, nil
 			}
 			timeout.Reset(conn.meekServer.turnAroundTimeout)
+
 		case <-timeout.C:
 			return n, nil
+
 		case <-conn.closeBroadcast:
 			return n, errors.Trace(errMeekConnectionHasClosed)
 		}