|
|
@@ -27,9 +27,11 @@ import (
|
|
|
"encoding/hex"
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
+ "hash/crc64"
|
|
|
"io"
|
|
|
"net"
|
|
|
"net/http"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
@@ -38,6 +40,7 @@ import (
|
|
|
"github.com/Psiphon-Inc/crypto/nacl/box"
|
|
|
"github.com/Psiphon-Inc/goarista/monotime"
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
|
|
|
+ "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tls"
|
|
|
)
|
|
|
|
|
|
@@ -54,20 +57,27 @@ const (
|
|
|
// report no version number and expect at most 64K response bodies.
|
|
|
MEEK_PROTOCOL_VERSION_1 = 1
|
|
|
|
|
|
- // Protocol version 2 clients initiate a session by sending a encrypted and obfuscated meek
|
|
|
+ // Protocol version 2 clients initiate a session by sending an encrypted and obfuscated meek
|
|
|
// cookie with their initial HTTP request. Connection information is contained within the
|
|
|
// encrypted cookie payload. The server inspects the cookie and establishes a new session and
|
|
|
// returns a new random session ID back to client via Set-Cookie header. The client uses this
|
|
|
// session ID on all subsequent requests for the remainder of the session.
|
|
|
MEEK_PROTOCOL_VERSION_2 = 2
|
|
|
|
|
|
- MEEK_MAX_PAYLOAD_LENGTH = 0x10000
|
|
|
- MEEK_TURN_AROUND_TIMEOUT = 20 * time.Millisecond
|
|
|
- MEEK_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
|
|
|
- MEEK_MAX_SESSION_STALENESS = 45 * time.Second
|
|
|
- MEEK_HTTP_CLIENT_IO_TIMEOUT = 45 * time.Second
|
|
|
- MEEK_MIN_SESSION_ID_LENGTH = 8
|
|
|
- MEEK_MAX_SESSION_ID_LENGTH = 20
|
|
|
+ // Protocol version 3 clients include resiliency enhancements and will add a Range header
|
|
|
+ // when retrying a request for a partially downloaded response payload.
|
|
|
+ MEEK_PROTOCOL_VERSION_3 = 3
|
|
|
+
|
|
|
+ MEEK_MAX_REQUEST_PAYLOAD_LENGTH = 65536
|
|
|
+ MEEK_TURN_AROUND_TIMEOUT = 20 * time.Millisecond
|
|
|
+ MEEK_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
|
|
|
+ MEEK_MAX_SESSION_STALENESS = 45 * time.Second
|
|
|
+ MEEK_HTTP_CLIENT_IO_TIMEOUT = 45 * time.Second
|
|
|
+ MEEK_MIN_SESSION_ID_LENGTH = 8
|
|
|
+ MEEK_MAX_SESSION_ID_LENGTH = 20
|
|
|
+ MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
|
|
|
+ MEEK_DEFAULT_POOL_BUFFER_LENGTH = 65536
|
|
|
+ MEEK_DEFAULT_POOL_BUFFER_COUNT = 2048
|
|
|
)
|
|
|
|
|
|
// MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
|
|
|
@@ -85,11 +95,13 @@ type MeekServer struct {
|
|
|
support *SupportServices
|
|
|
listener net.Listener
|
|
|
tlsConfig *tls.Config
|
|
|
- clientHandler func(clientConn net.Conn)
|
|
|
+ clientHandler func(clientTunnelProtocol string, clientConn net.Conn)
|
|
|
openConns *common.Conns
|
|
|
stopBroadcast <-chan struct{}
|
|
|
sessionsLock sync.RWMutex
|
|
|
sessions map[string]*meekSession
|
|
|
+ checksumTable *crc64.Table
|
|
|
+ bufferPool *CachedResponseBufferPool
|
|
|
}
|
|
|
|
|
|
// NewMeekServer initializes a new meek server.
|
|
|
@@ -97,9 +109,23 @@ func NewMeekServer(
|
|
|
support *SupportServices,
|
|
|
listener net.Listener,
|
|
|
useTLS, useObfuscatedSessionTickets bool,
|
|
|
- clientHandler func(clientConn net.Conn),
|
|
|
+ clientHandler func(clientTunnelProtocol string, clientConn net.Conn),
|
|
|
stopBroadcast <-chan struct{}) (*MeekServer, error) {
|
|
|
|
|
|
+ checksumTable := crc64.MakeTable(crc64.ECMA)
|
|
|
+
|
|
|
+ bufferLength := MEEK_DEFAULT_POOL_BUFFER_LENGTH
|
|
|
+ if support.Config.MeekCachedResponsePoolBufferSize != 0 {
|
|
|
+ bufferLength = support.Config.MeekCachedResponsePoolBufferSize
|
|
|
+ }
|
|
|
+
|
|
|
+ bufferCount := MEEK_DEFAULT_POOL_BUFFER_COUNT
|
|
|
+ if support.Config.MeekCachedResponsePoolBufferCount != 0 {
|
|
|
+ bufferCount = support.Config.MeekCachedResponsePoolBufferCount
|
|
|
+ }
|
|
|
+
|
|
|
+ bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
|
|
|
+
|
|
|
meekServer := &MeekServer{
|
|
|
support: support,
|
|
|
listener: listener,
|
|
|
@@ -107,6 +133,8 @@ func NewMeekServer(
|
|
|
openConns: new(common.Conns),
|
|
|
stopBroadcast: stopBroadcast,
|
|
|
sessions: make(map[string]*meekSession),
|
|
|
+ checksumTable: checksumTable,
|
|
|
+ bufferPool: bufferPool,
|
|
|
}
|
|
|
|
|
|
if useTLS {
|
|
|
@@ -172,7 +200,7 @@ func (server *MeekServer) Run() error {
|
|
|
// Note: Serve() will be interrupted by listener.Close() call
|
|
|
var err error
|
|
|
if server.tlsConfig != nil {
|
|
|
- httpsServer := HTTPSServer{Server: *httpServer}
|
|
|
+ httpsServer := HTTPSServer{Server: httpServer}
|
|
|
err = httpsServer.ServeTLS(server.listener, server.tlsConfig)
|
|
|
} else {
|
|
|
err = httpServer.Serve(server.listener)
|
|
|
@@ -236,19 +264,58 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ // Ensure that there's only one concurrent request handler per client
|
|
|
+ // session. Depending on the nature of a network disruption, it can
|
|
|
+ // happen that a client detects a failure and retries while the server
|
|
|
+ // is still streaming response in the handler for the _previous_ client
|
|
|
+ // request.
|
|
|
+ //
|
|
|
+ // Even if the session.cachedResponse were safe for concurrent
|
|
|
+ // use (it is not), concurrent handling could lead to loss of session
|
|
|
+ // since upstream data read by the first request may not reach the
|
|
|
+ // cached response before the second request reads the cached data.
|
|
|
+ //
|
|
|
+ // The existing handler will stream response data, holding the lock,
|
|
|
+ // for no more than MEEK_EXTENDED_TURN_AROUND_TIMEOUT.
|
|
|
+ //
|
|
|
+ // TODO: interrupt an existing handler? The existing handler will be
|
|
|
+ // sending data to the cached response, but if that buffer fills, the
|
|
|
+ // session will be lost.
|
|
|
+
|
|
|
+ requestNumber := atomic.AddInt64(&session.requestCount, 1)
|
|
|
+
|
|
|
+ // Wait for the existing request to complete.
|
|
|
+ session.lock.Lock()
|
|
|
+ defer session.lock.Unlock()
|
|
|
+
|
|
|
+ // If a newer request has arrived while waiting, discard this one.
|
|
|
+ // Do not delay processing the newest request.
|
|
|
+ if atomic.LoadInt64(&session.requestCount) > requestNumber {
|
|
|
+ server.terminateConnection(responseWriter, request)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
// pumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
|
|
|
// read the request body as upstream traffic.
|
|
|
// TODO: run pumpReads and pumpWrites concurrently?
|
|
|
|
|
|
+ // pumpReads checksums the request payload and skips relaying it when
|
|
|
+ // it matches the immediately previous request payload. This allows
|
|
|
+ // clients to resend request payloads, when retrying due to connection
|
|
|
+ // interruption, without knowing whether the server has received or
|
|
|
+ // relayed the data.
|
|
|
+
|
|
|
err = session.clientConn.pumpReads(request.Body)
|
|
|
if err != nil {
|
|
|
if err != io.EOF {
|
|
|
// Debug since errors such as "i/o timeout" occur during normal operation;
|
|
|
// also, golang network error messages may contain client IP.
|
|
|
- log.WithContextFields(LogFields{"error": err}).Debug("pump reads failed")
|
|
|
+ log.WithContextFields(LogFields{"error": err}).Debug("read request failed")
|
|
|
}
|
|
|
server.terminateConnection(responseWriter, request)
|
|
|
- server.closeSession(sessionID)
|
|
|
+
|
|
|
+ // Note: keep session open to allow client to retry
|
|
|
+
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -262,22 +329,134 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
|
|
|
session.sessionIDSent = true
|
|
|
}
|
|
|
|
|
|
- // pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
|
|
|
- // write its downstream traffic through to the response body.
|
|
|
+ // When streaming data into the response body, a copy is
|
|
|
+ // retained in the cachedResponse buffer. This allows the
|
|
|
+ // client to retry and request that the response be resent
|
|
|
+ // when the HTTP connection is interrupted.
|
|
|
+ //
|
|
|
+ // If a Range header is present, the client is retrying,
|
|
|
+ // possibly after having received a partial response. In
|
|
|
+ // this case, use any cached response to attempt to resend
|
|
|
+ // the response, starting from the resend position the client
|
|
|
+ // indicates.
|
|
|
+ //
|
|
|
+ // When the resend position is not available -- because the
|
|
|
+ // cachedResponse buffer could not hold it -- the client session
|
|
|
+ // is closed, as there's no way to resume streaming the payload
|
|
|
+ // uninterrupted.
|
|
|
+ //
|
|
|
+ // The client may retry before a cached response is prepared,
|
|
|
+ // so a cached response is not always used when a Range header
|
|
|
+ // is present.
|
|
|
+ //
|
|
|
+ // TODO: invalid Range header is ignored; should it be otherwise?
|
|
|
+
|
|
|
+ position, isRetry := checkRangeHeader(request)
|
|
|
+ if isRetry {
|
|
|
+ atomic.AddInt64(&session.metricClientRetries, 1)
|
|
|
+ }
|
|
|
|
|
|
- err = session.clientConn.pumpWrites(responseWriter)
|
|
|
- if err != nil {
|
|
|
- if err != io.EOF {
|
|
|
+ hasCompleteCachedResponse := session.cachedResponse.HasPosition(0)
|
|
|
+
|
|
|
+ // The client is not expected to send position > 0 when there is
|
|
|
+ // no cached response; let that case fall through to the next
|
|
|
+ // HasPosition check which will fail and close the session.
|
|
|
+
|
|
|
+ var responseSize int
|
|
|
+ var responseError error
|
|
|
+
|
|
|
+ if isRetry && (hasCompleteCachedResponse || position > 0) {
|
|
|
+
|
|
|
+ if !session.cachedResponse.HasPosition(position) {
|
|
|
+ greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
|
|
|
+ server.terminateConnection(responseWriter, request)
|
|
|
+ server.closeSession(sessionID)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ responseWriter.WriteHeader(http.StatusPartialContent)
|
|
|
+
|
|
|
+ // TODO:
|
|
|
+ // - enforce a max extended buffer count per client, for
|
|
|
+ // fairness? Throttling may make this unnecessary.
|
|
|
+ // - cachedResponse can now start releasing extended buffers,
|
|
|
+ // as response bytes before "position" will never be requested
|
|
|
+ // again?
|
|
|
+
|
|
|
+ responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter)
|
|
|
+ greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize))
|
|
|
+
|
|
|
+ // The client may again fail to receive the payload and may again
|
|
|
+ // retry, so not yet releasing cachedReponse buffers.
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ // _Now_ we release buffers holding data from the previous
|
|
|
+ // response. And then immediately stream the new response into
|
|
|
+ // newly acquired buffers.
|
|
|
+ session.cachedResponse.Reset()
|
|
|
+
|
|
|
+ // Note: this code depends on an implementation detail of
|
|
|
+ // io.MultiWriter: a Write() to the MultiWriter writes first
|
|
|
+ // to the cache, and then to the response writer. So if the
|
|
|
+ // write to the reponse writer fails, the payload is cached.
|
|
|
+ multiWriter := io.MultiWriter(session.cachedResponse, responseWriter)
|
|
|
+
|
|
|
+ // The client expects 206, not 200, whenever it sets a Range header,
|
|
|
+ // which it may do even when no cached response is prepared.
|
|
|
+ if isRetry {
|
|
|
+ responseWriter.WriteHeader(http.StatusPartialContent)
|
|
|
+ }
|
|
|
+
|
|
|
+ // pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
|
|
|
+ // write its downstream traffic through to the response body.
|
|
|
+
|
|
|
+ responseSize, responseError = session.clientConn.pumpWrites(multiWriter)
|
|
|
+ greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
|
|
|
+ greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
|
|
|
+ }
|
|
|
+
|
|
|
+ // responseError is the result of writing the body either from CopyFromPosition or pumpWrites
|
|
|
+ if responseError != nil {
|
|
|
+ if responseError != io.EOF {
|
|
|
// Debug since errors such as "i/o timeout" occur during normal operation;
|
|
|
// also, golang network error messages may contain client IP.
|
|
|
- log.WithContextFields(LogFields{"error": err}).Debug("pump writes failed")
|
|
|
+ log.WithContextFields(LogFields{"error": responseError}).Debug("write response failed")
|
|
|
}
|
|
|
server.terminateConnection(responseWriter, request)
|
|
|
- server.closeSession(sessionID)
|
|
|
+
|
|
|
+ // Note: keep session open to allow client to retry
|
|
|
+
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func checkRangeHeader(request *http.Request) (int, bool) {
|
|
|
+ rangeHeader := request.Header.Get("Range")
|
|
|
+ if rangeHeader == "" {
|
|
|
+ return 0, false
|
|
|
+ }
|
|
|
+
|
|
|
+ prefix := "bytes="
|
|
|
+ suffix := "-"
|
|
|
+
|
|
|
+ if !strings.HasPrefix(rangeHeader, prefix) ||
|
|
|
+ !strings.HasSuffix(rangeHeader, suffix) {
|
|
|
+
|
|
|
+ return 0, false
|
|
|
+ }
|
|
|
+
|
|
|
+ rangeHeader = strings.TrimPrefix(rangeHeader, prefix)
|
|
|
+ rangeHeader = strings.TrimSuffix(rangeHeader, suffix)
|
|
|
+ position, err := strconv.Atoi(rangeHeader)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return 0, false
|
|
|
+ }
|
|
|
+
|
|
|
+ return position, true
|
|
|
+}
|
|
|
+
|
|
|
// getSession returns the meek client session corresponding the
|
|
|
// meek cookie/session ID. If no session is found, the cookie is
|
|
|
// treated as a meek cookie for a new session and its payload is
|
|
|
@@ -307,13 +486,9 @@ func (server *MeekServer) getSession(
|
|
|
return "", nil, common.ContextError(err)
|
|
|
}
|
|
|
|
|
|
- // Note: this meek server ignores all but Version MeekProtocolVersion;
|
|
|
- // the other values are legacy or currently unused.
|
|
|
- var clientSessionData struct {
|
|
|
- MeekProtocolVersion int `json:"v"`
|
|
|
- PsiphonClientSessionId string `json:"s"`
|
|
|
- PsiphonServerAddress string `json:"p"`
|
|
|
- }
|
|
|
+ // Note: this meek server ignores legacy values PsiphonClientSessionId
|
|
|
+ // and PsiphonServerAddress.
|
|
|
+ var clientSessionData protocol.MeekCookieData
|
|
|
|
|
|
err = json.Unmarshal(payloadJSON, &clientSessionData)
|
|
|
if err != nil {
|
|
|
@@ -345,6 +520,22 @@ func (server *MeekServer) getSession(
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Create a new session
|
|
|
+
|
|
|
+ bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
|
|
|
+ if server.support.Config.MeekCachedResponseBufferSize != 0 {
|
|
|
+ bufferLength = server.support.Config.MeekCachedResponseBufferSize
|
|
|
+ }
|
|
|
+ cachedResponse := NewCachedResponse(bufferLength, server.bufferPool)
|
|
|
+
|
|
|
+ session = &meekSession{
|
|
|
+ meekProtocolVersion: clientSessionData.MeekProtocolVersion,
|
|
|
+ sessionIDSent: false,
|
|
|
+ cachedResponse: cachedResponse,
|
|
|
+ }
|
|
|
+
|
|
|
+ session.touch()
|
|
|
+
|
|
|
// Create a new meek conn that will relay the payload
|
|
|
// between meek request/responses and the tunnel server client
|
|
|
// handler. The client IP is also used to initialize the
|
|
|
@@ -354,18 +545,15 @@ func (server *MeekServer) getSession(
|
|
|
// Assumes clientIP is a valid IP address; the port value is a stub
|
|
|
// and is expected to be ignored.
|
|
|
clientConn := newMeekConn(
|
|
|
+ server,
|
|
|
+ session,
|
|
|
&net.TCPAddr{
|
|
|
IP: net.ParseIP(clientIP),
|
|
|
Port: 0,
|
|
|
},
|
|
|
clientSessionData.MeekProtocolVersion)
|
|
|
|
|
|
- session = &meekSession{
|
|
|
- clientConn: clientConn,
|
|
|
- meekProtocolVersion: clientSessionData.MeekProtocolVersion,
|
|
|
- sessionIDSent: false,
|
|
|
- }
|
|
|
- session.touch()
|
|
|
+ session.clientConn = clientConn
|
|
|
|
|
|
// Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
|
|
|
// meek cookie to a session ID; v1 clients always send the
|
|
|
@@ -388,7 +576,7 @@ func (server *MeekServer) getSession(
|
|
|
|
|
|
// Note: from the tunnel server's perspective, this client connection
|
|
|
// will close when closeSessionHelper calls Close() on the meekConn.
|
|
|
- server.clientHandler(session.clientConn)
|
|
|
+ server.clientHandler(clientSessionData.ClientTunnelProtocol, session.clientConn)
|
|
|
|
|
|
return sessionID, session, nil
|
|
|
}
|
|
|
@@ -398,6 +586,10 @@ func (server *MeekServer) closeSessionHelper(
|
|
|
|
|
|
// TODO: close the persistent HTTP client connection, if one exists
|
|
|
session.clientConn.Close()
|
|
|
+
|
|
|
+ // Release all extended buffers back to the pool
|
|
|
+ session.cachedResponse.Reset()
|
|
|
+
|
|
|
// Note: assumes caller holds lock on sessionsLock
|
|
|
delete(server.sessions, sessionID)
|
|
|
}
|
|
|
@@ -455,10 +647,18 @@ type meekSession struct {
|
|
|
// Note: 64-bit ints used with atomic operations are at placed
|
|
|
// at the start of struct to ensure 64-bit alignment.
|
|
|
// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
|
|
|
- lastActivity int64
|
|
|
- clientConn *meekConn
|
|
|
- meekProtocolVersion int
|
|
|
- sessionIDSent bool
|
|
|
+ lastActivity int64
|
|
|
+ requestCount int64
|
|
|
+ metricClientRetries int64
|
|
|
+ metricPeakResponseSize int64
|
|
|
+ metricPeakCachedResponseSize int64
|
|
|
+ metricPeakCachedResponseHitSize int64
|
|
|
+ metricCachedResponseMissPosition int64
|
|
|
+ lock sync.Mutex
|
|
|
+ clientConn *meekConn
|
|
|
+ meekProtocolVersion int
|
|
|
+ sessionIDSent bool
|
|
|
+ cachedResponse *CachedResponse
|
|
|
}
|
|
|
|
|
|
func (session *meekSession) touch() {
|
|
|
@@ -470,6 +670,17 @@ func (session *meekSession) expired() bool {
|
|
|
return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
|
|
|
}
|
|
|
|
|
|
+// GetMetrics implements the MetricsSource interface
|
|
|
+func (session *meekSession) GetMetrics() LogFields {
|
|
|
+ logFields := make(LogFields)
|
|
|
+ logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)
|
|
|
+ logFields["meek_peak_response_size"] = atomic.LoadInt64(&session.metricPeakResponseSize)
|
|
|
+ logFields["meek_peak_cached_response_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseSize)
|
|
|
+ logFields["meek_peak_cached_response_hit_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseHitSize)
|
|
|
+ logFields["meek_cached_response_miss_position"] = atomic.LoadInt64(&session.metricCachedResponseMissPosition)
|
|
|
+ return logFields
|
|
|
+}
|
|
|
+
|
|
|
// makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
|
|
|
// Currently, this config is optimized for fronted meek where the nature
|
|
|
// of the connection is non-circumvention; it's optimized for performance
|
|
|
@@ -629,10 +840,13 @@ func makeMeekSessionID() (string, error) {
|
|
|
// meekConn bridges net/http request/response payload readers and writers
|
|
|
// and goroutines calling Read()s and Write()s.
|
|
|
type meekConn struct {
|
|
|
+ meekServer *MeekServer
|
|
|
+ meekSession *meekSession
|
|
|
remoteAddr net.Addr
|
|
|
protocolVersion int
|
|
|
closeBroadcast chan struct{}
|
|
|
closed int32
|
|
|
+ lastReadChecksum *uint64
|
|
|
readLock sync.Mutex
|
|
|
emptyReadBuffer chan *bytes.Buffer
|
|
|
partialReadBuffer chan *bytes.Buffer
|
|
|
@@ -642,8 +856,15 @@ type meekConn struct {
|
|
|
writeResult chan error
|
|
|
}
|
|
|
|
|
|
-func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
|
|
|
+func newMeekConn(
|
|
|
+ meekServer *MeekServer,
|
|
|
+ meekSession *meekSession,
|
|
|
+ remoteAddr net.Addr,
|
|
|
+ protocolVersion int) *meekConn {
|
|
|
+
|
|
|
conn := &meekConn{
|
|
|
+ meekServer: meekServer,
|
|
|
+ meekSession: meekSession,
|
|
|
remoteAddr: remoteAddr,
|
|
|
protocolVersion: protocolVersion,
|
|
|
closeBroadcast: make(chan struct{}),
|
|
|
@@ -664,33 +885,84 @@ func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
|
|
|
// pumpReads causes goroutines blocking on meekConn.Read() to read
|
|
|
// from the specified reader. This function blocks until the reader
|
|
|
// is fully consumed or the meekConn is closed. A read buffer allows
|
|
|
-// up to MEEK_MAX_PAYLOAD_LENGTH bytes to be read and buffered without
|
|
|
-// a Read() immediately consuming the bytes, but there's still a
|
|
|
-// possibility of a stall if no Read() calls are made after this
|
|
|
+// up to MEEK_MAX_REQUEST_PAYLOAD_LENGTH bytes to be read and buffered
|
|
|
+// without a Read() immediately consuming the bytes, but there's still
|
|
|
+// a possibility of a stall if no Read() calls are made after this
|
|
|
// read buffer is full.
|
|
|
// Note: assumes only one concurrent call to pumpReads
|
|
|
func (conn *meekConn) pumpReads(reader io.Reader) error {
|
|
|
- for {
|
|
|
|
|
|
- var readBuffer *bytes.Buffer
|
|
|
- select {
|
|
|
- case readBuffer = <-conn.emptyReadBuffer:
|
|
|
- case readBuffer = <-conn.partialReadBuffer:
|
|
|
- case <-conn.closeBroadcast:
|
|
|
- return io.EOF
|
|
|
- }
|
|
|
+ // Wait for a full capacity empty buffer. This ensures we can read
|
|
|
+ // the maximum MEEK_MAX_REQUEST_PAYLOAD_LENGTH request payload and
|
|
|
+ // checksum before relaying.
|
|
|
+ //
|
|
|
+ // Note: previously, this code would select conn.partialReadBuffer
|
|
|
+ // and write to that, looping until the entire request payload was
|
|
|
+ // read. Now, the consumer, the Read() caller, must fully drain the
|
|
|
+ // read buffer first.
|
|
|
|
|
|
- limitReader := io.LimitReader(reader, int64(MEEK_MAX_PAYLOAD_LENGTH-readBuffer.Len()))
|
|
|
- n, err := readBuffer.ReadFrom(limitReader)
|
|
|
+ // Use either an empty or partial buffer. By using a partial
|
|
|
+ // buffer, pumpReads will not block if the Read() caller has
|
|
|
+ // not fully drained the read buffer.
|
|
|
|
|
|
+ var readBuffer *bytes.Buffer
|
|
|
+ select {
|
|
|
+ case readBuffer = <-conn.emptyReadBuffer:
|
|
|
+ case readBuffer = <-conn.partialReadBuffer:
|
|
|
+ case <-conn.closeBroadcast:
|
|
|
+ return io.EOF
|
|
|
+ }
|
|
|
+
|
|
|
+ newDataOffset := readBuffer.Len()
|
|
|
+
|
|
|
+ // Since we need to read the full request payload in order to
|
|
|
+ // take its checksum before relaying it, the read buffer can
|
|
|
+ // grow to up to 2 x MEEK_MAX_REQUEST_PAYLOAD_LENGTH + 1.
|
|
|
+
|
|
|
+ // +1 allows for an explict check for request payloads that
|
|
|
+ // exceed the maximum permitted length.
|
|
|
+ limitReader := io.LimitReader(reader, MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1)
|
|
|
+ n, err := readBuffer.ReadFrom(limitReader)
|
|
|
+
|
|
|
+ if err == nil && n == MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1 {
|
|
|
+ err = errors.New("invalid request payload length")
|
|
|
+ }
|
|
|
+
|
|
|
+ // If the request read fails, don't relay the new data. This allows
|
|
|
+ // the client to retry and resend its request payload without
|
|
|
+ // interrupting/duplicating the payload flow.
|
|
|
+ if err != nil {
|
|
|
+ readBuffer.Truncate(newDataOffset)
|
|
|
conn.replaceReadBuffer(readBuffer)
|
|
|
+ return common.ContextError(err)
|
|
|
+ }
|
|
|
|
|
|
- if n == 0 || err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ // Check if request payload checksum matches immediately
|
|
|
+ // previous payload. On match, assume this is a client retry
|
|
|
+ // sending payload that was already relayed and skip this
|
|
|
+ // payload. Payload is OSSH ciphertext and almost surely
|
|
|
+ // will not repeat. In the highly unlikely case that it does,
|
|
|
+ // the underlying SSH connection will fail and the client
|
|
|
+ // must reconnect.
|
|
|
+
|
|
|
+ checksum := crc64.Checksum(
|
|
|
+ readBuffer.Bytes()[newDataOffset:], conn.meekServer.checksumTable)
|
|
|
+
|
|
|
+ if conn.lastReadChecksum == nil {
|
|
|
+ conn.lastReadChecksum = new(uint64)
|
|
|
+ } else if *conn.lastReadChecksum == checksum {
|
|
|
+ readBuffer.Truncate(newDataOffset)
|
|
|
}
|
|
|
+
|
|
|
+ *conn.lastReadChecksum = checksum
|
|
|
+
|
|
|
+ conn.replaceReadBuffer(readBuffer)
|
|
|
+
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
+var errMeekConnectionHasClosed = errors.New("meek connection has closed")
|
|
|
+
|
|
|
// Read reads from the meekConn into buffer. Read blocks until
|
|
|
// some data is read or the meekConn closes. Under the hood, it
|
|
|
// waits for pumpReads to submit a reader to read from.
|
|
|
@@ -704,7 +976,7 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
|
|
|
case readBuffer = <-conn.partialReadBuffer:
|
|
|
case readBuffer = <-conn.fullReadBuffer:
|
|
|
case <-conn.closeBroadcast:
|
|
|
- return 0, io.EOF
|
|
|
+ return 0, common.ContextError(errMeekConnectionHasClosed)
|
|
|
}
|
|
|
|
|
|
n, err := readBuffer.Read(buffer)
|
|
|
@@ -715,12 +987,12 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
|
|
|
}
|
|
|
|
|
|
func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
|
|
|
- switch readBuffer.Len() {
|
|
|
- case MEEK_MAX_PAYLOAD_LENGTH:
|
|
|
+ length := readBuffer.Len()
|
|
|
+ if length >= MEEK_MAX_REQUEST_PAYLOAD_LENGTH {
|
|
|
conn.fullReadBuffer <- readBuffer
|
|
|
- case 0:
|
|
|
+ } else if length == 0 {
|
|
|
conn.emptyReadBuffer <- readBuffer
|
|
|
- default:
|
|
|
+ } else {
|
|
|
conn.partialReadBuffer <- readBuffer
|
|
|
}
|
|
|
}
|
|
|
@@ -730,40 +1002,41 @@ func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
|
|
|
// body limits (size for protocol v1, turn around time for protocol v2+)
|
|
|
// are met, or the meekConn is closed.
|
|
|
// Note: channel scheme assumes only one concurrent call to pumpWrites
|
|
|
-func (conn *meekConn) pumpWrites(writer io.Writer) error {
|
|
|
+func (conn *meekConn) pumpWrites(writer io.Writer) (int, error) {
|
|
|
|
|
|
startTime := monotime.Now()
|
|
|
timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
|
|
|
defer timeout.Stop()
|
|
|
|
|
|
+ n := 0
|
|
|
for {
|
|
|
select {
|
|
|
case buffer := <-conn.nextWriteBuffer:
|
|
|
- _, err := writer.Write(buffer)
|
|
|
-
|
|
|
+ written, err := writer.Write(buffer)
|
|
|
+ n += written
|
|
|
// Assumes that writeResult won't block.
|
|
|
// Note: always send the err to writeResult,
|
|
|
// as the Write() caller is blocking on this.
|
|
|
conn.writeResult <- err
|
|
|
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return n, err
|
|
|
}
|
|
|
|
|
|
if conn.protocolVersion < MEEK_PROTOCOL_VERSION_1 {
|
|
|
// Pre-protocol version 1 clients expect at most
|
|
|
- // MEEK_MAX_PAYLOAD_LENGTH response bodies
|
|
|
- return nil
|
|
|
+ // MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
|
|
|
+ return n, nil
|
|
|
}
|
|
|
totalElapsedTime := monotime.Since(startTime) / time.Millisecond
|
|
|
if totalElapsedTime >= MEEK_EXTENDED_TURN_AROUND_TIMEOUT {
|
|
|
- return nil
|
|
|
+ return n, nil
|
|
|
}
|
|
|
timeout.Reset(MEEK_TURN_AROUND_TIMEOUT)
|
|
|
case <-timeout.C:
|
|
|
- return nil
|
|
|
+ return n, nil
|
|
|
case <-conn.closeBroadcast:
|
|
|
- return io.EOF
|
|
|
+ return n, common.ContextError(errMeekConnectionHasClosed)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -783,29 +1056,40 @@ func (conn *meekConn) Write(buffer []byte) (int, error) {
|
|
|
|
|
|
n := 0
|
|
|
for n < len(buffer) {
|
|
|
- end := n + MEEK_MAX_PAYLOAD_LENGTH
|
|
|
+ end := n + MEEK_MAX_REQUEST_PAYLOAD_LENGTH
|
|
|
if end > len(buffer) {
|
|
|
end = len(buffer)
|
|
|
}
|
|
|
|
|
|
- // Only write MEEK_MAX_PAYLOAD_LENGTH at a time,
|
|
|
+ // Only write MEEK_MAX_REQUEST_PAYLOAD_LENGTH at a time,
|
|
|
// to ensure compatibility with v1 protocol.
|
|
|
chunk := buffer[n:end]
|
|
|
|
|
|
select {
|
|
|
case conn.nextWriteBuffer <- chunk:
|
|
|
case <-conn.closeBroadcast:
|
|
|
- return n, io.EOF
|
|
|
+ return n, common.ContextError(errMeekConnectionHasClosed)
|
|
|
}
|
|
|
|
|
|
// Wait for the buffer to be processed.
|
|
|
select {
|
|
|
- case err := <-conn.writeResult:
|
|
|
- if err != nil {
|
|
|
- return n, err
|
|
|
- }
|
|
|
+ case _ = <-conn.writeResult:
|
|
|
+ // The err from conn.writeResult comes from the
|
|
|
+ // io.MultiWriter used in pumpWrites, which writes
|
|
|
+ // to both the cached response and the HTTP response.
|
|
|
+ //
|
|
|
+ // Don't stop on error here, since only writing
|
|
|
+ // to the HTTP response will fail, and the client
|
|
|
+ // may retry and use the cached response.
|
|
|
+ //
|
|
|
+ // It's possible that the cached response buffer
|
|
|
+ // is too small for the client to successfully
|
|
|
+ // retry, but that cannot be determined. In this
|
|
|
+ // case, the meek connection will eventually fail.
|
|
|
+ //
|
|
|
+ // err is already logged in ServeHTTP.
|
|
|
case <-conn.closeBroadcast:
|
|
|
- return n, io.EOF
|
|
|
+ return n, common.ContextError(errMeekConnectionHasClosed)
|
|
|
}
|
|
|
n += len(chunk)
|
|
|
}
|
|
|
@@ -838,7 +1122,7 @@ func (conn *meekConn) RemoteAddr() net.Addr {
|
|
|
// SetDeadline is not a true implementation of net.Conn.SetDeadline. It
|
|
|
// merely checks that the requested timeout exceeds the MEEK_MAX_SESSION_STALENESS
|
|
|
// period. When it does, and the session is idle, the meekConn Read/Write will
|
|
|
-// be interrupted and return io.EOF (not a timeout error) before the deadline.
|
|
|
+// be interrupted and return an error (not a timeout error) before the deadline.
|
|
|
// In other words, this conn will approximate the desired functionality of
|
|
|
// timing out on idle on or before the requested deadline.
|
|
|
func (conn *meekConn) SetDeadline(t time.Time) error {
|
|
|
@@ -858,3 +1142,10 @@ func (conn *meekConn) SetReadDeadline(t time.Time) error {
|
|
|
func (conn *meekConn) SetWriteDeadline(t time.Time) error {
|
|
|
return common.ContextError(errors.New("not supported"))
|
|
|
}
|
|
|
+
|
|
|
+// GetMetrics implements the MetricsSource interface. The metrics are maintained
|
|
|
+// in the meek session type; but logTunnel, which calls MetricsSource.GetMetrics,
|
|
|
+// has a pointer only to this conn, so it calls through to the session.
|
|
|
+func (conn *meekConn) GetMetrics() LogFields {
|
|
|
+ return conn.meekSession.GetMetrics()
|
|
|
+}
|