Просмотр исходного кода

Add meek connection interruption resiliency

- Client round trip retries can now succeed when
  interrupted in the middle of sending the request,
  awaiting the response, or reading the response.
- Server can recognize resent request payloads
  and discard; server caches response payload and
  will resend when client retries.
- Update meekConn to use Context.
Rod Hynes 9 лет назад
Родитель
Сommit
5b367f531b
4 измененных файлов с 843 добавлено и 150 удалено
  1. 187 115
      psiphon/meekConn.go
  2. 202 35
      psiphon/server/meek.go
  3. 296 0
      psiphon/server/meekBuffer.go
  4. 158 0
      psiphon/server/meek_test.go

+ 187 - 115
psiphon/meekConn.go

@@ -21,6 +21,7 @@ package psiphon
 
 import (
 	"bytes"
+	"context"
 	"crypto/rand"
 	"encoding/base64"
 	"encoding/json"
@@ -59,7 +60,7 @@ const (
 	MAX_POLL_INTERVAL_JITTER       = 0.1
 	POLL_INTERVAL_MULTIPLIER       = 1.5
 	POLL_INTERVAL_JITTER           = 0.1
-	MEEK_ROUND_TRIP_RETRY_DEADLINE = 1 * time.Second
+	MEEK_ROUND_TRIP_RETRY_DEADLINE = 5 * time.Second
 	MEEK_ROUND_TRIP_RETRY_DELAY    = 50 * time.Millisecond
 	MEEK_ROUND_TRIP_TIMEOUT        = 20 * time.Second
 )
@@ -125,7 +126,8 @@ type MeekConn struct {
 	transport            transporter
 	mutex                sync.Mutex
 	isClosed             bool
-	broadcastClosed      chan struct{}
+	runContext           context.Context
+	stopRunning          context.CancelFunc
 	relayWaitGroup       *sync.WaitGroup
 	emptyReceiveBuffer   chan *bytes.Buffer
 	partialReceiveBuffer chan *bytes.Buffer
@@ -298,6 +300,8 @@ func DialMeek(
 		return nil, common.ContextError(err)
 	}
 
+	runContext, stopRunning := context.WithCancel(context.Background())
+
 	// The main loop of a MeekConn is run in the relay() goroutine.
 	// A MeekConn implements net.Conn concurrency semantics:
 	// "Multiple goroutines may invoke methods on a Conn simultaneously."
@@ -321,7 +325,8 @@ func DialMeek(
 		pendingConns:         pendingConns,
 		transport:            transport,
 		isClosed:             false,
-		broadcastClosed:      make(chan struct{}),
+		runContext:           runContext,
+		stopRunning:          stopRunning,
 		relayWaitGroup:       new(sync.WaitGroup),
 		emptyReceiveBuffer:   make(chan *bytes.Buffer, 1),
 		partialReceiveBuffer: make(chan *bytes.Buffer, 1),
@@ -356,7 +361,7 @@ func (meek *MeekConn) Close() (err error) {
 	meek.mutex.Unlock()
 
 	if !isClosed {
-		close(meek.broadcastClosed)
+		meek.stopRunning()
 		meek.pendingConns.CloseAll()
 		meek.relayWaitGroup.Wait()
 		meek.transport.CloseIdleConnections()
@@ -386,7 +391,7 @@ func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
 	select {
 	case receiveBuffer = <-meek.partialReceiveBuffer:
 	case receiveBuffer = <-meek.fullReceiveBuffer:
-	case <-meek.broadcastClosed:
+	case <-meek.runContext.Done():
 		return 0, common.ContextError(errors.New("meek connection has closed"))
 	}
 	n, err = receiveBuffer.Read(buffer)
@@ -408,7 +413,7 @@ func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
 		select {
 		case sendBuffer = <-meek.emptySendBuffer:
 		case sendBuffer = <-meek.partialSendBuffer:
-		case <-meek.broadcastClosed:
+		case <-meek.runContext.Done():
 			return 0, common.ContextError(errors.New("meek connection has closed"))
 		}
 		writeLen := MAX_SEND_PAYLOAD_LENGTH - sendBuffer.Len()
@@ -490,6 +495,7 @@ func (meek *MeekConn) relay() {
 
 	for {
 		timeout.Reset(interval)
+
 		// Block until there is payload to send or it is time to poll
 		var sendBuffer *bytes.Buffer
 		select {
@@ -497,10 +503,17 @@ func (meek *MeekConn) relay() {
 		case sendBuffer = <-meek.fullSendBuffer:
 		case <-timeout.C:
 			// In the polling case, send an empty payload
-		case <-meek.broadcastClosed:
-			// TODO: timeout case may be selected when broadcastClosed is set?
+		case <-meek.runContext.Done():
+			// Drop through to second Done() check
+		}
+
+		// Check Done() again, to ensure it takes precedence
+		select {
+		case <-meek.runContext.Done():
 			return
+		default:
 		}
+
 		sendPayloadSize := 0
 		if sendBuffer != nil {
 			var err error
@@ -512,17 +525,15 @@ func (meek *MeekConn) relay() {
 				return
 			}
 		}
-		receivedPayload, err := meek.roundTrip(sendPayload[:sendPayloadSize])
-		if err != nil {
-			NoticeAlert("%s", common.ContextError(err))
-			go meek.Close()
-			return
-		}
-		if receivedPayload == nil {
-			// In this case, meek.roundTrip encountered broadcastClosed. Exit without error.
+
+		receivedPayloadSize, err := meek.roundTrip(sendPayload[:sendPayloadSize])
+
+		select {
+		case <-meek.runContext.Done():
+			// In this case, meek.roundTrip encountered Done(). Exit without logging error.
 			return
+		default:
 		}
-		receivedPayloadSize, err := meek.readPayload(receivedPayload)
 		if err != nil {
 			NoticeAlert("%s", common.ContextError(err))
 			go meek.Close()
@@ -566,137 +577,198 @@ func (meek *MeekConn) relay() {
 	}
 }
 
-// readPayload reads the HTTP response  in chunks, making the read buffer available
-// to MeekConn.Read() calls after each chunk; the intention is to allow bytes to
-// flow back to the reader as soon as possible instead of buffering the entire payload.
-func (meek *MeekConn) readPayload(receivedPayload io.ReadCloser) (totalSize int64, err error) {
-	defer receivedPayload.Close()
-	totalSize = 0
-	for {
-		reader := io.LimitReader(receivedPayload, READ_PAYLOAD_CHUNK_LENGTH)
-		// Block until there is capacity in the receive buffer
-		var receiveBuffer *bytes.Buffer
-		select {
-		case receiveBuffer = <-meek.emptyReceiveBuffer:
-		case receiveBuffer = <-meek.partialReceiveBuffer:
-		case <-meek.broadcastClosed:
-			return 0, nil
-		}
-		// Note: receiveBuffer size may exceed FULL_RECEIVE_BUFFER_LENGTH by up to the size
-		// of one received payload. The FULL_RECEIVE_BUFFER_LENGTH value is just a threshold.
-		n, err := receiveBuffer.ReadFrom(reader)
-		meek.replaceReceiveBuffer(receiveBuffer)
-		if err != nil {
-			return 0, common.ContextError(err)
-		}
-		totalSize += n
-		if n == 0 {
-			break
-		}
-	}
-	return totalSize, nil
-}
-
 // roundTrip configures and makes the actual HTTP POST request
-func (meek *MeekConn) roundTrip(sendPayload []byte) (io.ReadCloser, error) {
+func (meek *MeekConn) roundTrip(sendPayload []byte) (int64, error) {
 
-	// The retry mitigates intermittent failures between the client and front/server.
+	// Retries are made when the round trip fails. This adds resiliency
+	// to connection interruption and intermittent failures.
+	//
+	// At least one retry is always attempted, and retries continue
+	// while still within a brief deadline -- 5 seconds, currently the
+	// deadline for an actively probed SSH connection to timeout. There
+	// is a brief delay between retries, allowing for intermittent
+	// failure states to resolve.
+	//
+	// Failure may occur at various stages of the HTTP request:
+	//
+	// 1. Before the request begins. In this case, the entire request
+	//    may be rerun.
 	//
-	// Note: Retry will only be effective if entire request failed (underlying transport protocol
-	// such as SSH will fail if extra bytes are replayed in either direction due to partial relay
-	// success followed by retry).
-	// At least one retry is always attempted. We retry when still within a brief deadline and wait
-	// for a short time before re-dialing.
+	// 2. While sending the request payload. In this case, the client
+	//    must resend its request payload. The server will not have
+	//    relayed its partially received request payload.
 	//
-	// TODO: in principle, we could retry for min(TUNNEL_WRITE_TIMEOUT, meek-server.MAX_SESSION_STALENESS),
-	// i.e., as long as the underlying tunnel has not timed out and as long as the server has not
-	// expired the current meek session. Presently not doing this to avoid excessive connection attempts
-	// through the first hop. In addition, this will require additional support for timely shutdown.
+	// 3. After sending the request payload but before receiving
+	//    a response. The client cannot distinguish between case 2 and
+	//    this case, case 3. The client resends its payload and the
+	//    server detects this and skips relaying the request payload.
+	//
+	// 4. While reading the response payload. The client will omit its
+	//    request payload when retrying, as the server has already
+	//    acknowleged it. The client will also indicate to the server
+	//    the amount of response payload already received, and the
+	//    server will skip resending the indicated amount of response
+	//    payload.
+	//
+	// Retries are indicated to the server by adding a Range header,
+	// which includes the response payload resend position.
+
 	retries := uint(0)
 	retryDeadline := monotime.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
+	serverAcknowlegedRequestPayload := false
+	receivedPayloadSize := int64(0)
 
-	var err error
-	var response *http.Response
-	for {
+	for try := 0; ; try++ {
+
+		// Omit the request payload when retrying after receiving a
+		// partial server response.
+
+		var sendPayloadReader io.Reader
+		if !serverAcknowlegedRequestPayload {
+			sendPayloadReader = bytes.NewReader(sendPayload)
+		}
 
 		var request *http.Request
-		request, err = http.NewRequest("POST", meek.url.String(), bytes.NewReader(sendPayload))
+		request, err := http.NewRequest("POST", meek.url.String(), sendPayloadReader)
 		if err != nil {
 			// Don't retry when can't initialize a Request
-			break
+			return 0, common.ContextError(err)
 		}
 
-		request.Header.Set("Content-Type", "application/octet-stream")
+		// Note: meek.stopRunning() will abort a round trip in flight
+		request = request.WithContext(meek.runContext)
 
-		// Set additional headers to the HTTP request using the same method we use for adding
-		// custom headers to HTTP proxy requests
-		for name, value := range meek.additionalHeaders {
-			// hack around special case of "Host" header
-			// https://golang.org/src/net/http/request.go#L474
-			// using URL.Opaque, see URL.RequestURI() https://golang.org/src/net/url/url.go#L915
-			if name == "Host" {
-				if len(value) > 0 {
-					if request.URL.Opaque == "" {
-						request.URL.Opaque = request.URL.Scheme + "://" + request.Host + request.URL.RequestURI()
-					}
-					request.Host = value[0]
-				}
-			} else {
-				request.Header[name] = value
-			}
-		}
+		meek.addAdditionalHeaders(request)
 
+		request.Header.Set("Content-Type", "application/octet-stream")
 		request.AddCookie(meek.cookie)
 
-		// The http.Transport.RoundTrip is run in a goroutine to enable cancelling a request in-flight.
-		type roundTripResponse struct {
-			response *http.Response
-			err      error
+		expectedStatusCode := http.StatusOK
+
+		// When retrying, add a Range header to indicate how much
+		// of the response was already received.
+
+		if try > 0 {
+			expectedStatusCode = http.StatusPartialContent
+			request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize))
 		}
-		roundTripResponseChannel := make(chan *roundTripResponse, 1)
-		roundTripWaitGroup := new(sync.WaitGroup)
-		roundTripWaitGroup.Add(1)
-		go func() {
-			defer roundTripWaitGroup.Done()
-			r, err := meek.transport.RoundTrip(request)
-			roundTripResponseChannel <- &roundTripResponse{r, err}
-		}()
-		select {
-		case roundTripResponse := <-roundTripResponseChannel:
-			response = roundTripResponse.response
-			err = roundTripResponse.err
-		case <-meek.broadcastClosed:
-			meek.transport.CancelRequest(request)
-			return nil, nil
+
+		response, err := meek.transport.RoundTrip(request)
+		if err != nil {
+			NoticeAlert("meek round trip failed: %s", err)
+			// ...continue to retry
 		}
-		roundTripWaitGroup.Wait()
 
 		if err == nil {
-			break
+
+			if response.StatusCode != expectedStatusCode {
+				// Don't retry when the status code is incorrect
+				response.Body.Close()
+				return 0, common.ContextError(
+					fmt.Errorf(
+						"unexpected status code: %d instead of %d ",
+						response.StatusCode, expectedStatusCode))
+			}
+
+			// Update meek session cookie
+			for _, c := range response.Cookies() {
+				if meek.cookie.Name == c.Name {
+					meek.cookie.Value = c.Value
+					break
+				}
+			}
+
+			// Received the response status code, so the server
+			// must have received the request payload.
+			serverAcknowlegedRequestPayload = true
+
+			readPayloadSize, err := meek.readPayload(response.Body)
+			if err != nil {
+				NoticeAlert("meek read payload failed: %s", err)
+				// ...continue to retry
+			}
+			response.Body.Close()
+
+			// receivedPayloadSize is the number of response
+			// payload bytes received and relayed. A retry can
+			// resume after this position.
+
+			receivedPayloadSize += readPayloadSize
+
+			if err == nil {
+				// Round trip completed successfully
+				break
+			}
 		}
 
+		// Either the request failed entirely, or there was a failure
+		// streaming the response payload. Retry, if time remains.
+
 		if retries >= 1 && monotime.Now().After(retryDeadline) {
-			break
+			return 0, common.ContextError(err)
 		}
 		retries += 1
 
 		time.Sleep(MEEK_ROUND_TRIP_RETRY_DELAY)
 	}
-	if err != nil {
-		return nil, common.ContextError(err)
-	}
-	if response.StatusCode != http.StatusOK {
-		return nil, common.ContextError(fmt.Errorf("http request failed %d", response.StatusCode))
+
+	return receivedPayloadSize, nil
+}
+
+// Add additional headers to the HTTP request using the same method we use for adding
+// custom headers to HTTP proxy requests.
+func (meek *MeekConn) addAdditionalHeaders(request *http.Request) {
+	for name, value := range meek.additionalHeaders {
+		// hack around special case of "Host" header
+		// https://golang.org/src/net/http/request.go#L474
+		// using URL.Opaque, see URL.RequestURI() https://golang.org/src/net/url/url.go#L915
+		if name == "Host" {
+			if len(value) > 0 {
+				if request.URL.Opaque == "" {
+					request.URL.Opaque = request.URL.Scheme + "://" + request.Host + request.URL.RequestURI()
+				}
+				request.Host = value[0]
+			}
+		} else {
+			request.Header[name] = value
+		}
 	}
-	// observe response cookies for meek session key token.
-	// Once found it must be used for all consecutive requests made to the server
-	for _, c := range response.Cookies() {
-		if meek.cookie.Name == c.Name {
-			meek.cookie.Value = c.Value
+}
+
+// readPayload reads the HTTP response in chunks, making the read buffer available
+// to MeekConn.Read() calls after each chunk; the intention is to allow bytes to
+// flow back to the reader as soon as possible instead of buffering the entire payload.
+//
+// When readPayload returns an error, the totalSize output is remains valid -- it's the
+// number of payload bytes successfully read and relayed.
+func (meek *MeekConn) readPayload(
+	receivedPayload io.ReadCloser) (totalSize int64, err error) {
+
+	defer receivedPayload.Close()
+	totalSize = 0
+	for {
+		reader := io.LimitReader(receivedPayload, READ_PAYLOAD_CHUNK_LENGTH)
+		// Block until there is capacity in the receive buffer
+		var receiveBuffer *bytes.Buffer
+		select {
+		case receiveBuffer = <-meek.emptyReceiveBuffer:
+		case receiveBuffer = <-meek.partialReceiveBuffer:
+		case <-meek.runContext.Done():
+			return 0, nil
+		}
+		// Note: receiveBuffer size may exceed FULL_RECEIVE_BUFFER_LENGTH by up to the size
+		// of one received payload. The FULL_RECEIVE_BUFFER_LENGTH value is just a guideline.
+		n, err := receiveBuffer.ReadFrom(reader)
+		meek.replaceReceiveBuffer(receiveBuffer)
+		totalSize += n
+		if err != nil {
+			return totalSize, common.ContextError(err)
+		}
+		if n == 0 {
 			break
 		}
 	}
-	return response.Body, nil
+	return totalSize, nil
 }
 
 type meekCookieData struct {

+ 202 - 35
psiphon/server/meek.go

@@ -27,9 +27,11 @@ import (
 	"encoding/hex"
 	"encoding/json"
 	"errors"
+	"hash/crc64"
 	"io"
 	"net"
 	"net/http"
+	"strconv"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -61,13 +63,16 @@ const (
 	// session ID on all subsequent requests for the remainder of the session.
 	MEEK_PROTOCOL_VERSION_2 = 2
 
-	MEEK_MAX_PAYLOAD_LENGTH           = 0x10000
-	MEEK_TURN_AROUND_TIMEOUT          = 20 * time.Millisecond
-	MEEK_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
-	MEEK_MAX_SESSION_STALENESS        = 45 * time.Second
-	MEEK_HTTP_CLIENT_IO_TIMEOUT       = 45 * time.Second
-	MEEK_MIN_SESSION_ID_LENGTH        = 8
-	MEEK_MAX_SESSION_ID_LENGTH        = 20
+	MEEK_MAX_REQUEST_PAYLOAD_LENGTH     = 65536
+	MEEK_TURN_AROUND_TIMEOUT            = 20 * time.Millisecond
+	MEEK_EXTENDED_TURN_AROUND_TIMEOUT   = 100 * time.Millisecond
+	MEEK_MAX_SESSION_STALENESS          = 45 * time.Second
+	MEEK_HTTP_CLIENT_IO_TIMEOUT         = 45 * time.Second
+	MEEK_MIN_SESSION_ID_LENGTH          = 8
+	MEEK_MAX_SESSION_ID_LENGTH          = 20
+	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
+	MEEK_DEFAULT_POOL_BUFFER_LENGTH     = 65536
+	MEEK_DEFAULT_POOL_BUFFER_COUNT      = 2048
 )
 
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
@@ -90,6 +95,8 @@ type MeekServer struct {
 	stopBroadcast <-chan struct{}
 	sessionsLock  sync.RWMutex
 	sessions      map[string]*meekSession
+	checksumTable *crc64.Table
+	bufferPool    *CachedResponseBufferPool
 }
 
 // NewMeekServer initializes a new meek server.
@@ -100,6 +107,13 @@ func NewMeekServer(
 	clientHandler func(clientConn net.Conn),
 	stopBroadcast <-chan struct{}) (*MeekServer, error) {
 
+	checksumTable := crc64.MakeTable(crc64.ECMA)
+
+	// TODO: configurable buffer parameters
+	bufferPool := NewCachedResponseBufferPool(
+		MEEK_DEFAULT_POOL_BUFFER_LENGTH,
+		MEEK_DEFAULT_POOL_BUFFER_COUNT)
+
 	meekServer := &MeekServer{
 		support:       support,
 		listener:      listener,
@@ -107,6 +121,8 @@ func NewMeekServer(
 		openConns:     new(common.Conns),
 		stopBroadcast: stopBroadcast,
 		sessions:      make(map[string]*meekSession),
+		checksumTable: checksumTable,
+		bufferPool:    bufferPool,
 	}
 
 	if useTLS {
@@ -240,15 +256,23 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// read the request body as upstream traffic.
 	// TODO: run pumpReads and pumpWrites concurrently?
 
+	// pumpReads checksums the request payload and skips relaying it when
+	// it matches the immediately previous request payload. This allows
+	// clients to resend request payloads, when retrying due to connection
+	// interruption, without knowing whether the server has received or
+	// relayed the data.
+
 	err = session.clientConn.pumpReads(request.Body)
 	if err != nil {
 		if err != io.EOF {
 			// Debug since errors such as "i/o timeout" occur during normal operation;
 			// also, golang network error messages may contain client IP.
-			log.WithContextFields(LogFields{"error": err}).Debug("pump reads failed")
+			log.WithContextFields(LogFields{"error": err}).Debug("read request failed")
 		}
 		server.terminateConnection(responseWriter, request)
-		server.closeSession(sessionID)
+
+		// Note: keep session open to allow client to retry
+
 		return
 	}
 
@@ -262,22 +286,114 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		session.sessionIDSent = true
 	}
 
-	// pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
-	// write its downstream traffic through to the response body.
+	// When streaming data into the response body, a copy is
+	// retained in the cachedResponse buffer. This allows the
+	// client to retry and request that the response be resent
+	// when the HTTP connection is interrupted.
+	//
+	// If a Range header is present, the client is retrying,
+	// possibly after having received a partial response. In
+	// this case, use any cached response to attempt to resend
+	// the response, starting from the resend position the client
+	// indicates.
+	//
+	// When the resend position is not available -- because the
+	// cachedResponse buffer could not hold it -- the client session
+	// is closed, as there's no way to resume streaming the payload
+	// uninterrupted.
+	//
+	// The client may retry before a cached response is prepared,
+	// so a cached response is not always used when a Range header
+	// is present.
+	//
+	// TODO: invalid Range header is ignored; should it be otherwise?
+
+	position, isRetry := checkRangeHeader(request)
+
+	hasCachedResponse := session.cachedResponse.HasPosition(0)
+
+	// The client is not expected to send position > 0 when there is
+	// no cached response; let that case fall through to the next
+	// HasPosition check which will fail and close the session.
+
+	if isRetry && (hasCachedResponse || position > 0) {
+
+		if !session.cachedResponse.HasPosition(position) {
+			server.terminateConnection(responseWriter, request)
+			server.closeSession(sessionID)
+			return
+		}
+
+		responseWriter.WriteHeader(http.StatusPartialContent)
+
+		// TODO:
+		// - enforce a max extended buffer count per client, for
+		//   fairness? Throttling may make this unnecessary.
+		// - cachedResponse can now start releasing extended buffers,
+		//   as response bytes before "position" will never be requested
+		//   again?
+
+		err = session.cachedResponse.CopyFromPosition(position, responseWriter)
+
+		// The client may again fail to receive the payload and may again
+		// retry, so not yet releasing cachedReponse buffers.
+
+	} else {
+
+		// _Now_ we release buffers holding data from the previous
+		// response. And then immediately stream the new response into
+		// newly acquired buffers.
+		session.cachedResponse.Reset()
+
+		multiWriter := io.MultiWriter(session.cachedResponse, responseWriter)
 
-	err = session.clientConn.pumpWrites(responseWriter)
+		// pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
+		// write its downstream traffic through to the response body.
+
+		err = session.clientConn.pumpWrites(multiWriter)
+	}
+
+	// err is the result of writing the body either from CopyFromPosition or pumpWrites
 	if err != nil {
 		if err != io.EOF {
 			// Debug since errors such as "i/o timeout" occur during normal operation;
 			// also, golang network error messages may contain client IP.
-			log.WithContextFields(LogFields{"error": err}).Debug("pump writes failed")
+			log.WithContextFields(LogFields{"error": err}).Debug("write response failed")
 		}
 		server.terminateConnection(responseWriter, request)
-		server.closeSession(sessionID)
+
+		// Note: keep session open to allow client to retry
+
 		return
 	}
 }
 
+func checkRangeHeader(request *http.Request) (int, bool) {
+	rangeHeader := request.Header.Get("Range")
+	if rangeHeader == "" {
+		return 0, false
+	}
+
+	prefix := "bytes="
+	suffix := "-"
+
+	if !strings.HasPrefix(rangeHeader, prefix) ||
+		!strings.HasSuffix(rangeHeader, suffix) {
+
+		return 0, false
+	}
+
+	rangeHeader = strings.TrimPrefix(rangeHeader, prefix)
+	rangeHeader = strings.TrimSuffix(rangeHeader, suffix)
+	position, err := strconv.Atoi(rangeHeader)
+
+	if err != nil {
+		return 0, false
+	}
+
+	return position, true
+}
+
 // getSession returns the meek client session corresponding the
 // meek cookie/session ID. If no session is found, the cookie is
 // treated as a meek cookie for a new session and its payload is
@@ -354,16 +470,22 @@ func (server *MeekServer) getSession(
 	// Assumes clientIP is a valid IP address; the port value is a stub
 	// and is expected to be ignored.
 	clientConn := newMeekConn(
+		server,
 		&net.TCPAddr{
 			IP:   net.ParseIP(clientIP),
 			Port: 0,
 		},
 		clientSessionData.MeekProtocolVersion)
 
+	// TODO: configurable buffer parameters
+	cachedResponse := NewCachedResponse(
+		MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH, server.bufferPool)
+
 	session = &meekSession{
 		clientConn:          clientConn,
 		meekProtocolVersion: clientSessionData.MeekProtocolVersion,
 		sessionIDSent:       false,
+		cachedResponse:      cachedResponse,
 	}
 	session.touch()
 
@@ -398,6 +520,10 @@ func (server *MeekServer) closeSessionHelper(
 
 	// TODO: close the persistent HTTP client connection, if one exists
 	session.clientConn.Close()
+
+	// Release all extended buffers back to the pool
+	session.cachedResponse.Reset()
+
 	// Note: assumes caller holds lock on sessionsLock
 	delete(server.sessions, sessionID)
 }
@@ -459,6 +585,7 @@ type meekSession struct {
 	clientConn          *meekConn
 	meekProtocolVersion int
 	sessionIDSent       bool
+	cachedResponse      *CachedResponse
 }
 
 func (session *meekSession) touch() {
@@ -629,10 +756,12 @@ func makeMeekSessionID() (string, error) {
 // meekConn bridges net/http request/response payload readers and writers
 // and goroutines calling Read()s and Write()s.
 type meekConn struct {
+	meekServer        *MeekServer
 	remoteAddr        net.Addr
 	protocolVersion   int
 	closeBroadcast    chan struct{}
 	closed            int32
+	lastReadChecksum  *uint64
 	readLock          sync.Mutex
 	emptyReadBuffer   chan *bytes.Buffer
 	partialReadBuffer chan *bytes.Buffer
@@ -642,8 +771,9 @@ type meekConn struct {
 	writeResult       chan error
 }
 
-func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
+func newMeekConn(meekServer *MeekServer, remoteAddr net.Addr, protocolVersion int) *meekConn {
 	conn := &meekConn{
+		meekServer:        meekServer,
 		remoteAddr:        remoteAddr,
 		protocolVersion:   protocolVersion,
 		closeBroadcast:    make(chan struct{}),
@@ -664,31 +794,68 @@ func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
 // pumpReads causes goroutines blocking on meekConn.Read() to read
 // from the specified reader. This function blocks until the reader
 // is fully consumed or the meekConn is closed. A read buffer allows
-// up to MEEK_MAX_PAYLOAD_LENGTH bytes to be read and buffered without
-// a Read() immediately consuming the bytes, but there's still a
-// possibility of a stall if no Read() calls are made after this
+// up to MEEK_MAX_REQUEST_PAYLOAD_LENGTH bytes to be read and buffered
+// without a Read() immediately consuming the bytes, but there's still
+// a possibility of a stall if no Read() calls are made after this
 // read buffer is full.
 // Note: assumes only one concurrent call to pumpReads
 func (conn *meekConn) pumpReads(reader io.Reader) error {
-	for {
 
-		var readBuffer *bytes.Buffer
-		select {
-		case readBuffer = <-conn.emptyReadBuffer:
-		case readBuffer = <-conn.partialReadBuffer:
-		case <-conn.closeBroadcast:
-			return io.EOF
-		}
+	// Wait for a full capacity empty buffer. This ensures we can read
+	// the maximum MEEK_MAX_REQUEST_PAYLOAD_LENGTH request payload and
+	// checksum before relaying.
+	//
+	// Note: previously, this code would select conn.partialReadBuffer
+	// and write to that, looping until the entire request payload was
+	// read. Now, the consumer, the Read() caller, must fully drain the
+	// read buffer first.
+
+	var readBuffer *bytes.Buffer
+	select {
+	case readBuffer = <-conn.emptyReadBuffer:
+	case <-conn.closeBroadcast:
+		return io.EOF
+	}
 
-		limitReader := io.LimitReader(reader, int64(MEEK_MAX_PAYLOAD_LENGTH-readBuffer.Len()))
-		n, err := readBuffer.ReadFrom(limitReader)
+	// +1 allows for an explict check for request payloads that
+	// exceed the maximum permitted length.
+	limitReader := io.LimitReader(reader, MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1)
+	n, err := readBuffer.ReadFrom(limitReader)
+
+	if err == nil && n == MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1 {
+		err = errors.New("invalid request payload length")
+	}
 
+	// If the request read fails, don't relay any data. This allows
+	// the client to retry and resend its request payload without
+	// interrupting/duplicating the payload flow.
+	if err != nil {
+		readBuffer.Reset()
 		conn.replaceReadBuffer(readBuffer)
+		return common.ContextError(err)
+	}
 
-		if n == 0 || err != nil {
-			return err
-		}
+	// Check if request payload checksum matches immediately
+	// previous payload. On match, assume this is a client retry
+	// sending payload that was already relayed and skip this
+	// payload. Payload is OSSH ciphertext and almost surely
+	// will not repeat. In the highly unlikely case that it does,
+	// the underlying SSH connection will fail and the client
+	// must reconnect.
+
+	checksum := crc64.Checksum(readBuffer.Bytes(), conn.meekServer.checksumTable)
+
+	if conn.lastReadChecksum == nil {
+		conn.lastReadChecksum = new(uint64)
+	} else if *conn.lastReadChecksum == checksum {
+		readBuffer.Reset()
 	}
+
+	*conn.lastReadChecksum = checksum
+
+	conn.replaceReadBuffer(readBuffer)
+
+	return nil
 }
 
 // Read reads from the meekConn into buffer. Read blocks until
@@ -716,7 +883,7 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
 
 func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
 	switch readBuffer.Len() {
-	case MEEK_MAX_PAYLOAD_LENGTH:
+	case MEEK_MAX_REQUEST_PAYLOAD_LENGTH:
 		conn.fullReadBuffer <- readBuffer
 	case 0:
 		conn.emptyReadBuffer <- readBuffer
@@ -752,7 +919,7 @@ func (conn *meekConn) pumpWrites(writer io.Writer) error {
 
 			if conn.protocolVersion < MEEK_PROTOCOL_VERSION_1 {
 				// Pre-protocol version 1 clients expect at most
-				// MEEK_MAX_PAYLOAD_LENGTH response bodies
+				// MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
 				return nil
 			}
 			totalElapsedTime := monotime.Since(startTime) / time.Millisecond
@@ -783,12 +950,12 @@ func (conn *meekConn) Write(buffer []byte) (int, error) {
 
 	n := 0
 	for n < len(buffer) {
-		end := n + MEEK_MAX_PAYLOAD_LENGTH
+		end := n + MEEK_MAX_REQUEST_PAYLOAD_LENGTH
 		if end > len(buffer) {
 			end = len(buffer)
 		}
 
-		// Only write MEEK_MAX_PAYLOAD_LENGTH at a time,
+		// Only write MEEK_MAX_REQUEST_PAYLOAD_LENGTH at a time,
 		// to ensure compatibility with v1 protocol.
 		chunk := buffer[n:end]
 

+ 296 - 0
psiphon/server/meekBuffer.go

@@ -0,0 +1,296 @@
+/*
+ * Copyright (c) 2017, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package server
+
+import (
+	"errors"
+	"io"
+)
+
+// CachedResponse is a data structure that supports meek
+// protocol connection interruption resiliency: it stores
+// payload data from the most recent response so that it
+// may be resent if the client fails to receive it.
+//
+// The meek server maintains one CachedResponse for each
+// meek client. Psiphon's variant of meek streams response
+// data, so responses are not fixed size. To limit the memory
+// overhead of response caching, each CachedResponse has a
+// fixed-size buffer that operates as a ring buffer,
+// discarding older response bytes when the buffer fills.
+// A CachedResponse that has discarded data may still satisfy
+// a client retry where the client has already received part
+// of the response payload.
+//
+// A CachedResponse will also extend its capacity by
+// borrowing buffers from a CachedResponseBufferPool, if
+// available. When Reset is called, borrowed buffers are
+// released back to the pool.
+type CachedResponse struct {
+	buffers            [][]byte
+	readPosition       int
+	readAvailable      int
+	writeIndex         int
+	writeBufferIndex   int
+	overwriting        bool
+	extendedBufferPool *CachedResponseBufferPool
+}
+
+// NewCachedResponse creates a CachedResponse with a fixed buffer
+// of size bufferSize and borrowing buffers from extendedBufferPool.
+func NewCachedResponse(
+	bufferSize int,
+	extendedBufferPool *CachedResponseBufferPool) *CachedResponse {
+
+	return &CachedResponse{
+		buffers:            [][]byte{make([]byte, bufferSize)},
+		extendedBufferPool: extendedBufferPool,
+	}
+}
+
+// Reset reinitializes the CachedResponse state to have
+// no buffered response and releases all extended buffers
+// back to the pool.
+// Reset _must_ be called before discarding a CachedResponse
+// or extended buffers will not be released.
+func (response *CachedResponse) Reset() {
+	for i, buffer := range response.buffers {
+		if i > 0 {
+			response.extendedBufferPool.Put(buffer)
+		}
+	}
+	response.buffers = response.buffers[0:1]
+	response.readPosition = 0
+	response.readAvailable = 0
+	response.writeIndex = 0
+	response.writeBufferIndex = 0
+	response.overwriting = false
+}
+
+func min(a, b int) int {
+	if a < b {
+		return a
+	}
+	return b
+}
+
+// HasPosition checks if the CachedResponse has buffered
+// response data starting at or before the specified
+// position.
+func (response *CachedResponse) HasPosition(position int) bool {
+	return response.readAvailable > 0 && response.readPosition <= position
+}
+
+// CopyFromPosition writes the response data, starting at
+// the specified position, to writer. Any data before the
+// position is skipped. CopyFromPosition will return an error
+// if the specified position is not available.
+// CopyFromPosition can be called repeatedly to read the
+// same data -- it does not advance or modify the CachedResponse.
+func (response *CachedResponse) CopyFromPosition(
+	position int, writer io.Writer) error {
+
+	if response.readAvailable > 0 && response.readPosition > position {
+		return errors.New("position unavailable")
+	}
+
+	// Begin at the start of the response data, which may
+	// be midway through the buffer(s).
+
+	index := 0
+	bufferIndex := 0
+	if response.overwriting {
+		index = response.writeIndex
+		bufferIndex = response.writeBufferIndex
+		if index >= len(response.buffers[bufferIndex]) {
+			index = 0
+			bufferIndex = (bufferIndex + 1) % len(response.buffers)
+		}
+	}
+
+	// Iterate over all available data, skipping until at the
+	// requested position.
+
+	skip := position - response.readPosition
+	available := response.readAvailable
+
+	for available > 0 {
+
+		buffer := response.buffers[bufferIndex]
+
+		toCopy := min(len(buffer)-index, available)
+
+		if skip > 0 {
+			if toCopy > skip {
+				index += skip
+				toCopy -= skip
+				skip = 0
+			} else {
+				skip -= toCopy
+			}
+		}
+
+		if skip == 0 {
+			_, err := writer.Write(buffer[index : index+toCopy])
+			if err != nil {
+				return err
+			}
+		}
+
+		available -= toCopy
+
+		index = 0
+		bufferIndex = (bufferIndex + 1) % len(response.buffers)
+	}
+
+	return nil
+}
+
+// Write appends data to the CachedResponse. All writes will
+// succeed, but only the most recent bytes will be retained
+// once the fixed buffer is full and no extended buffers are
+// available.
+//
+// Write may be called multiple times to record a single
+// response; Reset should be called between responses.
+//
+// Write conforms to the io.Writer interface.
+func (response *CachedResponse) Write(data []byte) (int, error) {
+
+	dataIndex := 0
+
+	for dataIndex < len(data) {
+
+		// Write into available space in the current buffer
+
+		buffer := response.buffers[response.writeBufferIndex]
+		canWriteLen := len(buffer) - response.writeIndex
+		needWriteLen := len(data) - dataIndex
+		writeLen := min(canWriteLen, needWriteLen)
+
+		if writeLen > 0 {
+			copy(
+				buffer[response.writeIndex:response.writeIndex+writeLen],
+				data[dataIndex:dataIndex+writeLen])
+
+			response.writeIndex += writeLen
+
+			// readPosition tracks the earliest position in
+			// the response that remains in the cached response.
+			// Once the buffer is full (and cannot be extended),
+			// older data is overwritten and readPosition advances.
+			//
+			// readAvailable is the amount of data in the cached
+			// response, which may be less than the buffer capacity.
+
+			if response.overwriting {
+				response.readPosition += writeLen
+			} else {
+				response.readAvailable += writeLen
+			}
+
+			dataIndex += writeLen
+		}
+
+		if needWriteLen > canWriteLen {
+
+			// Add an extended buffer to increase capacity
+
+			// TODO: can extend whenever response.readIndex and response.readBufferIndex are 0?
+			if response.writeBufferIndex == len(response.buffers)-1 &&
+				!response.overwriting {
+
+				extendedBuffer := response.extendedBufferPool.Get()
+				if extendedBuffer != nil {
+					response.buffers = append(response.buffers, extendedBuffer)
+				}
+			}
+
+			// Move to the next buffer, which may wrap around
+
+			// This isn't a general ring buffer: Reset is called at
+			// start of each response, so the initial data is always
+			// at the beginning of the first buffer. It follows that
+			// data is overwritten once the buffer wraps around back
+			// to the beginning.
+
+			response.writeBufferIndex++
+			if response.writeBufferIndex >= len(response.buffers) {
+				response.writeBufferIndex = 0
+				response.overwriting = true
+			}
+			response.writeIndex = 0
+		}
+	}
+
+	return len(data), nil
+}
+
+// CachedResponseBufferPool is a fixed-size pool of
+// fixed-size buffers that are used to temporarily extend
+// the capacity of CachedResponses.
+type CachedResponseBufferPool struct {
+	bufferSize int
+	buffers    chan []byte
+}
+
+// NewCachedResponseBufferPool creates a new CachedResponseBufferPool
+// with the specified number of buffers. Buffers are allocated on
+// demand and once allocated remain allocated.
+func NewCachedResponseBufferPool(
+	bufferSize, bufferCount int) *CachedResponseBufferPool {
+
+	buffers := make(chan []byte, bufferCount)
+	for i := 0; i < bufferCount; i++ {
+		buffers <- make([]byte, 0)
+	}
+
+	return &CachedResponseBufferPool{
+		bufferSize: bufferSize,
+		buffers:    buffers,
+	}
+}
+
+// Get returns a buffer, if one is available, or returns nil
+// when no buffer is available. Get does not block. Call Put
+// to release the buffer back to the pool.
+//
+// Note: currently, Buffers are not zeroed between use by
+// different CachedResponses owned by different clients.
+// A bug resulting in cross-client data transfer exposes
+// only OSSH ciphertext in the case of meek's use of
+// CachedResponses.
+func (pool *CachedResponseBufferPool) Get() []byte {
+	select {
+	case buffer := <-pool.buffers:
+		if len(buffer) == 0 {
+			buffer = make([]byte, pool.bufferSize)
+		}
+		return buffer
+	default:
+		return nil
+	}
+}
+
+// Put releases a buffer back to the pool. The buffer must
+// have been obtained from Get.
+func (pool *CachedResponseBufferPool) Put(buffer []byte) {
+	pool.buffers <- buffer
+}

+ 158 - 0
psiphon/server/meek_test.go

@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2017, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package server
+
+import (
+	"bytes"
+	"fmt"
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestMeekResiliency(t *testing.T) {
+	// TODO: implement
+}
+
+func TestCachedResponse(t *testing.T) {
+
+	rand.Seed(time.Now().Unix())
+
+	KB := 1024
+	MB := KB * KB
+
+	testCases := []struct {
+		concurrentResponses int
+		responseSize        int
+		bufferSize          int
+		extendedBufferSize  int
+		extendedBufferCount int
+		minBytesPerWrite    int
+		maxBytesPerWrite    int
+		copyPosition        int
+		expectedSuccess     bool
+	}{
+		{1, 16, 16, 0, 0, 1, 1, 0, true},
+
+		{1, 31, 16, 0, 0, 1, 1, 15, true},
+
+		{1, 16, 2, 2, 7, 1, 1, 0, true},
+
+		{1, 31, 15, 3, 5, 1, 1, 1, true},
+
+		{1, 10 * MB, 64 * KB, 64 * KB, 158, 1, 32 * KB, 0, false},
+
+		{1, 10 * MB, 64 * KB, 64 * KB, 159, 1, 32 * KB, 0, true},
+
+		{1, 10 * MB, 64 * KB, 64 * KB, 160, 1, 32 * KB, 0, true},
+
+		{1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 64 * KB, true},
+
+		{1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 63 * KB, false},
+
+		{1, 200 * KB, 64 * KB, 0, 0, 1, 1 * KB, 136 * KB, true},
+
+		{10, 10 * MB, 64 * KB, 64 * KB, 1589, 1, 32 * KB, 0, false},
+
+		{10, 10 * MB, 64 * KB, 64 * KB, 1590, 1, 32 * KB, 0, true},
+	}
+
+	for _, testCase := range testCases {
+		description := fmt.Sprintf("test case: %+v", testCase)
+		t.Run(description, func(t *testing.T) {
+
+			pool := NewCachedResponseBufferPool(testCase.extendedBufferSize, testCase.extendedBufferCount)
+
+			responses := make([]*CachedResponse, testCase.concurrentResponses)
+			for i := 0; i < testCase.concurrentResponses; i++ {
+				responses[i] = NewCachedResponse(testCase.bufferSize, pool)
+			}
+
+			// Repeats exercise CachedResponse.Reset() and CachedResponseBufferPool replacement
+			for repeat := 0; repeat < 2; repeat++ {
+
+				t.Logf("repeat %d", repeat)
+
+				responseData := make([]byte, testCase.responseSize)
+				_, _ = rand.Read(responseData)
+
+				waitGroup := new(sync.WaitGroup)
+
+				// Goroutines exercise concurrent access to CachedResponseBufferPool
+				for _, response := range responses {
+					waitGroup.Add(1)
+					go func(response *CachedResponse) {
+						defer waitGroup.Done()
+
+						remainingSize := testCase.responseSize
+						for remainingSize > 0 {
+
+							writeSize := testCase.minBytesPerWrite
+							writeSize += rand.Intn(testCase.maxBytesPerWrite - testCase.minBytesPerWrite + 1)
+							if writeSize > remainingSize {
+								writeSize = remainingSize
+							}
+
+							offset := len(responseData) - remainingSize
+
+							response.Write(responseData[offset : offset+writeSize])
+
+							remainingSize -= writeSize
+						}
+					}(response)
+				}
+
+				waitGroup.Wait()
+
+				atLeastOneFailure := false
+
+				for i, response := range responses {
+
+					cachedResponseData := new(bytes.Buffer)
+
+					err := response.CopyFromPosition(testCase.copyPosition, cachedResponseData)
+
+					if testCase.expectedSuccess {
+
+						if err != nil {
+							t.Fatalf("CopyFromPosition unexpectedly failed for response %d: %s", i, err)
+						}
+
+						if bytes.Compare(responseData[testCase.copyPosition:], cachedResponseData.Bytes()) != 0 {
+
+							t.Fatalf("cached response data mismatch for response %d", i)
+						}
+					} else {
+						atLeastOneFailure = true
+					}
+				}
+
+				if !testCase.expectedSuccess && !atLeastOneFailure {
+					t.Fatalf("CopyFromPosition unexpectedly succeeded for all responses")
+				}
+
+				for _, response := range responses {
+					response.Reset()
+				}
+			}
+		})
+	}
+}