Просмотр исходного кода

New scheme to spread out proxy announcements, and other fixes

- Spread out proxy announcments to avoid CDN rate limits and to better load
  balance the broker announcment queue.
- Record metrics for WebRTC selected ICE candidate attributes.
- In the broker matching loop, don't keep scanning queue after the best common
  match.
- Fix a close/shutdown hang by wiring up WebRTC DataChannel.SetReadDeadline,
  which is used by quic-go to interrupt blocked UDP reads on close.
- In resetBrokerClientOnRoundTripperFailed, check for and skip resets
  triggered by late calls for a previous broker client instance.
- Add more information to the broker matcher debug log.
- Log more information for broker round trip failures.
Rod Hynes 1 год назад
Родитель
Сommit
613c929365

+ 22 - 16
psiphon/common/inproxy/broker.go

@@ -443,6 +443,7 @@ func (b *Broker) handleProxyAnnounce(
 	var newTacticsTag string
 	var clientOffer *MatchOffer
 	var timedOut bool
+	var limitedErr error
 
 	// As a future enhancement, a broker could initiate its own test
 	// connection to the proxy to verify its effectiveness, including
@@ -498,6 +499,8 @@ func (b *Broker) handleProxyAnnounce(
 		}
 		if retErr != nil {
 			logFields["error"] = retErr.Error()
+		} else if limitedErr != nil {
+			logFields["error"] = limitedErr.Error()
 		}
 		logFields.Add(transportLogFields)
 		b.config.Logger.LogMetric(brokerMetricName, logFields)
@@ -606,12 +609,14 @@ func (b *Broker) handleProxyAnnounce(
 			return nil, errors.Trace(err)
 		}
 
+		// A no-match response is sent in the case of a timeout awaiting a
+		// match. The faster-failing rate or entry limiting case also results
+		// in a no-match response, rather than an error return from
+		// handleProxyAnnounce, so that the proxy doesn't receive a 404 and
+		// flag its BrokerClient as having failed.
+
 		if timeout {
 
-			// Time out awaiting match. Still send a no-match response, as this is
-			// not an unexpected outcome and the proxy should not incorrectly
-			// flag its BrokerClient as having failed.
-			//
 			// Note: the respective proxy and broker timeouts,
 			// InproxyBrokerProxyAnnounceTimeout and
 			// InproxyProxyAnnounceRequestTimeout in tactics, should be
@@ -622,11 +627,9 @@ func (b *Broker) handleProxyAnnounce(
 
 		} else if limited {
 
-			// The limit error case also returns a no-match response, so the
-			// proxy doesn't receive a 404 flag its BrokerClient as having failed.
+			// Record the specific limit error in the proxy-announce broker event.
 
-			b.config.Logger.WithTraceFields(
-				common.LogFields{"err": err.Error()}).Info("announcement limited")
+			limitedErr = err
 		}
 
 		responsePayload, err := MarshalProxyAnnounceResponse(
@@ -702,6 +705,7 @@ func (b *Broker) handleClientOffer(
 	var proxyMatchAnnouncement *MatchAnnouncement
 	var proxyAnswer *MatchAnswer
 	var timedOut bool
+	var limitedErr error
 
 	// Always log the outcome.
 	defer func() {
@@ -735,6 +739,8 @@ func (b *Broker) handleClientOffer(
 		}
 		if retErr != nil {
 			logFields["error"] = retErr.Error()
+		} else if limitedErr != nil {
+			logFields["error"] = limitedErr.Error()
 		}
 		logFields.Add(transportLogFields)
 		b.config.Logger.LogMetric(brokerMetricName, logFields)
@@ -828,12 +834,14 @@ func (b *Broker) handleClientOffer(
 			return nil, errors.Trace(err)
 		}
 
+		// A no-match response is sent in the case of a timeout awaiting a
+		// match. The faster-failing rate or entry limiting case also results
+		// in a no-match response, rather than an error return from
+		// handleClientOffer, so that the client doesn't receive a 404 and
+		// flag its BrokerClient as having failed.
+
 		if timeout {
 
-			// Time out awaiting match. Still send a no-match response, as this is
-			// not an unexpected outcome and the client should not incorrectly
-			// flag its BrokerClient as having failed.
-			//
 			// Note: the respective client and broker timeouts,
 			// InproxyBrokerClientOfferTimeout and
 			// InproxyClientOfferRequestTimeout in tactics, should be configured
@@ -844,11 +852,9 @@ func (b *Broker) handleClientOffer(
 
 		} else if limited {
 
-			// The limit error case also returns a no-match response, so the
-			// client doesn't receive a 404 flag its BrokerClient as having failed.
+			// Record the specific limit error in the client-offer broker event.
 
-			b.config.Logger.WithTraceFields(
-				common.LogFields{"err": err.Error()}).Info("offer limited")
+			limitedErr = err
 		}
 
 		responsePayload, err := MarshalClientOfferResponse(

+ 35 - 8
psiphon/common/inproxy/brokerClient.go

@@ -102,6 +102,7 @@ func (b *BrokerClient) GetBrokerDialCoordinator() BrokerDialCoordinator {
 // ProxyAnnounce sends a ProxyAnnounce request and returns the response.
 func (b *BrokerClient) ProxyAnnounce(
 	ctx context.Context,
+	requestDelay time.Duration,
 	request *ProxyAnnounceRequest) (*ProxyAnnounceResponse, error) {
 
 	requestPayload, err := MarshalProxyAnnounceRequest(request)
@@ -109,13 +110,18 @@ func (b *BrokerClient) ProxyAnnounce(
 		return nil, errors.Trace(err)
 	}
 
-	requestCtx, requestCancelFunc := context.WithTimeout(
-		ctx, common.ValueOrDefault(
-			b.coordinator.AnnounceRequestTimeout(),
-			proxyAnnounceRequestTimeout))
+	timeout := common.ValueOrDefault(
+		b.coordinator.AnnounceRequestTimeout(),
+		proxyAnnounceRequestTimeout)
+
+	// Increase the timeout to account for requestDelay, which is applied
+	// before the actual network round trip.
+	timeout += requestDelay
+
+	requestCtx, requestCancelFunc := context.WithTimeout(ctx, timeout)
 	defer requestCancelFunc()
 
-	responsePayload, err := b.roundTrip(requestCtx, requestPayload)
+	responsePayload, err := b.roundTrip(requestCtx, requestDelay, requestPayload)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
@@ -144,7 +150,7 @@ func (b *BrokerClient) ClientOffer(
 			clientOfferRequestTimeout))
 	defer requestCancelFunc()
 
-	responsePayload, err := b.roundTrip(requestCtx, requestPayload)
+	responsePayload, err := b.roundTrip(requestCtx, 0, requestPayload)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
@@ -173,7 +179,7 @@ func (b *BrokerClient) ProxyAnswer(
 			proxyAnswerRequestTimeout))
 	defer requestCancelFunc()
 
-	responsePayload, err := b.roundTrip(requestCtx, requestPayload)
+	responsePayload, err := b.roundTrip(requestCtx, 0, requestPayload)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
@@ -203,7 +209,7 @@ func (b *BrokerClient) ClientRelayedPacket(
 			clientRelayedPacketRequestTimeout))
 	defer requestCancelFunc()
 
-	responsePayload, err := b.roundTrip(requestCtx, requestPayload)
+	responsePayload, err := b.roundTrip(requestCtx, 0, requestPayload)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
@@ -218,6 +224,7 @@ func (b *BrokerClient) ClientRelayedPacket(
 
 func (b *BrokerClient) roundTrip(
 	ctx context.Context,
+	requestDelay time.Duration,
 	request []byte) ([]byte, error) {
 
 	// The round tripper may need to establish a transport-level connection;
@@ -252,11 +259,31 @@ func (b *BrokerClient) roundTrip(
 	// response are tagged with a RoundTripID which is checked to ensure the
 	// association is maintained.
 
+	var preRoundTrip func(context.Context)
+	if requestDelay > 0 {
+
+		// Use the pre-round trip callback apply the requestDelay _after_ any
+		// waitToShareSession delay, otherwise any waitToShareSession may
+		// collapse staggered requests back together.
+		//
+		// The context passed to preRoundTrip should cancel the delay both in
+		// the case where the request is canceled and and in the case where
+		// the round tripper is closed.
+		//
+		// It's assumed that the caller has adjusted the ctx deadline to
+		// account for requestDelay.
+
+		preRoundTrip = func(ctx context.Context) {
+			common.SleepWithContext(ctx, requestDelay)
+		}
+	}
+
 	waitToShareSession := true
 
 	response, err := b.sessions.RoundTrip(
 		ctx,
 		roundTripper,
+		preRoundTrip,
 		b.coordinator.BrokerPublicKey(),
 		b.coordinator.BrokerRootObfuscationSecret(),
 		waitToShareSession,

+ 12 - 3
psiphon/common/inproxy/coordinator.go

@@ -30,9 +30,18 @@ import (
 // fronted HTTPS. RoundTripper is used by clients and proxies to make
 // requests to brokers.
 type RoundTripper interface {
-	RoundTrip(ctx context.Context, requestPayload []byte) (responsePayload []byte, err error)
+	RoundTrip(
+		ctx context.Context,
+		preRoundTrip PreRoundTripCallback,
+		requestPayload []byte) (responsePayload []byte, err error)
 }
 
+// PreRoundTripCallback is a callback that is invoked by the RoundTripper
+// immediately before the network round trip, and which takes a context that
+// will be canceled both in the case the request is canceled and in case the
+// round tripper is closed.
+type PreRoundTripCallback func(context.Context)
+
 // RoundTripperFailedError is an error type that should be returned from
 // RoundTripper.RoundTrip when the round trip transport has permanently
 // failed. When RoundTrip returns an error of type RoundTripperFailedError to
@@ -155,8 +164,8 @@ type BrokerDialCoordinator interface {
 	BrokerClientRoundTripperFailed(roundTripper RoundTripper)
 
 	AnnounceRequestTimeout() time.Duration
-	AnnounceRetryDelay() time.Duration
-	AnnounceRetryJitter() float64
+	AnnounceDelay() time.Duration
+	AnnounceDelayJitter() float64
 	AnswerRequestTimeout() time.Duration
 	OfferRequestTimeout() time.Duration
 	OfferRetryDelay() time.Duration

+ 6 - 6
psiphon/common/inproxy/coordinator_test.go

@@ -46,8 +46,8 @@ type testBrokerDialCoordinator struct {
 	brokerClientRoundTripperSucceeded func(RoundTripper)
 	brokerClientRoundTripperFailed    func(RoundTripper)
 	announceRequestTimeout            time.Duration
-	announceRetryDelay                time.Duration
-	announceRetryJitter               float64
+	announceDelay                     time.Duration
+	announceDelayJitter               float64
 	answerRequestTimeout              time.Duration
 	offerRequestTimeout               time.Duration
 	offerRetryDelay                   time.Duration
@@ -121,16 +121,16 @@ func (t *testBrokerDialCoordinator) AnnounceRequestTimeout() time.Duration {
 	return t.announceRequestTimeout
 }
 
-func (t *testBrokerDialCoordinator) AnnounceRetryDelay() time.Duration {
+func (t *testBrokerDialCoordinator) AnnounceDelay() time.Duration {
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
-	return t.announceRetryDelay
+	return t.announceDelay
 }
 
-func (t *testBrokerDialCoordinator) AnnounceRetryJitter() float64 {
+func (t *testBrokerDialCoordinator) AnnounceDelayJitter() float64 {
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
-	return t.announceRetryJitter
+	return t.announceDelayJitter
 }
 
 func (t *testBrokerDialCoordinator) AnswerRequestTimeout() time.Duration {

+ 7 - 1
psiphon/common/inproxy/inproxy_test.go

@@ -860,7 +860,13 @@ func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
 }
 
 func (r *httpRoundTripper) RoundTrip(
-	ctx context.Context, requestPayload []byte) ([]byte, error) {
+	ctx context.Context,
+	preRoundTrip PreRoundTripCallback,
+	requestPayload []byte) ([]byte, error) {
+
+	if preRoundTrip != nil {
+		preRoundTrip(ctx)
+	}
 
 	url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
 

+ 32 - 10
psiphon/common/inproxy/matcher.go

@@ -570,16 +570,8 @@ func (m *Matcher) matchAllOffers() {
 			continue
 		}
 
-		if m.config.Logger.IsLogLevelDebug() {
-			m.config.Logger.WithTraceFields(common.LogFields{
-				"match_index":             j,
-				"offer_queue_size":        m.offerQueue.Len(),
-				"announcement_queue_size": m.announcementQueue.Len(),
-			}).Debug("match metrics")
-		}
-
 		// Remove the matched announcement from the queue. Send the offer to
-		// the announcment entry's offerChan, which will deliver it to the
+		// the announcement entry's offerChan, which will deliver it to the
 		// blocked Announce call. Add a pending answers entry to await the
 		// proxy's follow up Answer call. The TTL for the pending answer
 		// entry is set to the matched Offer call's ctx, as the answer is
@@ -587,6 +579,28 @@ func (m *Matcher) matchAllOffers() {
 
 		announcementEntry := m.announcementQueue.At(j)
 
+		if m.config.Logger.IsLogLevelDebug() {
+
+			announcementProxyID :=
+				announcementEntry.announcement.ProxyID
+			announcementConnectionID :=
+				announcementEntry.announcement.ConnectionID
+			announcementCommonCompartmentIDs :=
+				announcementEntry.announcement.Properties.CommonCompartmentIDs
+			offerCommonCompartmentIDs :=
+				offerEntry.offer.Properties.CommonCompartmentIDs
+
+			m.config.Logger.WithTraceFields(common.LogFields{
+				"announcement_proxy_id":               announcementProxyID,
+				"announcement_connection_id":          announcementConnectionID,
+				"announcement_common_compartment_ids": announcementCommonCompartmentIDs,
+				"offer_common_compartment_ids":        offerCommonCompartmentIDs,
+				"match_index":                         j,
+				"announcement_queue_size":             m.announcementQueue.Len(),
+				"offer_queue_size":                    m.offerQueue.Len(),
+			}).Debug("match metrics")
+		}
+
 		expiry := lrucache.DefaultExpiration
 		deadline, ok := offerEntry.ctx.Deadline()
 		if ok {
@@ -737,7 +751,9 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
 		// Stop as soon as we have the best possible match.
 
 		if (bestMatchNAT || !existsPreferredNATMatch) &&
-			(matchPersonalCompartment || m.announcementsPersonalCompartmentalizedCount == 0) {
+			(matchPersonalCompartment ||
+				m.announcementsPersonalCompartmentalizedCount == 0 ||
+				len(offerProperties.PersonalCompartmentIDs) == 0) {
 			break
 		}
 	}
@@ -815,6 +831,12 @@ func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) e
 	}
 
 	if limitEntryCount > 0 {
+
+		// Limitation: non-limited proxy ID entries are counted in
+		// entryCountByIP. If both a limited and non-limited proxy ingress
+		// from the same limitIP, then the non-limited entries will count
+		// against the limited proxy's limitEntryCount.
+
 		entryCount, ok := entryCountByIP[limitIP]
 		if ok && entryCount >= limitEntryCount {
 			return errors.Trace(

+ 9 - 2
psiphon/common/inproxy/nat.go

@@ -333,23 +333,30 @@ func (t PortMappingTypes) IsValid() bool {
 // ICECandidateType is an ICE candidate type: host for public addresses, port
 // mapping for when a port mapping protocol was used to establish a public
 // address, or server reflexive when STUN hole punching was used to create a
-// public address.
+// public address. Peer reflexive candidates emerge during the ICE
+// negotiation process and are not SDP entries.
 type ICECandidateType int32
 
 const (
-	ICECandidateHost ICECandidateType = iota
+	ICECandidateUnknown ICECandidateType = iota
+	ICECandidateHost
 	ICECandidatePortMapping
 	ICECandidateServerReflexive
+	ICECandidatePeerReflexive
 )
 
 func (t ICECandidateType) String() string {
 	switch t {
+	case ICECandidateUnknown:
+		return "Unknown"
 	case ICECandidateHost:
 		return "Host"
 	case ICECandidatePortMapping:
 		return "PortMapping"
 	case ICECandidateServerReflexive:
 		return "ServerReflexive"
+	case ICECandidatePeerReflexive:
+		return "PeerReflexive"
 	}
 	return ""
 }

+ 95 - 47
psiphon/common/inproxy/proxy.go

@@ -29,16 +29,17 @@ import (
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/pion/webrtc/v3"
 )
 
 const (
-	proxyAnnounceRetryDelay     = 2 * time.Second
-	proxyAnnounceRetryJitter    = 0.3
-	proxyAnnounceMaxRetryDelay  = 6 * time.Hour
-	proxyWebRTCAnswerTimeout    = 20 * time.Second
-	proxyDestinationDialTimeout = 20 * time.Second
+	proxyAnnounceDelay           = 1 * time.Second
+	proxyAnnounceDelayJitter     = 0.5
+	proxyAnnounceMaxBackoffDelay = 1 * time.Hour
+	proxyWebRTCAnswerTimeout     = 20 * time.Second
+	proxyDestinationDialTimeout  = 20 * time.Second
 )
 
 // Proxy is the in-proxy proxying component, which relays traffic from a
@@ -60,6 +61,9 @@ type Proxy struct {
 	networkDiscoveryMutex     sync.Mutex
 	networkDiscoveryRunOnce   bool
 	networkDiscoveryNetworkID string
+
+	nextAnnounceMutex sync.Mutex
+	nextAnnounceTime  time.Time
 }
 
 // TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
@@ -187,21 +191,43 @@ func (p *Proxy) Run(ctx context.Context) {
 
 	proxyWaitGroup := new(sync.WaitGroup)
 
-	for i := 0; i < p.config.MaxClients; i++ {
+	// Launch the first proxy worker, passing a signal to be triggered once
+	// the very first announcement round trip is complete. The first round
+	// trip is awaited so that:
+	//
+	// - The first announce response will arrive with any new tactics,
+	//   avoiding a start up case where MaxClients initial, concurrent
+	//   announces all return with no-match and a tactics payload.
+	//
+	// - The first worker gets no announcement delay and is also guaranteed to
+	//   be the shared session establisher. Since the announcement delays are
+	//   applied _after_ waitToShareSession, it would otherwise be possible,
+	//   with a race of MaxClient initial, concurrent announces, for the
+	//   session establisher to be a different worker than the no-delay worker.
 
-		// Give the very first announcement a head start, by delaying the
-		// others, so that the first announcement request can obtain and
-		// apply any new tactics first, avoiding all MaxClients initial
-		// announcement requests returning with potentially no match and new
-		// tactics responses. After this initial launch point, we assume
-		// proxy announcement requests are somewhat staggered.
-		delayFirstAnnounce := i > 0
+	signalFirstAnnounceCtx, signalFirstAnnounceDone :=
+		context.WithCancel(context.Background())
 
+	proxyWaitGroup.Add(1)
+	go func() {
+		defer proxyWaitGroup.Done()
+		p.proxyClients(ctx, signalFirstAnnounceDone)
+	}()
+
+	select {
+	case <-signalFirstAnnounceCtx.Done():
+	case <-ctx.Done():
+		return
+	}
+
+	// Launch the remaining workers.
+
+	for i := 0; i < p.config.MaxClients-1; i++ {
 		proxyWaitGroup.Add(1)
-		go func(delayFirstAnnounce bool) {
+		go func() {
 			defer proxyWaitGroup.Done()
-			p.proxyClients(ctx, delayFirstAnnounce)
-		}(delayFirstAnnounce)
+			p.proxyClients(ctx, nil)
+		}()
 	}
 
 	// Capture activity updates every second, which is the required frequency
@@ -235,11 +261,11 @@ loop:
 func (p *Proxy) getAnnounceDelayParameters() (time.Duration, float64) {
 	brokerClient, err := p.config.GetBrokerClient()
 	if err != nil {
-		return proxyAnnounceRetryDelay, proxyAnnounceRetryJitter
+		return proxyAnnounceDelay, proxyAnnounceDelayJitter
 	}
 	brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
-	return common.ValueOrDefault(brokerCoordinator.AnnounceRetryDelay(), proxyAnnounceRetryDelay),
-		common.ValueOrDefault(brokerCoordinator.AnnounceRetryJitter(), proxyAnnounceRetryJitter)
+	return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
+		common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
 
 }
 
@@ -281,7 +307,8 @@ func greaterThanSwapInt64(addr *int64, new int64) bool {
 	return false
 }
 
-func (p *Proxy) proxyClients(ctx context.Context, delayFirstAnnounce bool) {
+func (p *Proxy) proxyClients(
+	ctx context.Context, signalAnnounceDone func()) {
 
 	// Proxy one client, repeating until ctx is done.
 	//
@@ -313,15 +340,7 @@ func (p *Proxy) proxyClients(ctx context.Context, delayFirstAnnounce bool) {
 			break
 		}
 
-		// When delayFirstAnnounce is true, the very first proxyOneClient
-		// proxy announcement is delayed to give another, concurrent
-		// proxyClients proxy announcment a head start in order to fetch and
-		// apply any new tactics.
-		//
-		// This delay is distinct from the post-failure delay, although both
-		// use the same delay parameter settings.
-
-		backOff, err := p.proxyOneClient(ctx, delayFirstAnnounce && i == 0)
+		backOff, err := p.proxyOneClient(ctx, signalAnnounceDone)
 
 		if err != nil && ctx.Err() == nil {
 
@@ -330,7 +349,7 @@ func (p *Proxy) proxyClients(ctx context.Context, delayFirstAnnounce bool) {
 					"error": err.Error(),
 				}).Error("proxy client failed")
 
-			// Apply a simple exponential backoff base on whether
+			// Apply a simple exponential backoff based on whether
 			// proxyOneClient either relayed client traffic or got no match,
 			// or encountered a failure.
 			//
@@ -349,8 +368,8 @@ func (p *Proxy) proxyClients(ctx context.Context, delayFirstAnnounce bool) {
 				failureDelayFactor = 1
 			}
 			delay = delay * failureDelayFactor
-			if delay > proxyAnnounceMaxRetryDelay {
-				delay = proxyAnnounceMaxRetryDelay
+			if delay > proxyAnnounceMaxBackoffDelay {
+				delay = proxyAnnounceMaxBackoffDelay
 			}
 			if failureDelayFactor < 1<<20 {
 				failureDelayFactor *= 2
@@ -425,7 +444,8 @@ func (p *Proxy) doNetworkDiscovery(
 	p.networkDiscoveryNetworkID = networkID
 }
 
-func (p *Proxy) proxyOneClient(ctx context.Context, delayAnnounce bool) (bool, error) {
+func (p *Proxy) proxyOneClient(
+	ctx context.Context, signalAnnounceDone func()) (bool, error) {
 
 	// Do not trigger back-off unless the proxy successfully announces and
 	// only then performs poorly.
@@ -456,20 +476,6 @@ func (p *Proxy) proxyOneClient(ctx context.Context, delayAnnounce bool) (bool, e
 	// per network.
 	p.doNetworkDiscovery(ctx, webRTCCoordinator)
 
-	// delayAnnounce delays the proxy announcement request in order to give a
-	// concurrent request a head start. See comments in Run and proxyClients.
-	// This head start delay is applied here, after doNetworkDiscovery, as
-	// otherwise the delay might be negated if the head-start proxyOneClient
-	// blocks on doNetworkDiscovery and subsequently this proxyOneClient
-	// quickly finds cached results in doNetworkDiscovery.
-	if delayAnnounce {
-		announceRetryDelay, announceRetryJitter := p.getAnnounceDelayParameters()
-		common.SleepWithJitter(
-			ctx,
-			common.ValueOrDefault(announceRetryDelay, proxyAnnounceRetryDelay),
-			common.ValueOrDefault(announceRetryJitter, proxyAnnounceRetryJitter))
-	}
-
 	// Send the announce request
 
 	// At this point, no NAT traversal operations have been performed by the
@@ -518,6 +524,40 @@ func (p *Proxy) proxyOneClient(ctx context.Context, delayAnnounce bool) (bool, e
 		return backOff, errors.Trace(err)
 	}
 
+	// Set a delay before announcing, to stagger the announce request times.
+	// The delay helps to avoid triggering rate limits or similar errors from
+	// any intermediate CDN between the proxy and the broker; and provides a
+	// nudge towards better load balancing across multiple large MaxClients
+	// proxies, as the broker primarily matches enqueued announces in FIFO
+	// order, since older announces expire earlier.
+	//
+	// The delay is intended to be applied after doNetworkDiscovery, which has
+	// no reason to be delayed; and also after any waitToShareSession delay,
+	// as delaying before waitToShareSession can result in the announce
+	// request times collapsing back together. Delaying after
+	// waitToShareSession is handled by brokerClient.ProxyAnnounce, which
+	// will also extend the base request timeout, as required, to account for
+	// any deliberate delay.
+
+	announceRequestDelay := time.Duration(0)
+	announceDelay, announceDelayJitter := p.getAnnounceDelayParameters()
+	p.nextAnnounceMutex.Lock()
+	delay := prng.JitterDuration(announceDelay, announceDelayJitter)
+	if p.nextAnnounceTime.IsZero() {
+		// No delay for the very first announce request.
+		p.nextAnnounceTime = time.Now().Add(delay)
+
+	} else {
+		announceRequestDelay = time.Until(p.nextAnnounceTime)
+		if announceRequestDelay < 0 {
+			p.nextAnnounceTime = time.Now().Add(delay)
+			announceRequestDelay = 0
+		} else {
+			p.nextAnnounceTime = p.nextAnnounceTime.Add(delay)
+		}
+	}
+	p.nextAnnounceMutex.Unlock()
+
 	// A proxy ID is implicitly sent with requests; it's the proxy's session
 	// public key.
 	//
@@ -525,6 +565,7 @@ func (p *Proxy) proxyOneClient(ctx context.Context, delayAnnounce bool) (bool, e
 	// long-polling.
 	announceResponse, err := brokerClient.ProxyAnnounce(
 		ctx,
+		announceRequestDelay,
 		&ProxyAnnounceRequest{
 			PersonalCompartmentIDs: brokerCoordinator.PersonalCompartmentIDs(),
 			Metrics:                metrics,
@@ -551,6 +592,13 @@ func (p *Proxy) proxyOneClient(ctx context.Context, delayAnnounce bool) (bool, e
 		}
 	}
 
+	// Signal that the announce round trip is complete. At this point, the
+	// broker Noise session should be established and any fresh tactics
+	// applied.
+	if signalAnnounceDone != nil {
+		signalAnnounceDone()
+	}
+
 	if announceResponse.NoMatch {
 		return backOff, errors.TraceNew("no match")
 	}

+ 4 - 3
psiphon/common/inproxy/session.go

@@ -280,6 +280,7 @@ func NewInitiatorSessions(
 func (s *InitiatorSessions) RoundTrip(
 	ctx context.Context,
 	roundTripper RoundTripper,
+	preRoundTrip PreRoundTripCallback,
 	responderPublicKey SessionPublicKey,
 	responderRootObfuscationSecret ObfuscationSecret,
 	waitToShareSession bool,
@@ -307,7 +308,7 @@ func (s *InitiatorSessions) RoundTrip(
 			}
 			return response, nil
 		}
-		in, err = roundTripper.RoundTrip(ctx, out)
+		in, err = roundTripper.RoundTrip(ctx, preRoundTrip, out)
 		if err != nil {
 
 			// There are no explicit retries here. Retrying in the case where
@@ -315,8 +316,8 @@ func (s *InitiatorSessions) RoundTrip(
 			// the reset session token logic in InitiatorRoundTrip. Higher
 			// levels implicitly provide additional retries to cover other
 			// cases; Psiphon client tunnel establishment will retry in-proxy
-			// dials; the proxy will retry its announce request if it
-			// fails -- after an appropriate delay.
+			// dials; the proxy will retry its announce requests if they
+			// fail.
 
 			// If this round trip owns its session and there are any
 			// waitToShareSession initiators awaiting the session, signal them

+ 27 - 1
psiphon/common/inproxy/session_test.go

@@ -25,6 +25,7 @@ import (
 	"fmt"
 	"math"
 	"strings"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -77,6 +78,11 @@ func runTestSessions() error {
 		return errors.Trace(err)
 	}
 
+	var preRoundTripCalls atomic.Int64
+	preRoundTrip := func(_ context.Context) {
+		preRoundTripCalls.Add(1)
+	}
+
 	initiatorSessions := NewInitiatorSessions(initiatorPrivateKey)
 
 	roundTripper := newTestSessionRoundTripper(responderSessions, &initiatorPublicKey)
@@ -88,6 +94,7 @@ func runTestSessions() error {
 	response, err := initiatorSessions.RoundTrip(
 		context.Background(),
 		roundTripper,
+		preRoundTrip,
 		responderPublicKey,
 		responderRootObfuscationSecret,
 		waitToShareSession,
@@ -111,6 +118,7 @@ func runTestSessions() error {
 	response, err = initiatorSessions.RoundTrip(
 		context.Background(),
 		roundTripper,
+		preRoundTrip,
 		responderPublicKey,
 		responderRootObfuscationSecret,
 		waitToShareSession,
@@ -131,6 +139,7 @@ func runTestSessions() error {
 		_, err = initiatorSessions.RoundTrip(
 			context.Background(),
 			roundTripper,
+			preRoundTrip,
 			responderPublicKey,
 			responderRootObfuscationSecret,
 			waitToShareSession,
@@ -147,6 +156,7 @@ func runTestSessions() error {
 	response, err = initiatorSessions.RoundTrip(
 		context.Background(),
 		roundTripper,
+		preRoundTrip,
 		responderPublicKey,
 		responderRootObfuscationSecret,
 		waitToShareSession,
@@ -179,6 +189,7 @@ func runTestSessions() error {
 			_, err := initiatorSessions.RoundTrip(
 				context.Background(),
 				failingRoundTripper,
+				preRoundTrip,
 				responderPublicKey,
 				responderRootObfuscationSecret,
 				waitToShareSession,
@@ -220,6 +231,7 @@ func runTestSessions() error {
 	response, err = initiatorSessions.RoundTrip(
 		context.Background(),
 		roundTripper,
+		preRoundTrip,
 		responderPublicKey,
 		responderRootObfuscationSecret,
 		waitToShareSession,
@@ -253,6 +265,7 @@ func runTestSessions() error {
 	response, err = initiatorSessions.RoundTrip(
 		context.Background(),
 		roundTripper,
+		preRoundTrip,
 		responderPublicKey,
 		responderRootObfuscationSecret,
 		waitToShareSession,
@@ -305,6 +318,7 @@ func runTestSessions() error {
 	response, err = unknownInitiatorSessions.RoundTrip(
 		ctx,
 		roundTripper,
+		preRoundTrip,
 		responderPublicKey,
 		responderRootObfuscationSecret,
 		waitToShareSession,
@@ -365,6 +379,7 @@ func runTestSessions() error {
 						response, err := initiatorSessions.RoundTrip(
 							context.Background(),
 							roundTripper,
+							preRoundTrip,
 							responderPublicKey,
 							responderRootObfuscationSecret,
 							waitToShareSession,
@@ -403,6 +418,10 @@ func runTestSessions() error {
 		}
 	}
 
+	if preRoundTripCalls.Load() < int64(clientCount*requestCount) {
+		return errors.TraceNew("unexpected pre-round trip call count")
+	}
+
 	return nil
 }
 
@@ -434,7 +453,10 @@ func (t *testSessionRoundTripper) ExpectedResponse(requestPayload []byte) []byte
 	return responsePayload
 }
 
-func (t *testSessionRoundTripper) RoundTrip(ctx context.Context, requestPayload []byte) ([]byte, error) {
+func (t *testSessionRoundTripper) RoundTrip(
+	ctx context.Context,
+	preRoundTrip PreRoundTripCallback,
+	requestPayload []byte) ([]byte, error) {
 
 	err := ctx.Err()
 	if err != nil {
@@ -445,6 +467,10 @@ func (t *testSessionRoundTripper) RoundTrip(ctx context.Context, requestPayload
 		return nil, errors.TraceNew("closed")
 	}
 
+	if preRoundTrip != nil {
+		preRoundTrip(ctx)
+	}
+
 	unwrappedRequestHandler := func(initiatorID ID, unwrappedRequest []byte) ([]byte, error) {
 
 		if t.expectedPeerPublicKey != nil {

+ 122 - 10
psiphon/common/inproxy/webrtc.go

@@ -88,6 +88,7 @@ type WebRTCConn struct {
 	dataChannelOpenedOnce        sync.Once
 	dataChannelWriteBufferSignal chan struct{}
 	decoyDone                    bool
+	iceCandidatePairMetrics      common.LogFields
 
 	readMutex       sync.Mutex
 	readBuffer      []byte
@@ -762,11 +763,13 @@ func (conn *WebRTCConn) AwaitInitialDataChannel(ctx context.Context) error {
 	case <-conn.dataChannelOpenedSignal:
 
 		// The data channel is connected.
-		//
-		// TODO: for metrics, determine which end was the network connection
-		// initiator; and determine which type of ICE candidate was
-		// successful (note that peer-reflexive candidates aren't in either
-		// SDP and emerge only during ICE negotiation).
+
+		err := conn.recordSelectedICECandidateStats()
+		if err != nil {
+			conn.config.Logger.WithTraceFields(common.LogFields{
+				"error": err.Error()}).Warning("recordCandidateStats failed")
+			// Continue without log
+		}
 
 	case <-ctx.Done():
 		return errors.Trace(ctx.Err())
@@ -776,6 +779,106 @@ func (conn *WebRTCConn) AwaitInitialDataChannel(ctx context.Context) error {
 	return nil
 }
 
+func (conn *WebRTCConn) recordSelectedICECandidateStats() error {
+	conn.mutex.Lock()
+	defer conn.mutex.Unlock()
+
+	statsReport := conn.peerConnection.GetStats()
+	foundNominatedPair := false
+	for key, stats := range statsReport {
+
+		// Uses the pion StatsReport key formats "candidate:<ID>"
+		// and "candidate:<ID>-candidate:<ID>"
+
+		key, found := strings.CutPrefix(key, "candidate:")
+		if !found {
+			continue
+		}
+		candidateIDs := strings.Split(key, "-candidate:")
+		if len(candidateIDs) != 2 {
+			continue
+		}
+
+		candidatePairStats, ok := stats.(webrtc.ICECandidatePairStats)
+		if !ok ||
+			candidatePairStats.State != webrtc.StatsICECandidatePairStateSucceeded ||
+			!candidatePairStats.Nominated {
+			continue
+		}
+
+		localKey := fmt.Sprintf("candidate:%s", candidateIDs[0])
+		stats, ok := statsReport[localKey]
+		if !ok {
+			return errors.TraceNew("missing local ICECandidateStats")
+		}
+		localCandidateStats, ok := stats.(webrtc.ICECandidateStats)
+		if !ok {
+			return errors.TraceNew("unexpected local ICECandidateStats")
+		}
+
+		remoteKey := fmt.Sprintf("candidate:%s", candidateIDs[1])
+		stats, ok = statsReport[remoteKey]
+		if !ok {
+			return errors.TraceNew("missing remote ICECandidateStats")
+		}
+		remoteCandidateStats, ok := stats.(webrtc.ICECandidateStats)
+		if !ok {
+			return errors.TraceNew("unexpected remote ICECandidateStats")
+		}
+
+		// Use the same ICE candidate type names as logged in broker logs.
+		logCandidateType := func(
+			iceCandidateType webrtc.ICECandidateType) string {
+			logType := ICECandidateUnknown
+			switch iceCandidateType {
+			case webrtc.ICECandidateTypeHost:
+				logType = ICECandidateHost
+			case webrtc.ICECandidateTypeSrflx:
+				logType = ICECandidateServerReflexive
+			case webrtc.ICECandidateTypePrflx:
+				logType = ICECandidatePeerReflexive
+			}
+			return logType.String()
+		}
+
+		conn.iceCandidatePairMetrics = common.LogFields{}
+
+		// TODO: log which of local/remote candidate is initiator
+
+		conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_type"] =
+			logCandidateType(localCandidateStats.CandidateType)
+		localIP := net.ParseIP(localCandidateStats.IP)
+		isIPv6 := "0"
+		if localIP != nil && localIP.To4() == nil {
+			isIPv6 = "1"
+		}
+		conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_is_IPv6"] =
+			isIPv6
+		conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_port"] =
+			localCandidateStats.Port
+
+		conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_type"] =
+			logCandidateType(remoteCandidateStats.CandidateType)
+		remoteIP := net.ParseIP(remoteCandidateStats.IP)
+		isIPv6 = "0"
+		if remoteIP != nil && remoteIP.To4() == nil {
+			isIPv6 = "1"
+		}
+		conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_is_IPv6"] =
+			isIPv6
+		conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_port"] =
+			remoteCandidateStats.Port
+
+		foundNominatedPair = true
+		break
+	}
+	if !foundNominatedPair {
+		return errors.TraceNew("missing nominated ICECandidateStatsPair")
+	}
+
+	return nil
+}
+
 func (conn *WebRTCConn) Close() error {
 	conn.mutex.Lock()
 	defer conn.mutex.Unlock()
@@ -1173,7 +1276,16 @@ func (conn *WebRTCConn) SetReadDeadline(t time.Time) error {
 	conn.mutex.Lock()
 	defer conn.mutex.Unlock()
 
-	return errors.TraceNew("not supported")
+	if conn.isClosed {
+		return errors.TraceNew("closed")
+	}
+
+	readDeadliner, ok := conn.dataChannelConn.(datachannel.ReadDeadliner)
+	if !ok {
+		return errors.TraceNew("no data channel")
+	}
+
+	return readDeadliner.SetReadDeadline(t)
 }
 
 func (conn *WebRTCConn) SetWriteDeadline(t time.Time) error {
@@ -1186,13 +1298,13 @@ func (conn *WebRTCConn) SetWriteDeadline(t time.Time) error {
 // GetMetrics implements the common.MetricsSource interface and returns log
 // fields detailing the WebRTC dial parameters.
 func (conn *WebRTCConn) GetMetrics() common.LogFields {
-
-	// TODO: determine which WebRTC ICE candidate was chosen, and log its
-	// type (host, server reflexive, etc.), port number(s), and whether it's
-	// IPv6.
+	conn.mutex.Lock()
+	defer conn.mutex.Unlock()
 
 	logFields := make(common.LogFields)
 
+	logFields.Add(conn.iceCandidatePairMetrics)
+
 	randomizeDTLS := "0"
 	if conn.config.DoDTLSRandomization {
 		randomizeDTLS = "1"

+ 4 - 4
psiphon/common/parameters/parameters.go

@@ -395,8 +395,8 @@ const (
 	InproxyBrokerClientOfferTimeout                    = "InproxyBrokerClientOfferTimeout"
 	InproxyBrokerPendingServerRequestsTTL              = "InproxyBrokerPendingServerRequestsTTL"
 	InproxyProxyAnnounceRequestTimeout                 = "InproxyProxyAnnounceRequestTimeout"
-	InproxyProxyAnnounceRetryDelay                     = "InproxyProxyAnnounceRetryDelay"
-	InproxyProxyAnnounceRetryJitter                    = "InproxyProxyAnnounceRetryJitter"
+	InproxyProxyAnnounceDelay                          = "InproxyProxyAnnounceDelay"
+	InproxyProxyAnnounceDelayJitter                    = "InproxyProxyAnnounceDelayJitter"
 	InproxyProxyAnswerRequestTimeout                   = "InproxyProxyAnswerRequestTimeout"
 	InproxyClientOfferRequestTimeout                   = "InproxyClientOfferRequestTimeout"
 	InproxyClientOfferRetryDelay                       = "InproxyClientOfferRetryDelay"
@@ -876,8 +876,8 @@ var defaultParameters = map[string]struct {
 	InproxyBrokerClientOfferTimeout:                    {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerPendingServerRequestsTTL:              {value: 60 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyProxyAnnounceRequestTimeout:                 {value: 2*time.Minute + 10*time.Second, minimum: time.Duration(0)},
-	InproxyProxyAnnounceRetryDelay:                     {value: 2 * time.Second, minimum: time.Duration(0)},
-	InproxyProxyAnnounceRetryJitter:                    {value: 0.3, minimum: 0.0},
+	InproxyProxyAnnounceDelay:                          {value: 1 * time.Second, minimum: time.Duration(0)},
+	InproxyProxyAnnounceDelayJitter:                    {value: 0.5, minimum: 0.0},
 	InproxyProxyAnswerRequestTimeout:                   {value: 10*time.Second + 10*time.Second, minimum: time.Duration(0)},
 	InproxyClientOfferRequestTimeout:                   {value: 10*time.Second + 10*time.Second, minimum: time.Duration(0)},
 	InproxyClientOfferRetryDelay:                       {value: 1 * time.Second, minimum: time.Duration(0)},

+ 40 - 31
psiphon/common/protocol/packed.go

@@ -350,7 +350,7 @@ func unpackInt(v interface{}) (interface{}, error) {
 	case uint64:
 		return strconv.FormatUint(i, 10), nil
 	default:
-		return "", errors.TraceNew(
+		return nil, errors.TraceNew(
 			"expected int, int64, or uint64 type")
 	}
 }
@@ -374,7 +374,7 @@ func packFloat(v interface{}) (interface{}, error) {
 func unpackFloat(v interface{}) (interface{}, error) {
 	f, ok := v.(float64)
 	if !ok {
-		return "", errors.TraceNew("expected int type")
+		return nil, errors.TraceNew("expected int type")
 	}
 	return fmt.Sprintf("%f", f), nil
 }
@@ -396,7 +396,7 @@ func packHex(v interface{}) (interface{}, error) {
 func unpackHexLower(v interface{}) (interface{}, error) {
 	b, ok := v.([]byte)
 	if !ok {
-		return "", errors.TraceNew("expected []byte type")
+		return nil, errors.TraceNew("expected []byte type")
 	}
 	return hex.EncodeToString(b), nil
 }
@@ -404,7 +404,7 @@ func unpackHexLower(v interface{}) (interface{}, error) {
 func unpackHexUpper(v interface{}) (interface{}, error) {
 	s, err := unpackHexLower(v)
 	if err != nil {
-		return "", errors.Trace(err)
+		return nil, errors.Trace(err)
 	}
 	return strings.ToUpper(s.(string)), nil
 }
@@ -426,7 +426,7 @@ func packBase64(v interface{}) (interface{}, error) {
 func unpackBase64(v interface{}) (interface{}, error) {
 	b, ok := v.([]byte)
 	if !ok {
-		return "", errors.TraceNew("expected []byte type")
+		return nil, errors.TraceNew("expected []byte type")
 	}
 	return base64.StdEncoding.EncodeToString(b), nil
 }
@@ -449,7 +449,7 @@ func packUnpaddedBase64(v interface{}) (interface{}, error) {
 func unpackUnpaddedBase64(v interface{}) (interface{}, error) {
 	b, ok := v.([]byte)
 	if !ok {
-		return "", errors.TraceNew("expected []byte type")
+		return nil, errors.TraceNew("expected []byte type")
 	}
 	return base64.RawStdEncoding.EncodeToString(b), nil
 }
@@ -457,7 +457,7 @@ func unpackUnpaddedBase64(v interface{}) (interface{}, error) {
 func packAuthorizations(v interface{}) (interface{}, error) {
 	auths, ok := v.([]string)
 	if !ok {
-		return "", errors.TraceNew("expected []string type")
+		return nil, errors.TraceNew("expected []string type")
 	}
 	packedAuths, err := accesscontrol.PackAuthorizations(auths, CBOREncoding)
 	if err != nil {
@@ -514,7 +514,7 @@ func unpackSliceOfJSONCompatibleMaps(v interface{}) (interface{}, error) {
 
 	packedEntries, ok := v.([]interface{})
 	if !ok {
-		return nil, errors.Tracef("expected []interface{} type")
+		return nil, errors.TraceNew("expected []interface{} type")
 	}
 
 	entries := make([]map[string]interface{}, len(packedEntries))
@@ -522,13 +522,13 @@ func unpackSliceOfJSONCompatibleMaps(v interface{}) (interface{}, error) {
 	for i, packedEntry := range packedEntries {
 		entry, ok := packedEntry.(map[interface{}]interface{})
 		if !ok {
-			return nil, errors.Tracef("expected map[interface{}]interface{} type")
+			return nil, errors.TraceNew("expected map[interface{}]interface{} type")
 		}
 		entries[i] = make(map[string]interface{})
 		for key, value := range entry {
 			strKey, ok := key.(string)
 			if !ok {
-				return nil, errors.Tracef("expected string type")
+				return nil, errors.TraceNew("expected string type")
 			}
 			entries[i][strKey] = value
 		}
@@ -733,31 +733,38 @@ func init() {
 		{112, "inproxy_webrtc_padded_messages_received", intConverter},
 		{113, "inproxy_webrtc_decoy_messages_sent", intConverter},
 		{114, "inproxy_webrtc_decoy_messages_received", intConverter},
+		{115, "inproxy_webrtc_local_ice_candidate_type", nil},
+		{116, "inproxy_webrtc_local_ice_candidate_is_initiator", intConverter},
+		{117, "inproxy_webrtc_local_ice_candidate_is_IPv6", intConverter},
+		{118, "inproxy_webrtc_local_ice_candidate_port", intConverter},
+		{119, "inproxy_webrtc_remote_ice_candidate_type", nil},
+		{120, "inproxy_webrtc_remote_ice_candidate_is_IPv6", intConverter},
+		{121, "inproxy_webrtc_remote_ice_candidate_port", intConverter},
 
 		// Specs: server.handshakeRequestParams
 
-		{115, "missing_server_entry_signature", base64Converter},
-		{116, "missing_server_entry_provider_id", base64Converter},
+		{122, "missing_server_entry_signature", base64Converter},
+		{123, "missing_server_entry_provider_id", base64Converter},
 
 		// Specs: server.uniqueUserParams
 		//
 		// - future enhancement: add a timestamp converter from RFC3339 to and
 		//   from 64-bit Unix time?
 
-		{117, "last_connected", nil},
+		{124, "last_connected", nil},
 
 		// Specs: server.connectedRequestParams
 
-		{118, "establishment_duration", intConverter},
+		{125, "establishment_duration", intConverter},
 
 		// Specs: server.remoteServerListStatParams
 
-		{119, "client_download_timestamp", nil},
-		{120, "tunneled", intConverter},
-		{121, "url", nil},
-		{122, "etag", nil},
-		{123, "bytes", intConverter},
-		{124, "duration", intConverter},
+		{126, "client_download_timestamp", nil},
+		{127, "tunneled", intConverter},
+		{128, "url", nil},
+		{129, "etag", nil},
+		{130, "bytes", intConverter},
+		{131, "duration", intConverter},
 
 		// Specs: server.failedTunnelStatParams
 		//
@@ -767,23 +774,25 @@ func init() {
 		//   key encodings; however, we prioritize reducing the handshake
 		//   size, since it comes earlier in the tunnel flow.
 
-		{125, "server_entry_tag", base64Converter},
-		{126, "client_failed_timestamp", nil},
-		{127, "record_probability", floatConverter},
-		{128, "liveness_test_upstream_bytes", intConverter},
-		{129, "liveness_test_sent_upstream_bytes", intConverter},
-		{130, "liveness_test_downstream_bytes", intConverter},
-		{131, "liveness_test_received_downstream_bytes", intConverter},
-		{132, "bytes_up", intConverter},
-		{133, "bytes_down", intConverter},
-		{134, "tunnel_error", nil},
+		{132, "server_entry_tag", base64Converter},
+		{133, "client_failed_timestamp", nil},
+		{134, "record_probability", floatConverter},
+		{135, "liveness_test_upstream_bytes", intConverter},
+		{136, "liveness_test_sent_upstream_bytes", intConverter},
+		{137, "liveness_test_downstream_bytes", intConverter},
+		{138, "liveness_test_received_downstream_bytes", intConverter},
+		{139, "bytes_up", intConverter},
+		{140, "bytes_down", intConverter},
+		{141, "tunnel_error", nil},
 
 		// Specs: status request payload
 		//
 		// - future enhancement: pack the statusData payload, which is
 		//   currently sent as unpacked JSON.
 
-		{135, "statusData", rawJSONConverter},
+		{142, "statusData", rawJSONConverter},
+
+		// Last key value = 142
 	}
 
 	for _, spec := range packedAPIParameterSpecs {

+ 12 - 12
psiphon/config.go

@@ -966,8 +966,8 @@ type Config struct {
 	InproxyCommonCompartmentIDs                            parameters.InproxyCompartmentIDsValue
 	InproxyMaxCompartmentIDListLength                      *int
 	InproxyProxyAnnounceRequestTimeoutMilliseconds         *int
-	InproxyProxyAnnounceRetryDelayMilliseconds             *int
-	InproxyProxyAnnounceRetryJitter                        *float64
+	InproxyProxyAnnounceDelayMilliseconds                  *int
+	InproxyProxyAnnounceDelayJitter                        *float64
 	InproxyProxyAnswerRequestTimeoutMilliseconds           *int
 	InproxyClientOfferRequestTimeoutMilliseconds           *int
 	InproxyClientOfferRetryDelayMilliseconds               *int
@@ -2371,12 +2371,12 @@ func (config *Config) makeConfigParameters() map[string]interface{} {
 		applyParameters[parameters.InproxyProxyAnnounceRequestTimeout] = fmt.Sprintf("%dms", *config.InproxyProxyAnnounceRequestTimeoutMilliseconds)
 	}
 
-	if config.InproxyProxyAnnounceRetryDelayMilliseconds != nil {
-		applyParameters[parameters.InproxyProxyAnnounceRetryDelay] = fmt.Sprintf("%dms", *config.InproxyProxyAnnounceRetryDelayMilliseconds)
+	if config.InproxyProxyAnnounceDelayMilliseconds != nil {
+		applyParameters[parameters.InproxyProxyAnnounceDelay] = fmt.Sprintf("%dms", *config.InproxyProxyAnnounceDelayMilliseconds)
 	}
 
-	if config.InproxyProxyAnnounceRetryJitter != nil {
-		applyParameters[parameters.InproxyProxyAnnounceRetryJitter] = *config.InproxyProxyAnnounceRetryJitter
+	if config.InproxyProxyAnnounceDelayJitter != nil {
+		applyParameters[parameters.InproxyProxyAnnounceDelayJitter] = *config.InproxyProxyAnnounceDelayJitter
 	}
 
 	if config.InproxyProxyAnswerRequestTimeoutMilliseconds != nil {
@@ -3123,13 +3123,13 @@ func (config *Config) setDialParametersHash() {
 		hash.Write([]byte("InproxyProxyAnnounceRequestTimeoutMilliseconds"))
 		binary.Write(hash, binary.LittleEndian, int64(*config.InproxyProxyAnnounceRequestTimeoutMilliseconds))
 	}
-	if config.InproxyProxyAnnounceRetryDelayMilliseconds != nil {
-		hash.Write([]byte("InproxyProxyAnnounceRetryDelayMilliseconds"))
-		binary.Write(hash, binary.LittleEndian, int64(*config.InproxyProxyAnnounceRetryDelayMilliseconds))
+	if config.InproxyProxyAnnounceDelayMilliseconds != nil {
+		hash.Write([]byte("InproxyProxyAnnounceDelayMilliseconds"))
+		binary.Write(hash, binary.LittleEndian, int64(*config.InproxyProxyAnnounceDelayMilliseconds))
 	}
-	if config.InproxyProxyAnnounceRetryJitter != nil {
-		hash.Write([]byte("InproxyProxyAnnounceRetryJitter"))
-		binary.Write(hash, binary.LittleEndian, *config.InproxyProxyAnnounceRetryJitter)
+	if config.InproxyProxyAnnounceDelayJitter != nil {
+		hash.Write([]byte("InproxyProxyAnnounceDelayJitter"))
+		binary.Write(hash, binary.LittleEndian, *config.InproxyProxyAnnounceDelayJitter)
 	}
 	if config.InproxyProxyAnswerRequestTimeoutMilliseconds != nil {
 		hash.Write([]byte("InproxyProxyAnswerRequestTimeoutMilliseconds"))

+ 44 - 14
psiphon/inproxy.go

@@ -23,6 +23,7 @@ import (
 	"bytes"
 	"context"
 	"encoding/binary"
+	std_errors "errors"
 	"fmt"
 	"io"
 	"net"
@@ -134,10 +135,19 @@ func (b *InproxyBrokerClientManager) GetBrokerClient(
 		nil
 }
 
-func (b *InproxyBrokerClientManager) resetBrokerClientOnRoundTripperFailed() error {
+func (b *InproxyBrokerClientManager) resetBrokerClientOnRoundTripperFailed(
+	brokerClientInstance *InproxyBrokerClientInstance) error {
+
 	b.mutex.Lock()
 	defer b.mutex.Unlock()
 
+	if b.brokerClientInstance != brokerClientInstance {
+		// Ignore the reset if the signal comes from the non-current
+		// brokerClientInstance, which may occur when multiple in-flight
+		// round trips fail in close proximity.
+		return nil
+	}
+
 	return errors.Trace(b.reset())
 }
 
@@ -196,8 +206,8 @@ type InproxyBrokerClientInstance struct {
 	personalCompartmentIDs        []inproxy.ID
 	commonCompartmentIDs          []inproxy.ID
 	announceRequestTimeout        time.Duration
-	announceRetryDelay            time.Duration
-	announceRetryJitter           float64
+	announceDelay                 time.Duration
+	announceDelayJitter           float64
 	answerRequestTimeout          time.Duration
 	offerRequestTimeout           time.Duration
 	offerRetryDelay               time.Duration
@@ -357,8 +367,8 @@ func NewInproxyBrokerClientInstance(
 		commonCompartmentIDs:        commonCompartmentIDs,
 
 		announceRequestTimeout:        p.Duration(parameters.InproxyProxyAnnounceRequestTimeout),
-		announceRetryDelay:            p.Duration(parameters.InproxyProxyAnnounceRetryDelay),
-		announceRetryJitter:           p.Float(parameters.InproxyProxyAnnounceRetryJitter),
+		announceDelay:                 p.Duration(parameters.InproxyProxyAnnounceDelay),
+		announceDelayJitter:           p.Float(parameters.InproxyProxyAnnounceDelayJitter),
 		answerRequestTimeout:          p.Duration(parameters.InproxyProxyAnswerRequestTimeout),
 		offerRequestTimeout:           p.Duration(parameters.InproxyClientOfferRequestTimeout),
 		offerRetryDelay:               p.Duration(parameters.InproxyClientOfferRetryDelay),
@@ -556,7 +566,7 @@ func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperSucceeded(roundTri
 	b.mutex.Lock()
 	defer b.mutex.Unlock()
 
-	if roundTripper != b.roundTripper {
+	if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
 		// Passing in the round tripper obtained from BrokerClientRoundTripper
 		// is just used for sanity check in this implementation, since each
 		// InproxyBrokerClientInstance has exactly one round tripper.
@@ -602,7 +612,7 @@ func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperFailed(roundTrippe
 	b.mutex.Lock()
 	defer b.mutex.Unlock()
 
-	if roundTripper != b.roundTripper {
+	if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
 		// Passing in the round tripper obtained from BrokerClientRoundTripper
 		// is just used for sanity check in this implementation, since each
 		// InproxyBrokerClientInstance has exactly one round tripper.
@@ -645,7 +655,7 @@ func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperFailed(roundTrippe
 	// Limitation: a transport-level failure may unnecessarily reset the
 	// broker session state; see comment in NewInproxyBrokerClientInstance.
 
-	err := b.brokerClientManager.resetBrokerClientOnRoundTripperFailed()
+	err := b.brokerClientManager.resetBrokerClientOnRoundTripperFailed(b)
 	if err != nil {
 		NoticeWarning("reset broker client failed: %v", errors.Trace(err))
 		// Continue with old broker client instance.
@@ -658,13 +668,13 @@ func (b *InproxyBrokerClientInstance) AnnounceRequestTimeout() time.Duration {
 }
 
 // Implements the inproxy.BrokerDialCoordinator interface.
-func (b *InproxyBrokerClientInstance) AnnounceRetryDelay() time.Duration {
-	return b.announceRetryDelay
+func (b *InproxyBrokerClientInstance) AnnounceDelay() time.Duration {
+	return b.announceDelay
 }
 
 // Implements the inproxy.BrokerDialCoordinator interface.
-func (b *InproxyBrokerClientInstance) AnnounceRetryJitter() float64 {
-	return b.announceRetryJitter
+func (b *InproxyBrokerClientInstance) AnnounceDelayJitter() float64 {
+	return b.announceDelayJitter
 }
 
 // Implements the inproxy.BrokerDialCoordinator interface.
@@ -1209,7 +1219,17 @@ func (rt *InproxyBrokerRoundTripper) Close() error {
 // RoundTrip transports a request to the broker endpoint and returns a
 // response.
 func (rt *InproxyBrokerRoundTripper) RoundTrip(
-	ctx context.Context, requestPayload []byte) ([]byte, error) {
+	ctx context.Context,
+	preRoundTrip inproxy.PreRoundTripCallback,
+	requestPayload []byte) (_ []byte, retErr error) {
+
+	defer func() {
+		// Log any error which results in invoking BrokerClientRoundTripperFailed.
+		var failedError *inproxy.RoundTripperFailedError
+		if std_errors.As(retErr, &failedError) {
+			NoticeWarning("RoundTripperFailedError: %v", retErr)
+		}
+	}()
 
 	// Cancel DialMeek or MeekConn.RoundTrip when:
 	// - Close is called
@@ -1217,6 +1237,13 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 	ctx, cancelFunc := common.MergeContextCancel(ctx, rt.runCtx)
 	defer cancelFunc()
 
+	// Invoke the pre-round trip callback. Currently, this callback is used to
+	// apply an announce request delay post-waitToShareSession, pre-network
+	// round trip, and cancelable by the above merged context.
+	if preRoundTrip != nil {
+		preRoundTrip(ctx)
+	}
+
 	// The first RoundTrip caller will perform the DialMeek step, which
 	// establishes the TLS trasport connection to the fronted endpoint.
 	// Following callers will await that DialMeek or share an established
@@ -1307,7 +1334,10 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 		defer response.Body.Close()
 		if response.StatusCode != http.StatusOK {
 
-			err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
+			err = fmt.Errorf(
+				"unexpected response status code %d after %v",
+				response.StatusCode,
+				roundTripDuration)
 
 			// Depending on the round trip duration, this case is treated as a
 			// temporary round tripper failure, since we received a response

+ 7 - 0
psiphon/server/api.go

@@ -1159,6 +1159,13 @@ var inproxyDialParams = []requestParamSpec{
 	{"inproxy_webrtc_padded_messages_received", isIntString, requestParamOptional | requestParamLogStringAsInt},
 	{"inproxy_webrtc_decoy_messages_sent", isIntString, requestParamOptional | requestParamLogStringAsInt},
 	{"inproxy_webrtc_decoy_messages_received", isIntString, requestParamOptional | requestParamLogStringAsInt},
+	{"inproxy_webrtc_local_ice_candidate_type", isAnyString, requestParamOptional},
+	{"inproxy_webrtc_local_ice_candidate_is_initiator", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+	{"inproxy_webrtc_local_ice_candidate_is_IPv6", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+	{"inproxy_webrtc_local_ice_candidate_port", isIntString, requestParamOptional | requestParamLogStringAsInt},
+	{"inproxy_webrtc_remote_ice_candidate_type", isAnyString, requestParamOptional},
+	{"inproxy_webrtc_remote_ice_candidate_is_IPv6", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+	{"inproxy_webrtc_remote_ice_candidate_port", isIntString, requestParamOptional | requestParamLogStringAsInt},
 }
 
 // baseSessionAndDialParams adds baseDialParams and inproxyDialParams to baseSessionParams.