Просмотр исходного кода

Add liveness test metrics and bytes transferred to failed_tunnel

Rod Hynes 6 лет назад
Родитель
Сommit
4e89694f7c
4 измененных файлов с 90 добавлено и 31 удалено
  1. 6 0
      psiphon/server/api.go
  2. 1 1
      psiphon/server/server_test.go
  3. 14 1
      psiphon/serverApi.go
  4. 69 29
      psiphon/tunnel.go

+ 6 - 0
psiphon/server/api.go

@@ -433,6 +433,12 @@ var failedTunnelStatParams = append(
 		{"session_id", isHexDigits, 0},
 		{"last_connected", isLastConnected, 0},
 		{"client_failed_timestamp", isISO8601Date, 0},
+		{"liveness_test_upstream_bytes", isIntString, requestParamOptional},
+		{"liveness_test_sent_upstream_bytes", isIntString, requestParamOptional},
+		{"liveness_test_downstream_bytes", isIntString, requestParamOptional},
+		{"liveness_test_received_downstream_bytes", isIntString, requestParamOptional},
+		{"bytes_up", isIntString, requestParamOptional},
+		{"bytes_down", isIntString, requestParamOptional},
 		{"tunnel_error", isAnyString, 0}},
 	baseRequestParams...)
 

+ 1 - 1
psiphon/server/server_test.go

@@ -2115,7 +2115,7 @@ func storePruneServerEntriesTest(
 		}
 
 		err = psiphon.RecordFailedTunnelStat(
-			clientConfig, dialParams, errors.New("test error"))
+			clientConfig, dialParams, nil, 0, 0, errors.New("test error"))
 		if err != nil {
 			t.Fatalf("RecordFailedTunnelStat failed: %s", err)
 		}

+ 14 - 1
psiphon/serverApi.go

@@ -651,7 +651,12 @@ func RecordRemoteServerListStat(
 // This uses the same reporting facility, with the same caveats, as
 // RecordRemoteServerListStat.
 func RecordFailedTunnelStat(
-	config *Config, dialParams *DialParameters, tunnelErr error) error {
+	config *Config,
+	dialParams *DialParameters,
+	livenessTestMetrics *livenessTestMetrics,
+	bytesUp int64,
+	bytesDown int64,
+	tunnelErr error) error {
 
 	if !config.GetClientParameters().Get().WeightedCoinFlip(
 		parameters.RecordFailedTunnelPersistentStatsProbability) {
@@ -669,6 +674,14 @@ func RecordFailedTunnelStat(
 	params["server_entry_tag"] = dialParams.ServerEntry.Tag
 	params["last_connected"] = lastConnected
 	params["client_failed_timestamp"] = common.TruncateTimestampToHour(common.GetCurrentTimestamp())
+	if livenessTestMetrics != nil {
+		params["liveness_test_upstream_bytes"] = strconv.Itoa(livenessTestMetrics.UpstreamBytes)
+		params["liveness_test_sent_upstream_bytes"] = strconv.Itoa(livenessTestMetrics.SentUpstreamBytes)
+		params["liveness_test_downstream_bytes"] = strconv.Itoa(livenessTestMetrics.DownstreamBytes)
+		params["liveness_test_received_downstream_bytes"] = strconv.Itoa(livenessTestMetrics.ReceivedDownstreamBytes)
+	}
+	params["bytes_up"] = fmt.Sprintf("%d", bytesUp)
+	params["bytes_down"] = fmt.Sprintf("%d", bytesDown)
 
 	// Ensure direct server IPs are not exposed in logs. The "net" package, and
 	// possibly other 3rd party packages, will include destination addresses in

+ 69 - 29
psiphon/tunnel.go

@@ -30,6 +30,7 @@ import (
 	"io/ioutil"
 	"net"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -87,6 +88,7 @@ type Tunnel struct {
 	isDiscarded                bool
 	isClosed                   bool
 	dialParams                 *DialParameters
+	livenessTestMetrics        *livenessTestMetrics
 	serverContext              *ServerContext
 	conn                       *common.ActivityMonitoredConn
 	sshClient                  *ssh.Client
@@ -149,12 +151,13 @@ func ConnectTunnel(
 
 	// The tunnel is now connected
 	return &Tunnel{
-		mutex:             new(sync.Mutex),
-		config:            config,
-		dialParams:        dialParams,
-		conn:              dialResult.monitoredConn,
-		sshClient:         dialResult.sshClient,
-		sshServerRequests: dialResult.sshRequests,
+		mutex:               new(sync.Mutex),
+		config:              config,
+		dialParams:          dialParams,
+		livenessTestMetrics: dialResult.livenessTestMetrics,
+		conn:                dialResult.monitoredConn,
+		sshClient:           dialResult.sshClient,
+		sshServerRequests:   dialResult.sshRequests,
 		// A buffer allows at least one signal to be sent even when the receiver is
 		// not listening. Senders should not block.
 		signalPortForwardFailure:   make(chan struct{}, 1),
@@ -176,7 +179,13 @@ func (tunnel *Tunnel) Activate(
 	defer func() {
 		if !activationSucceeded && baseCtx.Err() == nil {
 			tunnel.dialParams.Failed(tunnel.config)
-			_ = RecordFailedTunnelStat(tunnel.config, tunnel.dialParams, retErr)
+			_ = RecordFailedTunnelStat(
+				tunnel.config,
+				tunnel.dialParams,
+				tunnel.livenessTestMetrics,
+				0,
+				0,
+				retErr)
 		}
 	}()
 
@@ -515,10 +524,11 @@ func (conn *TunneledConn) Close() error {
 }
 
 type dialResult struct {
-	dialConn      net.Conn
-	monitoredConn *common.ActivityMonitoredConn
-	sshClient     *ssh.Client
-	sshRequests   <-chan *ssh.Request
+	dialConn            net.Conn
+	monitoredConn       *common.ActivityMonitoredConn
+	sshClient           *ssh.Client
+	sshRequests         <-chan *ssh.Request
+	livenessTestMetrics *livenessTestMetrics
 }
 
 // dialTunnel is a helper that builds the transport layers and establishes the
@@ -561,10 +571,17 @@ func dialTunnel(
 	// logic.
 	dialSucceeded := false
 	baseCtx := ctx
+	var failedTunnelLivenessTestMetrics *livenessTestMetrics
 	defer func() {
 		if !dialSucceeded && baseCtx.Err() == nil {
 			dialParams.Failed(config)
-			_ = RecordFailedTunnelStat(config, dialParams, retErr)
+			_ = RecordFailedTunnelStat(
+				config,
+				dialParams,
+				failedTunnelLivenessTestMetrics,
+				0,
+				0,
+				retErr)
 		}
 	}()
 
@@ -785,9 +802,10 @@ func dialTunnel(
 	// in operate tunnel.
 
 	type sshNewClientResult struct {
-		sshClient   *ssh.Client
-		sshRequests <-chan *ssh.Request
-		err         error
+		sshClient           *ssh.Client
+		sshRequests         <-chan *ssh.Request
+		livenessTestMetrics *livenessTestMetrics
+		err                 error
 	}
 
 	resultChannel := make(chan sshNewClientResult)
@@ -803,7 +821,10 @@ func dialTunnel(
 		sshAddress := ""
 		sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
 			sshConn, sshAddress, sshClientConfig)
+
 		var sshClient *ssh.Client
+		var metrics *livenessTestMetrics
+
 		if err == nil {
 
 			// sshRequests is handled by operateTunnel.
@@ -829,7 +850,6 @@ func dialTunnel(
 				// TunnelConnectTimeout, which should be adjusted
 				// accordinging.
 
-				var metrics *livenessTestMetrics
 				metrics, err = performLivenessTest(
 					sshClient,
 					livenessTestMinUpstreamBytes, livenessTestMaxUpstreamBytes,
@@ -844,7 +864,7 @@ func dialTunnel(
 			}
 		}
 
-		resultChannel <- sshNewClientResult{sshClient, sshRequests, err}
+		resultChannel <- sshNewClientResult{sshClient, sshRequests, metrics, err}
 	}()
 
 	var result sshNewClientResult
@@ -866,6 +886,7 @@ func dialTunnel(
 	}
 
 	if result.err != nil {
+		failedTunnelLivenessTestMetrics = result.livenessTestMetrics
 		return nil, errors.Trace(result.err)
 	}
 
@@ -1076,7 +1097,10 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 		defer requestsWaitGroup.Done()
 		isFirstPeriodicKeepAlive := true
 		for timeout := range signalPeriodicSshKeepAlive {
-			err := tunnel.sendSshKeepAlive(isFirstPeriodicKeepAlive, timeout)
+			bytesUp := atomic.LoadInt64(&totalSent)
+			bytesDown := atomic.LoadInt64(&totalReceived)
+			err := tunnel.sendSshKeepAlive(
+				isFirstPeriodicKeepAlive, timeout, bytesUp, bytesDown)
 			if err != nil {
 				select {
 				case sshKeepAliveError <- err:
@@ -1096,7 +1120,10 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	go func() {
 		defer requestsWaitGroup.Done()
 		for timeout := range signalProbeSshKeepAlive {
-			err := tunnel.sendSshKeepAlive(false, timeout)
+			bytesUp := atomic.LoadInt64(&totalSent)
+			bytesDown := atomic.LoadInt64(&totalReceived)
+			err := tunnel.sendSshKeepAlive(
+				false, timeout, bytesUp, bytesDown)
 			if err != nil {
 				select {
 				case sshKeepAliveError <- err:
@@ -1118,8 +1145,8 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 				lastBytesReceivedTime = time.Now()
 			}
 
-			totalSent += sent
-			totalReceived += received
+			bytesUp := atomic.AddInt64(&totalSent, sent)
+			bytesDown := atomic.AddInt64(&totalReceived, received)
 
 			p := tunnel.getCustomClientParameters()
 			noticePeriod := p.Duration(parameters.TotalBytesTransferredNoticePeriod)
@@ -1129,7 +1156,7 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 
 			if lastTotalBytesTransferedTime.Add(noticePeriod).Before(time.Now()) {
 				NoticeTotalBytesTransferred(
-					tunnel.dialParams.ServerEntry.GetDiagnosticID(), totalSent, totalReceived)
+					tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
 				lastTotalBytesTransferedTime = time.Now()
 			}
 
@@ -1149,8 +1176,8 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 			// the same reason the granularity of ReplayTargetTunnelDuration is
 			// seconds.
 			if !setDialParamsSucceeded &&
-				totalSent >= int64(replayTargetUpstreamBytes) &&
-				totalReceived >= int64(replayTargetDownstreamBytes) &&
+				bytesUp >= int64(replayTargetUpstreamBytes) &&
+				bytesDown >= int64(replayTargetDownstreamBytes) &&
 				time.Since(tunnel.establishedTime) >= replayTargetTunnelDuration {
 
 				tunnel.dialParams.Succeeded()
@@ -1232,12 +1259,12 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	// Capture bytes transferred since the last noticeBytesTransferredTicker tick
 	sent, received := transferstats.ReportRecentBytesTransferredForServer(
 		tunnel.dialParams.ServerEntry.IpAddress)
-	totalSent += sent
-	totalReceived += received
+	bytesUp := atomic.AddInt64(&totalSent, sent)
+	bytesDown := atomic.AddInt64(&totalReceived, received)
 
 	// Always emit a final NoticeTotalBytesTransferred
 	NoticeTotalBytesTransferred(
-		tunnel.dialParams.ServerEntry.GetDiagnosticID(), totalSent, totalReceived)
+		tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
 
 	if err == nil {
 		NoticeInfo("shutdown operate tunnel")
@@ -1260,7 +1287,11 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 // on the specified SSH connections and returns true of the request succeeds
 // within a specified timeout. If the request fails, the associated conn is
 // closed, which will terminate the associated tunnel.
-func (tunnel *Tunnel) sendSshKeepAlive(isFirstPeriodicKeepAlive bool, timeout time.Duration) error {
+func (tunnel *Tunnel) sendSshKeepAlive(
+	isFirstPeriodicKeepAlive bool,
+	timeout time.Duration,
+	bytesUp int64,
+	bytesDown int64) error {
 
 	p := tunnel.getCustomClientParameters()
 
@@ -1343,6 +1374,9 @@ func (tunnel *Tunnel) sendSshKeepAlive(isFirstPeriodicKeepAlive bool, timeout ti
 	//
 	// For platforms that don't provide a NetworkConnectivityChecker, it is
 	// assumed that there is network connectivity.
+	//
+	// The approximate number of tunneled bytes successfully sent and received is
+	// recorded in the failed tunnel event as a quality indicator.
 
 	ticker := time.NewTicker(networkConnectivityPollPeriod)
 	defer ticker.Stop()
@@ -1373,7 +1407,13 @@ loop:
 		tunnel.conn.Close()
 
 		if continuousNetworkConnectivity {
-			_ = RecordFailedTunnelStat(tunnel.config, tunnel.dialParams, err)
+			_ = RecordFailedTunnelStat(
+				tunnel.config,
+				tunnel.dialParams,
+				tunnel.livenessTestMetrics,
+				bytesUp,
+				bytesDown,
+				err)
 		}
 	}