Просмотр исходного кода

Add client-side tunnel data transfer burst monitoring

Rod Hynes 5 лет назад
Родитель
Сommit
5636dafdbf

+ 36 - 18
psiphon/common/burst.go

@@ -118,13 +118,15 @@ type burstHistory struct {
 
 func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
 
+	if conn.upstreamDeadline <= 0 || conn.upstreamThresholdBytes <= 0 {
+		return conn.Conn.Read(buffer)
+	}
+
 	start := time.Now()
 	n, err := conn.Conn.Read(buffer)
 	end := time.Now()
 
-	if n > 0 &&
-		conn.upstreamDeadline > 0 && conn.upstreamThresholdBytes > 0 {
-
+	if n > 0 {
 		conn.readMutex.Lock()
 		conn.updateBurst(
 			start,
@@ -143,13 +145,15 @@ func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
 
 func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
 
+	if conn.downstreamDeadline <= 0 || conn.downstreamThresholdBytes <= 0 {
+		return conn.Conn.Write(buffer)
+	}
+
 	start := time.Now()
 	n, err := conn.Conn.Write(buffer)
 	end := time.Now()
 
-	if n > 0 &&
-		conn.downstreamDeadline > 0 && conn.downstreamThresholdBytes > 0 {
-
+	if n > 0 {
 		conn.writeMutex.Lock()
 		conn.updateBurst(
 			start,
@@ -169,24 +173,38 @@ func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
 func (conn *BurstMonitoredConn) Close() error {
 	err := conn.Conn.Close()
 
-	conn.readMutex.Lock()
-	conn.endBurst(
-		conn.upstreamThresholdBytes,
-		&conn.currentUpstreamBurst,
-		&conn.upstreamBursts)
-	conn.readMutex.Unlock()
+	if conn.upstreamDeadline > 0 && conn.upstreamThresholdBytes > 0 {
+		conn.readMutex.Lock()
+		conn.endBurst(
+			conn.upstreamThresholdBytes,
+			&conn.currentUpstreamBurst,
+			&conn.upstreamBursts)
+		conn.readMutex.Unlock()
+	}
 
-	conn.writeMutex.Lock()
-	conn.endBurst(
-		conn.downstreamThresholdBytes,
-		&conn.currentDownstreamBurst,
-		&conn.downstreamBursts)
-	conn.writeMutex.Unlock()
+	if conn.downstreamDeadline > 0 && conn.downstreamThresholdBytes > 0 {
+		conn.writeMutex.Lock()
+		conn.endBurst(
+			conn.downstreamThresholdBytes,
+			&conn.currentDownstreamBurst,
+			&conn.downstreamBursts)
+		conn.writeMutex.Unlock()
+	}
 
 	// Note: no context error to preserve error type
 	return err
 }
 
+// IsClosed implements the Closer iterface. The return value indicates whether
+// the underlying conn has been closed.
+func (conn *BurstMonitoredConn) IsClosed() bool {
+	closer, ok := conn.Conn.(Closer)
+	if !ok {
+		return false
+	}
+	return closer.IsClosed()
+}
+
 // GetMetrics returns log fields with burst metrics for the first, last, min
 // (by rate), and max bursts for this conn. Time/duration values are reported
 // in milliseconds.

+ 8 - 0
psiphon/common/parameters/parameters.go

@@ -262,6 +262,10 @@ const (
 	ServerBurstUpstreamThresholdBytes                = "ServerBurstUpstreamThresholdBytes"
 	ServerBurstDownstreamDeadline                    = "ServerBurstDownstreamDeadline"
 	ServerBurstDownstreamThresholdBytes              = "ServerBurstDownstreamThresholdBytes"
+	ClientBurstUpstreamDeadline                      = "ClientBurstUpstreamDeadline"
+	ClientBurstUpstreamThresholdBytes                = "ClientBurstUpstreamThresholdBytes"
+	ClientBurstDownstreamDeadline                    = "ClientBurstDownstreamDeadline"
+	ClientBurstDownstreamThresholdBytes              = "ClientBurstDownstreamThresholdBytes"
 )
 
 const (
@@ -544,6 +548,10 @@ var defaultParameters = map[string]struct {
 	ServerBurstUpstreamThresholdBytes:   {value: 0, minimum: 0, flags: serverSideOnly},
 	ServerBurstDownstreamDeadline:       {value: time.Duration(0), minimum: time.Duration(0), flags: serverSideOnly},
 	ServerBurstDownstreamThresholdBytes: {value: 0, minimum: 0, flags: serverSideOnly},
+	ClientBurstUpstreamDeadline:         {value: time.Duration(0), minimum: time.Duration(0)},
+	ClientBurstUpstreamThresholdBytes:   {value: 0, minimum: 0},
+	ClientBurstDownstreamDeadline:       {value: time.Duration(0), minimum: time.Duration(0)},
+	ClientBurstDownstreamThresholdBytes: {value: 0, minimum: 0},
 }
 
 // IsServerSideOnly indicates if the parameter specified by name is used

+ 7 - 0
psiphon/notice.go

@@ -879,6 +879,13 @@ func NoticeServerAlert(alert protocol.AlertRequest) {
 		"ServerAlert", 0, "reason", alert.Reason, "subject", alert.Subject)
 }
 
+// NoticeBursts reports tunnel data transfer burst metrics.
+func NoticeBursts(diagnosticID string, burstMetrics common.LogFields) {
+	singletonNoticeLogger.outputNotice(
+		"Bursts", noticeIsDiagnostic,
+		append([]interface{}{"diagnosticID", diagnosticID}, listCommonFields(burstMetrics)...)...)
+}
+
 type repetitiveNoticeState struct {
 	message string
 	repeats int

+ 4 - 0
psiphon/server/server_test.go

@@ -2101,6 +2101,10 @@ func paveTacticsConfigFile(
           "ServerBurstUpstreamThresholdBytes" : 1000,
           "ServerBurstDownstreamDeadline" : "100ms",
           "ServerBurstDownstreamThresholdBytes" : 100000,
+          "ClientBurstUpstreamDeadline" : "100ms",
+          "ClientBurstUpstreamThresholdBytes" : 1000,
+          "ClientBurstDownstreamDeadline" : "100ms",
+          "ClientBurstDownstreamThresholdBytes" : 100000,
 	`
 	}
 

+ 28 - 12
psiphon/tunnel.go

@@ -90,7 +90,8 @@ type Tunnel struct {
 	dialParams                     *DialParameters
 	livenessTestMetrics            *livenessTestMetrics
 	serverContext                  *ServerContext
-	conn                           *common.ActivityMonitoredConn
+	monitoringStartTime            time.Time
+	conn                           *common.BurstMonitoredConn
 	sshClient                      *ssh.Client
 	sshServerRequests              <-chan *ssh.Request
 	operateWaitGroup               *sync.WaitGroup
@@ -157,6 +158,7 @@ func ConnectTunnel(
 		config:              config,
 		dialParams:          dialParams,
 		livenessTestMetrics: dialResult.livenessTestMetrics,
+		monitoringStartTime: dialResult.monitoringStartTime,
 		conn:                dialResult.monitoredConn,
 		sshClient:           dialResult.sshClient,
 		sshServerRequests:   dialResult.sshRequests,
@@ -334,6 +336,17 @@ func (tunnel *Tunnel) Close(isDiscarded bool) {
 			NoticeWarning("close tunnel ssh error: %s", err)
 		}
 	}
+
+	// Log burst metrics now that the BurstMonitoredConn is closed.
+	// Metrics will be empty when burst monitoring is disabled.
+	if !isDiscarded && isActivated {
+		burstMetrics := tunnel.conn.GetMetrics(tunnel.monitoringStartTime)
+		if len(burstMetrics) > 0 {
+			NoticeBursts(
+				tunnel.dialParams.ServerEntry.GetDiagnosticID(),
+				burstMetrics)
+		}
+	}
 }
 
 // SetInFlightConnectedRequest checks if a connected request can begin and
@@ -600,7 +613,8 @@ func (conn *TunneledConn) Close() error {
 
 type dialResult struct {
 	dialConn            net.Conn
-	monitoredConn       *common.ActivityMonitoredConn
+	monitoringStartTime time.Time
+	monitoredConn       *common.BurstMonitoredConn
 	sshClient           *ssh.Client
 	sshRequests         <-chan *ssh.Request
 	livenessTestMetrics *livenessTestMetrics
@@ -609,11 +623,6 @@ type dialResult struct {
 // dialTunnel is a helper that builds the transport layers and establishes the
 // SSH connection. When additional dial configuration is used, dial metrics
 // are recorded and returned.
-//
-// The net.Conn return value is the value to be removed from pendingConns;
-// additional layering (ThrottledConn, ActivityMonitoredConn) is applied, but
-// this return value is the base dial conn. The *ActivityMonitoredConn return
-// value is the layered conn passed into the ssh.Client.
 func dialTunnel(
 	ctx context.Context,
 	config *Config,
@@ -635,6 +644,10 @@ func dialTunnel(
 	livenessTestMaxUpstreamBytes := p.Int(parameters.LivenessTestMaxUpstreamBytes)
 	livenessTestMinDownstreamBytes := p.Int(parameters.LivenessTestMinDownstreamBytes)
 	livenessTestMaxDownstreamBytes := p.Int(parameters.LivenessTestMaxDownstreamBytes)
+	burstUpstreamDeadline := p.Duration(parameters.ClientBurstUpstreamDeadline)
+	burstUpstreamThresholdBytes := int64(p.Int(parameters.ClientBurstUpstreamThresholdBytes))
+	burstDownstreamDeadline := p.Duration(parameters.ClientBurstDownstreamDeadline)
+	burstDownstreamThresholdBytes := int64(p.Int(parameters.ClientBurstDownstreamThresholdBytes))
 	p.Close()
 
 	// Ensure that, unless the base context is cancelled, any replayed dial
@@ -774,11 +787,13 @@ func dialTunnel(
 		}
 	}()
 
-	// Activity monitoring is used to measure tunnel duration
-	monitoredConn, err := common.NewActivityMonitoredConn(dialConn, 0, false, nil, nil)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
+	monitoringStartTime := time.Now()
+	monitoredConn := common.NewBurstMonitoredConn(
+		dialConn,
+		burstUpstreamDeadline,
+		burstUpstreamThresholdBytes,
+		burstDownstreamDeadline,
+		burstDownstreamThresholdBytes)
 
 	// Apply throttling (if configured)
 	throttledConn := common.NewThrottledConn(
@@ -977,6 +992,7 @@ func dialTunnel(
 
 	return &dialResult{
 			dialConn:            dialConn,
+			monitoringStartTime: monitoringStartTime,
 			monitoredConn:       monitoredConn,
 			sshClient:           result.sshClient,
 			sshRequests:         result.sshRequests,