Просмотр исходного кода

Merge pull request #243 from rod-hynes/master

Client-side Tunnel Stats improvements
Rod Hynes 9 лет назад
Родитель
Сommit
4f38befaa9

+ 1 - 0
README.md

@@ -90,6 +90,7 @@ Psiphon Tunnel Core uses:
 * [OpenSSL Bindings for Go](https://github.com/spacemonkeygo/openssl)
 * [goptlib](https://github.com/Yawning/goptlib)
 * [goregen](https://github.com/zach-klippenstein/goregen)
+* [monotime](https://github.com/aristanetworks/goarista)
 
 Licensing
 --------------------------------------------------------------------------------

+ 5 - 0
psiphon/TCPConn.go

@@ -135,9 +135,14 @@ func interruptibleTCPDial(addr string, config *DialConfig) (*TCPConn, error) {
 	// Wait until Dial completes (or times out) or until interrupt
 	err := <-conn.dialResult
 	if err != nil {
+		if config.PendingConns != nil {
+			config.PendingConns.Remove(conn)
+		}
 		return nil, common.ContextError(err)
 	}
 
+	// TODO: now allow conn.dialResult to be garbage collected?
+
 	return conn, nil
 }
 

+ 199 - 0
psiphon/common/net.go

@@ -20,8 +20,13 @@
 package common
 
 import (
+	"container/list"
 	"net"
 	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/Psiphon-Inc/goarista/monotime"
 )
 
 // Conns is a synchronized list of Conns that is used to coordinate
@@ -83,3 +88,197 @@ func IPAddressFromAddr(addr net.Addr) string {
 	}
 	return ipAddress
 }
+
+// LRUConns is a concurrency-safe list of net.Conns ordered
+// by recent activity. Its purpose is to facilitate closing
+// the oldest connection in a set of connections.
+//
+// New connections added are referenced by a LRUConnsEntry,
+// which is used to Touch() active connections, which
+// promotes them to the front of the order and to Remove()
+// connections that are no longer LRU candidates.
+//
+// CloseOldest() will remove the oldest connection from the
+// list and call net.Conn.Close() on the connection.
+//
+// After an entry has been removed, LRUConnsEntry Touch()
+// and Remove() will have no effect.
+type LRUConns struct {
+	mutex sync.Mutex
+	list  *list.List
+}
+
+// NewLRUConns initializes a new LRUConns.
+func NewLRUConns() *LRUConns {
+	return &LRUConns{list: list.New()}
+}
+
+// Add inserts a net.Conn as the freshest connection
+// in a LRUConns and returns an LRUConnsEntry to be
+// used to freshen the connection or remove the connection
+// from the LRU list.
+func (conns *LRUConns) Add(conn net.Conn) *LRUConnsEntry {
+	conns.mutex.Lock()
+	defer conns.mutex.Unlock()
+	return &LRUConnsEntry{
+		lruConns: conns,
+		element:  conns.list.PushFront(conn),
+	}
+}
+
+// CloseOldest closes the oldest connection in a
+// LRUConns. It calls net.Conn.Close() on the
+// connection.
+func (conns *LRUConns) CloseOldest() {
+	conns.mutex.Lock()
+	oldest := conns.list.Back()
+	if oldest != nil {
+		conns.list.Remove(oldest)
+	}
+	// Release mutex before closing conn
+	conns.mutex.Unlock()
+	if oldest != nil {
+		oldest.Value.(net.Conn).Close()
+	}
+}
+
+// LRUConnsEntry is an entry in a LRUConns list.
+type LRUConnsEntry struct {
+	lruConns *LRUConns
+	element  *list.Element
+}
+
+// Remove deletes the connection referenced by the
+// LRUConnsEntry from the associated LRUConns.
+// Has no effect if the entry was not initialized
+// or previously removed.
+func (entry *LRUConnsEntry) Remove() {
+	if entry.lruConns == nil || entry.element == nil {
+		return
+	}
+	entry.lruConns.mutex.Lock()
+	defer entry.lruConns.mutex.Unlock()
+	entry.lruConns.list.Remove(entry.element)
+}
+
+// Touch promotes the connection referenced by the
+// LRUConnsEntry to the front of the associated LRUConns.
+// Has no effect if the entry was not initialized
+// or previously removed.
+func (entry *LRUConnsEntry) Touch() {
+	if entry.lruConns == nil || entry.element == nil {
+		return
+	}
+	entry.lruConns.mutex.Lock()
+	defer entry.lruConns.mutex.Unlock()
+	entry.lruConns.list.MoveToFront(entry.element)
+}
+
+// ActivityMonitoredConn wraps a net.Conn, adding logic to deal with
+// events triggered by I/O activity.
+//
+// When an inactivity timeout is specified, the network I/O will
+// timeout after the specified period of read inactivity. Optionally,
+// for the purpose of inactivity only, ActivityMonitoredConn will also
+// consider the connection active when data is written to it.
+//
+// When a LRUConnsEntry is specified, then the LRU entry is promoted on
+// either a successful read or write.
+//
+type ActivityMonitoredConn struct {
+	// Note: 64-bit ints used with atomic operations are at placed
+	// at the start of struct to ensure 64-bit alignment.
+	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
+	monotonicStartTime   int64
+	lastReadActivityTime int64
+	realStartTime        time.Time
+	net.Conn
+	inactivityTimeout time.Duration
+	activeOnWrite     bool
+	lruEntry          *LRUConnsEntry
+}
+
+func NewActivityMonitoredConn(
+	conn net.Conn,
+	inactivityTimeout time.Duration,
+	activeOnWrite bool,
+	lruEntry *LRUConnsEntry) (*ActivityMonitoredConn, error) {
+
+	if inactivityTimeout > 0 {
+		err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
+		if err != nil {
+			return nil, ContextError(err)
+		}
+	}
+
+	now := int64(monotime.Now())
+
+	return &ActivityMonitoredConn{
+		Conn:                 conn,
+		inactivityTimeout:    inactivityTimeout,
+		activeOnWrite:        activeOnWrite,
+		realStartTime:        time.Now(),
+		monotonicStartTime:   now,
+		lastReadActivityTime: now,
+		lruEntry:             lruEntry,
+	}, nil
+}
+
+// GetStartTime gets the time when the ActivityMonitoredConn was
+// initialized.
+func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
+	return conn.realStartTime
+}
+
+// GetActiveDuration returns the time elapsed between the initialization
+// of the ActivityMonitoredConn and the last Read. Only reads are used
+// for this calculation since writes may succeed locally due to buffering.
+func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
+	return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.monotonicStartTime)
+}
+
+// GetLastActivityTime returns the arbitrary monotonic time of the last Read.
+func (conn *ActivityMonitoredConn) GetLastActivityMonotime() monotime.Time {
+	return monotime.Time(atomic.LoadInt64(&conn.lastReadActivityTime))
+}
+
+func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
+	n, err := conn.Conn.Read(buffer)
+	if err == nil {
+
+		if conn.inactivityTimeout > 0 {
+			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
+			if err != nil {
+				return n, ContextError(err)
+			}
+		}
+		if conn.lruEntry != nil {
+			conn.lruEntry.Touch()
+		}
+
+		atomic.StoreInt64(&conn.lastReadActivityTime, int64(monotime.Now()))
+
+	}
+	// Note: no context error to preserve error type
+	return n, err
+}
+
+func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
+	n, err := conn.Conn.Write(buffer)
+	if err == nil && conn.activeOnWrite {
+
+		if conn.inactivityTimeout > 0 {
+			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
+			if err != nil {
+				return n, ContextError(err)
+			}
+		}
+
+		if conn.lruEntry != nil {
+			conn.lruEntry.Touch()
+		}
+
+	}
+	// Note: no context error to preserve error type
+	return n, err
+}

+ 274 - 0
psiphon/common/net_test.go

@@ -0,0 +1,274 @@
+/*
+ * Copyright (c) 2016, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package common
+
+import (
+	"net"
+	"sync/atomic"
+	"testing"
+	"testing/iotest"
+	"time"
+
+	"github.com/Psiphon-Inc/goarista/monotime"
+)
+
+type dummyConn struct {
+	t        *testing.T
+	timeout  *time.Timer
+	isClosed int32
+}
+
+func (c *dummyConn) Read(b []byte) (n int, err error) {
+	if c.timeout != nil {
+		select {
+		case <-c.timeout.C:
+			return 0, iotest.ErrTimeout
+		default:
+		}
+	}
+	return len(b), nil
+}
+
+func (c *dummyConn) Write(b []byte) (n int, err error) {
+	if c.timeout != nil {
+		select {
+		case <-c.timeout.C:
+			return 0, iotest.ErrTimeout
+		default:
+		}
+	}
+	return len(b), nil
+}
+
+func (c *dummyConn) Close() error {
+	atomic.StoreInt32(&c.isClosed, 1)
+	return nil
+}
+
+func (c *dummyConn) IsClosed() bool {
+	return atomic.LoadInt32(&c.isClosed) == 1
+}
+
+func (c *dummyConn) LocalAddr() net.Addr {
+	c.t.Fatal("LocalAddr not implemented")
+	return nil
+}
+
+func (c *dummyConn) RemoteAddr() net.Addr {
+	c.t.Fatal("RemoteAddr not implemented")
+	return nil
+}
+
+func (c *dummyConn) SetDeadline(t time.Time) error {
+	duration := t.Sub(time.Now())
+	if c.timeout == nil {
+		c.timeout = time.NewTimer(duration)
+	} else {
+		if !c.timeout.Stop() {
+			<-c.timeout.C
+		}
+		c.timeout.Reset(duration)
+	}
+	return nil
+}
+
+func (c *dummyConn) SetReadDeadline(t time.Time) error {
+	c.t.Fatal("SetReadDeadline not implemented")
+	return nil
+}
+
+func (c *dummyConn) SetWriteDeadline(t time.Time) error {
+	c.t.Fatal("SetWriteDeadline not implemented")
+	return nil
+}
+
+func TestActivityMonitoredConn(t *testing.T) {
+	buffer := make([]byte, 1024)
+
+	conn, err := NewActivityMonitoredConn(
+		&dummyConn{},
+		200*time.Millisecond,
+		true,
+		nil)
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	realStartTime := time.Now()
+
+	monotonicStartTime := monotime.Now()
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != nil {
+		t.Fatalf("read before initial timeout failed")
+	}
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != nil {
+		t.Fatalf("previous read failed to extend timeout")
+	}
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Write(buffer)
+	if err != nil {
+		t.Fatalf("previous read failed to extend timeout")
+	}
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != nil {
+		t.Fatalf("previous write failed to extend timeout")
+	}
+
+	lastSuccessfulReadTime := monotime.Now()
+
+	time.Sleep(100 * time.Millisecond)
+
+	_, err = conn.Write(buffer)
+	if err != nil {
+		t.Fatalf("previous read failed to extend timeout")
+	}
+
+	time.Sleep(300 * time.Millisecond)
+
+	_, err = conn.Read(buffer)
+	if err != iotest.ErrTimeout {
+		t.Fatalf("failed to timeout")
+	}
+
+	if realStartTime.Round(time.Millisecond) != conn.GetStartTime().Round(time.Millisecond) {
+		t.Fatalf("unexpected GetStartTime")
+	}
+
+	if int64(lastSuccessfulReadTime)/int64(time.Millisecond) !=
+		int64(conn.GetLastActivityMonotime())/int64(time.Millisecond) {
+		t.Fatalf("unexpected GetLastActivityTime")
+	}
+
+	diff := lastSuccessfulReadTime.Sub(monotonicStartTime).Nanoseconds() - conn.GetActiveDuration().Nanoseconds()
+	if diff < 0 {
+		diff = -diff
+	}
+	if diff > (1 * time.Millisecond).Nanoseconds() {
+		t.Fatalf("unexpected GetActiveDuration")
+	}
+}
+
+func TestActivityMonitoredLRUConns(t *testing.T) {
+
+	lruConns := NewLRUConns()
+
+	dummy1 := &dummyConn{}
+	conn1, err := NewActivityMonitoredConn(dummy1, 0, true, lruConns.Add(dummy1))
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	dummy2 := &dummyConn{}
+	conn2, err := NewActivityMonitoredConn(dummy2, 0, true, lruConns.Add(dummy2))
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	dummy3 := &dummyConn{}
+	conn3, err := NewActivityMonitoredConn(dummy3, 0, true, lruConns.Add(dummy3))
+	if err != nil {
+		t.Fatalf("NewActivityMonitoredConn failed")
+	}
+
+	buffer := make([]byte, 1024)
+
+	conn1.Read(buffer)
+	conn2.Read(buffer)
+	conn3.Read(buffer)
+
+	conn3.Write(buffer)
+	conn2.Write(buffer)
+	conn1.Write(buffer)
+
+	if dummy1.IsClosed() || dummy2.IsClosed() || dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if dummy1.IsClosed() || dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if !dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+}
+
+func TestLRUConns(t *testing.T) {
+	lruConns := NewLRUConns()
+
+	dummy1 := &dummyConn{}
+	entry1 := lruConns.Add(dummy1)
+
+	dummy2 := &dummyConn{}
+	entry2 := lruConns.Add(dummy2)
+
+	dummy3 := &dummyConn{}
+	entry3 := lruConns.Add(dummy3)
+
+	entry3.Touch()
+	entry2.Touch()
+	entry1.Touch()
+
+	if dummy1.IsClosed() || dummy2.IsClosed() || dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if dummy1.IsClosed() || dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	lruConns.CloseOldest()
+
+	if dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+
+	entry1.Remove()
+
+	lruConns.CloseOldest()
+
+	if dummy1.IsClosed() || !dummy2.IsClosed() || !dummy3.IsClosed() {
+		t.Fatalf("unexpected IsClosed state")
+	}
+}

+ 15 - 10
psiphon/common/throttled_test.go

@@ -28,6 +28,8 @@ import (
 	"net/http"
 	"testing"
 	"time"
+
+	"github.com/Psiphon-Inc/goarista/monotime"
 )
 
 const (
@@ -65,12 +67,15 @@ func TestThrottledConn(t *testing.T) {
 		UpstreamBytesPerSecond:   1024 * 1024,
 	})
 
-	run(t, RateLimits{
-		DownstreamUnlimitedBytes: 0,
-		DownstreamBytesPerSecond: 1024 * 1024 / 8,
-		UpstreamUnlimitedBytes:   0,
-		UpstreamBytesPerSecond:   1024 * 1024 / 8,
-	})
+	// This test takes > 1 min to run, so disabled for now
+	/*
+		run(t, RateLimits{
+			DownstreamUnlimitedBytes: 0,
+			DownstreamBytesPerSecond: 1024 * 1024 / 8,
+			UpstreamUnlimitedBytes:   0,
+			UpstreamBytesPerSecond:   1024 * 1024 / 8,
+		})
+	*/
 }
 
 func run(t *testing.T, rateLimits RateLimits) {
@@ -117,7 +122,7 @@ func run(t *testing.T, rateLimits RateLimits) {
 	testData, _ := MakeSecureRandomBytes(testDataSize)
 	requestBody := bytes.NewReader(testData)
 
-	startTime := time.Now()
+	startTime := monotime.Now()
 
 	response, err := client.Post("http://"+serverAddress, "application/octet-stream", requestBody)
 	if err == nil && response.StatusCode != http.StatusOK {
@@ -131,9 +136,9 @@ func run(t *testing.T, rateLimits RateLimits) {
 
 	// Test: elapsed upload time must reflect rate limit
 
-	checkElapsedTime(t, testDataSize, rateLimits.UpstreamBytesPerSecond, time.Now().Sub(startTime))
+	checkElapsedTime(t, testDataSize, rateLimits.UpstreamBytesPerSecond, monotime.Since(startTime))
 
-	startTime = time.Now()
+	startTime = monotime.Now()
 
 	body, err := ioutil.ReadAll(response.Body)
 	if err != nil {
@@ -145,7 +150,7 @@ func run(t *testing.T, rateLimits RateLimits) {
 
 	// Test: elapsed download time must reflect rate limit
 
-	checkElapsedTime(t, testDataSize, rateLimits.DownstreamBytesPerSecond, time.Now().Sub(startTime))
+	checkElapsedTime(t, testDataSize, rateLimits.DownstreamBytesPerSecond, monotime.Since(startTime))
 }
 
 func checkElapsedTime(t *testing.T, dataSize int, rateLimit int64, duration time.Duration) {

+ 39 - 12
psiphon/controller.go

@@ -30,6 +30,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/Psiphon-Inc/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 )
 
@@ -66,8 +67,9 @@ type Controller struct {
 }
 
 type candidateServerEntry struct {
-	serverEntry               *ServerEntry
-	isServerAffinityCandidate bool
+	serverEntry                *ServerEntry
+	isServerAffinityCandidate  bool
+	adjustedEstablishStartTime monotime.Time
 }
 
 // NewController initializes a new controller.
@@ -287,7 +289,7 @@ func (controller *Controller) remoteServerListFetcher() {
 		return
 	}
 
-	var lastFetchTime time.Time
+	var lastFetchTime monotime.Time
 
 fetcherLoop:
 	for {
@@ -300,7 +302,8 @@ fetcherLoop:
 
 		// Skip fetch entirely (i.e., send no request at all, even when ETag would save
 		// on response size) when a recent fetch was successful
-		if time.Now().Before(lastFetchTime.Add(FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)) {
+		if lastFetchTime != 0 &&
+			lastFetchTime.Add(FETCH_REMOTE_SERVER_LIST_STALE_PERIOD).After(monotime.Now()) {
 			continue
 		}
 
@@ -324,7 +327,7 @@ fetcherLoop:
 				controller.untunneledDialConfig)
 
 			if err == nil {
-				lastFetchTime = time.Now()
+				lastFetchTime = monotime.Now()
 				break retryLoop
 			}
 
@@ -452,7 +455,7 @@ func (controller *Controller) startOrSignalConnectedReporter() {
 func (controller *Controller) upgradeDownloader() {
 	defer controller.runWaitGroup.Done()
 
-	var lastDownloadTime time.Time
+	var lastDownloadTime monotime.Time
 
 downloadLoop:
 	for {
@@ -467,7 +470,8 @@ downloadLoop:
 		// Unless handshake is explicitly advertizing a new version, skip
 		// checking entirely when a recent download was successful.
 		if handshakeVersion == "" &&
-			time.Now().Before(lastDownloadTime.Add(DOWNLOAD_UPGRADE_STALE_PERIOD)) {
+			lastDownloadTime != 0 &&
+			lastDownloadTime.Add(DOWNLOAD_UPGRADE_STALE_PERIOD).After(monotime.Now()) {
 			continue
 		}
 
@@ -492,7 +496,7 @@ downloadLoop:
 				controller.untunneledDialConfig)
 
 			if err == nil {
-				lastDownloadTime = time.Now()
+				lastDownloadTime = monotime.Now()
 				break retryLoop
 			}
 
@@ -668,7 +672,7 @@ loop:
 //
 // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol
 func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) {
-	if failedTunnel.startTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
+	if failedTunnel.establishedTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(monotime.Now()) {
 		controller.impairedProtocolClassification[failedTunnel.protocol] += 1
 	} else {
 		controller.impairedProtocolClassification[failedTunnel.protocol] = 0
@@ -913,6 +917,7 @@ func (controller *Controller) startEstablishing() {
 		return
 	}
 	NoticeInfo("start establishing")
+
 	controller.isEstablishing = true
 	controller.establishWaitGroup = new(sync.WaitGroup)
 	controller.stopEstablishingBroadcast = make(chan struct{})
@@ -986,6 +991,15 @@ func (controller *Controller) establishCandidateGenerator(impairedProtocols []st
 	defer controller.establishWaitGroup.Done()
 	defer close(controller.candidateServerEntries)
 
+	// establishStartTime is used to calculate and report the
+	// client's tunnel establishment duration.
+	//
+	// networkWaitDuration is the elapsed time spent waiting
+	// for network connectivity. This duration will be excluded
+	// from reported tunnel establishment duration.
+	establishStartTime := monotime.Now()
+	var networkWaitDuration time.Duration
+
 	iterator, err := NewServerEntryIterator(controller.config)
 	if err != nil {
 		NoticeAlert("failed to iterate over candidates: %s", err)
@@ -1006,6 +1020,8 @@ loop:
 	// Repeat until stopped
 	for i := 0; ; i++ {
 
+		networkWaitStartTime := monotime.Now()
+
 		if !WaitForNetworkConnectivity(
 			controller.config.NetworkConnectivityChecker,
 			controller.stopEstablishingBroadcast,
@@ -1013,8 +1029,10 @@ loop:
 			break loop
 		}
 
+		networkWaitDuration += monotime.Since(networkWaitStartTime)
+
 		// Send each iterator server entry to the establish workers
-		startTime := time.Now()
+		startTime := monotime.Now()
 		for {
 			serverEntry, err := iterator.Next()
 			if err != nil {
@@ -1047,9 +1065,17 @@ loop:
 				}
 			}
 
+			// adjustedEstablishStartTime is establishStartTime shifted
+			// to exclude time spent waiting for network connectivity.
+
+			candidate := &candidateServerEntry{
+				serverEntry:                serverEntry,
+				isServerAffinityCandidate:  isServerAffinityCandidate,
+				adjustedEstablishStartTime: establishStartTime.Add(networkWaitDuration),
+			}
+
 			// Note: there must be only one server affinity candidate, as it
 			// closes the serverAffinityDoneBroadcast channel.
-			candidate := &candidateServerEntry{serverEntry, isServerAffinityCandidate}
 			isServerAffinityCandidate = false
 
 			// TODO: here we could generate multiple candidates from the
@@ -1063,7 +1089,7 @@ loop:
 				break loop
 			}
 
-			if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME)) {
+			if startTime.Add(ESTABLISH_TUNNEL_WORK_TIME).Before(monotime.Now()) {
 				// Start over, after a brief pause, with a new shuffle of the server
 				// entries, and potentially some newly fetched server entries.
 				break
@@ -1137,6 +1163,7 @@ loop:
 			controller.sessionId,
 			controller.establishPendingConns,
 			candidateServerEntry.serverEntry,
+			candidateServerEntry.adjustedEstablishStartTime,
 			controller) // TunnelOwner
 		if err != nil {
 

+ 3 - 2
psiphon/controller_test.go

@@ -34,6 +34,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/Psiphon-Inc/goarista/monotime"
 	socks "github.com/Psiphon-Inc/goptlib"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/elazarl/goproxy"
@@ -692,14 +693,14 @@ func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
 		// ensure failed tunnel detection, and ultimately hitting
 		// impaired protocol checks.
 
-		startTime := time.Now()
+		startTime := monotime.Now()
 
 		for {
 
 			time.Sleep(1 * time.Second)
 			useTunnel(t, httpProxyPort)
 
-			if startTime.Add(runConfig.runDuration).Before(time.Now()) {
+			if startTime.Add(runConfig.runDuration).Before(monotime.Now()) {
 				break
 			}
 		}

+ 3 - 2
psiphon/meekConn.go

@@ -34,6 +34,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/Psiphon-Inc/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
 	"golang.org/x/crypto/nacl/box"
@@ -571,7 +572,7 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 	// through the first hop. In addition, this will require additional support for timely shutdown.
 
 	retries := uint(0)
-	retryDeadline := time.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
+	retryDeadline := monotime.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
 
 	var response *http.Response
 	for {
@@ -603,7 +604,7 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 			break
 		}
 
-		if retries >= 1 && time.Now().After(retryDeadline) {
+		if retries >= 1 && monotime.Now().After(retryDeadline) {
 			break
 		}
 		retries += 1

+ 4 - 3
psiphon/server/dns.go

@@ -28,6 +28,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/Psiphon-Inc/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 )
 
@@ -71,7 +72,7 @@ type DNSResolver struct {
 func NewDNSResolver(defaultResolver string) (*DNSResolver, error) {
 
 	dns := &DNSResolver{
-		lastReloadTime: time.Now().Unix(),
+		lastReloadTime: int64(monotime.Now()),
 	}
 
 	dns.ReloadableFile = common.NewReloadableFile(
@@ -130,8 +131,8 @@ func (dns *DNSResolver) Get() net.IP {
 	// write lock, we only incur write lock blocking when "/etc/resolv.conf"
 	// has actually changed.
 
-	lastReloadTime := atomic.LoadInt64(&dns.lastReloadTime)
-	stale := time.Unix(lastReloadTime, 0).Add(DNS_SYSTEM_CONFIG_RELOAD_PERIOD).Before(time.Now())
+	lastReloadTime := monotime.Time(atomic.LoadInt64(&dns.lastReloadTime))
+	stale := monotime.Now().After(lastReloadTime.Add(DNS_SYSTEM_CONFIG_RELOAD_PERIOD))
 
 	if stale {
 

+ 6 - 5
psiphon/server/meek.go

@@ -33,6 +33,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/Psiphon-Inc/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"golang.org/x/crypto/nacl/box"
@@ -453,12 +454,12 @@ type meekSession struct {
 }
 
 func (session *meekSession) touch() {
-	atomic.StoreInt64(&session.lastActivity, time.Now().UnixNano())
+	atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
 }
 
 func (session *meekSession) expired() bool {
-	lastActivity := atomic.LoadInt64(&session.lastActivity)
-	return time.Since(time.Unix(0, lastActivity)) > MEEK_MAX_SESSION_STALENESS
+	lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
+	return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
 }
 
 // makeMeekTLSConfig creates a TLS config for a meek HTTPS listener.
@@ -675,7 +676,7 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
 // Note: channel scheme assumes only one concurrent call to pumpWrites
 func (conn *meekConn) pumpWrites(writer io.Writer) error {
 
-	startTime := time.Now()
+	startTime := monotime.Now()
 	timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
 	defer timeout.Stop()
 
@@ -698,7 +699,7 @@ func (conn *meekConn) pumpWrites(writer io.Writer) error {
 				// MEEK_MAX_PAYLOAD_LENGTH response bodies
 				return nil
 			}
-			totalElapsedTime := time.Now().Sub(startTime) / time.Millisecond
+			totalElapsedTime := monotime.Since(startTime) / time.Millisecond
 			if totalElapsedTime >= MEEK_EXTENDED_TURN_AROUND_TIMEOUT {
 				return nil
 			}

+ 0 - 193
psiphon/server/net.go

@@ -51,205 +51,12 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 package server
 
 import (
-	"container/list"
 	"crypto/tls"
 	"net"
 	"net/http"
-	"sync"
-	"sync/atomic"
 	"time"
-
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 )
 
-// LRUConns is a concurrency-safe list of net.Conns ordered
-// by recent activity. Its purpose is to facilitate closing
-// the oldest connection in a set of connections.
-//
-// New connections added are referenced by a LRUConnsEntry,
-// which is used to Touch() active connections, which
-// promotes them to the front of the order and to Remove()
-// connections that are no longer LRU candidates.
-//
-// CloseOldest() will remove the oldest connection from the
-// list and call net.Conn.Close() on the connection.
-//
-// After an entry has been removed, LRUConnsEntry Touch()
-// and Remove() will have no effect.
-type LRUConns struct {
-	mutex sync.Mutex
-	list  *list.List
-}
-
-// NewLRUConns initializes a new LRUConns.
-func NewLRUConns() *LRUConns {
-	return &LRUConns{list: list.New()}
-}
-
-// Add inserts a net.Conn as the freshest connection
-// in a LRUConns and returns an LRUConnsEntry to be
-// used to freshen the connection or remove the connection
-// from the LRU list.
-func (conns *LRUConns) Add(conn net.Conn) *LRUConnsEntry {
-	conns.mutex.Lock()
-	defer conns.mutex.Unlock()
-	return &LRUConnsEntry{
-		lruConns: conns,
-		element:  conns.list.PushFront(conn),
-	}
-}
-
-// CloseOldest closes the oldest connection in a
-// LRUConns. It calls net.Conn.Close() on the
-// connection.
-func (conns *LRUConns) CloseOldest() {
-	conns.mutex.Lock()
-	oldest := conns.list.Back()
-	conn, ok := oldest.Value.(net.Conn)
-	if oldest != nil {
-		conns.list.Remove(oldest)
-	}
-	// Release mutex before closing conn
-	conns.mutex.Unlock()
-	if ok {
-		conn.Close()
-	}
-}
-
-// LRUConnsEntry is an entry in a LRUConns list.
-type LRUConnsEntry struct {
-	lruConns *LRUConns
-	element  *list.Element
-}
-
-// Remove deletes the connection referenced by the
-// LRUConnsEntry from the associated LRUConns.
-// Has no effect if the entry was not initialized
-// or previously removed.
-func (entry *LRUConnsEntry) Remove() {
-	if entry.lruConns == nil || entry.element == nil {
-		return
-	}
-	entry.lruConns.mutex.Lock()
-	defer entry.lruConns.mutex.Unlock()
-	entry.lruConns.list.Remove(entry.element)
-}
-
-// Touch promotes the connection referenced by the
-// LRUConnsEntry to the front of the associated LRUConns.
-// Has no effect if the entry was not initialized
-// or previously removed.
-func (entry *LRUConnsEntry) Touch() {
-	if entry.lruConns == nil || entry.element == nil {
-		return
-	}
-	entry.lruConns.mutex.Lock()
-	defer entry.lruConns.mutex.Unlock()
-	entry.lruConns.list.MoveToFront(entry.element)
-}
-
-// ActivityMonitoredConn wraps a net.Conn, adding logic to deal with
-// events triggered by I/O activity.
-//
-// When an inactivity timeout is specified, the network I/O will
-// timeout after the specified period of read inactivity. Optionally,
-// ActivityMonitoredConn will also consider the connection active when
-// data is written to it.
-//
-// When a LRUConnsEntry is specified, then the LRU entry is promoted on
-// either a successful read or write.
-//
-type ActivityMonitoredConn struct {
-	// Note: 64-bit ints used with atomic operations are at placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	startTime            int64
-	lastReadActivityTime int64
-	net.Conn
-	inactivityTimeout time.Duration
-	activeOnWrite     bool
-	lruEntry          *LRUConnsEntry
-}
-
-func NewActivityMonitoredConn(
-	conn net.Conn,
-	inactivityTimeout time.Duration,
-	activeOnWrite bool,
-	lruEntry *LRUConnsEntry) (*ActivityMonitoredConn, error) {
-
-	if inactivityTimeout > 0 {
-		err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
-		if err != nil {
-			return nil, common.ContextError(err)
-		}
-	}
-
-	now := time.Now().UnixNano()
-
-	return &ActivityMonitoredConn{
-		Conn:                 conn,
-		inactivityTimeout:    inactivityTimeout,
-		activeOnWrite:        activeOnWrite,
-		startTime:            now,
-		lastReadActivityTime: now,
-		lruEntry:             lruEntry,
-	}, nil
-}
-
-// GetStartTime gets the time when the ActivityMonitoredConn was
-// initialized.
-func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
-	return time.Unix(0, conn.startTime)
-}
-
-// GetActiveDuration returns the time elapsed between the initialization
-// of the ActivityMonitoredConn and the last Read. Only reads are used
-// for this calculation since writes may succeed locally due to buffering.
-func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
-	return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.startTime)
-}
-
-func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
-	n, err := conn.Conn.Read(buffer)
-	if err == nil {
-
-		if conn.inactivityTimeout > 0 {
-			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
-			if err != nil {
-				return n, common.ContextError(err)
-			}
-		}
-		if conn.lruEntry != nil {
-			conn.lruEntry.Touch()
-		}
-
-		atomic.StoreInt64(&conn.lastReadActivityTime, time.Now().UnixNano())
-
-	}
-	// Note: no context error to preserve error type
-	return n, err
-}
-
-func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
-	n, err := conn.Conn.Write(buffer)
-	if err == nil && conn.activeOnWrite {
-
-		if conn.inactivityTimeout > 0 {
-			err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
-			if err != nil {
-				return n, common.ContextError(err)
-			}
-		}
-
-		if conn.lruEntry != nil {
-			conn.lruEntry.Touch()
-		}
-
-	}
-	// Note: no context error to preserve error type
-	return n, err
-}
-
 // HTTPSServer is a wrapper around http.Server which adds the
 // ServeTLS function.
 type HTTPSServer struct {

+ 5 - 5
psiphon/server/tunnelServer.go

@@ -436,7 +436,7 @@ func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.C
 	// the connection active. Writes are not considered reliable activity indicators
 	// due to buffering.
 
-	activityConn, err := NewActivityMonitoredConn(
+	activityConn, err := common.NewActivityMonitoredConn(
 		clientConn,
 		SSH_CONNECTION_READ_DEADLINE,
 		false,
@@ -550,7 +550,7 @@ type sshClient struct {
 	sshServer               *sshServer
 	tunnelProtocol          string
 	sshConn                 ssh.Conn
-	activityConn            *ActivityMonitoredConn
+	activityConn            *common.ActivityMonitoredConn
 	geoIPData               GeoIPData
 	psiphonSessionID        string
 	udpChannel              ssh.Channel
@@ -558,7 +558,7 @@ type sshClient struct {
 	tcpTrafficState         *trafficState
 	udpTrafficState         *trafficState
 	channelHandlerWaitGroup *sync.WaitGroup
-	tcpPortForwardLRU       *LRUConns
+	tcpPortForwardLRU       *common.LRUConns
 	stopBroadcast           chan struct{}
 }
 
@@ -583,7 +583,7 @@ func newSshClient(
 		tcpTrafficState:         &trafficState{},
 		udpTrafficState:         &trafficState{},
 		channelHandlerWaitGroup: new(sync.WaitGroup),
-		tcpPortForwardLRU:       NewLRUConns(),
+		tcpPortForwardLRU:       common.NewLRUConns(),
 		stopBroadcast:           make(chan struct{}),
 	}
 }
@@ -1013,7 +1013,7 @@ func (sshClient *sshClient) handleTCPChannel(
 	lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
 	defer lruEntry.Remove()
 
-	fwdConn, err = NewActivityMonitoredConn(
+	fwdConn, err = common.NewActivityMonitoredConn(
 		fwdConn,
 		time.Duration(sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds)*time.Millisecond,
 		true,

+ 4 - 4
psiphon/server/udp.go

@@ -74,7 +74,7 @@ func (sshClient *sshClient) handleUDPChannel(newChannel ssh.NewChannel) {
 		sshClient:      sshClient,
 		sshChannel:     sshChannel,
 		portForwards:   make(map[uint16]*udpPortForward),
-		portForwardLRU: NewLRUConns(),
+		portForwardLRU: common.NewLRUConns(),
 		relayWaitGroup: new(sync.WaitGroup),
 	}
 	multiplexer.run()
@@ -85,7 +85,7 @@ type udpPortForwardMultiplexer struct {
 	sshChannel        ssh.Channel
 	portForwardsMutex sync.Mutex
 	portForwards      map[uint16]*udpPortForward
-	portForwardLRU    *LRUConns
+	portForwardLRU    *common.LRUConns
 	relayWaitGroup    *sync.WaitGroup
 }
 
@@ -215,7 +215,7 @@ func (mux *udpPortForwardMultiplexer) run() {
 
 			lruEntry := mux.portForwardLRU.Add(udpConn)
 
-			conn, err := NewActivityMonitoredConn(
+			conn, err := common.NewActivityMonitoredConn(
 				udpConn,
 				time.Duration(mux.sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds)*time.Millisecond,
 				true,
@@ -290,7 +290,7 @@ type udpPortForward struct {
 	remoteIP     []byte
 	remotePort   uint16
 	conn         net.Conn
-	lruEntry     *LRUConnsEntry
+	lruEntry     *common.LRUConnsEntry
 	mux          *udpPortForwardMultiplexer
 }
 

+ 8 - 3
psiphon/serverApi.go

@@ -604,13 +604,17 @@ func RecordTunnelStats(
 	sessionId string,
 	tunnelNumber int64,
 	tunnelServerIpAddress string,
-	serverHandshakeTimestamp, duration string,
-	totalBytesSent, totalBytesReceived int64) error {
+	establishmentDuration string,
+	serverHandshakeTimestamp string,
+	tunnelDuration string,
+	totalBytesSent int64,
+	totalBytesReceived int64) error {
 
 	tunnelStats := struct {
 		SessionId                string `json:"session_id"`
 		TunnelNumber             int64  `json:"tunnel_number"`
 		TunnelServerIpAddress    string `json:"tunnel_server_ip_address"`
+		EstablishmentDuration    string `json:"establishment_duration"`
 		ServerHandshakeTimestamp string `json:"server_handshake_timestamp"`
 		Duration                 string `json:"duration"`
 		TotalBytesSent           int64  `json:"total_bytes_sent"`
@@ -619,8 +623,9 @@ func RecordTunnelStats(
 		sessionId,
 		tunnelNumber,
 		tunnelServerIpAddress,
+		establishmentDuration,
 		serverHandshakeTimestamp,
-		duration,
+		tunnelDuration,
 		totalBytesSent,
 		totalBytesReceived,
 	}

+ 4 - 3
psiphon/splitTunnel.go

@@ -35,6 +35,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/Psiphon-Inc/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 )
 
@@ -83,7 +84,7 @@ type SplitTunnelClassifier struct {
 
 type classification struct {
 	isUntunneled bool
-	expiry       time.Time
+	expiry       monotime.Time
 }
 
 func NewSplitTunnelClassifier(config *Config, tunneler Tunneler) *SplitTunnelClassifier {
@@ -161,7 +162,7 @@ func (classifier *SplitTunnelClassifier) IsUntunneled(targetAddress string) bool
 	classifier.mutex.RLock()
 	cachedClassification, ok := classifier.cache[targetAddress]
 	classifier.mutex.RUnlock()
-	if ok && cachedClassification.expiry.After(time.Now()) {
+	if ok && cachedClassification.expiry.After(monotime.Now()) {
 		return cachedClassification.isUntunneled
 	}
 
@@ -171,7 +172,7 @@ func (classifier *SplitTunnelClassifier) IsUntunneled(targetAddress string) bool
 		NoticeAlert("failed to resolve address for split tunnel classification: %s", err)
 		return false
 	}
-	expiry := time.Now().Add(ttl)
+	expiry := monotime.Now().Add(ttl)
 
 	isUntunneled := classifier.ipAddressInRoutes(ipAddr)
 

+ 93 - 35
psiphon/tunnel.go

@@ -33,6 +33,7 @@ import (
 	"time"
 
 	"github.com/Psiphon-Inc/crypto/ssh"
+	"github.com/Psiphon-Inc/goarista/monotime"
 	regen "github.com/Psiphon-Inc/goregen"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
@@ -73,13 +74,14 @@ type Tunnel struct {
 	serverEntry                  *ServerEntry
 	serverContext                *ServerContext
 	protocol                     string
-	conn                         net.Conn
+	conn                         *common.ActivityMonitoredConn
 	sshClient                    *ssh.Client
 	operateWaitGroup             *sync.WaitGroup
 	shutdownOperateBroadcast     chan struct{}
 	signalPortForwardFailure     chan struct{}
 	totalPortForwardFailures     int
-	startTime                    time.Time
+	establishDuration            time.Duration
+	establishedTime              monotime.Time
 	dialStats                    *TunnelDialStats
 	newClientVerificationPayload chan string
 }
@@ -116,6 +118,7 @@ func EstablishTunnel(
 	sessionId string,
 	pendingConns *common.Conns,
 	serverEntry *ServerEntry,
+	adjustedEstablishStartTime monotime.Time,
 	tunnelOwner TunnelOwner) (tunnel *Tunnel, err error) {
 
 	selectedProtocol, err := selectProtocol(config, serverEntry)
@@ -123,8 +126,9 @@ func EstablishTunnel(
 		return nil, common.ContextError(err)
 	}
 
-	// Build transport layers and establish SSH connection
-	conn, sshClient, dialStats, err := dialSsh(
+	// Build transport layers and establish SSH connection. Note that
+	// dialConn and monitoredConn are the same network connection.
+	dialConn, monitoredConn, sshClient, dialStats, err := dialSsh(
 		config, pendingConns, serverEntry, selectedProtocol, sessionId)
 	if err != nil {
 		return nil, common.ContextError(err)
@@ -134,7 +138,8 @@ func EstablishTunnel(
 	defer func() {
 		if err != nil {
 			sshClient.Close()
-			conn.Close()
+			monitoredConn.Close()
+			pendingConns.Remove(dialConn)
 		}
 	}()
 
@@ -146,7 +151,7 @@ func EstablishTunnel(
 		isClosed:                 false,
 		serverEntry:              serverEntry,
 		protocol:                 selectedProtocol,
-		conn:                     conn,
+		conn:                     monitoredConn,
 		sshClient:                sshClient,
 		operateWaitGroup:         new(sync.WaitGroup),
 		shutdownOperateBroadcast: make(chan struct{}),
@@ -172,10 +177,20 @@ func EstablishTunnel(
 		}
 	}
 
-	tunnel.startTime = time.Now()
+	// establishDuration is the elapsed time between the controller starting tunnel
+	// establishment and this tunnel being established. The reported value represents
+	// how long the user waited between starting the client and having a usable tunnel;
+	// or how long between the client detecting an unexpected tunnel disconnect and
+	// completing automatic reestablishment.
+	//
+	// This time period may include time spent unsuccessfully connecting to other
+	// servers. Time spent waiting for network connectivity is excluded.
+	tunnel.establishDuration = monotime.Since(adjustedEstablishStartTime)
+
+	tunnel.establishedTime = monotime.Now()
 
 	// Now that network operations are complete, cancel interruptibility
-	pendingConns.Remove(conn)
+	pendingConns.Remove(dialConn)
 
 	// Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
 	tunnel.operateWaitGroup.Add(1)
@@ -210,7 +225,7 @@ func (tunnel *Tunnel) Close(isDiscarded bool) {
 		tunnel.operateWaitGroup.Wait()
 		timer.Stop()
 		tunnel.sshClient.Close()
-		// tunnel.conn.Close() may get called twice, which is allowed.
+		// tunnel.conn.Close() may get called multiple times, which is allowed.
 		tunnel.conn.Close()
 	}
 }
@@ -523,12 +538,17 @@ func initMeekConfig(
 
 // dialSsh is a helper that builds the transport layers and establishes the SSH connection.
 // When additional dial configuration is used, DialStats are recorded and returned.
+//
+// The net.Conn return value is the value to be removed from pendingConns; additional
+// layering (ThrottledConn, ActivityMonitoredConn) is applied, but this return value is the
+// base dial conn. The *ActivityMonitoredConn return value is the layered conn passed into
+// the ssh.Client.
 func dialSsh(
 	config *Config,
 	pendingConns *common.Conns,
 	serverEntry *ServerEntry,
 	selectedProtocol,
-	sessionId string) (net.Conn, *ssh.Client, *TunnelDialStats, error) {
+	sessionId string) (net.Conn, *common.ActivityMonitoredConn, *ssh.Client, *TunnelDialStats, error) {
 
 	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
 	// So depending on which protocol is used, multiple layers are initialized.
@@ -550,7 +570,7 @@ func dialSsh(
 		useObfuscatedSsh = true
 		meekConfig, err = initMeekConfig(config, serverEntry, selectedProtocol, sessionId)
 		if err != nil {
-			return nil, nil, nil, common.ContextError(err)
+			return nil, nil, nil, nil, common.ContextError(err)
 		}
 	}
 
@@ -590,12 +610,12 @@ func dialSsh(
 	if meekConfig != nil {
 		dialConn, err = DialMeek(meekConfig, dialConfig)
 		if err != nil {
-			return nil, nil, nil, common.ContextError(err)
+			return nil, nil, nil, nil, common.ContextError(err)
 		}
 	} else {
 		dialConn, err = DialTCP(directTCPDialAddress, dialConfig)
 		if err != nil {
-			return nil, nil, nil, common.ContextError(err)
+			return nil, nil, nil, nil, common.ContextError(err)
 		}
 	}
 
@@ -604,11 +624,18 @@ func dialSsh(
 		// Cleanup on error
 		if cleanupConn != nil {
 			cleanupConn.Close()
+			pendingConns.Remove(cleanupConn)
 		}
 	}()
 
+	// Activity monitoring is used to measure tunnel duration
+	monitoredConn, err := common.NewActivityMonitoredConn(dialConn, 0, false, nil)
+	if err != nil {
+		return nil, nil, nil, nil, common.ContextError(err)
+	}
+
 	// Apply throttling (if configured)
-	throttledConn := common.NewThrottledConn(dialConn, config.RateLimits)
+	throttledConn := common.NewThrottledConn(monitoredConn, config.RateLimits)
 
 	// Add obfuscated SSH layer
 	var sshConn net.Conn = throttledConn
@@ -616,14 +643,14 @@ func dialSsh(
 		sshConn, err = NewObfuscatedSshConn(
 			OBFUSCATION_CONN_MODE_CLIENT, throttledConn, serverEntry.SshObfuscatedKey)
 		if err != nil {
-			return nil, nil, nil, common.ContextError(err)
+			return nil, nil, nil, nil, common.ContextError(err)
 		}
 	}
 
 	// Now establish the SSH session over the conn transport
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	if err != nil {
-		return nil, nil, nil, common.ContextError(err)
+		return nil, nil, nil, nil, common.ContextError(err)
 	}
 	sshCertChecker := &ssh.CertChecker{
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
@@ -639,7 +666,7 @@ func dialSsh(
 			SshPassword string `json:"SshPassword"`
 		}{sessionId, serverEntry.SshPassword})
 	if err != nil {
-		return nil, nil, nil, common.ContextError(err)
+		return nil, nil, nil, nil, common.ContextError(err)
 	}
 	sshClientConfig := &ssh.ClientConfig{
 		User: serverEntry.SshUsername,
@@ -685,7 +712,7 @@ func dialSsh(
 
 	result := <-resultChannel
 	if result.err != nil {
-		return nil, nil, nil, common.ContextError(result.err)
+		return nil, nil, nil, nil, common.ContextError(result.err)
 	}
 
 	var dialStats *TunnelDialStats
@@ -724,7 +751,7 @@ func dialSsh(
 	// but should not be used to perform I/O as that would interfere with SSH
 	// (and also bypasses throttling).
 
-	return dialConn, result.sshClient, dialStats, nil
+	return dialConn, monitoredConn, result.sshClient, dialStats, nil
 }
 
 func makeRandomPeriod(min, max time.Duration) time.Duration {
@@ -786,9 +813,9 @@ func makeRandomPeriod(min, max time.Duration) time.Duration {
 func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	defer tunnel.operateWaitGroup.Done()
 
-	lastBytesReceivedTime := time.Now()
+	lastBytesReceivedTime := monotime.Now()
 
-	lastTotalBytesTransferedTime := time.Now()
+	lastTotalBytesTransferedTime := monotime.Now()
 	totalSent := int64(0)
 	totalReceived := int64(0)
 
@@ -915,15 +942,15 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 				tunnel.serverEntry.IpAddress)
 
 			if received > 0 {
-				lastBytesReceivedTime = time.Now()
+				lastBytesReceivedTime = monotime.Now()
 			}
 
 			totalSent += sent
 			totalReceived += received
 
-			if lastTotalBytesTransferedTime.Add(TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD).Before(time.Now()) {
+			if lastTotalBytesTransferedTime.Add(TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD).Before(monotime.Now()) {
 				NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
-				lastTotalBytesTransferedTime = time.Now()
+				lastTotalBytesTransferedTime = monotime.Now()
 			}
 
 			// Only emit the frequent BytesTransferred notice when tunnel is not idle.
@@ -939,7 +966,7 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 			statsTimer.Reset(nextStatusRequestPeriod())
 
 		case <-sshKeepAliveTimer.C:
-			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(time.Now()) {
+			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(monotime.Now()) {
 				select {
 				case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAlivePeriodicTimeoutSeconds) * time.Second:
 				default:
@@ -953,7 +980,7 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 			NoticeInfo("port forward failures for %s: %d",
 				tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
 
-			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(time.Now()) {
+			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(monotime.Now()) {
 				select {
 				case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAliveProbeTimeoutSeconds) * time.Second:
 				default:
@@ -983,20 +1010,48 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	// Always emit a final NoticeTotalBytesTransferred
 	NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
 
-	// The stats for this tunnel will be reported via the next successful
-	// status request.
-	// Note: Since client clocks are unreliable, we use the server's reported
-	// timestamp in the handshake response as the tunnel start time. This time
-	// will be slightly earlier than the actual tunnel activation time, as the
-	// client has to receive and parse the response and activate the tunnel.
 	// Tunnel does not have a serverContext when DisableApi is set.
 	if tunnel.serverContext != nil && !tunnel.IsDiscarded() {
+
+		// The stats for this tunnel will be reported via the next successful
+		// status request.
+
+		// Since client clocks are unreliable, we report the server's timestamp from
+		// the handshake response as the absolute tunnel start time. This time
+		// will be slightly earlier than the actual tunnel activation time, as the
+		// client has to receive and parse the response and activate the tunnel.
+
+		tunnelStartTime := tunnel.serverContext.serverHandshakeTimestamp
+
+		// For the tunnel duration calculation, we use the local clock. The start time
+		// is tunnel.establishedTime as recorded when the tunnel was established. For the
+		// end time, we do not use the current time as we may now be long past the
+		// actual termination time of the tunnel. For example, the host or device may
+		// have resumed after a long sleep (it's not clear that the monotonic clock service
+		// used to measure elapsed time will or will not stop during device sleep). Instead,
+		// we use the last data received time as the estimated tunnel end time.
+		//
+		// One potential issue with using the last received time is receiving data
+		// after an extended sleep because the device sleep occured with data still in
+		// the OS socket read buffer. This is not expected to happen on Android, as the
+		// OS will wake a process when it has TCP data available to read. (For this reason,
+		// the actual long sleep issue is only with an idle tunnel; in this case the client
+		// is responsible for sending SSH keep alives but a device sleep will delay the
+		// golang SSH keep alive timer.)
+		//
+		// Idle tunnels will only read data when a SSH keep alive is sent. As a result,
+		// the last-received-time scheme can undercount tunnel durations by up to
+		// TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX for idle tunnels.
+
+		tunnelDuration := tunnel.conn.GetLastActivityMonotime().Sub(tunnel.establishedTime)
+
 		err := RecordTunnelStats(
 			tunnel.serverContext.sessionId,
 			tunnel.serverContext.tunnelNumber,
 			tunnel.serverEntry.IpAddress,
-			tunnel.serverContext.serverHandshakeTimestamp,
-			fmt.Sprintf("%d", time.Now().Sub(tunnel.startTime)),
+			fmt.Sprintf("%d", tunnel.establishDuration),
+			tunnelStartTime,
+			fmt.Sprintf("%d", tunnelDuration),
 			totalSent,
 			totalReceived)
 		if err != nil {
@@ -1044,7 +1099,8 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 
 // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
 // on the specified SSH connections and returns true of the request succeeds
-// within a specified timeout.
+// within a specified timeout. If the request fails, the associated conn is
+// closed, which will terminate the associated tunnel.
 func sendSshKeepAlive(
 	sshClient *ssh.Client, conn net.Conn, timeout time.Duration) error {
 
@@ -1063,6 +1119,8 @@ func sendSshKeepAlive(
 			// Proceed without random padding
 			randomPadding = make([]byte, 0)
 		}
+		// Note: reading a reply is important for last-received-time tunnel
+		// duration calculation.
 		_, _, err = sshClient.SendRequest("keepalive@openssh.com", true, randomPadding)
 		errChannel <- err
 	}()