Browse Source

Add meek resiliency metrics

Rod Hynes 8 years ago
parent
commit
e994a17c41

+ 8 - 0
psiphon/server/log.go

@@ -33,6 +33,14 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 )
 
+// MetricsSource is an object that provides metrics to be logged
+type MetricsSource interface {
+
+	// GetMetrics returns a LogFields populated with
+	// metrics from the MetricsSource
+	GetMetrics() LogFields
+}
+
 // ContextLogger adds context logging functionality to the
 // underlying logging packages.
 type ContextLogger struct {

+ 85 - 39
psiphon/server/meek.go

@@ -282,7 +282,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// sending data to the cached response, but if that buffer fills, the
 	// session will be lost.
 
-	requestNumber := atomic.AddInt32(&session.requestCount, 1)
+	requestNumber := atomic.AddInt64(&session.requestCount, 1)
 
 	// Wait for the existing request to complete.
 	session.lock.Lock()
@@ -290,7 +290,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 	// If a newer request has arrived while waiting, discard this one.
 	// Do not delay processing the newest request.
-	if atomic.LoadInt32(&session.requestCount) > requestNumber {
+	if atomic.LoadInt64(&session.requestCount) > requestNumber {
 		server.terminateConnection(responseWriter, request)
 		return
 	}
@@ -352,16 +352,23 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// TODO: invalid Range header is ignored; should it be otherwise?
 
 	position, isRetry := checkRangeHeader(request)
+	if isRetry {
+		atomic.AddInt64(&session.metricClientRetries, 1)
+	}
 
-	hasCachedResponse := session.cachedResponse.HasPosition(0)
+	hasCompleteCachedResponse := session.cachedResponse.HasPosition(0)
 
 	// The client is not expected to send position > 0 when there is
 	// no cached response; let that case fall through to the next
 	// HasPosition check which will fail and close the session.
 
-	if isRetry && (hasCachedResponse || position > 0) {
+	var responseSize int
+	var responseError error
+
+	if isRetry && (hasCompleteCachedResponse || position > 0) {
 
 		if !session.cachedResponse.HasPosition(position) {
+			greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
 			server.terminateConnection(responseWriter, request)
 			server.closeSession(sessionID)
 			return
@@ -376,7 +383,8 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		//   as response bytes before "position" will never be requested
 		//   again?
 
-		err = session.cachedResponse.CopyFromPosition(position, responseWriter)
+		responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter)
+		greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize))
 
 		// The client may again fail to receive the payload and may again
 		// retry, so not yet releasing cachedReponse buffers.
@@ -403,15 +411,17 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		// pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
 		// write its downstream traffic through to the response body.
 
-		err = session.clientConn.pumpWrites(multiWriter)
+		responseSize, responseError = session.clientConn.pumpWrites(multiWriter)
+		greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize))
+		greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available()))
 	}
 
-	// err is the result of writing the body either from CopyFromPosition or pumpWrites
-	if err != nil {
-		if err != io.EOF {
+	// responseError is the result of writing the body either from CopyFromPosition or pumpWrites
+	if responseError != nil {
+		if responseError != io.EOF {
 			// Debug since errors such as "i/o timeout" occur during normal operation;
 			// also, golang network error messages may contain client IP.
-			log.WithContextFields(LogFields{"error": err}).Debug("write response failed")
+			log.WithContextFields(LogFields{"error": responseError}).Debug("write response failed")
 		}
 		server.terminateConnection(responseWriter, request)
 
@@ -510,6 +520,22 @@ func (server *MeekServer) getSession(
 		}
 	}
 
+	// Create a new session
+
+	bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
+	if server.support.Config.MeekCachedResponseBufferSize != 0 {
+		bufferLength = server.support.Config.MeekCachedResponseBufferSize
+	}
+	cachedResponse := NewCachedResponse(bufferLength, server.bufferPool)
+
+	session = &meekSession{
+		meekProtocolVersion: clientSessionData.MeekProtocolVersion,
+		sessionIDSent:       false,
+		cachedResponse:      cachedResponse,
+	}
+
+	session.touch()
+
 	// Create a new meek conn that will relay the payload
 	// between meek request/responses and the tunnel server client
 	// handler. The client IP is also used to initialize the
@@ -520,26 +546,14 @@ func (server *MeekServer) getSession(
 	// and is expected to be ignored.
 	clientConn := newMeekConn(
 		server,
+		session,
 		&net.TCPAddr{
 			IP:   net.ParseIP(clientIP),
 			Port: 0,
 		},
 		clientSessionData.MeekProtocolVersion)
 
-	bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
-	if server.support.Config.MeekCachedResponseBufferSize != 0 {
-		bufferLength = server.support.Config.MeekCachedResponseBufferSize
-	}
-
-	cachedResponse := NewCachedResponse(bufferLength, server.bufferPool)
-
-	session = &meekSession{
-		clientConn:          clientConn,
-		meekProtocolVersion: clientSessionData.MeekProtocolVersion,
-		sessionIDSent:       false,
-		cachedResponse:      cachedResponse,
-	}
-	session.touch()
+	session.clientConn = clientConn
 
 	// Note: MEEK_PROTOCOL_VERSION_1 doesn't support changing the
 	// meek cookie to a session ID; v1 clients always send the
@@ -633,13 +647,18 @@ type meekSession struct {
 	// Note: 64-bit ints used with atomic operations are at placed
 	// at the start of struct to ensure 64-bit alignment.
 	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	lastActivity        int64
-	requestCount        int32
-	lock                sync.Mutex
-	clientConn          *meekConn
-	meekProtocolVersion int
-	sessionIDSent       bool
-	cachedResponse      *CachedResponse
+	lastActivity                     int64
+	requestCount                     int64
+	metricClientRetries              int64
+	metricPeakResponseSize           int64
+	metricPeakCachedResponseSize     int64
+	metricPeakCachedResponseHitSize  int64
+	metricCachedResponseMissPosition int64
+	lock                             sync.Mutex
+	clientConn                       *meekConn
+	meekProtocolVersion              int
+	sessionIDSent                    bool
+	cachedResponse                   *CachedResponse
 }
 
 func (session *meekSession) touch() {
@@ -651,6 +670,17 @@ func (session *meekSession) expired() bool {
 	return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
 }
 
+// GetMetrics implements the MetricsSource interface
+func (session *meekSession) GetMetrics() LogFields {
+	logFields := make(LogFields)
+	logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)
+	logFields["meek_peak_response_size"] = atomic.LoadInt64(&session.metricPeakResponseSize)
+	logFields["meek_peak_cached_response_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseSize)
+	logFields["meek_peak_cached_response_hit_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseHitSize)
+	logFields["meek_cached_response_miss_position"] = atomic.LoadInt64(&session.metricCachedResponseMissPosition)
+	return logFields
+}
+
 // makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
 // Currently, this config is optimized for fronted meek where the nature
 // of the connection is non-circumvention; it's optimized for performance
@@ -811,6 +841,7 @@ func makeMeekSessionID() (string, error) {
 // and goroutines calling Read()s and Write()s.
 type meekConn struct {
 	meekServer        *MeekServer
+	meekSession       *meekSession
 	remoteAddr        net.Addr
 	protocolVersion   int
 	closeBroadcast    chan struct{}
@@ -825,9 +856,15 @@ type meekConn struct {
 	writeResult       chan error
 }
 
-func newMeekConn(meekServer *MeekServer, remoteAddr net.Addr, protocolVersion int) *meekConn {
+func newMeekConn(
+	meekServer *MeekServer,
+	meekSession *meekSession,
+	remoteAddr net.Addr,
+	protocolVersion int) *meekConn {
+
 	conn := &meekConn{
 		meekServer:        meekServer,
+		meekSession:       meekSession,
 		remoteAddr:        remoteAddr,
 		protocolVersion:   protocolVersion,
 		closeBroadcast:    make(chan struct{}),
@@ -965,39 +1002,41 @@ func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
 // body limits (size for protocol v1, turn around time for protocol v2+)
 // are met, or the meekConn is closed.
 // Note: channel scheme assumes only one concurrent call to pumpWrites
-func (conn *meekConn) pumpWrites(writer io.Writer) error {
+func (conn *meekConn) pumpWrites(writer io.Writer) (int, error) {
 
 	startTime := monotime.Now()
 	timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
 	defer timeout.Stop()
 
+	n := 0
 	for {
 		select {
 		case buffer := <-conn.nextWriteBuffer:
-			_, err := writer.Write(buffer)
+			written, err := writer.Write(buffer)
+			n += written
 			// Assumes that writeResult won't block.
 			// Note: always send the err to writeResult,
 			// as the Write() caller is blocking on this.
 			conn.writeResult <- err
 
 			if err != nil {
-				return err
+				return n, err
 			}
 
 			if conn.protocolVersion < MEEK_PROTOCOL_VERSION_1 {
 				// Pre-protocol version 1 clients expect at most
 				// MEEK_MAX_REQUEST_PAYLOAD_LENGTH response bodies
-				return nil
+				return n, nil
 			}
 			totalElapsedTime := monotime.Since(startTime) / time.Millisecond
 			if totalElapsedTime >= MEEK_EXTENDED_TURN_AROUND_TIMEOUT {
-				return nil
+				return n, nil
 			}
 			timeout.Reset(MEEK_TURN_AROUND_TIMEOUT)
 		case <-timeout.C:
-			return nil
+			return n, nil
 		case <-conn.closeBroadcast:
-			return common.ContextError(errMeekConnectionHasClosed)
+			return n, common.ContextError(errMeekConnectionHasClosed)
 		}
 	}
 }
@@ -1103,3 +1142,10 @@ func (conn *meekConn) SetReadDeadline(t time.Time) error {
 func (conn *meekConn) SetWriteDeadline(t time.Time) error {
 	return common.ContextError(errors.New("not supported"))
 }
+
+// GetMetrics implements the MetricsSource interface. The metrics are maintained
+// in the meek session type; but logTunnel, which calls MetricsSource.GetMetrics,
+// has a pointer only to this conn, so it calls through to the session.
+func (conn *meekConn) GetMetrics() LogFields {
+	return conn.meekSession.GetMetrics()
+}

+ 12 - 11
psiphon/server/meekBuffer.go

@@ -84,11 +84,9 @@ func (response *CachedResponse) Reset() {
 	response.overwriting = false
 }
 
-func min(a, b int) int {
-	if a < b {
-		return a
-	}
-	return b
+// Available returns the size of the buffered response data.
+func (response *CachedResponse) Available() int {
+	return response.readAvailable
 }
 
 // HasPosition checks if the CachedResponse has buffered
@@ -107,15 +105,15 @@ func (response *CachedResponse) HasPosition(position int) bool {
 // CopyFromPosition can be called repeatedly to read the
 // same data -- it does not advance or modify the CachedResponse.
 func (response *CachedResponse) CopyFromPosition(
-	position int, writer io.Writer) error {
+	position int, writer io.Writer) (int, error) {
 
 	if response.readAvailable > 0 && response.readPosition > position {
-		return errors.New("position unavailable")
+		return 0, errors.New("position unavailable")
 	}
 
 	// Special case: position is end of available data
 	if position == response.readPosition+response.readAvailable {
-		return nil
+		return 0, nil
 	}
 
 	// Begin at the start of the response data, which may
@@ -135,6 +133,8 @@ func (response *CachedResponse) CopyFromPosition(
 	// Iterate over all available data, skipping until at the
 	// requested position.
 
+	n := 0
+
 	skip := position - response.readPosition
 	available := response.readAvailable
 
@@ -157,9 +157,10 @@ func (response *CachedResponse) CopyFromPosition(
 		}
 
 		if skip == 0 {
-			_, err := writer.Write(buffer[index : index+toCopy])
+			written, err := writer.Write(buffer[index : index+toCopy])
+			n += written
 			if err != nil {
-				return err
+				return n, err
 			}
 		}
 
@@ -167,7 +168,7 @@ func (response *CachedResponse) CopyFromPosition(
 		bufferIndex = (bufferIndex + 1) % len(response.buffers)
 	}
 
-	return nil
+	return n, nil
 }
 
 // Write appends data to the CachedResponse. All writes will

+ 4 - 1
psiphon/server/meek_test.go

@@ -134,12 +134,15 @@ func TestCachedResponse(t *testing.T) {
 
 					cachedResponseData := new(bytes.Buffer)
 
-					err := response.CopyFromPosition(testCase.copyPosition, cachedResponseData)
+					n, err := response.CopyFromPosition(testCase.copyPosition, cachedResponseData)
 
 					if testCase.expectedSuccess {
 						if err != nil {
 							t.Fatalf("CopyFromPosition unexpectedly failed for response %d: %s", i, err)
 						}
+						if n != cachedResponseData.Len() || n != response.Available() {
+							t.Fatalf("cached response size mismatch for response %d", i)
+						}
 						if bytes.Compare(responseData[testCase.copyPosition:], cachedResponseData.Bytes()) != 0 {
 							t.Fatalf("cached response data mismatch for response %d", i)
 						}

+ 19 - 2
psiphon/server/tunnelServer.go

@@ -790,6 +790,9 @@ func newSshClient(
 
 func (sshClient *sshClient) run(clientConn net.Conn) {
 
+	// Some conns report additional metrics
+	metricsSource, isMetricsSource := clientConn.(MetricsSource)
+
 	// Set initial traffic rules, pre-handshake, based on currently known info.
 	sshClient.setTrafficRules()
 
@@ -910,7 +913,11 @@ func (sshClient *sshClient) run(clientConn net.Conn) {
 
 	sshClient.sshServer.unregisterEstablishedClient(sshClient)
 
-	sshClient.logTunnel()
+	var additionalMetrics LogFields
+	if isMetricsSource {
+		additionalMetrics = metricsSource.GetMetrics()
+	}
+	sshClient.logTunnel(additionalMetrics)
 
 	// Transfer OSL seed state -- the OSL progress -- from the closing
 	// client to the session cache so the client can resume its progress
@@ -1330,7 +1337,7 @@ func (sshClient *sshClient) runTunnel(
 	waitGroup.Wait()
 }
 
-func (sshClient *sshClient) logTunnel() {
+func (sshClient *sshClient) logTunnel(additionalMetrics LogFields) {
 
 	// Note: reporting duration based on last confirmed data transfer, which
 	// is reads for sshClient.activityConn.GetActiveDuration(), and not
@@ -1364,6 +1371,16 @@ func (sshClient *sshClient) logTunnel() {
 	logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
 	logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
 
+	// Merge in additional metrics from the optional metrics source
+	if additionalMetrics != nil {
+		for name, value := range additionalMetrics {
+			// Don't overwrite any basic fields
+			if logFields[name] == nil {
+				logFields[name] = value
+			}
+		}
+	}
+
 	sshClient.Unlock()
 
 	log.LogRawFieldsWithTimestamp(logFields)

+ 16 - 0
psiphon/server/utils.go

@@ -29,6 +29,7 @@ import (
 	"fmt"
 	"io"
 	"math/big"
+	"sync/atomic"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -174,3 +175,18 @@ func (w *PanickingLogWriter) Write(p []byte) (n int, err error) {
 	}
 	return
 }
+
+func min(a, b int) int {
+	if a < b {
+		return a
+	}
+	return b
+}
+
+func greaterThanSwapInt64(addr *int64, new int64) bool {
+	old := atomic.LoadInt64(addr)
+	if new > old {
+		return atomic.CompareAndSwapInt64(addr, old, new)
+	}
+	return false
+}