Просмотр исходного кода

Merge pull request #549 from rod-hynes/master

Metrics API requests changes
Rod Hynes 5 лет назад
Родитель
Сommit
7521054165

+ 13 - 0
psiphon/common/fragmentor/fragmentor.go

@@ -214,6 +214,19 @@ func (c *Conn) GetMetrics() common.LogFields {
 	return logFields
 	return logFields
 }
 }
 
 
+var upstreamMetricsNames = []string{
+	"upstream_bytes_fragmented",
+	"upstream_min_bytes_written",
+	"upstream_max_bytes_written",
+	"upstream_min_delayed",
+	"upstream_max_delayed",
+}
+
+// GetUpstreamMetricsNames returns the upstream metrics parameter names.
+func GetUpstreamMetricsNames() []string {
+	return upstreamMetricsNames
+}
+
 // SetPRNG sets the PRNG to be used by the fragmentor. Specifying a PRNG
 // SetPRNG sets the PRNG to be used by the fragmentor. Specifying a PRNG
 // allows for optional replay of a fragmentor sequence. SetPRNG is intended to
 // allows for optional replay of a fragmentor sequence. SetPRNG is intended to
 // be used with obfuscator.GetDerivedPRNG and allows for setting the PRNG
 // be used with obfuscator.GetDerivedPRNG and allows for setting the PRNG

+ 46 - 36
psiphon/controller.go

@@ -56,7 +56,6 @@ type Controller struct {
 	establishedOnce                         bool
 	establishedOnce                         bool
 	tunnels                                 []*Tunnel
 	tunnels                                 []*Tunnel
 	nextTunnel                              int
 	nextTunnel                              int
-	startedConnectedReporter                bool
 	isEstablishing                          bool
 	isEstablishing                          bool
 	protocolSelectionConstraints            *protocolSelectionConstraints
 	protocolSelectionConstraints            *protocolSelectionConstraints
 	concurrentEstablishTunnelsMutex         sync.Mutex
 	concurrentEstablishTunnelsMutex         sync.Mutex
@@ -111,13 +110,12 @@ func NewController(config *Config) (controller *Controller, err error) {
 		runWaitGroup: new(sync.WaitGroup),
 		runWaitGroup: new(sync.WaitGroup),
 		// connectedTunnels and failedTunnels buffer sizes are large enough to
 		// connectedTunnels and failedTunnels buffer sizes are large enough to
 		// receive full pools of tunnels without blocking. Senders should not block.
 		// receive full pools of tunnels without blocking. Senders should not block.
-		connectedTunnels:         make(chan *Tunnel, config.TunnelPoolSize),
-		failedTunnels:            make(chan *Tunnel, config.TunnelPoolSize),
-		tunnels:                  make([]*Tunnel, 0),
-		establishedOnce:          false,
-		startedConnectedReporter: false,
-		isEstablishing:           false,
-		untunneledDialConfig:     untunneledDialConfig,
+		connectedTunnels:     make(chan *Tunnel, config.TunnelPoolSize),
+		failedTunnels:        make(chan *Tunnel, config.TunnelPoolSize),
+		tunnels:              make([]*Tunnel, 0),
+		establishedOnce:      false,
+		isEstablishing:       false,
+		untunneledDialConfig: untunneledDialConfig,
 		// TODO: Add a buffer of 1 so we don't miss a signal while receiver is
 		// TODO: Add a buffer of 1 so we don't miss a signal while receiver is
 		// starting? Trade-off is potential back-to-back fetch remotes. As-is,
 		// starting? Trade-off is potential back-to-back fetch remotes. As-is,
 		// establish will eventually signal another fetch remote.
 		// establish will eventually signal another fetch remote.
@@ -238,8 +236,8 @@ func (controller *Controller) Run(ctx context.Context) {
 		go controller.upgradeDownloader()
 		go controller.upgradeDownloader()
 	}
 	}
 
 
-	/// Note: the connected reporter isn't started until a tunnel is
-	// established
+	controller.runWaitGroup.Add(1)
+	go controller.connectedReporter()
 
 
 	controller.runWaitGroup.Add(1)
 	controller.runWaitGroup.Add(1)
 	go controller.runTunnels()
 	go controller.runTunnels()
@@ -444,18 +442,38 @@ func (controller *Controller) establishTunnelWatcher() {
 // connectedReporter sends periodic "connected" requests to the Psiphon API.
 // connectedReporter sends periodic "connected" requests to the Psiphon API.
 // These requests are for server-side unique user stats calculation. See the
 // These requests are for server-side unique user stats calculation. See the
 // comment in DoConnectedRequest for a description of the request mechanism.
 // comment in DoConnectedRequest for a description of the request mechanism.
-// To ensure we don't over- or under-count unique users, only one connected
-// request is made across all simultaneous multi-tunnels; and the connected
-// request is repeated periodically for very long-lived tunnels.
-// The signalReportConnected mechanism is used to trigger another connected
-// request immediately after a reconnect.
+//
+// To correctly count daily unique users, only one connected request is made
+// across all simultaneous multi-tunnels; and the connected request is
+// repeated every 24h.
+//
+// The signalReportConnected mechanism is used to trigger a connected request
+// immediately after a reconnect. While strictly only one connected request
+// per 24h is required in order to count daily unique users, the connected
+// request also delivers the establishment duration metric (which includes
+// time elapsed performing the handshake request) and additional fragmentation
+// metrics; these metrics are measured for each tunnel.
 func (controller *Controller) connectedReporter() {
 func (controller *Controller) connectedReporter() {
 	defer controller.runWaitGroup.Done()
 	defer controller.runWaitGroup.Done()
+
+	// session is nil when DisableApi is set
+	if controller.config.DisableApi {
+		return
+	}
+
 loop:
 loop:
 	for {
 	for {
 
 
-		// Pick any active tunnel and make the next connected request. No error
-		// is logged if there's no active tunnel, as that's not an unexpected condition.
+		select {
+		case <-controller.signalReportConnected:
+			// Make the initial connected request
+		case <-controller.runCtx.Done():
+			break loop
+		}
+
+		// Pick any active tunnel and make the next connected request. No error is
+		// logged if there's no active tunnel, as that's not an unexpected
+		// condition.
 		reported := false
 		reported := false
 		tunnel := controller.getNextActiveTunnel()
 		tunnel := controller.getNextActiveTunnel()
 		if tunnel != nil {
 		if tunnel != nil {
@@ -467,11 +485,9 @@ loop:
 			}
 			}
 		}
 		}
 
 
-		// Schedule the next connected request and wait.
-		// Note: this duration is not a dynamic ClientParameter as
-		// the daily unique user stats logic specifically requires
-		// a "connected" request no more or less often than every
-		// 24 hours.
+		// Schedule the next connected request and wait. This duration is not a
+		// dynamic ClientParameter as the daily unique user stats logic specifically
+		// requires a "connected" request no more or less often than every 24h.
 		var duration time.Duration
 		var duration time.Duration
 		if reported {
 		if reported {
 			duration = 24 * time.Hour
 			duration = 24 * time.Hour
@@ -497,23 +513,16 @@ loop:
 	NoticeInfo("exiting connected reporter")
 	NoticeInfo("exiting connected reporter")
 }
 }
 
 
-func (controller *Controller) startOrSignalConnectedReporter() {
+func (controller *Controller) signalConnectedReporter() {
+
 	// session is nil when DisableApi is set
 	// session is nil when DisableApi is set
 	if controller.config.DisableApi {
 	if controller.config.DisableApi {
 		return
 		return
 	}
 	}
 
 
-	// Start the connected reporter after the first tunnel is established.
-	// Concurrency note: only the runTunnels goroutine may access startedConnectedReporter.
-	if !controller.startedConnectedReporter {
-		controller.startedConnectedReporter = true
-		controller.runWaitGroup.Add(1)
-		go controller.connectedReporter()
-	} else {
-		select {
-		case controller.signalReportConnected <- struct{}{}:
-		default:
-		}
+	select {
+	case controller.signalReportConnected <- struct{}{}:
+	default:
 	}
 	}
 }
 }
 
 
@@ -758,7 +767,7 @@ loop:
 				// Signal a connected request on each 1st tunnel establishment. For
 				// Signal a connected request on each 1st tunnel establishment. For
 				// multi-tunnels, the session is connected as long as at least one
 				// multi-tunnels, the session is connected as long as at least one
 				// tunnel is established.
 				// tunnel is established.
-				controller.startOrSignalConnectedReporter()
+				controller.signalConnectedReporter()
 
 
 				// If the handshake indicated that a new client version is available,
 				// If the handshake indicated that a new client version is available,
 				// trigger an upgrade download.
 				// trigger an upgrade download.
@@ -1610,7 +1619,8 @@ func (controller *Controller) doFetchTactics(
 	}
 	}
 	defer meekConn.Close()
 	defer meekConn.Close()
 
 
-	apiParams := getBaseAPIParameters(controller.config, dialParams)
+	apiParams := getBaseAPIParameters(
+		baseParametersAll, controller.config, dialParams)
 
 
 	tacticsRecord, err := tactics.FetchTactics(
 	tacticsRecord, err := tactics.FetchTactics(
 		ctx,
 		ctx,

+ 162 - 132
psiphon/server/api.go

@@ -21,6 +21,7 @@ package server
 
 
 import (
 import (
 	"crypto/subtle"
 	"crypto/subtle"
+	"encoding/base64"
 	"encoding/json"
 	"encoding/json"
 	std_errors "errors"
 	std_errors "errors"
 	"net"
 	"net"
@@ -28,10 +29,12 @@ import (
 	"runtime/debug"
 	"runtime/debug"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
+	"time"
 	"unicode"
 	"unicode"
 
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
 )
 )
@@ -137,7 +140,7 @@ func dispatchAPIRequestHandler(
 		// applies here.
 		// applies here.
 		sessionID, err := getStringRequestParam(params, "client_session_id")
 		sessionID, err := getStringRequestParam(params, "client_session_id")
 		if err == nil {
 		if err == nil {
-			// Note: follows/duplicates baseRequestParams validation
+			// Note: follows/duplicates baseParams validation
 			if !isHexDigits(support.Config, sessionID) {
 			if !isHexDigits(support.Config, sessionID) {
 				err = std_errors.New("invalid param: client_session_id")
 				err = std_errors.New("invalid param: client_session_id")
 			}
 			}
@@ -174,12 +177,12 @@ func dispatchAPIRequestHandler(
 
 
 var handshakeRequestParams = append(
 var handshakeRequestParams = append(
 	append(
 	append(
-		// Note: legacy clients may not send "session_id" in handshake
 		[]requestParamSpec{
 		[]requestParamSpec{
+			// Legacy clients may not send "session_id" in handshake
 			{"session_id", isHexDigits, requestParamOptional},
 			{"session_id", isHexDigits, requestParamOptional},
-			{"missing_server_entry_signature", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool}},
+			{"missing_server_entry_signature", isBase64String, requestParamOptional}},
 		tacticsParams...),
 		tacticsParams...),
-	baseRequestParams...)
+	baseSessionAndDialParams...)
 
 
 // handshakeAPIRequestHandler implements the "handshake" API request.
 // handshakeAPIRequestHandler implements the "handshake" API request.
 // Clients make the handshake immediately after establishing a tunnel
 // Clients make the handshake immediately after establishing a tunnel
@@ -191,9 +194,9 @@ func handshakeAPIRequestHandler(
 	geoIPData GeoIPData,
 	geoIPData GeoIPData,
 	params common.APIParameters) ([]byte, error) {
 	params common.APIParameters) ([]byte, error) {
 
 
-	// Note: ignoring "known_servers" params
+	// Note: ignoring legacy "known_servers" params
 
 
-	err := validateRequestParams(support.Config, params, baseRequestParams)
+	err := validateRequestParams(support.Config, params, handshakeRequestParams)
 	if err != nil {
 	if err != nil {
 		return nil, errors.Trace(err)
 		return nil, errors.Trace(err)
 	}
 	}
@@ -233,7 +236,7 @@ func handshakeAPIRequestHandler(
 		handshakeState{
 		handshakeState{
 			completed:               true,
 			completed:               true,
 			apiProtocol:             apiProtocol,
 			apiProtocol:             apiProtocol,
-			apiParams:               copyBaseRequestParams(params),
+			apiParams:               copyBaseSessionAndDialParams(params),
 			expectDomainBytes:       len(httpsRequestRegexes) > 0,
 			expectDomainBytes:       len(httpsRequestRegexes) > 0,
 			establishedTunnelsCount: establishedTunnelsCount,
 			establishedTunnelsCount: establishedTunnelsCount,
 		},
 		},
@@ -291,7 +294,7 @@ func handshakeAPIRequestHandler(
 			geoIPData,
 			geoIPData,
 			handshakeStateInfo.authorizedAccessTypes,
 			handshakeStateInfo.authorizedAccessTypes,
 			params,
 			params,
-			baseRequestParams)).Debug("handshake")
+			handshakeRequestParams)).Debug("handshake")
 
 
 	pad_response, _ := getPaddingSizeRequestParam(params, "pad_response")
 	pad_response, _ := getPaddingSizeRequestParam(params, "pad_response")
 
 
@@ -339,31 +342,33 @@ func handshakeAPIRequestHandler(
 	return responsePayload, nil
 	return responsePayload, nil
 }
 }
 
 
+// uniqueUserParams are the connected request parameters which are logged for
+// unique_user events.
+var uniqueUserParams = append(
+	[]requestParamSpec{
+		{"last_connected", isLastConnected, 0}},
+	baseSessionParams...)
+
 var connectedRequestParams = append(
 var connectedRequestParams = append(
 	[]requestParamSpec{
 	[]requestParamSpec{
-		{"session_id", isHexDigits, 0},
-		{"last_connected", isLastConnected, 0},
 		{"establishment_duration", isIntString, requestParamOptional | requestParamLogStringAsInt}},
 		{"establishment_duration", isIntString, requestParamOptional | requestParamLogStringAsInt}},
-	baseRequestParams...)
+	uniqueUserParams...)
 
 
 // updateOnConnectedParamNames are connected request parameters which are
 // updateOnConnectedParamNames are connected request parameters which are
 // copied to update data logged with server_tunnel: these fields either only
 // copied to update data logged with server_tunnel: these fields either only
 // ship with or ship newer data with connected requests.
 // ship with or ship newer data with connected requests.
-var updateOnConnectedParamNames = []string{
-	"last_connected",
-	"establishment_duration",
-	"upstream_bytes_fragmented",
-	"upstream_min_bytes_written",
-	"upstream_max_bytes_written",
-	"upstream_min_delayed",
-	"upstream_max_delayed",
-}
-
-// connectedAPIRequestHandler implements the "connected" API request.
-// Clients make the connected request once a tunnel connection has been
-// established and at least once per day. The last_connected input value,
-// which should be a connected_timestamp output from a previous connected
-// response, is used to calculate unique user stats.
+var updateOnConnectedParamNames = append(
+	[]string{
+		"last_connected",
+		"establishment_duration",
+	},
+	fragmentor.GetUpstreamMetricsNames()...)
+
+// connectedAPIRequestHandler implements the "connected" API request. Clients
+// make the connected request once a tunnel connection has been established
+// and at least once per 24h for long-running tunnels. The last_connected
+// input value, which should be a connected_timestamp output from a previous
+// connected response, is used to calculate unique user stats.
 // connected_timestamp is truncated as a privacy measure.
 // connected_timestamp is truncated as a privacy measure.
 func connectedAPIRequestHandler(
 func connectedAPIRequestHandler(
 	support *SupportServices,
 	support *SupportServices,
@@ -376,21 +381,58 @@ func connectedAPIRequestHandler(
 		return nil, errors.Trace(err)
 		return nil, errors.Trace(err)
 	}
 	}
 
 
+	sessionID, _ := getStringRequestParam(params, "client_session_id")
+	lastConnected, _ := getStringRequestParam(params, "last_connected")
+
 	// Update, for server_tunnel logging, upstream fragmentor metrics, as the
 	// Update, for server_tunnel logging, upstream fragmentor metrics, as the
-	// client may have performed more upstream fragmentation since the
-	// previous metrics reported by the handshake request. Also, additional
-	// fields reported only in the connected request, are added to
-	// server_tunnel here.
+	// client may have performed more upstream fragmentation since the previous
+	// metrics reported by the handshake request. Also, additional fields that
+	// are reported only in the connected request are added to server_tunnel
+	// here.
 
 
 	// TODO: same session-ID-lookup TODO in handshakeAPIRequestHandler
 	// TODO: same session-ID-lookup TODO in handshakeAPIRequestHandler
 	// applies here.
 	// applies here.
-	sessionID, _ := getStringRequestParam(params, "client_session_id")
 	err = support.TunnelServer.UpdateClientAPIParameters(
 	err = support.TunnelServer.UpdateClientAPIParameters(
 		sessionID, copyUpdateOnConnectedParams(params))
 		sessionID, copyUpdateOnConnectedParams(params))
 	if err != nil {
 	if err != nil {
 		return nil, errors.Trace(err)
 		return nil, errors.Trace(err)
 	}
 	}
 
 
+	connectedTimestamp := common.TruncateTimestampToHour(common.GetCurrentTimestamp())
+
+	// The finest required granularity for unique users is daily. To save space,
+	// only record a "unique_user" log event when the client's last_connected is
+	// in the previous day relative to the new connected_timestamp.
+
+	logUniqueUser := false
+	if lastConnected == "None" {
+		logUniqueUser = true
+	} else {
+
+		t1, _ := time.Parse(time.RFC3339, lastConnected)
+		year, month, day := t1.Date()
+		d1 := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
+
+		t2, _ := time.Parse(time.RFC3339, connectedTimestamp)
+		year, month, day = t2.Date()
+		d2 := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
+
+		if t1.Before(t2) && d1 != d2 {
+			logUniqueUser = true
+		}
+	}
+
+	if logUniqueUser {
+		log.LogRawFieldsWithTimestamp(
+			getRequestLogFields(
+				"unique_user",
+				geoIPData,
+				authorizedAccessTypes,
+				params,
+				uniqueUserParams))
+	}
+
+	// TODO: retire the legacy "connected" log event
 	log.LogRawFieldsWithTimestamp(
 	log.LogRawFieldsWithTimestamp(
 		getRequestLogFields(
 		getRequestLogFields(
 			"connected",
 			"connected",
@@ -402,7 +444,7 @@ func connectedAPIRequestHandler(
 	pad_response, _ := getPaddingSizeRequestParam(params, "pad_response")
 	pad_response, _ := getPaddingSizeRequestParam(params, "pad_response")
 
 
 	connectedResponse := protocol.ConnectedResponse{
 	connectedResponse := protocol.ConnectedResponse{
-		ConnectedTimestamp: common.TruncateTimestampToHour(common.GetCurrentTimestamp()),
+		ConnectedTimestamp: connectedTimestamp,
 		Padding:            strings.Repeat(" ", pad_response),
 		Padding:            strings.Repeat(" ", pad_response),
 	}
 	}
 
 
@@ -414,26 +456,16 @@ func connectedAPIRequestHandler(
 	return responsePayload, nil
 	return responsePayload, nil
 }
 }
 
 
-var statusRequestParams = append(
-	[]requestParamSpec{
-		{"session_id", isHexDigits, 0},
-		{"connected", isBooleanFlag, requestParamLogFlagAsBool}},
-	baseRequestParams...)
+var statusRequestParams = baseSessionParams
 
 
-var remoteServerListStatParams = []requestParamSpec{
-	{"session_id", isHexDigits, 0},
-	{"propagation_channel_id", isHexDigits, 0},
-	{"sponsor_id", isHexDigits, 0},
-	{"client_version", isIntString, requestParamLogStringAsInt},
-	{"client_platform", isAnyString, 0},
-	{"client_build_rev", isAnyString, requestParamOptional},
-	{"device_region", isAnyString, requestParamOptional},
-	{"client_download_timestamp", isISO8601Date, 0},
-	{"tunneled", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
-	{"url", isAnyString, 0},
-	{"etag", isAnyString, 0},
-	{"authenticated", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
-}
+var remoteServerListStatParams = append(
+	[]requestParamSpec{
+		{"client_download_timestamp", isISO8601Date, 0},
+		{"tunneled", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+		{"url", isAnyString, 0},
+		{"etag", isAnyString, 0},
+		{"authenticated", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool}},
+	baseSessionParams...)
 
 
 // Backwards compatibility case: legacy clients do not include these fields in
 // Backwards compatibility case: legacy clients do not include these fields in
 // the remote_server_list_stats entries. Use the values from the outer status
 // the remote_server_list_stats entries. Use the values from the outer status
@@ -464,7 +496,7 @@ var failedTunnelStatParams = append(
 		{"bytes_up", isIntString, requestParamOptional},
 		{"bytes_up", isIntString, requestParamOptional},
 		{"bytes_down", isIntString, requestParamOptional},
 		{"bytes_down", isIntString, requestParamOptional},
 		{"tunnel_error", isAnyString, 0}},
 		{"tunnel_error", isAnyString, 0}},
-	baseRequestParams...)
+	baseSessionAndDialParams...)
 
 
 // statusAPIRequestHandler implements the "status" API request.
 // statusAPIRequestHandler implements the "status" API request.
 // Clients make periodic status requests which deliver client-side
 // Clients make periodic status requests which deliver client-side
@@ -552,6 +584,8 @@ func statusAPIRequestHandler(
 				}
 				}
 			}
 			}
 
 
+			remoteServerListStat["server_secret"] = params["server_secret"]
+
 			err := validateRequestParams(support.Config, remoteServerListStat, remoteServerListStatParams)
 			err := validateRequestParams(support.Config, remoteServerListStat, remoteServerListStatParams)
 			if err != nil {
 			if err != nil {
 				return nil, errors.Trace(err)
 				return nil, errors.Trace(err)
@@ -586,9 +620,8 @@ func statusAPIRequestHandler(
 		}
 		}
 		for _, failedTunnelStat := range failedTunnelStats {
 		for _, failedTunnelStat := range failedTunnelStats {
 
 
-			// failed_tunnel supplies a full set of common params, but the
-			// server secret must use the correct value from the outer
-			// statusRequestParams
+			// failed_tunnel supplies a full set of base params, but the server secret
+			// must use the correct value from the outer statusRequestParams.
 			failedTunnelStat["server_secret"] = params["server_secret"]
 			failedTunnelStat["server_secret"] = params["server_secret"]
 
 
 			err := validateRequestParams(support.Config, failedTunnelStat, failedTunnelStatParams)
 			err := validateRequestParams(support.Config, failedTunnelStat, failedTunnelStatParams)
@@ -684,10 +717,8 @@ var tacticsParams = []requestParamSpec{
 }
 }
 
 
 var tacticsRequestParams = append(
 var tacticsRequestParams = append(
-	append(
-		[]requestParamSpec{{"session_id", isHexDigits, 0}},
-		tacticsParams...),
-	baseRequestParams...)
+	append([]requestParamSpec(nil), tacticsParams...),
+	baseSessionAndDialParams...)
 
 
 func getTacticsAPIParameterValidator(config *Config) common.APIParameterValidator {
 func getTacticsAPIParameterValidator(config *Config) common.APIParameterValidator {
 	return func(params common.APIParameters) error {
 	return func(params common.APIParameters) error {
@@ -710,6 +741,9 @@ func getTacticsAPIParameterLogFieldFormatter() common.APIParameterLogFieldFormat
 	}
 	}
 }
 }
 
 
+// requestParamSpec defines a request parameter. Each param is expected to be
+// a string, unless requestParamArray is specified, in which case an array of
+// strings is expected.
 type requestParamSpec struct {
 type requestParamSpec struct {
 	name      string
 	name      string
 	validator func(*Config, string) bool
 	validator func(*Config, string) bool
@@ -729,12 +763,9 @@ const (
 	requestParamNotLoggedForUnfrontedMeekNonTransformedHeader = 1 << 9
 	requestParamNotLoggedForUnfrontedMeekNonTransformedHeader = 1 << 9
 )
 )
 
 
-// baseRequestParams is the list of required and optional
-// request parameters; derived from COMMON_INPUTS and
-// OPTIONAL_COMMON_INPUTS in psi_web.
-// Each param is expected to be a string, unless requestParamArray
-// is specified, in which case an array of string is expected.
-var baseRequestParams = []requestParamSpec{
+// baseParams are the basic request parameters that are expected for all API
+// requests and log events.
+var baseParams = []requestParamSpec{
 	{"server_secret", isServerSecret, requestParamNotLogged},
 	{"server_secret", isServerSecret, requestParamNotLogged},
 	{"client_session_id", isHexDigits, requestParamNotLogged},
 	{"client_session_id", isHexDigits, requestParamNotLogged},
 	{"propagation_channel_id", isHexDigits, 0},
 	{"propagation_channel_id", isHexDigits, 0},
@@ -742,49 +773,64 @@ var baseRequestParams = []requestParamSpec{
 	{"client_version", isIntString, requestParamLogStringAsInt},
 	{"client_version", isIntString, requestParamLogStringAsInt},
 	{"client_platform", isClientPlatform, 0},
 	{"client_platform", isClientPlatform, 0},
 	{"client_build_rev", isHexDigits, requestParamOptional},
 	{"client_build_rev", isHexDigits, requestParamOptional},
-	{"relay_protocol", isRelayProtocol, 0},
 	{"tunnel_whole_device", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
 	{"tunnel_whole_device", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
 	{"device_region", isAnyString, requestParamOptional},
 	{"device_region", isAnyString, requestParamOptional},
-	{"ssh_client_version", isAnyString, requestParamOptional},
-	{"upstream_proxy_type", isUpstreamProxyType, requestParamOptional},
-	{"upstream_proxy_custom_header_names", isAnyString, requestParamOptional | requestParamArray},
-	{"fronting_provider_id", isAnyString, requestParamOptional},
-	{"meek_dial_address", isDialAddress, requestParamOptional | requestParamLogOnlyForFrontedMeek},
-	{"meek_resolved_ip_address", isIPAddress, requestParamOptional | requestParamLogOnlyForFrontedMeek},
-	{"meek_sni_server_name", isDomain, requestParamOptional},
-	{"meek_host_header", isHostHeader, requestParamOptional | requestParamNotLoggedForUnfrontedMeekNonTransformedHeader},
-	{"meek_transformed_host_name", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
-	{"user_agent", isAnyString, requestParamOptional},
-	{"tls_profile", isAnyString, requestParamOptional},
-	{"tls_version", isAnyString, requestParamOptional},
-	{"server_entry_region", isRegionCode, requestParamOptional},
-	{"server_entry_source", isServerEntrySource, requestParamOptional},
-	{"server_entry_timestamp", isISO8601Date, requestParamOptional},
-	{tactics.APPLIED_TACTICS_TAG_PARAMETER_NAME, isAnyString, requestParamOptional},
-	{"dial_port_number", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"quic_version", isAnyString, requestParamOptional},
-	{"quic_dial_sni_address", isAnyString, requestParamOptional},
-	{"upstream_bytes_fragmented", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"upstream_min_bytes_written", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"upstream_max_bytes_written", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"upstream_min_delayed", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"upstream_max_delayed", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"padding", isAnyString, requestParamOptional | requestParamLogStringLengthAsInt},
-	{"pad_response", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"is_replay", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
-	{"egress_region", isRegionCode, requestParamOptional},
-	{"dial_duration", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"candidate_number", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"established_tunnels_count", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"upstream_ossh_padding", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"meek_cookie_size", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"meek_limit_request", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"meek_tls_padding", isIntString, requestParamOptional | requestParamLogStringAsInt},
-	{"network_latency_multiplier", isFloatString, requestParamOptional | requestParamLogStringAsFloat},
-	{"client_bpf", isAnyString, requestParamOptional},
-	{"network_type", isAnyString, requestParamOptional},
 }
 }
 
 
+// baseSessionParams adds the required session_id parameter. For all requests
+// except handshake, all existing clients are expected to send session_id.
+// Legacy clients may not send "session_id" in handshake.
+var baseSessionParams = append(
+	[]requestParamSpec{
+		{"session_id", isHexDigits, 0}},
+	baseParams...)
+
+// baseSessionAndDialParams adds the dial parameters, per-tunnel network
+// protocol and obfuscation metrics which are logged with server_tunnel,
+// failed_tunnel, and tactics.
+var baseSessionAndDialParams = append(
+	[]requestParamSpec{
+		{"relay_protocol", isRelayProtocol, 0},
+		{"ssh_client_version", isAnyString, requestParamOptional},
+		{"upstream_proxy_type", isUpstreamProxyType, requestParamOptional},
+		{"upstream_proxy_custom_header_names", isAnyString, requestParamOptional | requestParamArray},
+		{"fronting_provider_id", isAnyString, requestParamOptional},
+		{"meek_dial_address", isDialAddress, requestParamOptional | requestParamLogOnlyForFrontedMeek},
+		{"meek_resolved_ip_address", isIPAddress, requestParamOptional | requestParamLogOnlyForFrontedMeek},
+		{"meek_sni_server_name", isDomain, requestParamOptional},
+		{"meek_host_header", isHostHeader, requestParamOptional | requestParamNotLoggedForUnfrontedMeekNonTransformedHeader},
+		{"meek_transformed_host_name", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+		{"user_agent", isAnyString, requestParamOptional},
+		{"tls_profile", isAnyString, requestParamOptional},
+		{"tls_version", isAnyString, requestParamOptional},
+		{"server_entry_region", isRegionCode, requestParamOptional},
+		{"server_entry_source", isServerEntrySource, requestParamOptional},
+		{"server_entry_timestamp", isISO8601Date, requestParamOptional},
+		{tactics.APPLIED_TACTICS_TAG_PARAMETER_NAME, isAnyString, requestParamOptional},
+		{"dial_port_number", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"quic_version", isAnyString, requestParamOptional},
+		{"quic_dial_sni_address", isAnyString, requestParamOptional},
+		{"upstream_bytes_fragmented", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"upstream_min_bytes_written", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"upstream_max_bytes_written", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"upstream_min_delayed", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"upstream_max_delayed", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"padding", isAnyString, requestParamOptional | requestParamLogStringLengthAsInt},
+		{"pad_response", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"is_replay", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+		{"egress_region", isRegionCode, requestParamOptional},
+		{"dial_duration", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"candidate_number", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"established_tunnels_count", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"upstream_ossh_padding", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"meek_cookie_size", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"meek_limit_request", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"meek_tls_padding", isIntString, requestParamOptional | requestParamLogStringAsInt},
+		{"network_latency_multiplier", isFloatString, requestParamOptional | requestParamLogStringAsFloat},
+		{"client_bpf", isAnyString, requestParamOptional},
+		{"network_type", isAnyString, requestParamOptional}},
+	baseSessionParams...)
+
 func validateRequestParams(
 func validateRequestParams(
 	config *Config,
 	config *Config,
 	params common.APIParameters,
 	params common.APIParameters,
@@ -822,14 +868,14 @@ func validateRequestParams(
 	return nil
 	return nil
 }
 }
 
 
-// copyBaseRequestParams makes a copy of the params which
-// includes only the baseRequestParams.
-func copyBaseRequestParams(params common.APIParameters) common.APIParameters {
+// copyBaseSessionAndDialParams makes a copy of the params which includes only
+// the baseSessionAndDialParams.
+func copyBaseSessionAndDialParams(params common.APIParameters) common.APIParameters {
 
 
-	// Note: not a deep copy; assumes baseRequestParams values
-	// are all scalar types (int, string, etc.)
+	// Note: not a deep copy; assumes baseSessionAndDialParams values are all
+	// scalar types (int, string, etc.)
 	paramsCopy := make(common.APIParameters)
 	paramsCopy := make(common.APIParameters)
-	for _, baseParam := range baseRequestParams {
+	for _, baseParam := range baseSessionAndDialParams {
 		value := params[baseParam.name]
 		value := params[baseParam.name]
 		if value == nil {
 		if value == nil {
 			continue
 			continue
@@ -989,27 +1035,6 @@ func getRequestLogFields(
 				// the field in this case.
 				// the field in this case.
 
 
 			default:
 			default:
-
-				// Add a distinct app ID field when the value is present in
-				// client_platform.
-				if expectedParam.name == "client_platform" {
-					index := -1
-					clientPlatform := strValue
-					if strings.HasPrefix(clientPlatform, "iOS") {
-						index = 3
-					} else if strings.HasPrefix(clientPlatform, "Android") {
-						index = 2
-						clientPlatform = strings.TrimSuffix(clientPlatform, "_playstore")
-						clientPlatform = strings.TrimSuffix(clientPlatform, "_rooted")
-					}
-					if index > 0 {
-						components := strings.Split(clientPlatform, "_")
-						if index < len(components) {
-							logFields["client_app_id"] = components[index]
-						}
-					}
-				}
-
 				if expectedParam.flags&requestParamLogStringAsInt != 0 {
 				if expectedParam.flags&requestParamLogStringAsInt != 0 {
 					intValue, _ := strconv.Atoi(strValue)
 					intValue, _ := strconv.Atoi(strValue)
 					logFields[expectedParam.name] = intValue
 					logFields[expectedParam.name] = intValue
@@ -1252,6 +1277,11 @@ func isHexDigits(_ *Config, value string) bool {
 	})
 	})
 }
 }
 
 
+func isBase64String(_ *Config, value string) bool {
+	_, err := base64.StdEncoding.DecodeString(value)
+	return err == nil
+}
+
 func isDigits(_ *Config, value string) bool {
 func isDigits(_ *Config, value string) bool {
 	return -1 == strings.IndexFunc(value, func(c rune) bool {
 	return -1 == strings.IndexFunc(value, func(c rune) bool {
 		return c < '0' || c > '9'
 		return c < '0' || c > '9'
@@ -1369,5 +1399,5 @@ func isISO8601Date(_ *Config, value string) bool {
 }
 }
 
 
 func isLastConnected(_ *Config, value string) bool {
 func isLastConnected(_ *Config, value string) bool {
-	return value == "None" || value == "Unknown" || isISO8601Date(nil, value)
+	return value == "None" || isISO8601Date(nil, value)
 }
 }

+ 70 - 24
psiphon/server/server_test.go

@@ -544,11 +544,14 @@ var (
 	testSSHClientVersions = []string{"SSH-2.0-A", "SSH-2.0-B", "SSH-2.0-C"}
 	testSSHClientVersions = []string{"SSH-2.0-A", "SSH-2.0-B", "SSH-2.0-C"}
 	testUserAgents        = []string{"ua1", "ua2", "ua3"}
 	testUserAgents        = []string{"ua1", "ua2", "ua3"}
 	testNetworkType       = "WIFI"
 	testNetworkType       = "WIFI"
-	testAppID             = "com.test.app"
 )
 )
 
 
+var serverRuns = 0
+
 func runServer(t *testing.T, runConfig *runServerConfig) {
 func runServer(t *testing.T, runConfig *runServerConfig) {
 
 
+	serverRuns += 1
+
 	// configure authorized access
 	// configure authorized access
 
 
 	accessType := "test-access-type"
 	accessType := "test-access-type"
@@ -693,8 +696,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 
 	serverConfigJSON, _ = json.Marshal(serverConfig)
 	serverConfigJSON, _ = json.Marshal(serverConfig)
 
 
-	serverConnectedLog := make(chan map[string]interface{}, 1)
 	serverTunnelLog := make(chan map[string]interface{}, 1)
 	serverTunnelLog := make(chan map[string]interface{}, 1)
+	uniqueUserLog := make(chan map[string]interface{}, 1)
 
 
 	setLogCallback(func(log []byte) {
 	setLogCallback(func(log []byte) {
 
 
@@ -710,9 +713,9 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		}
 		}
 
 
 		switch logFields["event_name"].(string) {
 		switch logFields["event_name"].(string) {
-		case "connected":
+		case "unique_user":
 			select {
 			select {
-			case serverConnectedLog <- logFields:
+			case uniqueUserLog <- logFields:
 			default:
 			default:
 			}
 			}
 		case "server_tunnel":
 		case "server_tunnel":
@@ -811,9 +814,9 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	localSOCKSProxyPort := 1081
 	localSOCKSProxyPort := 1081
 	localHTTPProxyPort := 8081
 	localHTTPProxyPort := 8081
 
 
-	// Use a distinct suffix for network ID for each test run to
-	// ensure tactics from different runs don't apply; this is
-	// a workaround for the singleton datastore.
+	// Use a distinct suffix for network ID for each test run to ensure tactics
+	// from different runs don't apply; this is a workaround for the singleton
+	// datastore.
 	jsonNetworkID := fmt.Sprintf(`,"NetworkID" : "WIFI-%s"`, time.Now().String())
 	jsonNetworkID := fmt.Sprintf(`,"NetworkID" : "WIFI-%s"`, time.Now().String())
 
 
 	jsonLimitTLSProfiles := ""
 	jsonLimitTLSProfiles := ""
@@ -823,7 +826,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 
 	clientConfigJSON := fmt.Sprintf(`
 	clientConfigJSON := fmt.Sprintf(`
     {
     {
-        "ClientPlatform" : "Android_10_%s",
+        "ClientPlatform" : "Android_10_com.test.app",
         "ClientVersion" : "0",
         "ClientVersion" : "0",
         "SponsorId" : "0",
         "SponsorId" : "0",
         "PropagationChannelId" : "0",
         "PropagationChannelId" : "0",
@@ -835,7 +838,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
         "LimitTunnelProtocols" : ["%s"]
         "LimitTunnelProtocols" : ["%s"]
         %s
         %s
         %s
         %s
-    }`, testAppID, numTunnels, runConfig.tunnelProtocol, jsonLimitTLSProfiles, jsonNetworkID)
+    }`, numTunnels, runConfig.tunnelProtocol, jsonLimitTLSProfiles, jsonNetworkID)
 
 
 	clientConfig, err := psiphon.LoadConfig([]byte(clientConfigJSON))
 	clientConfig, err := psiphon.LoadConfig([]byte(clientConfigJSON))
 	if err != nil {
 	if err != nil {
@@ -920,6 +923,24 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	}
 	}
 	defer psiphon.CloseDataStore()
 	defer psiphon.CloseDataStore()
 
 
+	// Test unique user counting cases.
+	var expectUniqueUser bool
+	switch serverRuns % 3 {
+	case 0:
+		// Mock no last_connected.
+		psiphon.SetKeyValue("lastConnected", "")
+		expectUniqueUser = true
+	case 1:
+		// Mock previous day last_connected.
+		psiphon.SetKeyValue(
+			"lastConnected",
+			time.Now().UTC().AddDate(0, 0, -1).Truncate(1*time.Hour).Format(time.RFC3339))
+		expectUniqueUser = true
+	case 2:
+		// Leave previous last_connected.
+		expectUniqueUser = false
+	}
+
 	// Clear SLOKs from previous test runs.
 	// Clear SLOKs from previous test runs.
 	psiphon.DeleteSLOKs()
 	psiphon.DeleteSLOKs()
 
 
@@ -1131,22 +1152,13 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	// without this delay.
 	// without this delay.
 	time.Sleep(100 * time.Millisecond)
 	time.Sleep(100 * time.Millisecond)
 
 
-	select {
-	case logFields := <-serverConnectedLog:
-		err := checkExpectedLogFields(runConfig, false, false, logFields)
-		if err != nil {
-			t.Fatalf("invalid server connected log fields: %s", err)
-		}
-	default:
-		t.Fatalf("missing server connected log")
-	}
-
 	expectClientBPFField := psiphon.ClientBPFEnabled() && doClientTactics
 	expectClientBPFField := psiphon.ClientBPFEnabled() && doClientTactics
 	expectServerBPFField := ServerBPFEnabled() && doServerTactics
 	expectServerBPFField := ServerBPFEnabled() && doServerTactics
 
 
 	select {
 	select {
 	case logFields := <-serverTunnelLog:
 	case logFields := <-serverTunnelLog:
-		err := checkExpectedLogFields(runConfig, expectClientBPFField, expectServerBPFField, logFields)
+		err := checkExpectedServerTunnelLogFields(
+			runConfig, expectClientBPFField, expectServerBPFField, logFields)
 		if err != nil {
 		if err != nil {
 			t.Fatalf("invalid server tunnel log fields: %s", err)
 			t.Fatalf("invalid server tunnel log fields: %s", err)
 		}
 		}
@@ -1154,11 +1166,29 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		t.Fatalf("missing server tunnel log")
 		t.Fatalf("missing server tunnel log")
 	}
 	}
 
 
+	if expectUniqueUser {
+		select {
+		case logFields := <-uniqueUserLog:
+			err := checkExpectedUniqueUserLogFields(runConfig, logFields)
+			if err != nil {
+				t.Fatalf("invalid unique user log fields: %s", err)
+			}
+		default:
+			t.Fatalf("missing unique user log")
+		}
+	} else {
+		select {
+		case <-uniqueUserLog:
+			t.Fatalf("unexpected unique user log")
+		default:
+		}
+	}
+
 	// Check that datastore had retained/pruned server entries as expected.
 	// Check that datastore had retained/pruned server entries as expected.
 	checkPruneServerEntriesTest(t, runConfig, testDataDirName, pruneServerEntryTestCases)
 	checkPruneServerEntriesTest(t, runConfig, testDataDirName, pruneServerEntryTestCases)
 }
 }
 
 
-func checkExpectedLogFields(
+func checkExpectedServerTunnelLogFields(
 	runConfig *runServerConfig,
 	runConfig *runServerConfig,
 	expectClientBPFField bool,
 	expectClientBPFField bool,
 	expectServerBPFField bool,
 	expectServerBPFField bool,
@@ -1191,7 +1221,6 @@ func checkExpectedLogFields(
 		"established_tunnels_count",
 		"established_tunnels_count",
 		"network_latency_multiplier",
 		"network_latency_multiplier",
 		"network_type",
 		"network_type",
-		"client_app_id",
 	} {
 	} {
 		if fields[name] == nil || fmt.Sprintf("%s", fields[name]) == "" {
 		if fields[name] == nil || fmt.Sprintf("%s", fields[name]) == "" {
 			return fmt.Errorf("missing expected field '%s'", name)
 			return fmt.Errorf("missing expected field '%s'", name)
@@ -1343,8 +1372,25 @@ func checkExpectedLogFields(
 		return fmt.Errorf("unexpected network_type '%s'", fields["network_type"])
 		return fmt.Errorf("unexpected network_type '%s'", fields["network_type"])
 	}
 	}
 
 
-	if fields["client_app_id"].(string) != testAppID {
-		return fmt.Errorf("unexpected client_app_id '%s'", fields["client_app_id"])
+	return nil
+}
+
+func checkExpectedUniqueUserLogFields(
+	runConfig *runServerConfig,
+	fields map[string]interface{}) error {
+
+	for _, name := range []string{
+		"session_id",
+		"last_connected",
+		"propagation_channel_id",
+		"sponsor_id",
+		"client_platform",
+		"tunnel_whole_device",
+		"device_region",
+	} {
+		if fields[name] == nil || fmt.Sprintf("%s", fields[name]) == "" {
+			return fmt.Errorf("missing expected field '%s'", name)
+		}
 	}
 	}
 
 
 	return nil
 	return nil

+ 1 - 1
psiphon/server/trafficRules.go

@@ -370,7 +370,7 @@ func (set *TrafficRulesSet) Validate() error {
 
 
 		for paramName := range filteredRule.Filter.HandshakeParameters {
 		for paramName := range filteredRule.Filter.HandshakeParameters {
 			validParamName := false
 			validParamName := false
-			for _, paramSpec := range baseRequestParams {
+			for _, paramSpec := range handshakeRequestParams {
 				if paramSpec.name == paramName {
 				if paramSpec.name == paramName {
 					validParamName = true
 					validParamName = true
 					break
 					break

+ 1 - 1
psiphon/server/tunnelServer.go

@@ -2226,7 +2226,7 @@ var serverTunnelStatParams = append(
 	[]requestParamSpec{
 	[]requestParamSpec{
 		{"last_connected", isLastConnected, requestParamOptional},
 		{"last_connected", isLastConnected, requestParamOptional},
 		{"establishment_duration", isIntString, requestParamOptional}},
 		{"establishment_duration", isIntString, requestParamOptional}},
-	baseRequestParams...)
+	baseSessionAndDialParams...)
 
 
 func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 
 

+ 1 - 1
psiphon/server/webServer.go

@@ -178,7 +178,7 @@ func convertHTTPRequestToAPIRequest(
 
 
 			// TODO: faster lookup?
 			// TODO: faster lookup?
 			isArray := false
 			isArray := false
-			for _, paramSpec := range baseRequestParams {
+			for _, paramSpec := range baseSessionAndDialParams {
 				if paramSpec.name == name {
 				if paramSpec.name == name {
 					isArray = (paramSpec.flags&requestParamArray != 0)
 					isArray = (paramSpec.flags&requestParamArray != 0)
 					break
 					break

+ 166 - 129
psiphon/serverApi.go

@@ -37,6 +37,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/buildinfo"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/buildinfo"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
@@ -114,7 +115,7 @@ func NewServerContext(tunnel *Tunnel) (*ServerContext, error) {
 func (serverContext *ServerContext) doHandshakeRequest(
 func (serverContext *ServerContext) doHandshakeRequest(
 	ignoreStatsRegexps bool) error {
 	ignoreStatsRegexps bool) error {
 
 
-	params := serverContext.getBaseAPIParameters()
+	params := serverContext.getBaseAPIParameters(baseParametersAll)
 
 
 	// The server will return a signed copy of its own server entry when the
 	// The server will return a signed copy of its own server entry when the
 	// client specifies this 'missing_server_entry_signature' parameter.
 	// client specifies this 'missing_server_entry_signature' parameter.
@@ -332,14 +333,45 @@ func (serverContext *ServerContext) doHandshakeRequest(
 }
 }
 
 
 // DoConnectedRequest performs the "connected" API request. This request is
 // DoConnectedRequest performs the "connected" API request. This request is
-// used for statistics. The server returns a last_connected token for
-// the client to store and send next time it connects. This token is
-// a timestamp (using the server clock, and should be rounded to the
-// nearest hour) which is used to determine when a connection represents
-// a unique user for a time period.
+// used for statistics, including unique user counting; reporting the full
+// tunnel establishment duration including the handshake request; and updated
+// fragmentor metrics.
+//
+// Users are not assigned identifiers. Instead, daily unique users are
+// calculated by having clients submit their last connected timestamp
+// (truncated to an hour, as a privacy measure). As client clocks are
+// unreliable, the server returns new last_connected values for the client to
+// store and send next time it connects.
 func (serverContext *ServerContext) DoConnectedRequest() error {
 func (serverContext *ServerContext) DoConnectedRequest() error {
 
 
-	params := serverContext.getBaseAPIParameters()
+	// Limitation: as currently implemented, the last_connected exchange isn't a
+	// distributed, atomic operation. When clients send the connected request,
+	// the server may receive the request, count a unique user based on the
+	// client's last_connected, and then the tunnel fails before the client
+	// receives the response, so the client will not update its last_connected
+	// value and submit the same one again, resulting in an inflated unique user
+	// count.
+	//
+	// The SetInFlightConnectedRequest mechanism mitigates one class of connected
+	// request interruption, a commanded shutdown in the middle of a connected
+	// request, by allowing some time for the request to complete before
+	// terminating the tunnel.
+	//
+	// TODO: consider extending the connected request protocol with additional
+	// "acknowledgment" messages so that the server does not commit its unique
+	// user count until after the client has acknowledged receipt and durable
+	// storage of the new last_connected value.
+
+	requestDone := make(chan struct{})
+	defer close(requestDone)
+
+	if !serverContext.tunnel.SetInFlightConnectedRequest(requestDone) {
+		return errors.TraceNew("tunnel is closing")
+	}
+	defer serverContext.tunnel.SetInFlightConnectedRequest(nil)
+
+	params := serverContext.getBaseAPIParameters(
+		baseParametersOnlyUpstreamFragmentorDialParameters)
 
 
 	lastConnected, err := getLastConnected()
 	lastConnected, err := getLastConnected()
 	if err != nil {
 	if err != nil {
@@ -411,7 +443,7 @@ func (serverContext *ServerContext) StatsRegexps() *transferstats.Regexps {
 // DoStatusRequest makes a "status" API request to the server, sending session stats.
 // DoStatusRequest makes a "status" API request to the server, sending session stats.
 func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error {
 func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error {
 
 
-	params := serverContext.getStatusParams(true)
+	params := serverContext.getBaseAPIParameters(baseParametersNoDialParameters)
 
 
 	// Note: ensure putBackStatusRequestPayload is called, to replace
 	// Note: ensure putBackStatusRequestPayload is called, to replace
 	// payload for future attempt, in all failure cases.
 	// payload for future attempt, in all failure cases.
@@ -478,28 +510,6 @@ func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error {
 	return nil
 	return nil
 }
 }
 
 
-func (serverContext *ServerContext) getStatusParams(
-	isTunneled bool) common.APIParameters {
-
-	params := serverContext.getBaseAPIParameters()
-
-	// Legacy clients set "connected" to "0" when disconnecting, and this value
-	// is used to calculate session duration estimates. This is now superseded
-	// by explicit tunnel stats duration reporting.
-	// The legacy method of reconstructing session durations is not compatible
-	// with this client's connected request retries and asynchronous final
-	// status request attempts. So we simply set this "connected" flag to reflect
-	// whether the request is sent tunneled or not.
-
-	connected := "1"
-	if !isTunneled {
-		connected = "0"
-	}
-	params["connected"] = connected
-
-	return params
-}
-
 // statusRequestPayloadInfo is a temporary structure for data used to
 // statusRequestPayloadInfo is a temporary structure for data used to
 // either "clear" or "put back" status request payload data depending
 // either "clear" or "put back" status request payload data depending
 // on whether or not the request succeeded.
 // on whether or not the request succeeded.
@@ -680,7 +690,7 @@ func RecordFailedTunnelStat(
 		return errors.Trace(err)
 		return errors.Trace(err)
 	}
 	}
 
 
-	params := getBaseAPIParameters(config, dialParams)
+	params := getBaseAPIParameters(baseParametersAll, config, dialParams)
 
 
 	delete(params, "server_secret")
 	delete(params, "server_secret")
 	params["server_entry_tag"] = dialParams.ServerEntry.Tag
 	params["server_entry_tag"] = dialParams.ServerEntry.Tag
@@ -782,9 +792,19 @@ func (serverContext *ServerContext) makeSSHAPIRequestPayload(
 	return jsonPayload, nil
 	return jsonPayload, nil
 }
 }
 
 
-func (serverContext *ServerContext) getBaseAPIParameters() common.APIParameters {
+type baseParametersFilter int
+
+const (
+	baseParametersAll baseParametersFilter = iota
+	baseParametersOnlyUpstreamFragmentorDialParameters
+	baseParametersNoDialParameters
+)
+
+func (serverContext *ServerContext) getBaseAPIParameters(
+	filter baseParametersFilter) common.APIParameters {
 
 
 	params := getBaseAPIParameters(
 	params := getBaseAPIParameters(
+		filter,
 		serverContext.tunnel.config,
 		serverContext.tunnel.config,
 		serverContext.tunnel.dialParams)
 		serverContext.tunnel.dialParams)
 
 
@@ -815,6 +835,7 @@ func (serverContext *ServerContext) getBaseAPIParameters() common.APIParameters
 // included with each Psiphon API request. These common parameters are used
 // included with each Psiphon API request. These common parameters are used
 // for metrics.
 // for metrics.
 func getBaseAPIParameters(
 func getBaseAPIParameters(
+	filter baseParametersFilter,
 	config *Config,
 	config *Config,
 	dialParams *DialParameters) common.APIParameters {
 	dialParams *DialParameters) common.APIParameters {
 
 
@@ -826,140 +847,156 @@ func getBaseAPIParameters(
 	params["propagation_channel_id"] = config.PropagationChannelId
 	params["propagation_channel_id"] = config.PropagationChannelId
 	params["sponsor_id"] = config.GetSponsorID()
 	params["sponsor_id"] = config.GetSponsorID()
 	params["client_version"] = config.ClientVersion
 	params["client_version"] = config.ClientVersion
-	params["relay_protocol"] = dialParams.TunnelProtocol
 	params["client_platform"] = config.ClientPlatform
 	params["client_platform"] = config.ClientPlatform
 	params["client_build_rev"] = buildinfo.GetBuildInfo().BuildRev
 	params["client_build_rev"] = buildinfo.GetBuildInfo().BuildRev
 	params["tunnel_whole_device"] = strconv.Itoa(config.TunnelWholeDevice)
 	params["tunnel_whole_device"] = strconv.Itoa(config.TunnelWholeDevice)
-	params["network_type"] = dialParams.GetNetworkType()
 
 
-	// The following parameters may be blank and must
-	// not be sent to the server if blank.
+	// Blank parameters must be omitted.
 
 
 	if config.DeviceRegion != "" {
 	if config.DeviceRegion != "" {
 		params["device_region"] = config.DeviceRegion
 		params["device_region"] = config.DeviceRegion
 	}
 	}
 
 
-	if dialParams.BPFProgramName != "" {
-		params["client_bpf"] = dialParams.BPFProgramName
-	}
+	if filter == baseParametersAll {
 
 
-	if dialParams.SelectedSSHClientVersion {
-		params["ssh_client_version"] = dialParams.SSHClientVersion
-	}
+		params["relay_protocol"] = dialParams.TunnelProtocol
+		params["network_type"] = dialParams.GetNetworkType()
 
 
-	if dialParams.UpstreamProxyType != "" {
-		params["upstream_proxy_type"] = dialParams.UpstreamProxyType
-	}
+		if dialParams.BPFProgramName != "" {
+			params["client_bpf"] = dialParams.BPFProgramName
+		}
 
 
-	if dialParams.UpstreamProxyCustomHeaderNames != nil {
-		params["upstream_proxy_custom_header_names"] = dialParams.UpstreamProxyCustomHeaderNames
-	}
+		if dialParams.SelectedSSHClientVersion {
+			params["ssh_client_version"] = dialParams.SSHClientVersion
+		}
 
 
-	if dialParams.FrontingProviderID != "" {
-		params["fronting_provider_id"] = dialParams.FrontingProviderID
-	}
+		if dialParams.UpstreamProxyType != "" {
+			params["upstream_proxy_type"] = dialParams.UpstreamProxyType
+		}
 
 
-	if dialParams.MeekDialAddress != "" {
-		params["meek_dial_address"] = dialParams.MeekDialAddress
-	}
+		if dialParams.UpstreamProxyCustomHeaderNames != nil {
+			params["upstream_proxy_custom_header_names"] = dialParams.UpstreamProxyCustomHeaderNames
+		}
 
 
-	meekResolvedIPAddress := dialParams.MeekResolvedIPAddress.Load().(string)
-	if meekResolvedIPAddress != "" {
-		params["meek_resolved_ip_address"] = meekResolvedIPAddress
-	}
+		if dialParams.FrontingProviderID != "" {
+			params["fronting_provider_id"] = dialParams.FrontingProviderID
+		}
 
 
-	if dialParams.MeekSNIServerName != "" {
-		params["meek_sni_server_name"] = dialParams.MeekSNIServerName
-	}
+		if dialParams.MeekDialAddress != "" {
+			params["meek_dial_address"] = dialParams.MeekDialAddress
+		}
 
 
-	if dialParams.MeekHostHeader != "" {
-		params["meek_host_header"] = dialParams.MeekHostHeader
-	}
+		meekResolvedIPAddress := dialParams.MeekResolvedIPAddress.Load().(string)
+		if meekResolvedIPAddress != "" {
+			params["meek_resolved_ip_address"] = meekResolvedIPAddress
+		}
 
 
-	// MeekTransformedHostName is meaningful when meek is used, which is when MeekDialAddress != ""
-	if dialParams.MeekDialAddress != "" {
-		transformedHostName := "0"
-		if dialParams.MeekTransformedHostName {
-			transformedHostName = "1"
+		if dialParams.MeekSNIServerName != "" {
+			params["meek_sni_server_name"] = dialParams.MeekSNIServerName
 		}
 		}
-		params["meek_transformed_host_name"] = transformedHostName
-	}
 
 
-	if dialParams.SelectedUserAgent {
-		params["user_agent"] = dialParams.UserAgent
-	}
+		if dialParams.MeekHostHeader != "" {
+			params["meek_host_header"] = dialParams.MeekHostHeader
+		}
 
 
-	if dialParams.SelectedTLSProfile {
-		params["tls_profile"] = dialParams.TLSProfile
-		params["tls_version"] = dialParams.GetTLSVersionForMetrics()
-	}
+		// MeekTransformedHostName is meaningful when meek is used, which is when
+		// MeekDialAddress != ""
+		if dialParams.MeekDialAddress != "" {
+			transformedHostName := "0"
+			if dialParams.MeekTransformedHostName {
+				transformedHostName = "1"
+			}
+			params["meek_transformed_host_name"] = transformedHostName
+		}
 
 
-	if dialParams.ServerEntry.Region != "" {
-		params["server_entry_region"] = dialParams.ServerEntry.Region
-	}
+		if dialParams.SelectedUserAgent {
+			params["user_agent"] = dialParams.UserAgent
+		}
 
 
-	if dialParams.ServerEntry.LocalSource != "" {
-		params["server_entry_source"] = dialParams.ServerEntry.LocalSource
-	}
+		if dialParams.SelectedTLSProfile {
+			params["tls_profile"] = dialParams.TLSProfile
+			params["tls_version"] = dialParams.GetTLSVersionForMetrics()
+		}
 
 
-	// As with last_connected, this timestamp stat, which may be
-	// a precise handshake request server timestamp, is truncated
-	// to hour granularity to avoid introducing a reconstructable
-	// cross-session user trace into server logs.
-	localServerEntryTimestamp := common.TruncateTimestampToHour(
-		dialParams.ServerEntry.LocalTimestamp)
-	if localServerEntryTimestamp != "" {
-		params["server_entry_timestamp"] = localServerEntryTimestamp
-	}
+		if dialParams.ServerEntry.Region != "" {
+			params["server_entry_region"] = dialParams.ServerEntry.Region
+		}
 
 
-	params[tactics.APPLIED_TACTICS_TAG_PARAMETER_NAME] =
-		config.GetClientParameters().Get().Tag()
+		if dialParams.ServerEntry.LocalSource != "" {
+			params["server_entry_source"] = dialParams.ServerEntry.LocalSource
+		}
 
 
-	if dialParams.DialPortNumber != "" {
-		params["dial_port_number"] = dialParams.DialPortNumber
-	}
+		// As with last_connected, this timestamp stat, which may be a precise
+		// handshake request server timestamp, is truncated to hour granularity to
+		// avoid introducing a reconstructable cross-session user trace into server
+		// logs.
+		localServerEntryTimestamp := common.TruncateTimestampToHour(
+			dialParams.ServerEntry.LocalTimestamp)
+		if localServerEntryTimestamp != "" {
+			params["server_entry_timestamp"] = localServerEntryTimestamp
+		}
 
 
-	if dialParams.QUICVersion != "" {
-		params["quic_version"] = dialParams.QUICVersion
-	}
+		params[tactics.APPLIED_TACTICS_TAG_PARAMETER_NAME] =
+			config.GetClientParameters().Get().Tag()
 
 
-	if dialParams.QUICDialSNIAddress != "" {
-		params["quic_dial_sni_address"] = dialParams.QUICDialSNIAddress
-	}
+		if dialParams.DialPortNumber != "" {
+			params["dial_port_number"] = dialParams.DialPortNumber
+		}
 
 
-	isReplay := "0"
-	if dialParams.IsReplay {
-		isReplay = "1"
-	}
-	params["is_replay"] = isReplay
+		if dialParams.QUICVersion != "" {
+			params["quic_version"] = dialParams.QUICVersion
+		}
 
 
-	if config.EgressRegion != "" {
-		params["egress_region"] = config.EgressRegion
-	}
+		if dialParams.QUICDialSNIAddress != "" {
+			params["quic_dial_sni_address"] = dialParams.QUICDialSNIAddress
+		}
 
 
-	// dialParams.DialDuration is nanoseconds; divide to get to milliseconds
-	params["dial_duration"] = fmt.Sprintf("%d", dialParams.DialDuration/1000000)
+		isReplay := "0"
+		if dialParams.IsReplay {
+			isReplay = "1"
+		}
+		params["is_replay"] = isReplay
 
 
-	params["candidate_number"] = strconv.Itoa(dialParams.CandidateNumber)
+		if config.EgressRegion != "" {
+			params["egress_region"] = config.EgressRegion
+		}
 
 
-	params["established_tunnels_count"] = strconv.Itoa(dialParams.EstablishedTunnelsCount)
+		// dialParams.DialDuration is nanoseconds; divide to get to milliseconds
+		params["dial_duration"] = fmt.Sprintf("%d", dialParams.DialDuration/1000000)
 
 
-	if dialParams.NetworkLatencyMultiplier != 0.0 {
-		params["network_latency_multiplier"] =
-			fmt.Sprintf("%f", dialParams.NetworkLatencyMultiplier)
-	}
+		params["candidate_number"] = strconv.Itoa(dialParams.CandidateNumber)
+
+		params["established_tunnels_count"] = strconv.Itoa(dialParams.EstablishedTunnelsCount)
 
 
-	if dialParams.DialConnMetrics != nil {
-		metrics := dialParams.DialConnMetrics.GetMetrics()
-		for name, value := range metrics {
-			params[name] = fmt.Sprintf("%v", value)
+		if dialParams.NetworkLatencyMultiplier != 0.0 {
+			params["network_latency_multiplier"] =
+				fmt.Sprintf("%f", dialParams.NetworkLatencyMultiplier)
 		}
 		}
-	}
 
 
-	if dialParams.ObfuscatedSSHConnMetrics != nil {
-		metrics := dialParams.ObfuscatedSSHConnMetrics.GetMetrics()
-		for name, value := range metrics {
-			params[name] = fmt.Sprintf("%v", value)
+		if dialParams.DialConnMetrics != nil {
+			metrics := dialParams.DialConnMetrics.GetMetrics()
+			for name, value := range metrics {
+				params[name] = fmt.Sprintf("%v", value)
+			}
+		}
+
+		if dialParams.ObfuscatedSSHConnMetrics != nil {
+			metrics := dialParams.ObfuscatedSSHConnMetrics.GetMetrics()
+			for name, value := range metrics {
+				params[name] = fmt.Sprintf("%v", value)
+			}
+		}
+
+	} else if filter == baseParametersOnlyUpstreamFragmentorDialParameters {
+
+		if dialParams.DialConnMetrics != nil {
+			names := fragmentor.GetUpstreamMetricsNames()
+			metrics := dialParams.DialConnMetrics.GetMetrics()
+			for name, value := range metrics {
+				if common.Contains(names, name) {
+					params[name] = fmt.Sprintf("%v", value)
+				}
+			}
 		}
 		}
 	}
 	}
 
 

+ 91 - 25
psiphon/tunnel.go

@@ -82,26 +82,27 @@ type TunnelOwner interface {
 // tunnel includes a network connection to the specified server
 // tunnel includes a network connection to the specified server
 // and an SSH session built on top of that transport.
 // and an SSH session built on top of that transport.
 type Tunnel struct {
 type Tunnel struct {
-	mutex                      *sync.Mutex
-	config                     *Config
-	isActivated                bool
-	isDiscarded                bool
-	isClosed                   bool
-	dialParams                 *DialParameters
-	livenessTestMetrics        *livenessTestMetrics
-	serverContext              *ServerContext
-	conn                       *common.ActivityMonitoredConn
-	sshClient                  *ssh.Client
-	sshServerRequests          <-chan *ssh.Request
-	operateWaitGroup           *sync.WaitGroup
-	operateCtx                 context.Context
-	stopOperate                context.CancelFunc
-	signalPortForwardFailure   chan struct{}
-	totalPortForwardFailures   int
-	adjustedEstablishStartTime time.Time
-	establishDuration          time.Duration
-	establishedTime            time.Time
-	handledSSHKeepAliveFailure int32
+	mutex                          *sync.Mutex
+	config                         *Config
+	isActivated                    bool
+	isDiscarded                    bool
+	isClosed                       bool
+	dialParams                     *DialParameters
+	livenessTestMetrics            *livenessTestMetrics
+	serverContext                  *ServerContext
+	conn                           *common.ActivityMonitoredConn
+	sshClient                      *ssh.Client
+	sshServerRequests              <-chan *ssh.Request
+	operateWaitGroup               *sync.WaitGroup
+	operateCtx                     context.Context
+	stopOperate                    context.CancelFunc
+	signalPortForwardFailure       chan struct{}
+	totalPortForwardFailures       int
+	adjustedEstablishStartTime     time.Time
+	establishDuration              time.Duration
+	establishedTime                time.Time
+	handledSSHKeepAliveFailure     int32
+	inFlightConnectedRequestSignal chan struct{}
 }
 }
 
 
 // getCustomClientParameters helpers wrap the verbose function call chain
 // getCustomClientParameters helpers wrap the verbose function call chain
@@ -335,6 +336,55 @@ func (tunnel *Tunnel) Close(isDiscarded bool) {
 	}
 	}
 }
 }
 
 
+// SetInFlightConnectedRequest checks if a connected request can begin and
+// sets the channel used to signal that the request is complete.
+//
+// The caller must not initiate a connected request when
+// SetInFlightConnectedRequest returns false. When SetInFlightConnectedRequest
+// returns true, the caller must call SetInFlightConnectedRequest(nil) when
+// the connected request completes.
+func (tunnel *Tunnel) SetInFlightConnectedRequest(requestSignal chan struct{}) bool {
+	tunnel.mutex.Lock()
+	defer tunnel.mutex.Unlock()
+
+	// If already closing, don't start a connected request: the
+	// TunnelOperateShutdownTimeout period may be nearly expired.
+	if tunnel.isClosed {
+		return false
+	}
+
+	if requestSignal == nil {
+		// Not already in-flight (not expected)
+		if tunnel.inFlightConnectedRequestSignal == nil {
+			return false
+		}
+	} else {
+		// Already in-flight (not expected)
+		if tunnel.inFlightConnectedRequestSignal != nil {
+			return false
+		}
+	}
+
+	tunnel.inFlightConnectedRequestSignal = requestSignal
+
+	return true
+}
+
+// AwaitInFlightConnectedRequest waits for the signal that any in-flight
+// connected request is complete.
+//
+// AwaitInFlightConnectedRequest may block until the connected request is
+// aborted by terminating the tunnel.
+func (tunnel *Tunnel) AwaitInFlightConnectedRequest() {
+	tunnel.mutex.Lock()
+	requestSignal := tunnel.inFlightConnectedRequestSignal
+	tunnel.mutex.Unlock()
+
+	if requestSignal != nil {
+		<-requestSignal
+	}
+}
+
 // IsActivated returns the tunnel's activated flag.
 // IsActivated returns the tunnel's activated flag.
 func (tunnel *Tunnel) IsActivated() bool {
 func (tunnel *Tunnel) IsActivated() bool {
 	tunnel.mutex.Lock()
 	tunnel.mutex.Lock()
@@ -1273,18 +1323,34 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 		tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
 		tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
 
 
 	if err == nil {
 	if err == nil {
+
 		NoticeInfo("shutdown operate tunnel")
 		NoticeInfo("shutdown operate tunnel")
 
 
-		// Send a final status request in order to report any outstanding
-		// domain bytes transferred stats as well as to report session stats
-		// as soon as possible.
-		// This request will be interrupted when the tunnel is closed after
-		// an operate shutdown timeout.
+		// This commanded shutdown case is initiated by Tunnel.Close, which will
+		// wait up to parameters.TunnelOperateShutdownTimeout to allow the following
+		// requests to complete.
+
+		// Send a final status request in order to report any outstanding persistent
+		// stats and domain bytes transferred as soon as possible.
+
 		sendStats(tunnel)
 		sendStats(tunnel)
 
 
+		// The controller connectedReporter may have initiated a connected request
+		// concurrent to this commanded shutdown. SetInFlightConnectedRequest
+		// ensures that a connected request doesn't start after the commanded
+		// shutdown. AwaitInFlightConnectedRequest blocks until any in flight
+		// request completes or is aborted after TunnelOperateShutdownTimeout.
+		//
+		// As any connected request is performed by a concurrent goroutine,
+		// sendStats is called first and AwaitInFlightConnectedRequest second.
+
+		tunnel.AwaitInFlightConnectedRequest()
+
 	} else {
 	} else {
+
 		NoticeWarning("operate tunnel error for %s: %s",
 		NoticeWarning("operate tunnel error for %s: %s",
 			tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
 			tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
+
 		tunnelOwner.SignalTunnelFailure(tunnel)
 		tunnelOwner.SignalTunnelFailure(tunnel)
 	}
 	}
 }
 }