Procházet zdrojové kódy

Add support for liveness test

Rod Hynes před 7 roky
rodič
revize
a5d7dce41e

+ 9 - 0
psiphon/common/parameters/clientParameters.go

@@ -177,6 +177,10 @@ const (
 	MeekRoundTripTimeout                       = "MeekRoundTripTimeout"
 	TransformHostNameProbability               = "TransformHostNameProbability"
 	PickUserAgentProbability                   = "PickUserAgentProbability"
+	LivenessTestMinUpstreamBytes               = "LivenessTestMinUpstreamBytes"
+	LivenessTestMaxUpstreamBytes               = "LivenessTestMaxUpstreamBytes"
+	LivenessTestMinDownstreamBytes             = "LivenessTestMinDownstreamBytes"
+	LivenessTestMaxDownstreamBytes             = "LivenessTestMaxDownstreamBytes"
 )
 
 const (
@@ -355,6 +359,11 @@ var defaultClientParameters = map[string]struct {
 
 	TransformHostNameProbability: {value: 0.5, minimum: 0.0},
 	PickUserAgentProbability:     {value: 0.5, minimum: 0.0},
+
+	LivenessTestMinUpstreamBytes:   {value: 0, minimum: 0},
+	LivenessTestMaxUpstreamBytes:   {value: 0, minimum: 0},
+	LivenessTestMinDownstreamBytes: {value: 0, minimum: 0},
+	LivenessTestMaxDownstreamBytes: {value: 0, minimum: 0},
 }
 
 // IsServerSideOnly indicates if the parameter specified by name is used

+ 6 - 0
psiphon/common/protocol/protocol.go

@@ -64,6 +64,7 @@ const (
 	PSIPHON_WEB_API_PROTOCOL = "web"
 
 	PACKET_TUNNEL_CHANNEL_TYPE = "tun@psiphon.ca"
+	RANDOM_STREAM_CHANNEL_TYPE = "random@psiphon.ca"
 
 	PSIPHON_API_HANDSHAKE_AUTHORIZATIONS = "authorizations"
 )
@@ -297,3 +298,8 @@ type MeekCookieData struct {
 	ClientTunnelProtocol string `json:"t"`
 	EndPoint             string `json:"e"`
 }
+
+type RandomStreamRequest struct {
+	UpstreamBytes   int `json:"u"`
+	DownstreamBytes int `json:"d"`
+}

+ 4 - 1
psiphon/common/utils.go

@@ -183,8 +183,11 @@ func MakeSecureRandomBytes(length int) ([]byte, error) {
 }
 
 // MakeSecureRandomRange selects a random int in [min, max].
-// If max < min, min is returned.
+// If min < 0, min is set to 0. If max < min, min is returned.
 func MakeSecureRandomRange(min, max int) (int, error) {
+	if min < 0 {
+		min = 0
+	}
 	if max < min {
 		return min, nil
 	}

+ 23 - 0
psiphon/config.go

@@ -494,6 +494,13 @@ type Config struct {
 	ObfuscatedSSHMinPadding *int
 	ObfuscatedSSHMaxPadding *int
 
+	// LivenessTestMinUpstreamBytes and other LivenessTest fields are for
+	// testing purposes.
+	LivenessTestMinUpstreamBytes   *int
+	LivenessTestMaxUpstreamBytes   *int
+	LivenessTestMinDownstreamBytes *int
+	LivenessTestMaxDownstreamBytes *int
+
 	// clientParameters is the active ClientParameters with defaults, config
 	// values, and, optionally, tactics applied.
 	//
@@ -946,6 +953,22 @@ func (config *Config) makeConfigParameters() map[string]interface{} {
 		applyParameters[parameters.ObfuscatedSSHMaxPadding] = *config.ObfuscatedSSHMaxPadding
 	}
 
+	if config.LivenessTestMinUpstreamBytes != nil {
+		applyParameters[parameters.LivenessTestMinUpstreamBytes] = *config.LivenessTestMinUpstreamBytes
+	}
+
+	if config.LivenessTestMaxUpstreamBytes != nil {
+		applyParameters[parameters.LivenessTestMaxUpstreamBytes] = *config.LivenessTestMaxUpstreamBytes
+	}
+
+	if config.LivenessTestMinDownstreamBytes != nil {
+		applyParameters[parameters.LivenessTestMinDownstreamBytes] = *config.LivenessTestMinDownstreamBytes
+	}
+
+	if config.LivenessTestMaxDownstreamBytes != nil {
+		applyParameters[parameters.LivenessTestMaxDownstreamBytes] = *config.LivenessTestMaxDownstreamBytes
+	}
+
 	return applyParameters
 }
 

+ 8 - 0
psiphon/notice.go

@@ -758,6 +758,14 @@ func NoticeNetworkID(networkID string) {
 		"NetworkID", 0, "ID", networkID)
 }
 
+func NoticeLivenessTest(ipAddress string, livenessTestMetrics livenessTestMetrics, success bool) {
+	singletonNoticeLogger.outputNotice(
+		"LivenessTest", noticeIsDiagnostic,
+		"ipAddress", ipAddress,
+		"metrics", livenessTestMetrics,
+		"success", success)
+}
+
 type repetitiveNoticeState struct {
 	message string
 	repeats int

+ 84 - 19
psiphon/server/server_test.go

@@ -130,6 +130,7 @@ func TestSSH(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -146,6 +147,7 @@ func TestOSSH(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -162,6 +164,7 @@ func TestFragmentedOSSH(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     true,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -178,6 +181,7 @@ func TestUnfrontedMeek(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -195,6 +199,7 @@ func TestUnfrontedMeekHTTPS(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -212,6 +217,7 @@ func TestUnfrontedMeekHTTPSTLS13(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -229,6 +235,7 @@ func TestUnfrontedMeekSessionTicket(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -246,6 +253,7 @@ func TestUnfrontedMeekSessionTicketTLS13(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -262,6 +270,7 @@ func TestQUICOSSH(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -281,6 +290,7 @@ func TestMarionetteOSSH(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -297,6 +307,7 @@ func TestWebTransportAPIRequests(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -313,6 +324,7 @@ func TestHotReload(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -329,6 +341,7 @@ func TestDefaultSessionID(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -345,6 +358,7 @@ func TestDenyTrafficRules(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -361,6 +375,7 @@ func TestOmitAuthorization(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -377,6 +392,7 @@ func TestNoAuthorization(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -393,6 +409,7 @@ func TestUnusedAuthorization(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -409,6 +426,7 @@ func TestTCPOnlySLOK(t *testing.T) {
 			doTunneledWebRequest: true,
 			doTunneledNTPRequest: false,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
 		})
 }
 
@@ -425,6 +443,24 @@ func TestUDPOnlySLOK(t *testing.T) {
 			doTunneledWebRequest: false,
 			doTunneledNTPRequest: true,
 			forceFragmenting:     false,
+			forceLivenessTest:    false,
+		})
+}
+
+func TestLivenessTest(t *testing.T) {
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:       "OSSH",
+			enableSSHAPIRequests: true,
+			doHotReload:          false,
+			doDefaultSponsorID:   false,
+			denyTrafficRules:     false,
+			requireAuthorization: true,
+			omitAuthorization:    false,
+			doTunneledWebRequest: true,
+			doTunneledNTPRequest: true,
+			forceFragmenting:     false,
+			forceLivenessTest:    true,
 		})
 }
 
@@ -440,6 +476,7 @@ type runServerConfig struct {
 	doTunneledWebRequest bool
 	doTunneledNTPRequest bool
 	forceFragmenting     bool
+	forceLivenessTest    bool
 }
 
 func runServer(t *testing.T, runConfig *runServerConfig) {
@@ -483,6 +520,11 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		t.Fatalf("error generating tactics keys: %s", err)
 	}
 
+	livenessTestSize := 0
+	if doClientTactics || runConfig.forceLivenessTest {
+		livenessTestSize = 1048576
+	}
+
 	// create a server
 
 	psiphonServerIPAddress := serverIPAddress
@@ -531,7 +573,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	trafficRulesFilename := filepath.Join(testDataDirName, "traffic_rules.json")
 	paveTrafficRulesFile(
 		t, trafficRulesFilename, propagationChannelID, accessType,
-		runConfig.requireAuthorization, runConfig.denyTrafficRules)
+		runConfig.requireAuthorization, runConfig.denyTrafficRules, livenessTestSize)
 
 	var tacticsConfigFilename string
 
@@ -543,7 +585,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			t, tacticsConfigFilename,
 			tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey,
 			runConfig.tunnelProtocol,
-			propagationChannelID)
+			propagationChannelID,
+			livenessTestSize)
 	}
 
 	var serverConfig map[string]interface{}
@@ -620,7 +663,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 		paveTrafficRulesFile(
 			t, trafficRulesFilename, propagationChannelID, accessType,
-			runConfig.requireAuthorization, runConfig.denyTrafficRules)
+			runConfig.requireAuthorization, runConfig.denyTrafficRules,
+			livenessTestSize)
 
 		p, _ := os.FindProcess(os.Getpid())
 		p.Signal(syscall.SIGUSR1)
@@ -711,19 +755,29 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			t.Fatalf("SetClientParameters failed: %s", err)
 		}
 
-	} else if runConfig.forceFragmenting {
+	} else {
+
 		// Directly apply same parameters that would've come from tactics.
 
 		applyParameters := make(map[string]interface{})
 
-		applyParameters[parameters.FragmentorLimitProtocols] = protocol.TunnelProtocols{runConfig.tunnelProtocol}
-		applyParameters[parameters.FragmentorProbability] = 1.0
-		applyParameters[parameters.FragmentorMinTotalBytes] = 1000
-		applyParameters[parameters.FragmentorMaxTotalBytes] = 2000
-		applyParameters[parameters.FragmentorMinWriteBytes] = 1
-		applyParameters[parameters.FragmentorMaxWriteBytes] = 100
-		applyParameters[parameters.FragmentorMinDelay] = 1 * time.Millisecond
-		applyParameters[parameters.FragmentorMaxDelay] = 10 * time.Millisecond
+		if runConfig.forceFragmenting {
+			applyParameters[parameters.FragmentorLimitProtocols] = protocol.TunnelProtocols{runConfig.tunnelProtocol}
+			applyParameters[parameters.FragmentorProbability] = 1.0
+			applyParameters[parameters.FragmentorMinTotalBytes] = 1000
+			applyParameters[parameters.FragmentorMaxTotalBytes] = 2000
+			applyParameters[parameters.FragmentorMinWriteBytes] = 1
+			applyParameters[parameters.FragmentorMaxWriteBytes] = 100
+			applyParameters[parameters.FragmentorMinDelay] = 1 * time.Millisecond
+			applyParameters[parameters.FragmentorMaxDelay] = 10 * time.Millisecond
+		}
+
+		if runConfig.forceLivenessTest {
+			applyParameters[parameters.LivenessTestMinUpstreamBytes] = livenessTestSize
+			applyParameters[parameters.LivenessTestMaxUpstreamBytes] = livenessTestSize
+			applyParameters[parameters.LivenessTestMinDownstreamBytes] = livenessTestSize
+			applyParameters[parameters.LivenessTestMaxDownstreamBytes] = livenessTestSize
+		}
 
 		err = clientConfig.SetClientParameters("", true, applyParameters)
 		if err != nil {
@@ -1153,7 +1207,8 @@ func pavePsinetDatabaseFile(
 
 func paveTrafficRulesFile(
 	t *testing.T, trafficRulesFilename, propagationChannelID, accessType string,
-	requireAuthorization, deny bool) {
+	requireAuthorization, deny bool,
+	livenessTestSize int) {
 
 	allowTCPPorts := fmt.Sprintf("%d", mockWebServerPort)
 	allowUDPPorts := "53, 123"
@@ -1177,7 +1232,9 @@ func paveTrafficRulesFile(
         "DefaultRules" :  {
             "RateLimits" : {
                 "ReadBytesPerSecond": 16384,
-                "WriteBytesPerSecond": 16384
+                "WriteBytesPerSecond": 16384,
+                "ReadUnthrottledBytes": %d,
+                "WriteUnthrottledBytes": %d
             },
             "AllowTCPPorts" : [0],
             "AllowUDPPorts" : [0],
@@ -1196,8 +1253,8 @@ func paveTrafficRulesFile(
                 },
                 "Rules" : {
                     "RateLimits" : {
-                        "ReadUnthrottledBytes": 132352,
-                        "WriteUnthrottledBytes": 132352
+                        "ReadBytesPerSecond": 2097152,
+                        "WriteBytesPerSecond": 2097152
                     },
                     "AllowTCPPorts" : [%s],
                     "AllowUDPPorts" : [%s]
@@ -1208,7 +1265,9 @@ func paveTrafficRulesFile(
     `
 
 	trafficRulesJSON := fmt.Sprintf(
-		trafficRulesJSONFormat, propagationChannelID, authorizationFilter, allowTCPPorts, allowUDPPorts)
+		trafficRulesJSONFormat,
+		livenessTestSize, livenessTestSize,
+		propagationChannelID, authorizationFilter, allowTCPPorts, allowUDPPorts)
 
 	err := ioutil.WriteFile(trafficRulesFilename, []byte(trafficRulesJSON), 0600)
 	if err != nil {
@@ -1312,7 +1371,8 @@ func paveTacticsConfigFile(
 	t *testing.T, tacticsConfigFilename string,
 	tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey string,
 	tunnelProtocol string,
-	propagationChannelID string) {
+	propagationChannelID string,
+	livenessTestSize int) {
 
 	// Setting LimitTunnelProtocols passively exercises the
 	// server-side LimitTunnelProtocols enforcement.
@@ -1343,7 +1403,11 @@ func paveTacticsConfigFile(
           "FragmentorDownstreamMinWriteBytes" : 1,
           "FragmentorDownstreamMaxWriteBytes" : 100,
           "FragmentorDownstreamMinDelay" : "1ms",
-          "FragmentorDownstreamMaxDelay" : "10ms"
+          "FragmentorDownstreamMaxDelay" : "10ms",
+          "LivenessTestMinUpstreamBytes" : %d,
+          "LivenessTestMaxUpstreamBytes" : %d,
+          "LivenessTestMinDownstreamBytes" : %d,
+          "LivenessTestMaxDownstreamBytes" : %d
         }
       },
       "FilteredTactics" : [
@@ -1372,6 +1436,7 @@ func paveTacticsConfigFile(
 		tunnelProtocol,
 		tunnelProtocol,
 		tunnelProtocol,
+		livenessTestSize, livenessTestSize, livenessTestSize, livenessTestSize,
 		propagationChannelID)
 
 	err := ioutil.WriteFile(tacticsConfigFilename, []byte(tacticsConfigJSON), 0600)

+ 400 - 235
psiphon/server/tunnelServer.go

@@ -21,12 +21,14 @@ package server
 
 import (
 	"context"
+	"crypto/rand"
 	"crypto/subtle"
 	"encoding/base64"
 	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net"
 	"strconv"
 	"sync"
@@ -63,6 +65,8 @@ const (
 	SSH_SEND_OSL_RETRY_FACTOR             = 2
 	OSL_SESSION_CACHE_TTL                 = 5 * time.Minute
 	MAX_AUTHORIZATIONS                    = 16
+	PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
+	RANDOM_STREAM_MAX_BYTES               = 10485760
 )
 
 // TunnelServer is the main server that accepts Psiphon client
@@ -963,6 +967,8 @@ type sshClient struct {
 	tcpPortForwardDialingAvailableSignal context.CancelFunc
 	releaseAuthorizations                func()
 	stopTimer                            *time.Timer
+	preHandshakeRandomStreamMetrics      randomStreamMetrics
+	postHandshakeRandomStreamMetrics     randomStreamMetrics
 }
 
 type trafficState struct {
@@ -976,6 +982,14 @@ type trafficState struct {
 	availablePortForwardCond              *sync.Cond
 }
 
+type randomStreamMetrics struct {
+	count                 int
+	upstreamBytes         int
+	receivedUpstreamBytes int
+	downstreamBytes       int
+	sentDownstreamBytes   int
+}
+
 // qualityMetrics records upstream TCP dial attempts and
 // elapsed time. Elapsed time includes the full TCP handshake
 // and, in aggregate, is a measure of the quality of the
@@ -1356,7 +1370,8 @@ func (sshClient *sshClient) stop() {
 // When the SSH client connection closes, both the channels and requests channels
 // will close and runTunnel will exit.
 func (sshClient *sshClient) runTunnel(
-	channels <-chan ssh.NewChannel, requests <-chan *ssh.Request) {
+	channels <-chan ssh.NewChannel,
+	requests <-chan *ssh.Request) {
 
 	waitGroup := new(sync.WaitGroup)
 
@@ -1365,64 +1380,135 @@ func (sshClient *sshClient) runTunnel(
 	waitGroup.Add(1)
 	go func() {
 		defer waitGroup.Done()
+		sshClient.handleSSHRequests(requests)
+	}()
 
-		for request := range requests {
+	// Start OSL sender
 
-			// Requests are processed serially; API responses must be sent in request order.
+	if sshClient.supportsServerRequests {
+		waitGroup.Add(1)
+		go func() {
+			defer waitGroup.Done()
+			sshClient.runOSLSender()
+		}()
+	}
 
-			var responsePayload []byte
-			var err error
+	// Start the TCP port forward manager
 
-			if request.Type == "keepalive@openssh.com" {
+	// The queue size is set to the traffic rules (MaxTCPPortForwardCount +
+	// MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
+	// limits per client; when that value is not set, a default is used.
+	// A limitation: this queue size is set once and doesn't change, for this client,
+	// when traffic rules are reloaded.
+	queueSize := sshClient.getTCPPortForwardQueueSize()
+	if queueSize == 0 {
+		queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
+	}
+	newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
 
-				// SSH keep alive round trips are used as speed test samples.
-				responsePayload, err = tactics.MakeSpeedTestResponse(
-					SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
+	waitGroup.Add(1)
+	go func() {
+		defer waitGroup.Done()
+		sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)
+	}()
 
-			} else {
+	// Handle new channel (port forward) requests from the client.
 
-				// All other requests are assumed to be API requests.
+	for newChannel := range channels {
+		switch newChannel.ChannelType() {
+		case protocol.RANDOM_STREAM_CHANNEL_TYPE:
+			sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)
+		case protocol.PACKET_TUNNEL_CHANNEL_TYPE:
+			sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)
+		case "direct-tcpip":
+			sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, newTCPPortForwards)
+		default:
+			sshClient.rejectNewChannel(newChannel,
+				fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))
+		}
+	}
 
-				sshClient.Lock()
-				authorizedAccessTypes := sshClient.handshakeState.authorizedAccessTypes
-				sshClient.Unlock()
+	// The channel loop is interrupted by a client
+	// disconnect or by calling sshClient.stop().
 
-				// Note: unlock before use is only safe as long as referenced sshClient data,
-				// such as slices in handshakeState, is read-only after initially set.
+	// Stop the TCP port forward manager
+	close(newTCPPortForwards)
 
-				responsePayload, err = sshAPIRequestHandler(
-					sshClient.sshServer.support,
-					sshClient.geoIPData,
-					authorizedAccessTypes,
-					request.Type,
-					request.Payload)
-			}
+	// Stop all other worker goroutines
+	sshClient.stopRunning()
 
-			if err == nil {
-				err = request.Reply(true, responsePayload)
-			} else {
-				log.WithContextFields(LogFields{"error": err}).Warning("request failed")
-				err = request.Reply(false, nil)
-			}
-			if err != nil {
-				if !isExpectedTunnelIOError(err) {
-					log.WithContextFields(LogFields{"error": err}).Warning("response failed")
-				}
-			}
+	if sshClient.sshServer.support.Config.RunPacketTunnel {
+		// PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
+		sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
+			sshClient.sessionID)
+	}
+
+	waitGroup.Wait()
+
+	sshClient.cleanupAuthorizations()
+}
+
+func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {
+
+	for request := range requests {
+
+		// Requests are processed serially; API responses must be sent in request order.
+
+		var responsePayload []byte
+		var err error
+
+		if request.Type == "keepalive@openssh.com" {
+
+			// SSH keep alive round trips are used as speed test samples.
+			responsePayload, err = tactics.MakeSpeedTestResponse(
+				SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
+
+		} else {
+
+			// All other requests are assumed to be API requests.
+
+			sshClient.Lock()
+			authorizedAccessTypes := sshClient.handshakeState.authorizedAccessTypes
+			sshClient.Unlock()
+
+			// Note: unlock before use is only safe as long as referenced sshClient data,
+			// such as slices in handshakeState, is read-only after initially set.
 
+			responsePayload, err = sshAPIRequestHandler(
+				sshClient.sshServer.support,
+				sshClient.geoIPData,
+				authorizedAccessTypes,
+				request.Type,
+				request.Payload)
 		}
-	}()
 
-	// Start OSL sender
+		if err == nil {
+			err = request.Reply(true, responsePayload)
+		} else {
+			log.WithContextFields(LogFields{"error": err}).Warning("request failed")
+			err = request.Reply(false, nil)
+		}
+		if err != nil {
+			if !isExpectedTunnelIOError(err) {
+				log.WithContextFields(LogFields{"error": err}).Warning("response failed")
+			}
+		}
 
-	if sshClient.supportsServerRequests {
-		waitGroup.Add(1)
-		go func() {
-			defer waitGroup.Done()
-			sshClient.runOSLSender()
-		}()
 	}
 
+}
+
+type newTCPPortForward struct {
+	enqueueTime   monotime.Time
+	hostToConnect string
+	portToConnect int
+	newChannel    ssh.NewChannel
+}
+
+func (sshClient *sshClient) handleTCPPortForwards(
+	waitGroup *sync.WaitGroup,
+	newTCPPortForwards chan *newTCPPortForward) {
+
 	// Lifecycle of a TCP port forward:
 	//
 	// 1. A "direct-tcpip" SSH request is received from the client.
@@ -1484,249 +1570,316 @@ func (sshClient *sshClient) runTunnel(
 	//    f. Call closedPortForward() which decrements concurrentPortForwardCount and
 	//       records bytes transferred.
 
-	// Start the TCP port forward manager
+	for newPortForward := range newTCPPortForwards {
 
-	type newTCPPortForward struct {
-		enqueueTime   monotime.Time
-		hostToConnect string
-		portToConnect int
-		newChannel    ssh.NewChannel
-	}
+		remainingDialTimeout :=
+			time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
+				monotime.Since(newPortForward.enqueueTime)
 
-	// The queue size is set to the traffic rules (MaxTCPPortForwardCount +
-	// MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
-	// limits per client; when that value is not set, a default is used.
-	// A limitation: this queue size is set once and doesn't change, for this client,
-	// when traffic rules are reloaded.
-	queueSize := sshClient.getTCPPortForwardQueueSize()
-	if queueSize == 0 {
-		queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
-	}
-	newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
-
-	waitGroup.Add(1)
-	go func() {
-		defer waitGroup.Done()
-		for newPortForward := range newTCPPortForwards {
+		if remainingDialTimeout <= 0 {
+			sshClient.updateQualityMetricsWithRejectedDialingLimit()
+			sshClient.rejectNewChannel(
+				newPortForward.newChannel, "TCP port forward timed out in queue")
+			continue
+		}
 
-			remainingDialTimeout :=
-				time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
-					monotime.Since(newPortForward.enqueueTime)
+		// Reserve a TCP dialing slot.
+		//
+		// TOCTOU note: important to increment counts _before_ checking limits; otherwise,
+		// the client could potentially consume excess resources by initiating many port
+		// forwards concurrently.
+
+		sshClient.dialingTCPPortForward()
+
+		// When max dials are in progress, wait up to remainingDialTimeout for dialing
+		// to become available. This blocks all dequeing.
+
+		if sshClient.isTCPDialingPortForwardLimitExceeded() {
+			blockStartTime := monotime.Now()
+			ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
+			sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
+			<-ctx.Done()
+			sshClient.setTCPPortForwardDialingAvailableSignal(nil)
+			cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
+			remainingDialTimeout -= monotime.Since(blockStartTime)
+		}
 
-			if remainingDialTimeout <= 0 {
-				sshClient.updateQualityMetricsWithRejectedDialingLimit()
-				sshClient.rejectNewChannel(
-					newPortForward.newChannel, "TCP port forward timed out in queue")
-				continue
-			}
+		if remainingDialTimeout <= 0 {
 
-			// Reserve a TCP dialing slot.
-			//
-			// TOCTOU note: important to increment counts _before_ checking limits; otherwise,
-			// the client could potentially consume excess resources by initiating many port
-			// forwards concurrently.
-
-			sshClient.dialingTCPPortForward()
-
-			// When max dials are in progress, wait up to remainingDialTimeout for dialing
-			// to become available. This blocks all dequeing.
-
-			if sshClient.isTCPDialingPortForwardLimitExceeded() {
-				blockStartTime := monotime.Now()
-				ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
-				sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
-				<-ctx.Done()
-				sshClient.setTCPPortForwardDialingAvailableSignal(nil)
-				cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
-				remainingDialTimeout -= monotime.Since(blockStartTime)
-			}
+			// Release the dialing slot here since handleTCPChannel() won't be called.
+			sshClient.abortedTCPPortForward()
 
-			if remainingDialTimeout <= 0 {
+			sshClient.updateQualityMetricsWithRejectedDialingLimit()
+			sshClient.rejectNewChannel(
+				newPortForward.newChannel, "TCP port forward timed out before dialing")
+			continue
+		}
 
-				// Release the dialing slot here since handleTCPChannel() won't be called.
-				sshClient.abortedTCPPortForward()
+		// Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
+		// handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
+		// will deal with remainingDialTimeout <= 0.
 
-				sshClient.updateQualityMetricsWithRejectedDialingLimit()
-				sshClient.rejectNewChannel(
-					newPortForward.newChannel, "TCP port forward timed out before dialing")
-				continue
-			}
+		waitGroup.Add(1)
+		go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
+			defer waitGroup.Done()
+			sshClient.handleTCPChannel(
+				remainingDialTimeout,
+				newPortForward.hostToConnect,
+				newPortForward.portToConnect,
+				newPortForward.newChannel)
+		}(remainingDialTimeout, newPortForward)
+	}
+}
 
-			// Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
-			// handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
-			// will deal with remainingDialTimeout <= 0.
-
-			waitGroup.Add(1)
-			go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
-				defer waitGroup.Done()
-				sshClient.handleTCPChannel(
-					remainingDialTimeout,
-					newPortForward.hostToConnect,
-					newPortForward.portToConnect,
-					newPortForward.newChannel)
-			}(remainingDialTimeout, newPortForward)
-		}
-	}()
+func (sshClient *sshClient) handleNewRandomStreamChannel(
+	waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
 
-	// Handle new channel (port forward) requests from the client.
+	// A random stream channel returns the requested number of bytes -- random
+	// bytes -- to the client while also consuming and discarding bytes sent
+	// by the client.
 	//
-	// packet tunnel channels are handled by the packet tunnel server
-	// component. Each client may have at most one packet tunnel channel.
+	// One use case for the random stream channel is a liveness test that the
+	// client performs to confirm that the tunnel is live. As the liveness
+	// test is performed in the concurrent establishment phase, before
+	// selecting a single candidate for handshake, the random stream channel
+	// is available pre-handshake, albeit with additional restrictions.
 	//
-	// udpgw client connections are dispatched immediately (clients use this for
-	// DNS, so it's essential to not block; and only one udpgw connection is
-	// retained at a time).
+	// The random stream is subject to throttling in traffic rules; for
+	// unthrottled liveness tests, set initial   Read/WriteUnthrottledBytes as
+	// required. The random stream maximum count and response size cap
+	// mitigate clients abusing the facility to waste server resources.
 	//
-	// All other TCP port forwards are dispatched via the TCP port forward
-	// manager queue.
+	// Like all other channels, this channel type is handled asynchronously,
+	// so it's possible to run at any point in the tunnel lifecycle.
+	//
+	// Up/downstream byte counts don't include SSH packet and request
+	// marshalling overhead.
 
-	for newChannel := range channels {
+	var request protocol.RandomStreamRequest
+	err := json.Unmarshal(newChannel.ExtraData(), &request)
+	if err != nil {
+		sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))
+		return
+	}
 
-		if newChannel.ChannelType() == protocol.PACKET_TUNNEL_CHANNEL_TYPE {
+	if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {
+		sshClient.rejectNewChannel(newChannel,
+			fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))
+		return
+	}
 
-			if !sshClient.sshServer.support.Config.RunPacketTunnel {
-				sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
-				continue
-			}
+	if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {
+		sshClient.rejectNewChannel(newChannel,
+			fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))
+		return
+	}
 
-			// Accept this channel immediately. This channel will replace any
-			// previously existing packet tunnel channel for this client.
+	var metrics *randomStreamMetrics
 
-			packetTunnelChannel, requests, err := newChannel.Accept()
-			if err != nil {
-				if !isExpectedTunnelIOError(err) {
-					log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
-				}
-				continue
-			}
-			go ssh.DiscardRequests(requests)
+	sshClient.Lock()
 
-			sshClient.setPacketTunnelChannel(packetTunnelChannel)
+	if !sshClient.handshakeState.completed {
+		metrics = &sshClient.preHandshakeRandomStreamMetrics
+	} else {
+		metrics = &sshClient.postHandshakeRandomStreamMetrics
+	}
 
-			// PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
-			// will stop packet tunnel workers for any previous packet tunnel channel.
+	countOk := true
+	if !sshClient.handshakeState.completed &&
+		metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {
+		countOk = false
+	} else {
+		metrics.count++
+	}
 
-			checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
-				return sshClient.isPortForwardPermitted(portForwardTypeTCP, false, upstreamIPAddress, port)
-			}
+	sshClient.Unlock()
 
-			checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
-				return sshClient.isPortForwardPermitted(portForwardTypeUDP, false, upstreamIPAddress, port)
-			}
+	if !countOk {
+		sshClient.rejectNewChannel(newChannel, "max count exceeded")
+		return
+	}
 
-			flowActivityUpdaterMaker := func(
-				upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
+	channel, requests, err := newChannel.Accept()
+	if err != nil {
+		if !isExpectedTunnelIOError(err) {
+			log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
+		}
+		return
+	}
+	go ssh.DiscardRequests(requests)
 
-				var updaters []tun.FlowActivityUpdater
-				oslUpdater := sshClient.newClientSeedPortForward(upstreamIPAddress)
-				if oslUpdater != nil {
-					updaters = append(updaters, oslUpdater)
-				}
-				return updaters
-			}
+	waitGroup.Add(1)
+	go func() {
+		defer waitGroup.Done()
 
-			metricUpdater := func(
-				TCPApplicationBytesUp, TCPApplicationBytesDown,
-				UDPApplicationBytesUp, UDPApplicationBytesDown int64) {
+		received := 0
+		sent := 0
 
-				sshClient.Lock()
-				sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
-				sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
-				sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
-				sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
-				sshClient.Unlock()
+		if request.UpstreamBytes > 0 {
+			n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))
+			received = int(n)
+			if err != nil {
+				if !isExpectedTunnelIOError(err) {
+					log.WithContextFields(LogFields{"error": err}).Warning("receive failed")
+				}
+				// Fall through and record any bytes received...
 			}
+		}
 
-			err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
-				sshClient.sessionID,
-				packetTunnelChannel,
-				checkAllowedTCPPortFunc,
-				checkAllowedUDPPortFunc,
-				flowActivityUpdaterMaker,
-				metricUpdater)
+		if request.DownstreamBytes > 0 {
+			n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))
+			sent = int(n)
 			if err != nil {
-				log.WithContextFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
-				sshClient.setPacketTunnelChannel(nil)
+				if !isExpectedTunnelIOError(err) {
+					log.WithContextFields(LogFields{"error": err}).Warning("send failed")
+				}
 			}
-
-			continue
 		}
 
-		if newChannel.ChannelType() != "direct-tcpip" {
-			sshClient.rejectNewChannel(newChannel, "unknown or unsupported channel type")
-			continue
-		}
+		sshClient.Lock()
+		metrics.upstreamBytes += request.UpstreamBytes
+		metrics.receivedUpstreamBytes += received
+		metrics.downstreamBytes += request.DownstreamBytes
+		metrics.sentDownstreamBytes += sent
+		sshClient.Unlock()
 
-		// http://tools.ietf.org/html/rfc4254#section-7.2
-		var directTcpipExtraData struct {
-			HostToConnect       string
-			PortToConnect       uint32
-			OriginatorIPAddress string
-			OriginatorPort      uint32
-		}
+		channel.Close()
+	}()
+}
 
-		err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
-		if err != nil {
-			sshClient.rejectNewChannel(newChannel, "invalid extra data")
-			continue
-		}
+func (sshClient *sshClient) handleNewPacketTunnelChannel(
+	waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
+
+	// packet tunnel channels are handled by the packet tunnel server
+	// component. Each client may have at most one packet tunnel channel.
 
-		// Intercept TCP port forwards to a specified udpgw server and handle directly.
-		// TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
-		isUDPChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
-			sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
-				net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
+	if !sshClient.sshServer.support.Config.RunPacketTunnel {
+		sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
+		return
+	}
 
-		if isUDPChannel {
+	// Accept this channel immediately. This channel will replace any
+	// previously existing packet tunnel channel for this client.
 
-			// Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
-			// own worker goroutine.
+	packetTunnelChannel, requests, err := newChannel.Accept()
+	if err != nil {
+		if !isExpectedTunnelIOError(err) {
+			log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
+		}
+		return
+	}
+	go ssh.DiscardRequests(requests)
 
-			waitGroup.Add(1)
-			go func(channel ssh.NewChannel) {
-				defer waitGroup.Done()
-				sshClient.handleUDPChannel(channel)
-			}(newChannel)
+	sshClient.setPacketTunnelChannel(packetTunnelChannel)
 
-		} else {
+	// PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
+	// will stop packet tunnel workers for any previous packet tunnel channel.
 
-			// Dispatch via TCP port forward manager. When the queue is full, the channel
-			// is immediately rejected.
+	checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
+		return sshClient.isPortForwardPermitted(portForwardTypeTCP, false, upstreamIPAddress, port)
+	}
 
-			tcpPortForward := &newTCPPortForward{
-				enqueueTime:   monotime.Now(),
-				hostToConnect: directTcpipExtraData.HostToConnect,
-				portToConnect: int(directTcpipExtraData.PortToConnect),
-				newChannel:    newChannel,
-			}
+	checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
+		return sshClient.isPortForwardPermitted(portForwardTypeUDP, false, upstreamIPAddress, port)
+	}
 
-			select {
-			case newTCPPortForwards <- tcpPortForward:
-			default:
-				sshClient.updateQualityMetricsWithRejectedDialingLimit()
-				sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
-			}
+	flowActivityUpdaterMaker := func(
+		upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
+
+		var updaters []tun.FlowActivityUpdater
+		oslUpdater := sshClient.newClientSeedPortForward(upstreamIPAddress)
+		if oslUpdater != nil {
+			updaters = append(updaters, oslUpdater)
 		}
+		return updaters
 	}
 
-	// The channel loop is interrupted by a client
-	// disconnect or by calling sshClient.stop().
+	metricUpdater := func(
+		TCPApplicationBytesUp, TCPApplicationBytesDown,
+		UDPApplicationBytesUp, UDPApplicationBytesDown int64) {
 
-	// Stop the TCP port forward manager
-	close(newTCPPortForwards)
+		sshClient.Lock()
+		sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
+		sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
+		sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
+		sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
+		sshClient.Unlock()
+	}
 
-	// Stop all other worker goroutines
-	sshClient.stopRunning()
+	err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
+		sshClient.sessionID,
+		packetTunnelChannel,
+		checkAllowedTCPPortFunc,
+		checkAllowedUDPPortFunc,
+		flowActivityUpdaterMaker,
+		metricUpdater)
+	if err != nil {
+		log.WithContextFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
+		sshClient.setPacketTunnelChannel(nil)
+	}
+}
 
-	if sshClient.sshServer.support.Config.RunPacketTunnel {
-		// PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
-		sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
-			sshClient.sessionID)
+func (sshClient *sshClient) handleNewTCPPortForwardChannel(
+	waitGroup *sync.WaitGroup, newChannel ssh.NewChannel,
+	newTCPPortForwards chan *newTCPPortForward) {
+
+	// udpgw client connections are dispatched immediately (clients use this for
+	// DNS, so it's essential to not block; and only one udpgw connection is
+	// retained at a time).
+	//
+	// All other TCP port forwards are dispatched via the TCP port forward
+	// manager queue.
+
+	// http://tools.ietf.org/html/rfc4254#section-7.2
+	var directTcpipExtraData struct {
+		HostToConnect       string
+		PortToConnect       uint32
+		OriginatorIPAddress string
+		OriginatorPort      uint32
 	}
 
-	waitGroup.Wait()
+	err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
+	if err != nil {
+		sshClient.rejectNewChannel(newChannel, "invalid extra data")
+		return
+	}
 
-	sshClient.cleanupAuthorizations()
+	// Intercept TCP port forwards to a specified udpgw server and handle directly.
+	// TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
+	isUDPChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
+		sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
+			net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
+
+	if isUDPChannel {
+
+		// Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
+		// own worker goroutine.
+
+		waitGroup.Add(1)
+		go func(channel ssh.NewChannel) {
+			defer waitGroup.Done()
+			sshClient.handleUDPChannel(channel)
+		}(newChannel)
+
+	} else {
+
+		// Dispatch via TCP port forward manager. When the queue is full, the channel
+		// is immediately rejected.
+
+		tcpPortForward := &newTCPPortForward{
+			enqueueTime:   monotime.Now(),
+			hostToConnect: directTcpipExtraData.HostToConnect,
+			portToConnect: int(directTcpipExtraData.PortToConnect),
+			newChannel:    newChannel,
+		}
+
+		select {
+		case newTCPPortForwards <- tcpPortForward:
+		default:
+			sshClient.updateQualityMetricsWithRejectedDialingLimit()
+			sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
+		}
+	}
 }
 
 func (sshClient *sshClient) cleanupAuthorizations() {
@@ -1802,6 +1955,18 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 	// sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful
 	logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
 	logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
+	logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
+
+	logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
+	logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes
+	logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes
+	logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes
+	logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes
+	logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count
+	logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes
+	logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes
+	logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
+	logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
 
 	// Pre-calculate a total-tunneled-bytes field. This total is used
 	// extensively in analytics and is more performant when pre-calculated.
@@ -2280,7 +2445,7 @@ func (sshClient *sshClient) isPortForwardPermitted(
 
 	// Disallow connection to loopback. This is a failsafe. The server
 	// should be run on a host with correctly configured firewall rules.
-	// An exception is made in the case of tranparent DNS forwarding,
+	// An exception is made in the case of transparent DNS forwarding,
 	// where the remoteIP has been rewritten.
 	if !isTransparentDNSForwarding && remoteIP.IsLoopback() {
 		return false

+ 97 - 0
psiphon/tunnel.go

@@ -22,11 +22,13 @@ package psiphon
 import (
 	"bytes"
 	"context"
+	"crypto/rand"
 	"encoding/base64"
 	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net"
 	"net/url"
 	"sync"
@@ -820,6 +822,10 @@ func dialSsh(
 	rateLimits := p.RateLimits(parameters.TunnelRateLimits)
 	obfuscatedSSHMinPadding := p.Int(parameters.ObfuscatedSSHMinPadding)
 	obfuscatedSSHMaxPadding := p.Int(parameters.ObfuscatedSSHMaxPadding)
+	livenessTestMinUpstreamBytes := p.Int(parameters.LivenessTestMinUpstreamBytes)
+	livenessTestMaxUpstreamBytes := p.Int(parameters.LivenessTestMaxUpstreamBytes)
+	livenessTestMinDownstreamBytes := p.Int(parameters.LivenessTestMinDownstreamBytes)
+	livenessTestMaxDownstreamBytes := p.Int(parameters.LivenessTestMaxDownstreamBytes)
 	p = nil
 
 	var cancelFunc context.CancelFunc
@@ -1102,7 +1108,29 @@ func dialSsh(
 			close(noRequests)
 
 			sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
+
+			if livenessTestMinUpstreamBytes > 0 || livenessTestMinDownstreamBytes > 0 {
+
+				// When configured, perform a liveness test which sends and
+				// receives bytes through the tunnel to ensure the tunnel had
+				// not been blocked upon or shortly after connecting. This
+				// test is performed concurrently for each establishment
+				// candidate before selecting a successful tunnel.
+				//
+				// Note that the liveness test is subject to the
+				// TunnelConnectTimeout, which should be adjusted
+				// accordinging.
+
+				var metrics livenessTestMetrics
+				metrics, err = performLivenessTest(
+					sshClient,
+					livenessTestMinUpstreamBytes, livenessTestMaxUpstreamBytes,
+					livenessTestMinDownstreamBytes, livenessTestMaxDownstreamBytes)
+
+				NoticeLivenessTest(serverEntry.IpAddress, metrics, err == nil)
+			}
 		}
+
 		resultChannel <- sshNewClientResult{sshClient, sshRequests, err}
 	}()
 
@@ -1142,6 +1170,75 @@ func dialSsh(
 		nil
 }
 
+type livenessTestMetrics struct {
+	duration                time.Duration
+	upstreamBytes           int
+	sentUpstreamBytes       int
+	downstreamBytes         int
+	receivedDownstreamBytes int
+}
+
+func performLivenessTest(
+	sshClient *ssh.Client,
+	minUpstreamBytes, maxUpstreamBytes int,
+	minDownstreamBytes, maxDownstreamBytes int) (livenessTestMetrics, error) {
+
+	var err error
+	var metrics livenessTestMetrics
+	defer func(startTime monotime.Time) {
+		metrics.duration = monotime.Since(startTime)
+	}(monotime.Now())
+
+	metrics.upstreamBytes, err = common.MakeSecureRandomRange(
+		minUpstreamBytes, maxUpstreamBytes)
+	if err != nil {
+		return metrics, common.ContextError(err)
+	}
+
+	metrics.downstreamBytes, err = common.MakeSecureRandomRange(
+		minDownstreamBytes, maxDownstreamBytes)
+	if err != nil {
+		return metrics, common.ContextError(err)
+	}
+
+	request := &protocol.RandomStreamRequest{
+		UpstreamBytes:   metrics.upstreamBytes,
+		DownstreamBytes: metrics.downstreamBytes,
+	}
+
+	extraData, err := json.Marshal(request)
+	if err != nil {
+		return metrics, common.ContextError(err)
+	}
+
+	channel, requests, err := sshClient.OpenChannel(
+		protocol.RANDOM_STREAM_CHANNEL_TYPE, extraData)
+	if err != nil {
+		return metrics, common.ContextError(err)
+	}
+	defer channel.Close()
+
+	go ssh.DiscardRequests(requests)
+
+	if metrics.upstreamBytes > 0 {
+		n, err := io.CopyN(channel, rand.Reader, int64(metrics.upstreamBytes))
+		metrics.sentUpstreamBytes = int(n)
+		if err != nil {
+			return metrics, common.ContextError(err)
+		}
+	}
+
+	if metrics.downstreamBytes > 0 {
+		n, err := io.CopyN(ioutil.Discard, channel, int64(metrics.downstreamBytes))
+		metrics.receivedDownstreamBytes = int(n)
+		if err != nil {
+			return metrics, common.ContextError(err)
+		}
+	}
+
+	return metrics, nil
+}
+
 func selectQUICVersion(
 	clientParameters *parameters.ClientParameters) string {