Rod Hynes 5 лет назад
Родитель
Сommit
ddcd96d577

+ 177 - 0
psiphon/common/activity.go

@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package common
+
+import (
+	"net"
+	"sync/atomic"
+	"time"
+
+	"github.com/Psiphon-Labs/goarista/monotime"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+)
+
+// ActivityMonitoredConn wraps a net.Conn, adding logic to deal with events
+// triggered by I/O activity.
+//
+// ActivityMonitoredConn uses lock-free concurrency synronization, avoiding an
+// additional mutex resource, making it suitable for wrapping many net.Conns
+// (e.g, each Psiphon port forward).
+//
+// When an inactivity timeout is specified, the network I/O will timeout after
+// the specified period of read inactivity. Optionally, for the purpose of
+// inactivity only, ActivityMonitoredConn will also consider the connection
+// active when data is written to it.
+//
+// When a LRUConnsEntry is specified, then the LRU entry is promoted on either
+// a successful read or write.
+//
+// When an ActivityUpdater is set, then its UpdateActivity method is called on
+// each read and write with the number of bytes transferred. The
+// durationNanoseconds, which is the time since the last read, is reported
+// only on reads.
+type ActivityMonitoredConn struct {
+	// Note: 64-bit ints used with atomic operations are placed
+	// at the start of struct to ensure 64-bit alignment.
+	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
+	monotonicStartTime   int64
+	lastReadActivityTime int64
+	realStartTime        time.Time
+	net.Conn
+	inactivityTimeout time.Duration
+	activeOnWrite     bool
+	activityUpdater   ActivityUpdater
+	lruEntry          *LRUConnsEntry
+}
+
+// ActivityUpdater defines an interface for receiving updates for
+// ActivityMonitoredConn activity. Values passed to UpdateProgress are bytes
+// transferred and conn duration since the previous UpdateProgress.
+type ActivityUpdater interface {
+	UpdateProgress(bytesRead, bytesWritten int64, durationNanoseconds int64)
+}
+
+// NewActivityMonitoredConn creates a new ActivityMonitoredConn.
+func NewActivityMonitoredConn(
+	conn net.Conn,
+	inactivityTimeout time.Duration,
+	activeOnWrite bool,
+	activityUpdater ActivityUpdater,
+	lruEntry *LRUConnsEntry) (*ActivityMonitoredConn, error) {
+
+	if inactivityTimeout > 0 {
+		err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+	}
+
+	// The "monotime" package is still used here as its time value is an int64,
+	// which is compatible with atomic operations.
+
+	now := int64(monotime.Now())
+
+	return &ActivityMonitoredConn{
+		Conn:                 conn,
+		inactivityTimeout:    inactivityTimeout,
+		activeOnWrite:        activeOnWrite,
+		realStartTime:        time.Now(),
+		monotonicStartTime:   now,
+		lastReadActivityTime: now,
+		activityUpdater:      activityUpdater,
+		lruEntry:             lruEntry,
+	}, nil
+}
+
+// GetStartTime gets the time when the ActivityMonitoredConn was initialized.
+// Reported time is UTC.
+func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
+	return conn.realStartTime.UTC()
+}
+
+// GetActiveDuration returns the time elapsed between the initialization of
+// the ActivityMonitoredConn and the last Read. Only reads are used for this
+// calculation since writes may succeed locally due to buffering.
+func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
+	return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.monotonicStartTime)
+}
+
+func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
+	n, err := conn.Conn.Read(buffer)
+	if n > 0 {
+
+		if conn.inactivityTimeout > 0 {
+			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
+			if err != nil {
+				return n, errors.Trace(err)
+			}
+		}
+
+		lastReadActivityTime := atomic.LoadInt64(&conn.lastReadActivityTime)
+		readActivityTime := int64(monotime.Now())
+
+		atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
+
+		if conn.activityUpdater != nil {
+			conn.activityUpdater.UpdateProgress(
+				int64(n), 0, readActivityTime-lastReadActivityTime)
+		}
+
+		if conn.lruEntry != nil {
+			conn.lruEntry.Touch()
+		}
+	}
+	// Note: no context error to preserve error type
+	return n, err
+}
+
+func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
+	n, err := conn.Conn.Write(buffer)
+	if n > 0 && conn.activeOnWrite {
+
+		if conn.inactivityTimeout > 0 {
+			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
+			if err != nil {
+				return n, errors.Trace(err)
+			}
+		}
+
+		if conn.activityUpdater != nil {
+			conn.activityUpdater.UpdateProgress(0, int64(n), 0)
+		}
+
+		if conn.lruEntry != nil {
+			conn.lruEntry.Touch()
+		}
+
+	}
+	// Note: no context error to preserve error type
+	return n, err
+}
+
+// IsClosed implements the Closer iterface. The return value indicates whether
+// the underlying conn has been closed.
+func (conn *ActivityMonitoredConn) IsClosed() bool {
+	closer, ok := conn.Conn.(Closer)
+	if !ok {
+		return false
+	}
+	return closer.IsClosed()
+}

+ 157 - 0
psiphon/common/activity_test.go

@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package common
+
+import (
+	"testing"
+	"testing/iotest"
+	"time"
+
+	"github.com/Psiphon-Labs/goarista/monotime"
+)
+
+func TestActivityMonitoredConn(t *testing.T) {
+	buffer := make([]byte, 1024)
+
+	conn, err := NewActivityMonitoredConn(
+		&dummyConn{},
+		200*time.Millisecond,
+		true,
+		nil,
+		nil)
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	realStartTime := time.Now().UTC()
+
+	monotonicStartTime := monotime.Now()
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != nil {
+		t.Fatalf("read before initial timeout failed")
+	}
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != nil {
+		t.Fatalf("previous read failed to extend timeout")
+	}
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Write(buffer)
+	if err != nil {
+		t.Fatalf("previous read failed to extend timeout")
+	}
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != nil {
+		t.Fatalf("previous write failed to extend timeout")
+	}
+
+	lastSuccessfulReadTime := monotime.Now()
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Write(buffer)
+	if err != nil {
+		t.Fatalf("previous read failed to extend timeout")
+	}
+
+	time.Sleep(300 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != iotest.ErrTimeout {
+		t.Fatalf("failed to timeout")
+	}
+
+	if realStartTime.Round(time.Millisecond) != conn.GetStartTime().Round(time.Millisecond) {
+		t.Fatalf("unexpected GetStartTime")
+	}
+
+	diff := lastSuccessfulReadTime.Sub(monotonicStartTime).Nanoseconds() - conn.GetActiveDuration().Nanoseconds()
+	if diff < 0 {
+		diff = -diff
+	}
+	if diff > (1 * time.Millisecond).Nanoseconds() {
+		t.Fatalf("unexpected GetActiveDuration")
+	}
+}
+
+func TestActivityMonitoredLRUConns(t *testing.T) {
+
+	lruConns := NewLRUConns()
+
+	dummy1 := &dummyConn{}
+	conn1, err := NewActivityMonitoredConn(dummy1, 0, true, nil, lruConns.Add(dummy1))
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	dummy2 := &dummyConn{}
+	conn2, err := NewActivityMonitoredConn(dummy2, 0, true, nil, lruConns.Add(dummy2))
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	dummy3 := &dummyConn{}
+	conn3, err := NewActivityMonitoredConn(dummy3, 0, true, nil, lruConns.Add(dummy3))
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	buffer := make([]byte, 1024)
+
+	conn1.Read(buffer)
+	conn2.Read(buffer)
+	conn3.Read(buffer)
+
+	conn3.Write(buffer)
+	conn2.Write(buffer)
+	conn1.Write(buffer)
+
+	if dummy1.IsClosed() || dummy2.IsClosed() || dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if dummy1.IsClosed() || dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if !dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+}

+ 284 - 0
psiphon/common/burst.go

@@ -0,0 +1,284 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package common
+
+import (
+	"net"
+	"sync"
+	"time"
+)
+
+// BurstMonitoredConn wraps a net.Conn and monitors for data transfer bursts.
+// Upstream (read) and downstream (write) bursts are tracked independently.
+//
+// A burst is defined as a transfer of at least "threshold" bytes, across
+// multiple I/O operations where the delay between operations does not exceed
+// "deadline". Both a non-zero deadline and theshold must be set to enable
+// monitoring. Four bursts are reported: the first, the last, the min (by
+// rate) and max.
+//
+// The reported rates will be more accurate for larger data transfers,
+// especially for higher transfer rates. Tune the deadline/threshold as
+// required. The threshold should be set to account for buffering (e.g, the
+// local host socket send/receive buffer) but this is not enforced by
+// BurstMonitoredConn.
+//
+// Close must be called to complete any outstanding bursts. For complete
+// results, call GetMetrics only after Close is called.
+//
+// Overhead: BurstMonitoredConn adds mutexes but does not use timers.
+type BurstMonitoredConn struct {
+	net.Conn
+	upstreamDeadline         time.Duration
+	upstreamThresholdBytes   int64
+	downstreamDeadline       time.Duration
+	downstreamThresholdBytes int64
+
+	readMutex            sync.Mutex
+	currentUpstreamBurst burst
+	upstreamBursts       burstHistory
+
+	writeMutex             sync.Mutex
+	currentDownstreamBurst burst
+	downstreamBursts       burstHistory
+}
+
+// NewBurstMonitoredConn creates a new BurstMonitoredConn.
+func NewBurstMonitoredConn(
+	conn net.Conn,
+	upstreamDeadline time.Duration,
+	upstreamThresholdBytes int64,
+	downstreamDeadline time.Duration,
+	downstreamThresholdBytes int64) *BurstMonitoredConn {
+
+	return &BurstMonitoredConn{
+		Conn:                     conn,
+		upstreamDeadline:         upstreamDeadline,
+		upstreamThresholdBytes:   upstreamThresholdBytes,
+		downstreamDeadline:       downstreamDeadline,
+		downstreamThresholdBytes: downstreamThresholdBytes,
+	}
+}
+
+type burst struct {
+	startTime    time.Time
+	lastByteTime time.Time
+	bytes        int64
+}
+
+func (b *burst) isZero() bool {
+	return b.startTime.IsZero()
+}
+
+func (b *burst) offset(baseTime time.Time) time.Duration {
+	offset := b.startTime.Sub(baseTime)
+	if offset <= 0 {
+		return 0
+	}
+	return offset
+}
+
+func (b *burst) duration() time.Duration {
+	duration := b.lastByteTime.Sub(b.startTime)
+	if duration <= 0 {
+		return 0
+	}
+	return duration
+}
+
+func (b *burst) rate() int64 {
+	return int64(
+		(float64(b.bytes) * float64(time.Second)) /
+			float64(b.duration()))
+}
+
+type burstHistory struct {
+	first burst
+	last  burst
+	min   burst
+	max   burst
+}
+
+func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
+
+	start := time.Now()
+	n, err := conn.Conn.Read(buffer)
+	end := time.Now()
+
+	if n > 0 &&
+		conn.upstreamDeadline > 0 && conn.upstreamThresholdBytes > 0 {
+
+		conn.readMutex.Lock()
+		conn.updateBurst(
+			start,
+			end,
+			int64(n),
+			conn.upstreamDeadline,
+			conn.upstreamThresholdBytes,
+			&conn.currentUpstreamBurst,
+			&conn.upstreamBursts)
+		conn.readMutex.Unlock()
+	}
+
+	// Note: no context error to preserve error type
+	return n, err
+}
+
+func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
+
+	start := time.Now()
+	n, err := conn.Conn.Write(buffer)
+	end := time.Now()
+
+	if n > 0 &&
+		conn.downstreamDeadline > 0 && conn.downstreamThresholdBytes > 0 {
+
+		conn.writeMutex.Lock()
+		conn.updateBurst(
+			start,
+			end,
+			int64(n),
+			conn.downstreamDeadline,
+			conn.downstreamThresholdBytes,
+			&conn.currentDownstreamBurst,
+			&conn.downstreamBursts)
+		conn.writeMutex.Unlock()
+	}
+
+	// Note: no context error to preserve error type
+	return n, err
+}
+
+func (conn *BurstMonitoredConn) Close() error {
+	err := conn.Conn.Close()
+
+	conn.readMutex.Lock()
+	conn.endBurst(
+		conn.upstreamThresholdBytes,
+		&conn.currentUpstreamBurst,
+		&conn.upstreamBursts)
+	conn.readMutex.Unlock()
+
+	conn.writeMutex.Lock()
+	conn.endBurst(
+		conn.downstreamThresholdBytes,
+		&conn.currentDownstreamBurst,
+		&conn.downstreamBursts)
+	conn.writeMutex.Unlock()
+
+	// Note: no context error to preserve error type
+	return err
+}
+
+// GetMetrics returns log fields with burst metrics for the first, last, min
+// (by rate), and max bursts for this conn. Time/duration values are reported
+// in milliseconds.
+func (conn *BurstMonitoredConn) GetMetrics(baseTime time.Time) LogFields {
+	logFields := make(LogFields)
+
+	addFields := func(prefix string, burst *burst) {
+		if burst.isZero() {
+			return
+		}
+		logFields[prefix+"offset"] = int64(burst.offset(baseTime) / time.Millisecond)
+		logFields[prefix+"duration"] = int64(burst.duration() / time.Millisecond)
+		logFields[prefix+"bytes"] = burst.bytes
+		logFields[prefix+"rate"] = burst.rate()
+	}
+
+	addHistory := func(prefix string, history *burstHistory) {
+		addFields(prefix+"first_", &history.first)
+		addFields(prefix+"last_", &history.last)
+		addFields(prefix+"min_", &history.min)
+		addFields(prefix+"max_", &history.max)
+	}
+
+	addHistory("burst_upstream_", &conn.upstreamBursts)
+	addHistory("burst_downstream_", &conn.downstreamBursts)
+
+	return logFields
+}
+
+func (conn *BurstMonitoredConn) updateBurst(
+	operationStart time.Time,
+	operationEnd time.Time,
+	operationBytes int64,
+	deadline time.Duration,
+	thresholdBytes int64,
+	currentBurst *burst,
+	history *burstHistory) {
+
+	// Assumes the associated mutex is locked.
+
+	if currentBurst.isZero() {
+		currentBurst.startTime = operationStart
+		currentBurst.lastByteTime = operationEnd
+		currentBurst.bytes = operationBytes
+
+	} else {
+
+		if operationStart.Sub(currentBurst.lastByteTime) >
+			deadline {
+
+			conn.endBurst(thresholdBytes, currentBurst, history)
+			currentBurst.startTime = operationStart
+		}
+
+		currentBurst.lastByteTime = operationEnd
+		currentBurst.bytes += operationBytes
+	}
+
+}
+
+func (conn *BurstMonitoredConn) endBurst(
+	thresholdBytes int64,
+	currentBurst *burst,
+	history *burstHistory) {
+
+	// Assumes the associated mutex is locked.
+
+	if currentBurst.isZero() {
+		return
+	}
+
+	burst := *currentBurst
+
+	currentBurst.startTime = time.Time{}
+	currentBurst.lastByteTime = time.Time{}
+	currentBurst.bytes = 0
+
+	if burst.bytes < thresholdBytes {
+		return
+	}
+
+	if history.first.isZero() {
+		history.first = burst
+	}
+
+	history.last = burst
+
+	if history.min.isZero() || history.min.rate() > burst.rate() {
+		history.min = burst
+	}
+
+	if history.max.isZero() || history.max.rate() < burst.rate() {
+		history.max = burst
+	}
+}

+ 186 - 0
psiphon/common/burst_test.go

@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package common
+
+import (
+	"testing"
+	"time"
+)
+
+func TestBurstMonitoredConn(t *testing.T) {
+
+	burstDeadline := 100 * time.Millisecond
+	upstreamThresholdBytes := int64(100000)
+	downstreamThresholdBytes := int64(1000000)
+
+	baseTime := time.Now()
+
+	dummy := &dummyConn{}
+
+	conn := NewBurstMonitoredConn(
+		dummy,
+		burstDeadline,
+		upstreamThresholdBytes,
+		burstDeadline,
+		downstreamThresholdBytes)
+
+	// Simulate 128KB/s up, 1MB/s down; transmit >= min bytes in segments; sets "first" and "min"
+
+	dummy.SetRateLimits(131072, 1048576)
+
+	segments := 10
+
+	b := make([]byte, int(upstreamThresholdBytes)/segments)
+	firstReadStart := time.Now()
+	for i := 0; i < segments; i++ {
+		conn.Read(b)
+	}
+	firstReadEnd := time.Now()
+
+	b = make([]byte, int(downstreamThresholdBytes)/segments)
+	firstWriteStart := time.Now()
+	for i := 0; i < segments; i++ {
+		conn.Write(b)
+	}
+	firstWriteEnd := time.Now()
+
+	time.Sleep(burstDeadline * 2)
+
+	// Simulate 1MB/s up, 10MB/s down; repeatedly transmit < min bytes before deadline; ignored
+
+	dummy.SetRateLimits(1048576, 10485760)
+
+	b = make([]byte, 1)
+	segments = 1000
+	for i := 0; i < segments; i++ {
+		conn.Read(b)
+	}
+	for i := 0; i < segments; i++ {
+		conn.Write(b)
+	}
+
+	time.Sleep(burstDeadline * 2)
+
+	// Simulate 512Kb/s up, 5MB/s down; transmit >= min bytes; sets "max"
+
+	dummy.SetRateLimits(524288, 5242880)
+
+	maxReadStart := time.Now()
+	conn.Read(make([]byte, upstreamThresholdBytes))
+	maxReadEnd := time.Now()
+
+	maxWriteStart := time.Now()
+	conn.Write(make([]byte, downstreamThresholdBytes))
+	maxWriteEnd := time.Now()
+
+	time.Sleep(burstDeadline * 2)
+
+	// Simulate 256Kb/s up, 2MB/s down;, transmit >= min bytes; sets "last"
+
+	dummy.SetRateLimits(262144, 2097152)
+
+	lastReadStart := time.Now()
+	conn.Read(make([]byte, upstreamThresholdBytes))
+	lastReadEnd := time.Now()
+
+	lastWriteStart := time.Now()
+	conn.Write(make([]byte, downstreamThresholdBytes))
+	lastWriteEnd := time.Now()
+
+	time.Sleep(burstDeadline * 2)
+
+	conn.Close()
+
+	t.Logf("upstream first:    %d bytes in %s; %d bytes/s",
+		conn.upstreamBursts.first.bytes, conn.upstreamBursts.first.duration(), conn.upstreamBursts.first.rate())
+	t.Logf("upstream last:     %d bytes in %s; %d bytes/s",
+		conn.upstreamBursts.last.bytes, conn.upstreamBursts.last.duration(), conn.upstreamBursts.last.rate())
+	t.Logf("upstream min:      %d bytes in %s; %d bytes/s",
+		conn.upstreamBursts.min.bytes, conn.upstreamBursts.min.duration(), conn.upstreamBursts.min.rate())
+	t.Logf("upstream max:      %d bytes in %s; %d bytes/s",
+		conn.upstreamBursts.max.bytes, conn.upstreamBursts.max.duration(), conn.upstreamBursts.max.rate())
+	t.Logf("downstream first:  %d bytes in %s; %d bytes/s",
+		conn.downstreamBursts.first.bytes, conn.downstreamBursts.first.duration(), conn.downstreamBursts.first.rate())
+	t.Logf("downstream last:   %d bytes in %s; %d bytes/s",
+		conn.downstreamBursts.last.bytes, conn.downstreamBursts.last.duration(), conn.downstreamBursts.last.rate())
+	t.Logf("downstream min:    %d bytes in %s; %d bytes/s",
+		conn.downstreamBursts.min.bytes, conn.downstreamBursts.min.duration(), conn.downstreamBursts.min.rate())
+	t.Logf("downstream max:    %d bytes in %s; %d bytes/s",
+		conn.downstreamBursts.max.bytes, conn.downstreamBursts.max.duration(), conn.downstreamBursts.max.rate())
+
+	logFields := conn.GetMetrics(baseTime)
+
+	if len(logFields) != 32 {
+		t.Errorf("unexpected metric count: %d", len(logFields))
+	}
+
+	for name, expectedValue := range map[string]int64{
+		"burst_upstream_first_offset":     int64(firstReadStart.Sub(baseTime) / time.Millisecond),
+		"burst_upstream_first_duration":   int64(firstReadEnd.Sub(firstReadStart) / time.Millisecond),
+		"burst_upstream_first_bytes":      upstreamThresholdBytes,
+		"burst_upstream_first_rate":       131072,
+		"burst_upstream_last_offset":      int64(lastReadStart.Sub(baseTime) / time.Millisecond),
+		"burst_upstream_last_duration":    int64(lastReadEnd.Sub(lastReadStart) / time.Millisecond),
+		"burst_upstream_last_bytes":       upstreamThresholdBytes,
+		"burst_upstream_last_rate":        262144,
+		"burst_upstream_min_offset":       int64(firstReadStart.Sub(baseTime) / time.Millisecond),
+		"burst_upstream_min_duration":     int64(firstReadEnd.Sub(firstReadStart) / time.Millisecond),
+		"burst_upstream_min_bytes":        upstreamThresholdBytes,
+		"burst_upstream_min_rate":         131072,
+		"burst_upstream_max_offset":       int64(maxReadStart.Sub(baseTime) / time.Millisecond),
+		"burst_upstream_max_duration":     int64(maxReadEnd.Sub(maxReadStart) / time.Millisecond),
+		"burst_upstream_max_bytes":        upstreamThresholdBytes,
+		"burst_upstream_max_rate":         524288,
+		"burst_downstream_first_offset":   int64(firstWriteStart.Sub(baseTime) / time.Millisecond),
+		"burst_downstream_first_duration": int64(firstWriteEnd.Sub(firstWriteStart) / time.Millisecond),
+		"burst_downstream_first_bytes":    downstreamThresholdBytes,
+		"burst_downstream_first_rate":     1048576,
+		"burst_downstream_last_offset":    int64(lastWriteStart.Sub(baseTime) / time.Millisecond),
+		"burst_downstream_last_duration":  int64(lastWriteEnd.Sub(lastWriteStart) / time.Millisecond),
+		"burst_downstream_last_bytes":     downstreamThresholdBytes,
+		"burst_downstream_last_rate":      2097152,
+		"burst_downstream_min_offset":     int64(firstWriteStart.Sub(baseTime) / time.Millisecond),
+		"burst_downstream_min_duration":   int64(firstWriteEnd.Sub(firstWriteStart) / time.Millisecond),
+		"burst_downstream_min_bytes":      downstreamThresholdBytes,
+		"burst_downstream_min_rate":       1048576,
+		"burst_downstream_max_offset":     int64(maxWriteStart.Sub(baseTime) / time.Millisecond),
+		"burst_downstream_max_duration":   int64(maxWriteEnd.Sub(maxWriteStart) / time.Millisecond),
+		"burst_downstream_max_bytes":      downstreamThresholdBytes,
+		"burst_downstream_max_rate":       5242880,
+	} {
+		value, ok := logFields[name]
+		if !ok {
+			t.Errorf("missing expected metric: %s", name)
+			continue
+		}
+		valueInt64, ok := value.(int64)
+		if !ok {
+			t.Errorf("missing expected metric type: %s (%T)", name, value)
+			continue
+		}
+		minAcceptable := int64(float64(expectedValue) * 0.95)
+		maxAcceptable := int64(float64(expectedValue) * 1.05)
+		if valueInt64 < minAcceptable || valueInt64 > maxAcceptable {
+			t.Errorf("unexpected metric value: %s (%v <= %v <= %v)",
+				name, minAcceptable, valueInt64, maxAcceptable)
+			continue
+		}
+	}
+}

+ 0 - 409
psiphon/common/net.go

@@ -26,10 +26,7 @@ import (
 	"net/http"
 	"strconv"
 	"sync"
-	"sync/atomic"
-	"time"
 
-	"github.com/Psiphon-Labs/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/miekg/dns"
@@ -261,412 +258,6 @@ func (entry *LRUConnsEntry) Touch() {
 	entry.lruConns.list.MoveToFront(entry.element)
 }
 
-// ActivityMonitoredConn wraps a net.Conn, adding logic to deal with events
-// triggered by I/O activity.
-//
-// ActivityMonitoredConn uses lock-free concurrency synronization, avoiding an
-// additional mutex resource, making it suitable for wrapping many net.Conns
-// (e.g, each Psiphon port forward).
-//
-// When an inactivity timeout is specified, the network I/O will timeout after
-// the specified period of read inactivity. Optionally, for the purpose of
-// inactivity only, ActivityMonitoredConn will also consider the connection
-// active when data is written to it.
-//
-// When a LRUConnsEntry is specified, then the LRU entry is promoted on either
-// a successful read or write.
-//
-// When an ActivityUpdater is set, then its UpdateActivity method is called on
-// each read and write with the number of bytes transferred. The
-// durationNanoseconds, which is the time since the last read, is reported
-// only on reads.
-type ActivityMonitoredConn struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	monotonicStartTime   int64
-	lastReadActivityTime int64
-	realStartTime        time.Time
-	net.Conn
-	inactivityTimeout time.Duration
-	activeOnWrite     bool
-	activityUpdater   ActivityUpdater
-	lruEntry          *LRUConnsEntry
-}
-
-// ActivityUpdater defines an interface for receiving updates for
-// ActivityMonitoredConn activity. Values passed to UpdateProgress are bytes
-// transferred and conn duration since the previous UpdateProgress.
-type ActivityUpdater interface {
-	UpdateProgress(bytesRead, bytesWritten int64, durationNanoseconds int64)
-}
-
-// NewActivityMonitoredConn creates a new ActivityMonitoredConn.
-func NewActivityMonitoredConn(
-	conn net.Conn,
-	inactivityTimeout time.Duration,
-	activeOnWrite bool,
-	activityUpdater ActivityUpdater,
-	lruEntry *LRUConnsEntry) (*ActivityMonitoredConn, error) {
-
-	if inactivityTimeout > 0 {
-		err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
-		if err != nil {
-			return nil, errors.Trace(err)
-		}
-	}
-
-	// The "monotime" package is still used here as its time value is an int64,
-	// which is compatible with atomic operations.
-
-	now := int64(monotime.Now())
-
-	return &ActivityMonitoredConn{
-		Conn:                 conn,
-		inactivityTimeout:    inactivityTimeout,
-		activeOnWrite:        activeOnWrite,
-		realStartTime:        time.Now(),
-		monotonicStartTime:   now,
-		lastReadActivityTime: now,
-		activityUpdater:      activityUpdater,
-		lruEntry:             lruEntry,
-	}, nil
-}
-
-// GetStartTime gets the time when the ActivityMonitoredConn was initialized.
-// Reported time is UTC.
-func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
-	return conn.realStartTime.UTC()
-}
-
-// GetActiveDuration returns the time elapsed between the initialization of
-// the ActivityMonitoredConn and the last Read. Only reads are used for this
-// calculation since writes may succeed locally due to buffering.
-func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
-	return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.monotonicStartTime)
-}
-
-func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
-	n, err := conn.Conn.Read(buffer)
-	if n > 0 {
-
-		if conn.inactivityTimeout > 0 {
-			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
-			if err != nil {
-				return n, errors.Trace(err)
-			}
-		}
-
-		lastReadActivityTime := atomic.LoadInt64(&conn.lastReadActivityTime)
-		readActivityTime := int64(monotime.Now())
-
-		atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
-
-		if conn.activityUpdater != nil {
-			conn.activityUpdater.UpdateProgress(
-				int64(n), 0, readActivityTime-lastReadActivityTime)
-		}
-
-		if conn.lruEntry != nil {
-			conn.lruEntry.Touch()
-		}
-	}
-	// Note: no context error to preserve error type
-	return n, err
-}
-
-func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
-	n, err := conn.Conn.Write(buffer)
-	if n > 0 && conn.activeOnWrite {
-
-		if conn.inactivityTimeout > 0 {
-			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
-			if err != nil {
-				return n, errors.Trace(err)
-			}
-		}
-
-		if conn.activityUpdater != nil {
-			conn.activityUpdater.UpdateProgress(0, int64(n), 0)
-		}
-
-		if conn.lruEntry != nil {
-			conn.lruEntry.Touch()
-		}
-
-	}
-	// Note: no context error to preserve error type
-	return n, err
-}
-
-// IsClosed implements the Closer iterface. The return value indicates whether
-// the underlying conn has been closed.
-func (conn *ActivityMonitoredConn) IsClosed() bool {
-	closer, ok := conn.Conn.(Closer)
-	if !ok {
-		return false
-	}
-	return closer.IsClosed()
-}
-
-// BurstMonitoredConn wraps a net.Conn and monitors for data transfer bursts.
-// Upstream (read) and downstream (write) bursts are tracked independently.
-//
-// A burst is defined as a transfer of at least "threshold" bytes, across
-// multiple I/O operations where the delay between operations does not exceed
-// "deadline". Both a non-zero deadline and theshold must be set to enable
-// monitoring. Four bursts are reported: the first, the last, the min (by
-// rate) and max.
-//
-// The reported rates will be more accurate for larger data transfers,
-// especially for higher transfer rates. Tune the deadline/threshold as
-// required. The threshold should be set to account for buffering (e.g, the
-// local host socket send/receive buffer) but this is not enforced by
-// BurstMonitoredConn.
-//
-// Close must be called to complete any outstanding bursts. For complete
-// results, call GetMetrics only after Close is called.
-//
-// Overhead: BurstMonitoredConn adds mutexes but does not use timers.
-type BurstMonitoredConn struct {
-	net.Conn
-	upstreamDeadline         time.Duration
-	upstreamThresholdBytes   int64
-	downstreamDeadline       time.Duration
-	downstreamThresholdBytes int64
-
-	readMutex            sync.Mutex
-	currentUpstreamBurst burst
-	upstreamBursts       burstHistory
-
-	writeMutex             sync.Mutex
-	currentDownstreamBurst burst
-	downstreamBursts       burstHistory
-}
-
-// NewBurstMonitoredConn creates a new BurstMonitoredConn.
-func NewBurstMonitoredConn(
-	conn net.Conn,
-	upstreamDeadline time.Duration,
-	upstreamThresholdBytes int64,
-	downstreamDeadline time.Duration,
-	downstreamThresholdBytes int64) *BurstMonitoredConn {
-
-	return &BurstMonitoredConn{
-		Conn:                     conn,
-		upstreamDeadline:         upstreamDeadline,
-		upstreamThresholdBytes:   upstreamThresholdBytes,
-		downstreamDeadline:       downstreamDeadline,
-		downstreamThresholdBytes: downstreamThresholdBytes,
-	}
-}
-
-type burst struct {
-	startTime    time.Time
-	lastByteTime time.Time
-	bytes        int64
-}
-
-func (b *burst) isZero() bool {
-	return b.startTime.IsZero()
-}
-
-func (b *burst) offset(baseTime time.Time) time.Duration {
-	offset := b.startTime.Sub(baseTime)
-	if offset <= 0 {
-		return 0
-	}
-	return offset
-}
-
-func (b *burst) duration() time.Duration {
-	duration := b.lastByteTime.Sub(b.startTime)
-	if duration <= 0 {
-		return 0
-	}
-	return duration
-}
-
-func (b *burst) rate() int64 {
-	return int64(
-		(float64(b.bytes) * float64(time.Second)) /
-			float64(b.duration()))
-}
-
-type burstHistory struct {
-	first burst
-	last  burst
-	min   burst
-	max   burst
-}
-
-func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
-
-	start := time.Now()
-	n, err := conn.Conn.Read(buffer)
-	end := time.Now()
-
-	if n > 0 &&
-		conn.upstreamDeadline > 0 && conn.upstreamThresholdBytes > 0 {
-
-		conn.readMutex.Lock()
-		conn.updateBurst(
-			start,
-			end,
-			int64(n),
-			conn.upstreamDeadline,
-			conn.upstreamThresholdBytes,
-			&conn.currentUpstreamBurst,
-			&conn.upstreamBursts)
-		conn.readMutex.Unlock()
-	}
-
-	// Note: no context error to preserve error type
-	return n, err
-}
-
-func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
-
-	start := time.Now()
-	n, err := conn.Conn.Write(buffer)
-	end := time.Now()
-
-	if n > 0 &&
-		conn.downstreamDeadline > 0 && conn.downstreamThresholdBytes > 0 {
-
-		conn.writeMutex.Lock()
-		conn.updateBurst(
-			start,
-			end,
-			int64(n),
-			conn.downstreamDeadline,
-			conn.downstreamThresholdBytes,
-			&conn.currentDownstreamBurst,
-			&conn.downstreamBursts)
-		conn.writeMutex.Unlock()
-	}
-
-	// Note: no context error to preserve error type
-	return n, err
-}
-
-func (conn *BurstMonitoredConn) Close() error {
-	err := conn.Conn.Close()
-
-	conn.readMutex.Lock()
-	conn.endBurst(
-		conn.upstreamThresholdBytes,
-		&conn.currentUpstreamBurst,
-		&conn.upstreamBursts)
-	conn.readMutex.Unlock()
-
-	conn.writeMutex.Lock()
-	conn.endBurst(
-		conn.downstreamThresholdBytes,
-		&conn.currentDownstreamBurst,
-		&conn.downstreamBursts)
-	conn.writeMutex.Unlock()
-
-	// Note: no context error to preserve error type
-	return err
-}
-
-// GetMetrics returns log fields with burst metrics for the first, last, min
-// (by rate), and max bursts for this conn. Time/duration values are reported
-// in milliseconds.
-func (conn *BurstMonitoredConn) GetMetrics(baseTime time.Time) LogFields {
-	logFields := make(LogFields)
-
-	addFields := func(prefix string, burst *burst) {
-		if burst.isZero() {
-			return
-		}
-		logFields[prefix+"offset"] = int64(burst.offset(baseTime) / time.Millisecond)
-		logFields[prefix+"duration"] = int64(burst.duration() / time.Millisecond)
-		logFields[prefix+"bytes"] = burst.bytes
-		logFields[prefix+"rate"] = burst.rate()
-	}
-
-	addHistory := func(prefix string, history *burstHistory) {
-		addFields(prefix+"first_", &history.first)
-		addFields(prefix+"last_", &history.last)
-		addFields(prefix+"min_", &history.min)
-		addFields(prefix+"max_", &history.max)
-	}
-
-	addHistory("burst_upstream_", &conn.upstreamBursts)
-	addHistory("burst_downstream_", &conn.downstreamBursts)
-
-	return logFields
-}
-
-func (conn *BurstMonitoredConn) updateBurst(
-	operationStart time.Time,
-	operationEnd time.Time,
-	operationBytes int64,
-	deadline time.Duration,
-	thresholdBytes int64,
-	currentBurst *burst,
-	history *burstHistory) {
-
-	// Assumes the associated mutex is locked.
-
-	if currentBurst.isZero() {
-		currentBurst.startTime = operationStart
-		currentBurst.lastByteTime = operationEnd
-		currentBurst.bytes = operationBytes
-
-	} else {
-
-		if operationStart.Sub(currentBurst.lastByteTime) >
-			deadline {
-
-			conn.endBurst(thresholdBytes, currentBurst, history)
-			currentBurst.startTime = operationStart
-		}
-
-		currentBurst.lastByteTime = operationEnd
-		currentBurst.bytes += operationBytes
-	}
-
-}
-
-func (conn *BurstMonitoredConn) endBurst(
-	thresholdBytes int64,
-	currentBurst *burst,
-	history *burstHistory) {
-
-	// Assumes the associated mutex is locked.
-
-	if currentBurst.isZero() {
-		return
-	}
-
-	burst := *currentBurst
-
-	currentBurst.startTime = time.Time{}
-	currentBurst.lastByteTime = time.Time{}
-	currentBurst.bytes = 0
-
-	if burst.bytes < thresholdBytes {
-		return
-	}
-
-	if history.first.isZero() {
-		history.first = burst
-	}
-
-	history.last = burst
-
-	if history.min.isZero() || history.min.rate() > burst.rate() {
-		history.min = burst
-	}
-
-	if history.max.isZero() || history.max.rate() < burst.rate() {
-		history.max = burst
-	}
-}
-
 // IsBogon checks if the specified IP is a bogon (loopback, private addresses,
 // link-local addresses, etc.)
 func IsBogon(IP net.IP) bool {

+ 0 - 291
psiphon/common/net_test.go

@@ -26,139 +26,9 @@ import (
 	"testing/iotest"
 	"time"
 
-	"github.com/Psiphon-Labs/goarista/monotime"
 	"github.com/miekg/dns"
 )
 
-func TestActivityMonitoredConn(t *testing.T) {
-	buffer := make([]byte, 1024)
-
-	conn, err := NewActivityMonitoredConn(
-		&dummyConn{},
-		200*time.Millisecond,
-		true,
-		nil,
-		nil)
-	if err != nil {
-		t.Fatalf("NewActivityMonitoredConn failed")
-	}
-
-	realStartTime := time.Now().UTC()
-
-	monotonicStartTime := monotime.Now()
-
-	time.Sleep(100 * time.Millisecond)
-
-	_, err = conn.Read(buffer)
-	if err != nil {
-		t.Fatalf("read before initial timeout failed")
-	}
-
-	time.Sleep(100 * time.Millisecond)
-
-	_, err = conn.Read(buffer)
-	if err != nil {
-		t.Fatalf("previous read failed to extend timeout")
-	}
-
-	time.Sleep(100 * time.Millisecond)
-
-	_, err = conn.Write(buffer)
-	if err != nil {
-		t.Fatalf("previous read failed to extend timeout")
-	}
-
-	time.Sleep(100 * time.Millisecond)
-
-	_, err = conn.Read(buffer)
-	if err != nil {
-		t.Fatalf("previous write failed to extend timeout")
-	}
-
-	lastSuccessfulReadTime := monotime.Now()
-
-	time.Sleep(100 * time.Millisecond)
-
-	_, err = conn.Write(buffer)
-	if err != nil {
-		t.Fatalf("previous read failed to extend timeout")
-	}
-
-	time.Sleep(300 * time.Millisecond)
-
-	_, err = conn.Read(buffer)
-	if err != iotest.ErrTimeout {
-		t.Fatalf("failed to timeout")
-	}
-
-	if realStartTime.Round(time.Millisecond) != conn.GetStartTime().Round(time.Millisecond) {
-		t.Fatalf("unexpected GetStartTime")
-	}
-
-	diff := lastSuccessfulReadTime.Sub(monotonicStartTime).Nanoseconds() - conn.GetActiveDuration().Nanoseconds()
-	if diff < 0 {
-		diff = -diff
-	}
-	if diff > (1 * time.Millisecond).Nanoseconds() {
-		t.Fatalf("unexpected GetActiveDuration")
-	}
-}
-
-func TestActivityMonitoredLRUConns(t *testing.T) {
-
-	lruConns := NewLRUConns()
-
-	dummy1 := &dummyConn{}
-	conn1, err := NewActivityMonitoredConn(dummy1, 0, true, nil, lruConns.Add(dummy1))
-	if err != nil {
-		t.Fatalf("NewActivityMonitoredConn failed")
-	}
-
-	dummy2 := &dummyConn{}
-	conn2, err := NewActivityMonitoredConn(dummy2, 0, true, nil, lruConns.Add(dummy2))
-	if err != nil {
-		t.Fatalf("NewActivityMonitoredConn failed")
-	}
-
-	dummy3 := &dummyConn{}
-	conn3, err := NewActivityMonitoredConn(dummy3, 0, true, nil, lruConns.Add(dummy3))
-	if err != nil {
-		t.Fatalf("NewActivityMonitoredConn failed")
-	}
-
-	buffer := make([]byte, 1024)
-
-	conn1.Read(buffer)
-	conn2.Read(buffer)
-	conn3.Read(buffer)
-
-	conn3.Write(buffer)
-	conn2.Write(buffer)
-	conn1.Write(buffer)
-
-	if dummy1.IsClosed() || dummy2.IsClosed() || dummy3.IsClosed() {
-		t.Fatalf("unexpected IsClosed state")
-	}
-
-	lruConns.CloseOldest()
-
-	if dummy1.IsClosed() || dummy2.IsClosed() || !dummy3.IsClosed() {
-		t.Fatalf("unexpected IsClosed state")
-	}
-
-	lruConns.CloseOldest()
-
-	if dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
-		t.Fatalf("unexpected IsClosed state")
-	}
-
-	lruConns.CloseOldest()
-
-	if !dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
-		t.Fatalf("unexpected IsClosed state")
-	}
-}
-
 func TestLRUConns(t *testing.T) {
 	lruConns := NewLRUConns()
 
@@ -200,167 +70,6 @@ func TestLRUConns(t *testing.T) {
 	}
 }
 
-func TestBurstMonitoredConn(t *testing.T) {
-
-	burstDeadline := 100 * time.Millisecond
-	upstreamThresholdBytes := int64(100000)
-	downstreamThresholdBytes := int64(1000000)
-
-	baseTime := time.Now()
-
-	dummy := &dummyConn{}
-
-	conn := NewBurstMonitoredConn(
-		dummy,
-		burstDeadline,
-		upstreamThresholdBytes,
-		burstDeadline,
-		downstreamThresholdBytes)
-
-	// Simulate 128KB/s up, 1MB/s down; transmit >= min bytes in segments; sets "first" and "min"
-
-	dummy.SetRateLimits(131072, 1048576)
-
-	segments := 10
-
-	b := make([]byte, int(upstreamThresholdBytes)/segments)
-	firstReadStart := time.Now()
-	for i := 0; i < segments; i++ {
-		conn.Read(b)
-	}
-	firstReadEnd := time.Now()
-
-	b = make([]byte, int(downstreamThresholdBytes)/segments)
-	firstWriteStart := time.Now()
-	for i := 0; i < segments; i++ {
-		conn.Write(b)
-	}
-	firstWriteEnd := time.Now()
-
-	time.Sleep(burstDeadline * 2)
-
-	// Simulate 1MB/s up, 10MB/s down; repeatedly transmit < min bytes before deadline; ignored
-
-	dummy.SetRateLimits(1048576, 10485760)
-
-	b = make([]byte, 1)
-	segments = 1000
-	for i := 0; i < segments; i++ {
-		conn.Read(b)
-	}
-	for i := 0; i < segments; i++ {
-		conn.Write(b)
-	}
-
-	time.Sleep(burstDeadline * 2)
-
-	// Simulate 512Kb/s up, 5MB/s down; transmit >= min bytes; sets "max"
-
-	dummy.SetRateLimits(524288, 5242880)
-
-	maxReadStart := time.Now()
-	conn.Read(make([]byte, upstreamThresholdBytes))
-	maxReadEnd := time.Now()
-
-	maxWriteStart := time.Now()
-	conn.Write(make([]byte, downstreamThresholdBytes))
-	maxWriteEnd := time.Now()
-
-	time.Sleep(burstDeadline * 2)
-
-	// Simulate 256Kb/s up, 2MB/s down;, transmit >= min bytes; sets "last"
-
-	dummy.SetRateLimits(262144, 2097152)
-
-	lastReadStart := time.Now()
-	conn.Read(make([]byte, upstreamThresholdBytes))
-	lastReadEnd := time.Now()
-
-	lastWriteStart := time.Now()
-	conn.Write(make([]byte, downstreamThresholdBytes))
-	lastWriteEnd := time.Now()
-
-	time.Sleep(burstDeadline * 2)
-
-	conn.Close()
-
-	t.Logf("upstream first:    %d bytes in %s; %d bytes/s",
-		conn.upstreamBursts.first.bytes, conn.upstreamBursts.first.duration(), conn.upstreamBursts.first.rate())
-	t.Logf("upstream last:     %d bytes in %s; %d bytes/s",
-		conn.upstreamBursts.last.bytes, conn.upstreamBursts.last.duration(), conn.upstreamBursts.last.rate())
-	t.Logf("upstream min:      %d bytes in %s; %d bytes/s",
-		conn.upstreamBursts.min.bytes, conn.upstreamBursts.min.duration(), conn.upstreamBursts.min.rate())
-	t.Logf("upstream max:      %d bytes in %s; %d bytes/s",
-		conn.upstreamBursts.max.bytes, conn.upstreamBursts.max.duration(), conn.upstreamBursts.max.rate())
-	t.Logf("downstream first:  %d bytes in %s; %d bytes/s",
-		conn.downstreamBursts.first.bytes, conn.downstreamBursts.first.duration(), conn.downstreamBursts.first.rate())
-	t.Logf("downstream last:   %d bytes in %s; %d bytes/s",
-		conn.downstreamBursts.last.bytes, conn.downstreamBursts.last.duration(), conn.downstreamBursts.last.rate())
-	t.Logf("downstream min:    %d bytes in %s; %d bytes/s",
-		conn.downstreamBursts.min.bytes, conn.downstreamBursts.min.duration(), conn.downstreamBursts.min.rate())
-	t.Logf("downstream max:    %d bytes in %s; %d bytes/s",
-		conn.downstreamBursts.max.bytes, conn.downstreamBursts.max.duration(), conn.downstreamBursts.max.rate())
-
-	logFields := conn.GetMetrics(baseTime)
-
-	if len(logFields) != 32 {
-		t.Errorf("unexpected metric count: %d", len(logFields))
-	}
-
-	for name, expectedValue := range map[string]int64{
-		"burst_upstream_first_offset":     int64(firstReadStart.Sub(baseTime) / time.Millisecond),
-		"burst_upstream_first_duration":   int64(firstReadEnd.Sub(firstReadStart) / time.Millisecond),
-		"burst_upstream_first_bytes":      upstreamThresholdBytes,
-		"burst_upstream_first_rate":       131072,
-		"burst_upstream_last_offset":      int64(lastReadStart.Sub(baseTime) / time.Millisecond),
-		"burst_upstream_last_duration":    int64(lastReadEnd.Sub(lastReadStart) / time.Millisecond),
-		"burst_upstream_last_bytes":       upstreamThresholdBytes,
-		"burst_upstream_last_rate":        262144,
-		"burst_upstream_min_offset":       int64(firstReadStart.Sub(baseTime) / time.Millisecond),
-		"burst_upstream_min_duration":     int64(firstReadEnd.Sub(firstReadStart) / time.Millisecond),
-		"burst_upstream_min_bytes":        upstreamThresholdBytes,
-		"burst_upstream_min_rate":         131072,
-		"burst_upstream_max_offset":       int64(maxReadStart.Sub(baseTime) / time.Millisecond),
-		"burst_upstream_max_duration":     int64(maxReadEnd.Sub(maxReadStart) / time.Millisecond),
-		"burst_upstream_max_bytes":        upstreamThresholdBytes,
-		"burst_upstream_max_rate":         524288,
-		"burst_downstream_first_offset":   int64(firstWriteStart.Sub(baseTime) / time.Millisecond),
-		"burst_downstream_first_duration": int64(firstWriteEnd.Sub(firstWriteStart) / time.Millisecond),
-		"burst_downstream_first_bytes":    downstreamThresholdBytes,
-		"burst_downstream_first_rate":     1048576,
-		"burst_downstream_last_offset":    int64(lastWriteStart.Sub(baseTime) / time.Millisecond),
-		"burst_downstream_last_duration":  int64(lastWriteEnd.Sub(lastWriteStart) / time.Millisecond),
-		"burst_downstream_last_bytes":     downstreamThresholdBytes,
-		"burst_downstream_last_rate":      2097152,
-		"burst_downstream_min_offset":     int64(firstWriteStart.Sub(baseTime) / time.Millisecond),
-		"burst_downstream_min_duration":   int64(firstWriteEnd.Sub(firstWriteStart) / time.Millisecond),
-		"burst_downstream_min_bytes":      downstreamThresholdBytes,
-		"burst_downstream_min_rate":       1048576,
-		"burst_downstream_max_offset":     int64(maxWriteStart.Sub(baseTime) / time.Millisecond),
-		"burst_downstream_max_duration":   int64(maxWriteEnd.Sub(maxWriteStart) / time.Millisecond),
-		"burst_downstream_max_bytes":      downstreamThresholdBytes,
-		"burst_downstream_max_rate":       5242880,
-	} {
-		value, ok := logFields[name]
-		if !ok {
-			t.Errorf("missing expected metric: %s", name)
-			continue
-		}
-		valueInt64, ok := value.(int64)
-		if !ok {
-			t.Errorf("missing expected metric type: %s (%T)", name, value)
-			continue
-		}
-		minAcceptable := int64(float64(expectedValue) * 0.95)
-		maxAcceptable := int64(float64(expectedValue) * 1.05)
-		if valueInt64 < minAcceptable || valueInt64 > maxAcceptable {
-			t.Errorf("unexpected metric value: %s (%v <= %v <= %v)",
-				name, minAcceptable, valueInt64, maxAcceptable)
-			continue
-		}
-	}
-}
-
 func TestIsBogon(t *testing.T) {
 	if IsBogon(net.ParseIP("8.8.8.8")) {
 		t.Errorf("unexpected bogon")