Explorar o código

Fix: mitigate meek resiliency bugs

Rod Hynes hai 2 semanas
pai
achega
cbfb901104
Modificáronse 4 ficheiros con 57 adicións e 15 borrados
  1. 9 2
      psiphon/meekConn.go
  2. 33 10
      psiphon/server/meek.go
  3. 6 1
      psiphon/server/meekBuffer.go
  4. 9 2
      psiphon/server/meek_test.go

+ 9 - 2
psiphon/meekConn.go

@@ -1778,11 +1778,18 @@ func (meek *MeekConn) relayRoundTrip(
 				totalPaddingSize += readPaddingSize
 
 				if err != nil {
-					NoticeWarning("meek read padding failed: %v", err)
 					response.Body.Close()
+
+					if more == false {
+						// Fail on unexpected errors including
+						// "unknown padding prefix".
+						return 0, false, errors.Trace(err)
+					}
+
+					NoticeWarning("meek read padding failed: %v", err)
+
 					// ...continue to retry
 					continue
-
 				}
 			}
 

+ 33 - 10
psiphon/server/meek.go

@@ -764,7 +764,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	//
 	// pumpReads also handles discarding meek request payload padding.
 
-	requestSize, err := session.clientConn.pumpReads(request.Body)
+	requestSize, allowCachedResponse, err := session.clientConn.pumpReads(request.Body)
 	if err != nil {
 		if err != io.EOF {
 			// Debug since errors such as "i/o timeout" occur during normal operation;
@@ -825,11 +825,22 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// The client is not expected to send position > 0 when there is
 	// no cached response; let that case fall through to the next
 	// HasPosition check which will fail and close the session.
+	//
+	// Limitation: when a client sends a request that never reaches the
+	// server, it will retry with a Range header and position 0. This case
+	// leads to a flaw in the resiliency design, since the server may have a
+	// stale cached response from a _previous_ request and this can lead to
+	// sending a response body twice, breaking the stream. To partially
+	// mitigate this, the pumpReads allowCachedResponse value is used as a
+	// heuristic to distinguish Range 0 cases that should not use cached
+	// responses.
 
 	var responseSize int
 	var responseError error
 
-	if isRetry && (hasCompleteCachedResponse || position > 0) {
+	if isRetry &&
+		(hasCompleteCachedResponse || position > 0) &&
+		allowCachedResponse {
 
 		if !session.cachedResponse.HasPosition(position) {
 			greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
@@ -2510,10 +2521,16 @@ func (conn *meekConn) StopFragmenting() {
 // read buffer is full.
 //
 // Returns the number of request bytes read, excluding any payload padding
-// bytes.
+// bytes, and whether an existing cached response _may_ be safely used for
+// this request.
+//
+// It's safe to use cached response if the request body is a duplicate,
+// ambiguous when the request body is empty with no padding, and not safe
+// when the request body is not a duplicate. pumpReads returns true in the
+// first two cases.
 //
 // Note: assumes only one concurrent call to pumpReads
-func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
+func (conn *meekConn) pumpReads(reader io.Reader) (int64, bool, error) {
 
 	// Use either an empty or partial buffer. By using a partial
 	// buffer, pumpReads will not block if the Read() caller has
@@ -2524,7 +2541,7 @@ func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
 	case readBuffer = <-conn.emptyReadBuffer:
 	case readBuffer = <-conn.partialReadBuffer:
 	case <-conn.closeBroadcast:
-		return 0, io.EOF
+		return 0, false, io.EOF
 	}
 
 	newDataOffset := readBuffer.Len()
@@ -2555,10 +2572,16 @@ func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
 	// padding mode, this handles the case when padding is omitted for an
 	// empty payload.
 
-	if err != nil || n == 0 {
+	if err != nil {
+		readBuffer.Truncate(newDataOffset)
+		conn.replaceReadBuffer(readBuffer)
+		return 0, false, errors.Trace(err)
+	}
+
+	if n == 0 {
 		readBuffer.Truncate(newDataOffset)
 		conn.replaceReadBuffer(readBuffer)
-		return 0, errors.Trace(err)
+		return 0, true, nil
 	}
 
 	// Check if request payload checksum matches immediately
@@ -2580,7 +2603,7 @@ func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
 	} else if *conn.lastReadChecksum == checksum {
 		readBuffer.Truncate(newDataOffset)
 		conn.replaceReadBuffer(readBuffer)
-		return 0, errors.Trace(err)
+		return 0, true, nil
 	}
 
 	*conn.lastReadChecksum = checksum
@@ -2625,7 +2648,7 @@ func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
 		if err != nil {
 			readBuffer.Truncate(newDataOffset)
 			conn.replaceReadBuffer(readBuffer)
-			return 0, errors.Trace(err)
+			return 0, false, errors.Trace(err)
 		}
 
 		// Return only the actual payload size read, which is important for
@@ -2646,7 +2669,7 @@ func (conn *meekConn) pumpReads(reader io.Reader) (int64, error) {
 
 	conn.replaceReadBuffer(readBuffer)
 
-	return n, nil
+	return n, false, nil
 }
 
 var errMeekConnectionHasClosed = std_errors.New("meek connection has closed")

+ 6 - 1
psiphon/server/meekBuffer.go

@@ -93,7 +93,12 @@ func (response *CachedResponse) Available() int {
 // response data starting at or before the specified
 // position.
 func (response *CachedResponse) HasPosition(position int) bool {
-	return response.readAvailable > 0 && response.readPosition <= position
+	if response.readAvailable == 0 {
+		return false
+	}
+	start := response.readPosition
+	end := response.readPosition + response.readAvailable
+	return position >= start && position <= end
 }
 
 // CopyFromPosition writes the response data, starting at

+ 9 - 2
psiphon/server/meek_test.go

@@ -222,10 +222,12 @@ func testMeekResiliency(
 	useHTTPNormalizer bool,
 	enablePayloadPadding bool) {
 
-	upstreamData := make([]byte, 5*MB)
+	totalSize := 5 * MB
+
+	upstreamData := make([]byte, totalSize)
 	_, _ = rand.Read(upstreamData)
 
-	downstreamData := make([]byte, 5*MB)
+	downstreamData := make([]byte, totalSize)
 	_, _ = rand.Read(downstreamData)
 
 	minWrite, maxWrite := 1, 128*KB
@@ -298,6 +300,11 @@ func testMeekResiliency(
 				protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK: 0,
 			},
 			runningProtocols: []string{protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK},
+
+			// Default MeekCachedResponsePoolBufferSize, 64K, may be
+			// insufficient for total downstream. Leave private pool at
+			// default size but add sufficiently large shared buffers.
+			MeekCachedResponsePoolBufferSize: totalSize,
 		},
 		TrafficRulesSet: &TrafficRulesSet{},
 	}