Browse Source

Merge pull request #759 from rod-hynes/master

Fixes and minor enhancements
Rod Hynes 4 months ago
parent
commit
5366994927

+ 3 - 3
Server/main.go

@@ -386,9 +386,9 @@ func panicHandlerProtobuf(config *server.Config, output string) error {
 
 	psiphondMsg := &pb.Psiphond{
 		Timestamp:    timestamppb.Now(),
-		HostId:       &config.HostID,
-		HostBuildRev: &buildinfo.GetBuildInfo().BuildRev,
-		Provider:     &config.HostProvider,
+		HostId:       config.HostID,
+		HostBuildRev: buildinfo.GetBuildInfo().BuildRev,
+		Provider:     config.HostProvider,
 		Metric: &pb.Psiphond_ServerPanic{
 			ServerPanic: &pb.ServerPanic{
 				Panic: &output,

+ 20 - 22
psiphon/common/activity.go

@@ -48,17 +48,14 @@ import (
 // durationNanoseconds, which is the time since the last read, is reported
 // only on reads.
 type ActivityMonitoredConn struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
+	net.Conn
 	monotonicStartTime   int64
-	lastReadActivityTime int64
+	lastReadActivityTime atomic.Int64
 	realStartTime        time.Time
-	net.Conn
-	inactivityTimeout time.Duration
-	activeOnWrite     bool
-	activityUpdaters  []ActivityUpdater
-	lruEntry          *LRUConnsEntry
+	inactivityTimeout    time.Duration
+	activeOnWrite        bool
+	activityUpdaters     []ActivityUpdater
+	lruEntry             *LRUConnsEntry
 }
 
 // ActivityUpdater defines an interface for receiving updates for
@@ -88,16 +85,17 @@ func NewActivityMonitoredConn(
 
 	now := int64(monotime.Now())
 
-	return &ActivityMonitoredConn{
-		Conn:                 conn,
-		inactivityTimeout:    inactivityTimeout,
-		activeOnWrite:        activeOnWrite,
-		realStartTime:        time.Now(),
-		monotonicStartTime:   now,
-		lastReadActivityTime: now,
-		lruEntry:             lruEntry,
-		activityUpdaters:     activityUpdaters,
-	}, nil
+	activityConn := &ActivityMonitoredConn{
+		Conn:               conn,
+		inactivityTimeout:  inactivityTimeout,
+		activeOnWrite:      activeOnWrite,
+		realStartTime:      time.Now(),
+		monotonicStartTime: now,
+		lruEntry:           lruEntry,
+		activityUpdaters:   activityUpdaters,
+	}
+	activityConn.lastReadActivityTime.Store(now)
+	return activityConn, nil
 }
 
 // GetStartTime gets the time when the ActivityMonitoredConn was initialized.
@@ -110,7 +108,7 @@ func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
 // the ActivityMonitoredConn and the last Read. Only reads are used for this
 // calculation since writes may succeed locally due to buffering.
 func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
-	return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.monotonicStartTime)
+	return time.Duration(conn.lastReadActivityTime.Load() - conn.monotonicStartTime)
 }
 
 func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
@@ -124,10 +122,10 @@ func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
 			}
 		}
 
-		lastReadActivityTime := atomic.LoadInt64(&conn.lastReadActivityTime)
+		lastReadActivityTime := conn.lastReadActivityTime.Load()
 		readActivityTime := int64(monotime.Now())
 
-		atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
+		conn.lastReadActivityTime.Store(readActivityTime)
 
 		for _, activityUpdater := range conn.activityUpdaters {
 			activityUpdater.UpdateProgress(

+ 108 - 24
psiphon/common/inproxy/broker.go

@@ -35,6 +35,7 @@ import (
 	"github.com/cespare/xxhash"
 	lrucache "github.com/cognusion/go-cache-lru"
 	"github.com/fxamacker/cbor/v2"
+	"golang.org/x/time/rate"
 )
 
 const (
@@ -52,6 +53,9 @@ const (
 	brokerPendingServerReportsTTL     = 60 * time.Second
 	brokerPendingServerReportsMaxSize = 100000
 	brokerMetricName                  = "inproxy_broker"
+
+	brokerRateLimiterReapHistoryFrequencySeconds = 300
+	brokerRateLimiterMaxCacheEntries             = 1000000
 )
 
 // LookupGeoIP is a callback for providing GeoIP lookup service.
@@ -100,15 +104,18 @@ type Broker struct {
 	commonCompartmentsMutex sync.Mutex
 	commonCompartments      *consistent.Consistent
 
-	proxyAnnounceTimeout       int64
-	clientOfferTimeout         int64
-	clientOfferPersonalTimeout int64
-	pendingServerReportsTTL    int64
+	proxyAnnounceTimeout       atomic.Int64
+	clientOfferTimeout         atomic.Int64
+	clientOfferPersonalTimeout atomic.Int64
+	pendingServerReportsTTL    atomic.Int64
 	maxRequestTimeouts         atomic.Value
-	maxCompartmentIDs          int64
+	maxCompartmentIDs          atomic.Int64
 
 	enableProxyQualityMutex sync.Mutex
 	enableProxyQuality      atomic.Bool
+
+	dslRequestRateLimiters    *lrucache.Cache
+	dslRequestRateLimitParams atomic.Value
 }
 
 // BrokerConfig specifies the configuration for a Broker.
@@ -226,6 +233,10 @@ type BrokerConfig struct {
 	MatcherOfferRateLimitQuantity int
 	MatcherOfferRateLimitInterval time.Duration
 
+	// DSL request relay rate limit configuration.
+	DSLRequestRateLimitQuantity int
+	DSLRequestRateLimitInterval time.Duration
+
 	// MaxCompartmentIDs specifies the maximum number of compartment IDs that
 	// can be included, per list, in one request. If 0, the value
 	// MaxCompartmentIDs is used.
@@ -306,12 +317,10 @@ func NewBroker(config *BrokerConfig) (*Broker, error) {
 
 		proxyQualityState: proxyQuality,
 
-		proxyAnnounceTimeout:       int64(config.ProxyAnnounceTimeout),
-		clientOfferTimeout:         int64(config.ClientOfferTimeout),
-		clientOfferPersonalTimeout: int64(config.ClientOfferPersonalTimeout),
-		pendingServerReportsTTL:    int64(config.PendingServerReportsTTL),
-
-		maxCompartmentIDs: int64(common.ValueOrDefault(config.MaxCompartmentIDs, MaxCompartmentIDs)),
+		dslRequestRateLimiters: lrucache.NewWithLRU(
+			0,
+			time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
+			brokerRateLimiterMaxCacheEntries),
 	}
 
 	b.pendingServerReports = lrucache.NewWithLRU(
@@ -319,6 +328,20 @@ func NewBroker(config *BrokerConfig) (*Broker, error) {
 		1*time.Minute,
 		brokerPendingServerReportsMaxSize)
 
+	b.proxyAnnounceTimeout.Store(int64(config.ProxyAnnounceTimeout))
+	b.clientOfferTimeout.Store(int64(config.ClientOfferTimeout))
+	b.clientOfferPersonalTimeout.Store(int64(config.ClientOfferPersonalTimeout))
+	b.pendingServerReportsTTL.Store(int64(config.PendingServerReportsTTL))
+
+	b.maxCompartmentIDs.Store(
+		int64(common.ValueOrDefault(config.MaxCompartmentIDs, MaxCompartmentIDs)))
+
+	b.dslRequestRateLimitParams.Store(
+		&brokerRateLimitParams{
+			quantity: config.DSLRequestRateLimitQuantity,
+			interval: config.DSLRequestRateLimitInterval,
+		})
+
 	if len(config.CommonCompartmentIDs) > 0 {
 		err = b.initializeCommonCompartmentIDHashing(config.CommonCompartmentIDs)
 		if err != nil {
@@ -365,10 +388,10 @@ func (b *Broker) SetTimeouts(
 	pendingServerReportsTTL time.Duration,
 	maxRequestTimeouts map[string]time.Duration) {
 
-	atomic.StoreInt64(&b.proxyAnnounceTimeout, int64(proxyAnnounceTimeout))
-	atomic.StoreInt64(&b.clientOfferTimeout, int64(clientOfferTimeout))
-	atomic.StoreInt64(&b.clientOfferPersonalTimeout, int64(clientOfferPersonalTimeout))
-	atomic.StoreInt64(&b.pendingServerReportsTTL, int64(pendingServerReportsTTL))
+	b.proxyAnnounceTimeout.Store(int64(proxyAnnounceTimeout))
+	b.clientOfferTimeout.Store(int64(clientOfferTimeout))
+	b.clientOfferPersonalTimeout.Store(int64(clientOfferPersonalTimeout))
+	b.pendingServerReportsTTL.Store(int64(pendingServerReportsTTL))
 	b.maxRequestTimeouts.Store(maxRequestTimeouts)
 }
 
@@ -383,7 +406,9 @@ func (b *Broker) SetLimits(
 	matcherOfferLimitEntryCount int,
 	matcherOfferRateLimitQuantity int,
 	matcherOfferRateLimitInterval time.Duration,
-	maxCompartmentIDs int) {
+	maxCompartmentIDs int,
+	dslRequestRateLimitQuantity int,
+	dslRequestRateLimitInterval time.Duration) {
 
 	b.matcher.SetLimits(
 		matcherAnnouncementLimitEntryCount,
@@ -394,9 +419,14 @@ func (b *Broker) SetLimits(
 		matcherOfferRateLimitQuantity,
 		matcherOfferRateLimitInterval)
 
-	atomic.StoreInt64(
-		&b.maxCompartmentIDs,
+	b.maxCompartmentIDs.Store(
 		int64(common.ValueOrDefault(maxCompartmentIDs, MaxCompartmentIDs)))
+
+	b.dslRequestRateLimitParams.Store(
+		&brokerRateLimitParams{
+			quantity: dslRequestRateLimitQuantity,
+			interval: dslRequestRateLimitInterval,
+		})
 }
 
 func (b *Broker) SetProxyQualityParameters(
@@ -666,7 +696,7 @@ func (b *Broker) handleProxyAnnounce(
 
 	var apiParams common.APIParameters
 	apiParams, logFields, err = announceRequest.ValidateAndGetParametersAndLogFields(
-		int(atomic.LoadInt64(&b.maxCompartmentIDs)),
+		int(b.maxCompartmentIDs.Load()),
 		b.config.APIParameterValidator,
 		b.config.APIParameterLogFieldFormatter,
 		geoIPData)
@@ -807,7 +837,7 @@ func (b *Broker) handleProxyAnnounce(
 	// Await client offer.
 
 	timeout := common.ValueOrDefault(
-		time.Duration(atomic.LoadInt64(&b.proxyAnnounceTimeout)),
+		time.Duration(b.proxyAnnounceTimeout.Load()),
 		brokerProxyAnnounceTimeout)
 
 	// Adjust the timeout to respect any shorter maximum request timeouts for
@@ -1038,7 +1068,7 @@ func (b *Broker) handleClientOffer(
 
 	var filteredSDP []byte
 	filteredSDP, logFields, err = offerRequest.ValidateAndGetLogFields(
-		int(atomic.LoadInt64(&b.maxCompartmentIDs)),
+		int(b.maxCompartmentIDs.Load()),
 		b.config.LookupGeoIP,
 		b.config.APIParameterValidator,
 		b.config.APIParameterLogFieldFormatter,
@@ -1111,9 +1141,9 @@ func (b *Broker) handleClientOffer(
 	// resulting broker rotation.
 	var timeout time.Duration
 	if hasPersonalCompartmentIDs {
-		timeout = time.Duration(atomic.LoadInt64(&b.clientOfferPersonalTimeout))
+		timeout = time.Duration(b.clientOfferPersonalTimeout.Load())
 	} else {
-		timeout = time.Duration(atomic.LoadInt64(&b.clientOfferTimeout))
+		timeout = time.Duration(b.clientOfferTimeout.Load())
 	}
 	timeout = common.ValueOrDefault(timeout, brokerClientOfferTimeout)
 
@@ -1680,6 +1710,26 @@ func (b *Broker) handleClientDSL(
 		}
 	}()
 
+	// Rate limit the number of relayed DSL requests. The DSL backend has its
+	// own rate limit enforcement, but avoiding excess requests here saves on
+	// resources consumed between the relay and backend.
+	//
+	// Unlike the announce/offer rate limit cases, there's no "limited" error
+	// flag returned to the client in this case, since this rate limiter is
+	// purely for abuse prevention and is expected to be configured with
+	// limits that won't be exceeded by legitimate clients.
+
+	rateLimitParams := b.dslRequestRateLimitParams.Load().(*brokerRateLimitParams)
+	err := brokerRateLimit(
+		b.dslRequestRateLimiters,
+		clientIP,
+		rateLimitParams.quantity,
+		rateLimitParams.interval)
+	if err != nil {
+		return nil, errors.Trace(err)
+
+	}
+
 	dslRequest, err := UnmarshalClientDSLRequest(requestPayload)
 	if err != nil {
 		return nil, errors.Trace(err)
@@ -1790,7 +1840,7 @@ func (b *Broker) initiateRelayedServerReport(
 			serverReport: serverReport,
 			roundTrip:    roundTrip,
 		},
-		time.Duration(atomic.LoadInt64(&b.pendingServerReportsTTL)))
+		time.Duration(b.pendingServerReportsTTL.Load()))
 
 	return relayPacket, nil
 }
@@ -2038,3 +2088,37 @@ func (b *Broker) selectCommonCompartmentID(proxyID ID) (ID, error) {
 
 	return compartmentID, nil
 }
+
+type brokerRateLimitParams struct {
+	quantity int
+	interval time.Duration
+}
+
+func brokerRateLimit(
+	rateLimiters *lrucache.Cache,
+	limitIP string,
+	quantity int,
+	interval time.Duration) error {
+
+	if quantity <= 0 || interval <= 0 {
+		return nil
+	}
+
+	var rateLimiter *rate.Limiter
+
+	entry, ok := rateLimiters.Get(limitIP)
+	if ok {
+		rateLimiter = entry.(*rate.Limiter)
+	} else {
+		limit := float64(quantity) / interval.Seconds()
+		rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
+		rateLimiters.Set(
+			limitIP, rateLimiter, interval)
+	}
+
+	if !rateLimiter.Allow() {
+		return errors.TraceNew("rate exceeded for IP")
+	}
+
+	return nil
+}

+ 15 - 27
psiphon/common/inproxy/matcher.go

@@ -30,7 +30,6 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	lrucache "github.com/cognusion/go-cache-lru"
-	"golang.org/x/time/rate"
 )
 
 // TTLs should be aligned with STUN hole punch lifetimes.
@@ -39,12 +38,9 @@ const (
 	matcherAnnouncementQueueMaxSize = 5000000
 	matcherOfferQueueMaxSize        = 5000000
 	matcherPendingAnswersTTL        = 30 * time.Second
-	matcherPendingAnswersMaxSize    = 100000
+	matcherPendingAnswersMaxSize    = 5000000
 	matcherMaxPreferredNATProbe     = 100
 	matcherMaxProbe                 = 1000
-
-	matcherRateLimiterReapHistoryFrequencySeconds = 300
-	matcherRateLimiterMaxCacheEntries             = 1000000
 )
 
 // Matcher matches proxy announcements with client offers. Matcher also
@@ -228,6 +224,7 @@ type MatchMetrics struct {
 	OfferQueueSize         int
 	AnnouncementMatchIndex int
 	AnnouncementQueueSize  int
+	PendingAnswersSize     int
 }
 
 // GetMetrics converts MatchMetrics to loggable fields.
@@ -240,6 +237,7 @@ func (metrics *MatchMetrics) GetMetrics() common.LogFields {
 		"offer_queue_size":         metrics.OfferQueueSize,
 		"announcement_match_index": metrics.AnnouncementMatchIndex,
 		"announcement_queue_size":  metrics.AnnouncementQueueSize,
+		"pending_answers_size":     metrics.PendingAnswersSize,
 	}
 }
 
@@ -306,15 +304,15 @@ func NewMatcher(config *MatcherConfig) *Matcher {
 		announcementQueueEntryCountByIP: make(map[string]int),
 		announcementQueueRateLimiters: lrucache.NewWithLRU(
 			0,
-			time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
-			matcherRateLimiterMaxCacheEntries),
+			time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
+			brokerRateLimiterMaxCacheEntries),
 
 		offerQueue:               list.New(),
 		offerQueueEntryCountByIP: make(map[string]int),
 		offerQueueRateLimiters: lrucache.NewWithLRU(
 			0,
-			time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
-			matcherRateLimiterMaxCacheEntries),
+			time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
+			brokerRateLimiterMaxCacheEntries),
 
 		matchSignal: make(chan struct{}, 1),
 
@@ -704,6 +702,7 @@ func (m *Matcher) matchAllOffers() {
 			OfferQueueSize:         m.offerQueue.Len(),
 			AnnouncementMatchIndex: announcementMatchIndex,
 			AnnouncementQueueSize:  m.announcementQueue.getLen(),
+			PendingAnswersSize:     m.pendingAnswers.ItemCount(),
 		}
 
 		offerEntry.matchMetrics.Store(matchMetrics)
@@ -1061,24 +1060,13 @@ func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID)
 	// that the rate limit state is updated regardless of the max count check
 	// outcome.
 
-	if quantity > 0 && interval > 0 {
-
-		var rateLimiter *rate.Limiter
-
-		entry, ok := queueRateLimiters.Get(limitIP)
-		if ok {
-			rateLimiter = entry.(*rate.Limiter)
-		} else {
-			limit := float64(quantity) / interval.Seconds()
-			rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
-			queueRateLimiters.Set(
-				limitIP, rateLimiter, interval)
-		}
-
-		if !rateLimiter.Allow() {
-			return errors.Trace(
-				NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
-		}
+	err := brokerRateLimit(
+		queueRateLimiters,
+		limitIP,
+		quantity,
+		interval)
+	if err != nil {
+		return errors.Trace(NewMatcherLimitError(err))
 	}
 
 	if limitEntryCount > 0 {

+ 13 - 16
psiphon/common/inproxy/proxy.go

@@ -46,13 +46,10 @@ const (
 // Proxy is the in-proxy proxying component, which relays traffic from a
 // client to a Psiphon server.
 type Proxy struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	bytesUp           int64
-	bytesDown         int64
-	peakBytesUp       int64
-	peakBytesDown     int64
+	bytesUp           atomic.Int64
+	bytesDown         atomic.Int64
+	peakBytesUp       atomic.Int64
+	peakBytesDown     atomic.Int64
 	connectingClients int32
 	connectedClients  int32
 
@@ -189,8 +186,8 @@ type activityUpdateWrapper struct {
 }
 
 func (w *activityUpdateWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
-	atomic.AddInt64(&w.p.bytesUp, bytesWritten)
-	atomic.AddInt64(&w.p.bytesDown, bytesRead)
+	w.p.bytesUp.Add(bytesWritten)
+	w.p.bytesDown.Add(bytesRead)
 }
 
 // Run runs the proxy. The proxy sends requests to the Broker announcing its
@@ -297,8 +294,8 @@ func (p *Proxy) activityUpdate(period time.Duration) {
 
 	connectingClients := atomic.LoadInt32(&p.connectingClients)
 	connectedClients := atomic.LoadInt32(&p.connectedClients)
-	bytesUp := atomic.SwapInt64(&p.bytesUp, 0)
-	bytesDown := atomic.SwapInt64(&p.bytesDown, 0)
+	bytesUp := p.bytesUp.Swap(0)
+	bytesDown := p.bytesDown.Swap(0)
 
 	greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
 	greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
@@ -324,14 +321,14 @@ func (p *Proxy) activityUpdate(period time.Duration) {
 		period)
 }
 
-func greaterThanSwapInt64(addr *int64, new int64) bool {
+func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
 
 	// Limitation: if there are two concurrent calls, the greater value could
 	// get overwritten.
 
-	old := atomic.LoadInt64(addr)
+	old := addr.Load()
 	if new > old {
-		return atomic.CompareAndSwapInt64(addr, old, new)
+		return addr.CompareAndSwap(old, new)
 	}
 	return false
 }
@@ -1048,7 +1045,7 @@ func (p *Proxy) getMetrics(
 		ConnectedClients:              atomic.LoadInt32(&p.connectedClients),
 		LimitUpstreamBytesPerSecond:   int64(p.config.LimitUpstreamBytesPerSecond),
 		LimitDownstreamBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
-		PeakUpstreamBytesPerSecond:    atomic.LoadInt64(&p.peakBytesUp),
-		PeakDownstreamBytesPerSecond:  atomic.LoadInt64(&p.peakBytesDown),
+		PeakUpstreamBytesPerSecond:    p.peakBytesUp.Load(),
+		PeakDownstreamBytesPerSecond:  p.peakBytesDown.Load(),
 	}, tacticsNetworkID, compressTactics, nil
 }

+ 17 - 17
psiphon/common/inproxy/quality.go

@@ -288,10 +288,10 @@ type ProxyQualityReporter struct {
 
 	brokerPublicKeys             atomic.Value
 	brokerRootObfuscationSecrets atomic.Value
-	requestDelay                 int64
-	maxRequestEntries            int64
-	requestTimeout               int64
-	requestRetries               int64
+	requestDelay                 atomic.Int64
+	maxRequestEntries            atomic.Int64
+	requestTimeout               atomic.Int64
+	requestRetries               atomic.Int64
 
 	signalReport chan struct{}
 }
@@ -351,11 +351,6 @@ func NewProxyQualityReporter(
 
 		waitGroup: new(sync.WaitGroup),
 
-		requestDelay:      int64(proxyQualityReporterRequestDelay),
-		maxRequestEntries: proxyQualityReporterMaxRequestEntries,
-		requestTimeout:    int64(proxyQualityReporterRequestTimeout),
-		requestRetries:    proxyQualityReporterRequestRetries,
-
 		reportQueue:       list.New(),
 		proxyIDQueueEntry: make(map[ProxyQualityKey]*list.Element),
 
@@ -367,6 +362,11 @@ func NewProxyQualityReporter(
 		return nil, errors.Trace(err)
 	}
 
+	r.requestDelay.Store(int64(proxyQualityReporterRequestDelay))
+	r.maxRequestEntries.Store(int64(proxyQualityReporterMaxRequestEntries))
+	r.requestTimeout.Store(int64(proxyQualityReporterRequestTimeout))
+	r.requestRetries.Store(int64(proxyQualityReporterRequestRetries))
+
 	return r, nil
 }
 
@@ -392,10 +392,10 @@ func (r *ProxyQualityReporter) SetRequestParameters(
 	requestTimeout time.Duration,
 	requestRetries int) {
 
-	atomic.StoreInt64(&r.requestDelay, int64(requestDelay))
-	atomic.StoreInt64(&r.maxRequestEntries, int64(maxRequestEntries))
-	atomic.StoreInt64(&r.requestTimeout, int64(requestTimeout))
-	atomic.StoreInt64(&r.requestRetries, int64(requestRetries))
+	r.requestDelay.Store(int64(requestDelay))
+	r.maxRequestEntries.Store(int64(maxRequestEntries))
+	r.requestTimeout.Store(int64(requestTimeout))
+	r.requestRetries.Store(int64(requestRetries))
 }
 
 // Start launches the request workers.
@@ -499,7 +499,7 @@ func (r *ProxyQualityReporter) requestScheduler(ctx context.Context) {
 		// Delay, for a brief moment, sending requests in an effort to batch
 		// up more data for the requests.
 
-		requestDelay := time.Duration(atomic.LoadInt64(&r.requestDelay))
+		requestDelay := time.Duration(r.requestDelay.Load())
 		if requestDelay > 0 {
 
 			// TODO: SleepWithContext creates and discards a timer per call;
@@ -565,7 +565,7 @@ func (r *ProxyQualityReporter) prepareNextRequest() ProxyQualityRequestCounts {
 	// different client ASN counts per entry. In practice, there shouldn't be
 	// an excessive number of client ASNs.
 
-	for queueEntry != nil && int64(len(counts)) < atomic.LoadInt64(&r.maxRequestEntries) {
+	for queueEntry != nil && int64(len(counts)) < r.maxRequestEntries.Load() {
 
 		entry := queueEntry.Value.(proxyQualityReportQueueEntry)
 
@@ -721,7 +721,7 @@ func (r *ProxyQualityReporter) sendToBrokers(
 		go func(brokerClient *serverBrokerClient) {
 			defer sendWaitGroup.Done()
 
-			retries := int(atomic.LoadInt64(&r.requestRetries))
+			retries := int(r.requestRetries.Load())
 			for i := 0; i <= retries; i++ {
 				err := r.sendBrokerRequest(ctx, brokerClient, requestCounts)
 				if err != nil {
@@ -778,7 +778,7 @@ func (r *ProxyQualityReporter) sendBrokerRequest(
 	brokerClient *serverBrokerClient,
 	requestCounts ProxyQualityRequestCounts) error {
 
-	requestTimeout := time.Duration(atomic.LoadInt64(&r.requestTimeout))
+	requestTimeout := time.Duration(r.requestTimeout.Load())
 
 	// While the request payload, requestCounts, is the same for every broker,
 	// each broker round tripper may have different dial parameters, so each

+ 42 - 28
psiphon/common/osl/osl.go

@@ -186,6 +186,34 @@ type TrafficValues struct {
 	PortForwardDurationNanoseconds int64
 }
 
+type trafficCounters struct {
+	bytesRead                      atomic.Int64
+	bytesWritten                   atomic.Int64
+	portForwardDurationNanoseconds atomic.Int64
+}
+
+func newTrafficCounters() *trafficCounters {
+	return &trafficCounters{}
+}
+
+func (c *trafficCounters) add(bytesRead, bytesWritten, durationNanoseconds int64) {
+	c.bytesRead.Add(bytesRead)
+	c.bytesWritten.Add(bytesWritten)
+	c.portForwardDurationNanoseconds.Add(durationNanoseconds)
+}
+
+func (c *trafficCounters) exceeds(target *TrafficValues) bool {
+	return c.bytesRead.Load() >= target.BytesRead &&
+		c.bytesWritten.Load() >= target.BytesWritten &&
+		c.portForwardDurationNanoseconds.Load() >= target.PortForwardDurationNanoseconds
+}
+
+func (c *trafficCounters) reset() {
+	c.bytesRead.Store(0)
+	c.bytesWritten.Store(0)
+	c.portForwardDurationNanoseconds.Store(0)
+}
+
 // KeySplit defines a secret key splitting scheme where the secret is split
 // into n (total) shares and any K (threshold) of N shares must be known
 // to recostruct the split secret.
@@ -208,12 +236,9 @@ type ClientSeedState struct {
 // ClientSeedProgress tracks client progress towards seeding SLOKs for
 // a particular scheme.
 type ClientSeedProgress struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	progressSLOKTime int64
+	progressSLOKTime atomic.Int64
 	scheme           *Scheme
-	trafficProgress  []*TrafficValues
+	trafficProgress  []*trafficCounters
 }
 
 // ClientSeedPortForward map a client port forward, which is relaying
@@ -413,17 +438,17 @@ func (config *Config) NewClientSeedState(
 
 			// Empty progress is initialized up front for all seed specs. Once
 			// created, the progress structure is read-only (the slice, not the
-			// TrafficValue fields); this permits lock-free operation.
-			trafficProgress := make([]*TrafficValues, len(scheme.SeedSpecs))
+			// counter fields); this permits lock-free operation.
+			trafficProgress := make([]*trafficCounters, len(scheme.SeedSpecs))
 			for index := 0; index < len(scheme.SeedSpecs); index++ {
-				trafficProgress[index] = &TrafficValues{}
+				trafficProgress[index] = newTrafficCounters()
 			}
 
 			seedProgress := &ClientSeedProgress{
-				scheme:           scheme,
-				progressSLOKTime: getSLOKTime(scheme.SeedPeriodNanoseconds),
-				trafficProgress:  trafficProgress,
+				scheme:          scheme,
+				trafficProgress: trafficProgress,
 			}
+			seedProgress.progressSLOKTime.Store(getSLOKTime(scheme.SeedPeriodNanoseconds))
 
 			state.seedProgress = append(state.seedProgress, seedProgress)
 		}
@@ -585,7 +610,7 @@ func (portForward *ClientSeedPortForward) UpdateProgress(
 		// As it acquires the state mutex, issueSLOKs may stall other port
 		// forwards for this client. The delay is minimized by SLOK caching,
 		// which avoids redundant crypto operations.
-		if slokTime != atomic.LoadInt64(&seedProgress.progressSLOKTime) {
+		if slokTime != seedProgress.progressSLOKTime.Load() {
 			portForward.state.mutex.Lock()
 			portForward.state.issueSLOKs()
 			portForward.state.mutex.Unlock()
@@ -608,9 +633,7 @@ func (portForward *ClientSeedPortForward) UpdateProgress(
 
 		alreadyExceedsTargets := trafficProgress.exceeds(&seedSpec.Targets)
 
-		atomic.AddInt64(&trafficProgress.BytesRead, bytesRead)
-		atomic.AddInt64(&trafficProgress.BytesWritten, bytesWritten)
-		atomic.AddInt64(&trafficProgress.PortForwardDurationNanoseconds, durationNanoseconds)
+		trafficProgress.add(bytesRead, bytesWritten, durationNanoseconds)
 
 		// With the target newly met for a SeedSpec, a new
 		// SLOK *may* be issued.
@@ -620,13 +643,6 @@ func (portForward *ClientSeedPortForward) UpdateProgress(
 	}
 }
 
-func (lhs *TrafficValues) exceeds(rhs *TrafficValues) bool {
-	return atomic.LoadInt64(&lhs.BytesRead) >= atomic.LoadInt64(&rhs.BytesRead) &&
-		atomic.LoadInt64(&lhs.BytesWritten) >= atomic.LoadInt64(&rhs.BytesWritten) &&
-		atomic.LoadInt64(&lhs.PortForwardDurationNanoseconds) >=
-			atomic.LoadInt64(&rhs.PortForwardDurationNanoseconds)
-}
-
 // issueSLOKs checks client progress against each candidate seed spec
 // and seeds SLOKs when the client traffic levels are achieved. After
 // checking progress, and if the SLOK time period has changed since
@@ -645,7 +661,7 @@ func (state *ClientSeedState) issueSLOKs() {
 
 	for _, seedProgress := range state.seedProgress {
 
-		progressSLOKTime := time.Unix(0, seedProgress.progressSLOKTime)
+		progressSLOKTime := time.Unix(0, seedProgress.progressSLOKTime.Load())
 
 		for index, trafficProgress := range seedProgress.trafficProgress {
 
@@ -680,15 +696,13 @@ func (state *ClientSeedState) issueSLOKs() {
 
 		slokTime := getSLOKTime(seedProgress.scheme.SeedPeriodNanoseconds)
 
-		if slokTime != atomic.LoadInt64(&seedProgress.progressSLOKTime) {
-			atomic.StoreInt64(&seedProgress.progressSLOKTime, slokTime)
+		if slokTime != seedProgress.progressSLOKTime.Load() {
+			seedProgress.progressSLOKTime.Store(slokTime)
 			// The progress map structure is not reset or modifed; instead
 			// the mapped accumulator values are zeroed. Concurrently, port
 			// forward relay goroutines continue to add to these accumulators.
 			for _, trafficProgress := range seedProgress.trafficProgress {
-				atomic.StoreInt64(&trafficProgress.BytesRead, 0)
-				atomic.StoreInt64(&trafficProgress.BytesWritten, 0)
-				atomic.StoreInt64(&trafficProgress.PortForwardDurationNanoseconds, 0)
+				trafficProgress.reset()
 			}
 		}
 	}

+ 4 - 0
psiphon/common/parameters/parameters.go

@@ -434,6 +434,8 @@ const (
 	InproxyBrokerClientOfferTimeout                    = "InproxyBrokerClientOfferTimeout"
 	InproxyBrokerClientOfferPersonalTimeout            = "InproxyBrokerClientOfferPersonalTimeout"
 	InproxyBrokerPendingServerRequestsTTL              = "InproxyBrokerPendingServerRequestsTTL"
+	InproxyBrokerDSLRequestRateLimitQuantity           = "InproxyBrokerDSLRequestRateLimitQuantity"
+	InproxyBrokerDSLRequestRateLimitInterval           = "InproxyBrokerDSLRequestRateLimitInterval"
 	InproxySessionHandshakeRoundTripTimeout            = "InproxySessionHandshakeRoundTripTimeout"
 	InproxyProxyAnnounceRequestTimeout                 = "InproxyProxyAnnounceRequestTimeout"
 	InproxyProxyAnnounceDelay                          = "InproxyProxyAnnounceDelay"
@@ -1054,6 +1056,8 @@ var defaultParameters = map[string]struct {
 	InproxyBrokerClientOfferTimeout:                    {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferPersonalTimeout:            {value: 5 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerPendingServerRequestsTTL:              {value: 60 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyBrokerDSLRequestRateLimitQuantity:           {value: 20, minimum: 0, flags: serverSideOnly},
+	InproxyBrokerDSLRequestRateLimitInterval:           {value: 1 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxySessionHandshakeRoundTripTimeout:            {value: 10 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyProxyAnnounceRequestTimeout:                 {value: 2*time.Minute + 10*time.Second, minimum: time.Duration(0)},
 	InproxyProxyAnnounceDelay:                          {value: 100 * time.Millisecond, minimum: time.Duration(0)},

+ 15 - 18
psiphon/common/throttled.go

@@ -63,13 +63,11 @@ type RateLimits struct {
 // The underlying rate limiter uses the token bucket algorithm to
 // calculate delay times for read and write operations.
 type ThrottledConn struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	readUnthrottledBytes  int64
-	readBytesPerSecond    int64
-	writeUnthrottledBytes int64
-	writeBytesPerSecond   int64
+	net.Conn
+	readUnthrottledBytes  atomic.Int64
+	readBytesPerSecond    atomic.Int64
+	writeUnthrottledBytes atomic.Int64
+	writeBytesPerSecond   atomic.Int64
 	closeAfterExhausted   int32
 	readLock              sync.Mutex
 	readRateLimiter       *rate.Limiter
@@ -80,7 +78,6 @@ type ThrottledConn struct {
 	isClosed              int32
 	stopBroadcast         chan struct{}
 	isStream              bool
-	net.Conn
 }
 
 // NewThrottledConn initializes a new ThrottledConn.
@@ -120,15 +117,15 @@ func (conn *ThrottledConn) SetLimits(limits RateLimits) {
 	if rate < 0 {
 		rate = 0
 	}
-	atomic.StoreInt64(&conn.readBytesPerSecond, rate)
-	atomic.StoreInt64(&conn.readUnthrottledBytes, limits.ReadUnthrottledBytes)
+	conn.readBytesPerSecond.Store(rate)
+	conn.readUnthrottledBytes.Store(limits.ReadUnthrottledBytes)
 
 	rate = limits.WriteBytesPerSecond
 	if rate < 0 {
 		rate = 0
 	}
-	atomic.StoreInt64(&conn.writeBytesPerSecond, rate)
-	atomic.StoreInt64(&conn.writeUnthrottledBytes, limits.WriteUnthrottledBytes)
+	conn.writeBytesPerSecond.Store(rate)
+	conn.writeUnthrottledBytes.Store(limits.WriteUnthrottledBytes)
 
 	closeAfterExhausted := int32(0)
 	if limits.CloseAfterExhausted {
@@ -153,9 +150,9 @@ func (conn *ThrottledConn) Read(buffer []byte) (int, error) {
 	// exhausted. This is only an approximate enforcement
 	// since this read, or concurrent reads, could exceed
 	// the remaining count.
-	if atomic.LoadInt64(&conn.readUnthrottledBytes) > 0 {
+	if conn.readUnthrottledBytes.Load() > 0 {
 		n, err := conn.Conn.Read(buffer)
-		atomic.AddInt64(&conn.readUnthrottledBytes, -int64(n))
+		conn.readUnthrottledBytes.Add(-int64(n))
 		return n, err
 	}
 
@@ -164,7 +161,7 @@ func (conn *ThrottledConn) Read(buffer []byte) (int, error) {
 		return 0, errors.TraceNew("throttled conn exhausted")
 	}
 
-	readRate := atomic.SwapInt64(&conn.readBytesPerSecond, -1)
+	readRate := conn.readBytesPerSecond.Swap(-1)
 
 	if readRate != -1 {
 		// SetLimits has been called and a new rate limiter
@@ -257,9 +254,9 @@ func (conn *ThrottledConn) Write(buffer []byte) (int, error) {
 		return 0, errors.TraceNew("throttled conn closed")
 	}
 
-	if atomic.LoadInt64(&conn.writeUnthrottledBytes) > 0 {
+	if conn.writeUnthrottledBytes.Load() > 0 {
 		n, err := conn.Conn.Write(buffer)
-		atomic.AddInt64(&conn.writeUnthrottledBytes, -int64(n))
+		conn.writeUnthrottledBytes.Add(-int64(n))
 		return n, err
 	}
 
@@ -268,7 +265,7 @@ func (conn *ThrottledConn) Write(buffer []byte) (int, error) {
 		return 0, errors.TraceNew("throttled conn exhausted")
 	}
 
-	writeRate := atomic.SwapInt64(&conn.writeBytesPerSecond, -1)
+	writeRate := conn.writeBytesPerSecond.Swap(-1)
 
 	if writeRate != -1 {
 		if writeRate == 0 {

+ 45 - 51
psiphon/common/tun/tun.go

@@ -460,7 +460,6 @@ func (server *Server) ClientConnected(
 
 		clientSession = &session{
 			allowBogons:              server.config.AllowBogons,
-			lastActivity:             int64(monotime.Now()),
 			sessionID:                sessionID,
 			metrics:                  new(packetMetrics),
 			enableDNSFlowTracking:    server.config.EnableDNSFlowTracking,
@@ -468,6 +467,7 @@ func (server *Server) ClientConnected(
 			DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
 			workers:                  new(sync.WaitGroup),
 		}
+		clientSession.lastActivity.Store(int64(monotime.Now()))
 
 		// One-time, for this session, random resolver selection for TCP transparent
 		// DNS forwarding. See comment in processPacket.
@@ -1104,11 +1104,8 @@ func (server *Server) convertIndexToIPv6Address(index int32) net.IP {
 }
 
 type session struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	lastActivity             int64
-	lastFlowReapIndex        int64
+	lastActivity             atomic.Int64
+	lastFlowReapIndex        atomic.Int64
 	downstreamPackets        unsafe.Pointer
 	checkAllowedTCPPortFunc  unsafe.Pointer
 	checkAllowedUDPPortFunc  unsafe.Pointer
@@ -1141,11 +1138,11 @@ type session struct {
 }
 
 func (session *session) touch() {
-	atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
+	session.lastActivity.Store(int64(monotime.Now()))
 }
 
 func (session *session) expired(idleExpiry time.Duration) bool {
-	lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
+	lastActivity := monotime.Time(session.lastActivity.Load())
 	return monotime.Since(lastActivity) > idleExpiry
 }
 
@@ -1300,13 +1297,10 @@ func (f *flowID) set(
 }
 
 type flowState struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	firstUpstreamPacketTime   int64
-	lastUpstreamPacketTime    int64
-	firstDownstreamPacketTime int64
-	lastDownstreamPacketTime  int64
+	firstUpstreamPacketTime   atomic.Int64
+	lastUpstreamPacketTime    atomic.Int64
+	firstDownstreamPacketTime atomic.Int64
+	lastDownstreamPacketTime  atomic.Int64
 	isDNS                     bool
 	dnsQualityReporter        DNSQualityReporter
 	activityUpdaters          []FlowActivityUpdater
@@ -1319,8 +1313,8 @@ func (flowState *flowState) expired(idleExpiry time.Duration) bool {
 	// lastUpstreamPacketTime or lastDownstreamPacketTime will be set by
 	// startTrackingFlow, and the other value will be 0 and evaluate as expired.
 
-	return (now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastUpstreamPacketTime))) > idleExpiry) &&
-		(now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastDownstreamPacketTime))) > idleExpiry)
+	return (now.Sub(monotime.Time(flowState.lastUpstreamPacketTime.Load())) > idleExpiry) &&
+		(now.Sub(monotime.Time(flowState.lastDownstreamPacketTime.Load())) > idleExpiry)
 }
 
 // isTrackingFlow checks if a flow is being tracked.
@@ -1378,9 +1372,9 @@ func (session *session) startTrackingFlow(
 
 	// Once every period, iterate over flows and reap expired entries.
 	reapIndex := now / int64(monotime.Time(FLOW_IDLE_EXPIRY/2))
-	previousReapIndex := atomic.LoadInt64(&session.lastFlowReapIndex)
+	previousReapIndex := session.lastFlowReapIndex.Load()
 	if reapIndex != previousReapIndex &&
-		atomic.CompareAndSwapInt64(&session.lastFlowReapIndex, previousReapIndex, reapIndex) {
+		session.lastFlowReapIndex.CompareAndSwap(previousReapIndex, reapIndex) {
 		session.reapFlows()
 	}
 
@@ -1412,11 +1406,11 @@ func (session *session) startTrackingFlow(
 	}
 
 	if direction == packetDirectionServerUpstream {
-		flowState.firstUpstreamPacketTime = now
-		flowState.lastUpstreamPacketTime = now
+		flowState.firstUpstreamPacketTime.Store(now)
+		flowState.lastUpstreamPacketTime.Store(now)
 	} else {
-		flowState.firstDownstreamPacketTime = now
-		flowState.lastDownstreamPacketTime = now
+		flowState.firstDownstreamPacketTime.Store(now)
+		flowState.lastDownstreamPacketTime.Store(now)
 	}
 
 	// LoadOrStore will retain any existing entry
@@ -1445,14 +1439,14 @@ func (session *session) updateFlow(
 	if direction == packetDirectionServerUpstream {
 		upstreamBytes = int64(len(applicationData))
 
-		atomic.CompareAndSwapInt64(&flowState.firstUpstreamPacketTime, 0, now)
+		flowState.firstUpstreamPacketTime.CompareAndSwap(0, now)
 
-		atomic.StoreInt64(&flowState.lastUpstreamPacketTime, now)
+		flowState.lastUpstreamPacketTime.Store(now)
 
 	} else {
 		downstreamBytes = int64(len(applicationData))
 
-		atomic.CompareAndSwapInt64(&flowState.firstDownstreamPacketTime, 0, now)
+		flowState.firstDownstreamPacketTime.CompareAndSwap(0, now)
 
 		// Follows common.ActivityMonitoredConn semantics, where
 		// duration is updated only for downstream activity. This
@@ -1460,7 +1454,7 @@ func (session *session) updateFlow(
 		// forward clients (tracked with ActivityUpdaters) and
 		// packet tunnel clients (tracked with FlowActivityUpdaters).
 
-		durationNanoseconds = now - atomic.SwapInt64(&flowState.lastDownstreamPacketTime, now)
+		durationNanoseconds = now - flowState.lastDownstreamPacketTime.Swap(now)
 	}
 
 	for _, updater := range flowState.activityUpdaters {
@@ -1476,7 +1470,7 @@ func (session *session) deleteFlow(ID flowID, flowState *flowState) {
 	if flowState.isDNS {
 
 		dnsStartTime := monotime.Time(
-			atomic.LoadInt64(&flowState.firstUpstreamPacketTime))
+			flowState.firstUpstreamPacketTime.Load())
 
 		if dnsStartTime > 0 {
 
@@ -1490,7 +1484,7 @@ func (session *session) deleteFlow(ID flowID, flowState *flowState) {
 			// recording of the DNS metric.
 
 			dnsEndTime := monotime.Time(
-				atomic.LoadInt64(&flowState.firstDownstreamPacketTime))
+				flowState.firstDownstreamPacketTime.Load())
 
 			dnsSuccess := true
 			if dnsEndTime == 0 {
@@ -1532,8 +1526,8 @@ func (session *session) deleteFlows() {
 }
 
 type packetMetrics struct {
-	upstreamRejectReasons   [packetRejectReasonCount]int64
-	downstreamRejectReasons [packetRejectReasonCount]int64
+	upstreamRejectReasons   [packetRejectReasonCount]atomic.Int64
+	downstreamRejectReasons [packetRejectReasonCount]atomic.Int64
 	TCPIPv4                 relayedPacketMetrics
 	TCPIPv6                 relayedPacketMetrics
 	UDPIPv4                 relayedPacketMetrics
@@ -1541,12 +1535,12 @@ type packetMetrics struct {
 }
 
 type relayedPacketMetrics struct {
-	packetsUp            int64
-	packetsDown          int64
-	bytesUp              int64
-	bytesDown            int64
-	applicationBytesUp   int64
-	applicationBytesDown int64
+	packetsUp            atomic.Int64
+	packetsDown          atomic.Int64
+	bytesUp              atomic.Int64
+	bytesDown            atomic.Int64
+	applicationBytesUp   atomic.Int64
+	applicationBytesDown atomic.Int64
 }
 
 func (metrics *packetMetrics) rejectedPacket(
@@ -1556,11 +1550,11 @@ func (metrics *packetMetrics) rejectedPacket(
 	if direction == packetDirectionServerUpstream ||
 		direction == packetDirectionClientUpstream {
 
-		atomic.AddInt64(&metrics.upstreamRejectReasons[reason], 1)
+		metrics.upstreamRejectReasons[reason].Add(1)
 
 	} else { // packetDirectionDownstream
 
-		atomic.AddInt64(&metrics.downstreamRejectReasons[reason], 1)
+		metrics.downstreamRejectReasons[reason].Add(1)
 
 	}
 }
@@ -1571,7 +1565,7 @@ func (metrics *packetMetrics) relayedPacket(
 	protocol internetProtocol,
 	packetLength, applicationDataLength int) {
 
-	var packetsMetric, bytesMetric, applicationBytesMetric *int64
+	var packetsMetric, bytesMetric, applicationBytesMetric *atomic.Int64
 
 	if direction == packetDirectionServerUpstream ||
 		direction == packetDirectionClientUpstream {
@@ -1629,9 +1623,9 @@ func (metrics *packetMetrics) relayedPacket(
 		}
 	}
 
-	atomic.AddInt64(packetsMetric, 1)
-	atomic.AddInt64(bytesMetric, int64(packetLength))
-	atomic.AddInt64(applicationBytesMetric, int64(applicationDataLength))
+	packetsMetric.Add(1)
+	bytesMetric.Add(int64(packetLength))
+	applicationBytesMetric.Add(int64(applicationDataLength))
 }
 
 const (
@@ -1652,9 +1646,9 @@ func (metrics *packetMetrics) checkpoint(
 
 		for i := 0; i < packetRejectReasonCount; i++ {
 			logFields["upstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
-				atomic.SwapInt64(&metrics.upstreamRejectReasons[i], 0)
+				metrics.upstreamRejectReasons[i].Swap(0)
 			logFields["downstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
-				atomic.SwapInt64(&metrics.downstreamRejectReasons[i], 0)
+				metrics.downstreamRejectReasons[i].Swap(0)
 		}
 	}
 
@@ -1677,16 +1671,16 @@ func (metrics *packetMetrics) checkpoint(
 
 		for _, r := range relayedMetrics {
 
-			applicationBytesUp := atomic.SwapInt64(&r.metrics.applicationBytesUp, 0)
-			applicationBytesDown := atomic.SwapInt64(&r.metrics.applicationBytesDown, 0)
+			applicationBytesUp := r.metrics.applicationBytesUp.Swap(0)
+			applicationBytesDown := r.metrics.applicationBytesDown.Swap(0)
 
 			*r.updaterBytesUp += applicationBytesUp
 			*r.updaterBytesDown += applicationBytesDown
 
-			logFields[r.prefix+"packets_up"] = atomic.SwapInt64(&r.metrics.packetsUp, 0)
-			logFields[r.prefix+"packets_down"] = atomic.SwapInt64(&r.metrics.packetsDown, 0)
-			logFields[r.prefix+"bytes_up"] = atomic.SwapInt64(&r.metrics.bytesUp, 0)
-			logFields[r.prefix+"bytes_down"] = atomic.SwapInt64(&r.metrics.bytesDown, 0)
+			logFields[r.prefix+"packets_up"] = r.metrics.packetsUp.Swap(0)
+			logFields[r.prefix+"packets_down"] = r.metrics.packetsDown.Swap(0)
+			logFields[r.prefix+"bytes_up"] = r.metrics.bytesUp.Swap(0)
+			logFields[r.prefix+"bytes_down"] = r.metrics.bytesDown.Swap(0)
 			logFields[r.prefix+"application_bytes_up"] = applicationBytesUp
 			logFields[r.prefix+"application_bytes_down"] = applicationBytesDown
 		}

+ 9 - 12
psiphon/common/tun/tun_test.go

@@ -301,26 +301,23 @@ func testTunneledTCP(t *testing.T, useIPv6 bool) {
 }
 
 type bytesTransferredCounter struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	downstreamBytes     int64
-	upstreamBytes       int64
-	durationNanoseconds int64
+	downstreamBytes     atomic.Int64
+	upstreamBytes       atomic.Int64
+	durationNanoseconds atomic.Int64
 }
 
 func (counter *bytesTransferredCounter) UpdateProgress(
 	downstreamBytes, upstreamBytes int64, durationNanoseconds int64) {
 
-	atomic.AddInt64(&counter.downstreamBytes, downstreamBytes)
-	atomic.AddInt64(&counter.upstreamBytes, upstreamBytes)
-	atomic.AddInt64(&counter.durationNanoseconds, durationNanoseconds)
+	counter.downstreamBytes.Add(downstreamBytes)
+	counter.upstreamBytes.Add(upstreamBytes)
+	counter.durationNanoseconds.Add(durationNanoseconds)
 }
 
 func (counter *bytesTransferredCounter) Get() (int64, int64, int64) {
-	return atomic.LoadInt64(&counter.downstreamBytes),
-		atomic.LoadInt64(&counter.upstreamBytes),
-		atomic.LoadInt64(&counter.durationNanoseconds)
+	return counter.downstreamBytes.Load(),
+		counter.upstreamBytes.Load(),
+		counter.durationNanoseconds.Load()
 }
 
 type testServer struct {

+ 9 - 3
psiphon/dialParameters.go

@@ -491,7 +491,10 @@ func MakeDialParameters(
 	// server.TacticsListener.accept.
 	if protocol.TunnelProtocolIsDirect(dialParams.TunnelProtocol) &&
 		common.ContainsAny(
-			p.KeyStrings(parameters.RestrictDirectProviderRegions, dialParams.ServerEntry.ProviderID), []string{"", serverEntry.Region}) {
+			p.KeyStrings(
+				parameters.RestrictDirectProviderRegions,
+				dialParams.ServerEntry.ProviderID),
+			[]string{"", serverEntry.Region}) {
 		if p.WeightedCoinFlip(
 			parameters.RestrictDirectProviderIDsClientProbability) {
 
@@ -512,7 +515,10 @@ func MakeDialParameters(
 	// server.sshClient.setHandshakeState.
 	if protocol.TunnelProtocolUsesInproxy(dialParams.TunnelProtocol) &&
 		common.ContainsAny(
-			p.KeyStrings(parameters.RestrictInproxyProviderRegions, dialParams.ServerEntry.ProviderID), []string{"", serverEntry.Region}) {
+			p.KeyStrings(
+				parameters.RestrictInproxyProviderRegions,
+				dialParams.ServerEntry.ProviderID),
+			[]string{"", serverEntry.Region}) {
 		if p.WeightedCoinFlip(
 			parameters.RestrictInproxyProviderIDsClientProbability) {
 
@@ -521,7 +527,7 @@ func MakeDialParameters(
 			// of server entry, at most once per session.
 
 			NoticeSkipServerEntry(
-				"restricted provider ID: %s",
+				"restricted in-proxy provider ID: %s",
 				dialParams.ServerEntry.ProviderID)
 
 			return nil, nil

+ 9 - 12
psiphon/packetTunnelTransport.go

@@ -35,11 +35,8 @@ import (
 // disconnect from and reconnect to the same or different Psiphon servers. PacketTunnelTransport
 // allows the Psiphon client to substitute new transport channels on-the-fly.
 type PacketTunnelTransport struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	readTimeout   int64
-	readDeadline  int64
+	readTimeout   atomic.Int64
+	readDeadline  atomic.Int64
 	closed        int32
 	workers       *sync.WaitGroup
 	readMutex     sync.Mutex
@@ -88,7 +85,7 @@ func (p *PacketTunnelTransport) Read(data []byte) (int, error) {
 
 		// Clear the read deadline now that a read has succeeded.
 		// See read deadline comment in Write.
-		atomic.StoreInt64(&p.readDeadline, 0)
+		p.readDeadline.Store(0)
 	}
 
 	return n, errors.Trace(err)
@@ -148,7 +145,7 @@ func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
 		// Access to readDeadline/readTimeout is not intended to be completely
 		// atomic.
 
-		readDeadline := monotime.Time(atomic.LoadInt64(&p.readDeadline))
+		readDeadline := monotime.Time(p.readDeadline.Load())
 
 		if readDeadline > 0 {
 
@@ -160,7 +157,7 @@ func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
 				}
 
 				// Clear the deadline now that a probe is triggered.
-				atomic.StoreInt64(&p.readDeadline, 0)
+				p.readDeadline.Store(0)
 			}
 
 			// Keep an existing deadline as set: subsequent writes attempts shouldn't
@@ -168,9 +165,9 @@ func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
 
 		} else {
 
-			readTimeout := time.Duration(atomic.LoadInt64(&p.readTimeout))
+			readTimeout := time.Duration(p.readTimeout.Load())
 			readDeadline := monotime.Now().Add(readTimeout)
-			atomic.StoreInt64(&p.readDeadline, int64(readDeadline))
+			p.readDeadline.Store(int64(readDeadline))
 		}
 	}
 
@@ -273,8 +270,8 @@ func (p *PacketTunnelTransport) setChannel(
 		GetParameters().
 		GetCustom(channelTunnel.dialParams.NetworkLatencyMultiplier).
 		Duration(parameters.PacketTunnelReadTimeout)
-	atomic.StoreInt64(&p.readTimeout, int64(timeout))
-	atomic.StoreInt64(&p.readDeadline, 0)
+	p.readTimeout.Store(int64(timeout))
+	p.readDeadline.Store(0)
 
 	p.channelReady.Broadcast()
 }

+ 20 - 0
psiphon/server/api.go

@@ -1018,6 +1018,26 @@ func dslAPIRequestHandler(
 	sshClient *sshClient,
 	requestPayload []byte) ([]byte, error) {
 
+	// Sanity check: don't relay more than the modest number of DSL requests
+	// expected in the tunneled case. The DSL backend has its own rate limit
+	// enforcement, but avoiding excess requests here saves on resources
+	// consumed between the relay and backend.
+	//
+	// The equivalent pre-relay check in the in-proxy broker uses an explicit
+	// rate limiter; here a simpler hard limit per tunnel suffices due to the
+	// low limit size and the fact that tunnel dials are themselves rate
+	// limited.
+	ok := false
+	sshClient.Lock()
+	if sshClient.dslRequestCount < SSH_CLIENT_MAX_DSL_REQUEST_COUNT {
+		ok = true
+		sshClient.dslRequestCount += 1
+	}
+	sshClient.Unlock()
+	if !ok {
+		return nil, errors.TraceNew("too many DSL requests")
+	}
+
 	responsePayload, err := dslHandleRequest(
 		sshClient.runCtx,
 		support,

+ 7 - 11
psiphon/server/dns.go

@@ -43,13 +43,10 @@ const (
 // "/etc/resolv.conf" on platforms where it is available; and
 // otherwise using a default value.
 type DNSResolver struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	lastReloadTime int64
 	common.ReloadableFile
-	isReloading int32
-	resolvers   []net.IP
+	isReloading    int32
+	lastReloadTime atomic.Int64
+	resolvers      []net.IP
 }
 
 // NewDNSResolver initializes a new DNSResolver, loading it with
@@ -71,9 +68,8 @@ type DNSResolver struct {
 //     maybe_update_dns: 2 seconds
 func NewDNSResolver(defaultResolver string) (*DNSResolver, error) {
 
-	dns := &DNSResolver{
-		lastReloadTime: int64(monotime.Now()),
-	}
+	dns := &DNSResolver{}
+	dns.lastReloadTime.Store(int64(monotime.Now()))
 
 	dns.ReloadableFile = common.NewReloadableFile(
 		DNS_SYSTEM_CONFIG_FILENAME,
@@ -146,7 +142,7 @@ func (dns *DNSResolver) reloadWhenStale() {
 	// write lock, we only incur write lock blocking when "/etc/resolv.conf"
 	// has actually changed.
 
-	lastReloadTime := monotime.Time(atomic.LoadInt64(&dns.lastReloadTime))
+	lastReloadTime := monotime.Time(dns.lastReloadTime.Load())
 	stale := monotime.Now().After(lastReloadTime.Add(DNS_SYSTEM_CONFIG_RELOAD_PERIOD))
 
 	if stale {
@@ -157,7 +153,7 @@ func (dns *DNSResolver) reloadWhenStale() {
 
 			// Unconditionally set last reload time. Even on failure only
 			// want to retry after another DNS_SYSTEM_CONFIG_RELOAD_PERIOD.
-			atomic.StoreInt64(&dns.lastReloadTime, int64(monotime.Now()))
+			dns.lastReloadTime.Store(int64(monotime.Now()))
 
 			_, err := dns.Reload()
 			if err != nil {

+ 42 - 24
psiphon/server/meek.go

@@ -503,11 +503,30 @@ func (server *MeekServer) Run() error {
 	return err
 }
 
+func handleServeHTTPPanic() {
+
+	// Disable panic recovery, to ensure panics are captured and logged by
+	// panicwrap.
+	//
+	// The net.http ServeHTTP caller will recover any ServeHTTP panic, so
+	// re-panic in another goroutine after capturing the panicking goroutine
+	// call stack.
+
+	if r := recover(); r != nil {
+		var stack [4096]byte
+		n := runtime.Stack(stack[:], false)
+		err := errors.Tracef("ServeHTTP panic: %v\n%s", r, stack[:n])
+		go panic(err.Error())
+	}
+}
+
 // ServeHTTP handles meek client HTTP requests, where the request body
 // contains upstream traffic and the response will contain downstream
 // traffic.
 func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
 
+	defer handleServeHTTPPanic()
+
 	// Note: no longer requiring that the request method is POST
 
 	// Check for required headers and values. For fronting, required headers
@@ -696,7 +715,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// sending data to the cached response, but if that buffer fills, the
 	// session will be lost.
 
-	requestNumber := atomic.AddInt64(&session.requestCount, 1)
+	requestNumber := session.requestCount.Add(1)
 
 	// Wait for the existing request to complete.
 	session.lock.Lock()
@@ -710,7 +729,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// interface which includes a pointer, and the previous value cannot
 	// be garbage collected until session.underlyingConn is updated.
 	if session.underlyingConn != underlyingConn {
-		atomic.AddInt64(&session.metricUnderlyingConnCount, 1)
+		session.metricUnderlyingConnCount.Add(1)
 		session.underlyingConn = underlyingConn
 	}
 
@@ -721,7 +740,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 	// discard this request. The session is no longer valid, and the final call
 	// to session.cachedResponse.Reset may have already occured, so any further
 	// session.cachedResponse access may deplete resources (fail to refill the pool).
-	if atomic.LoadInt64(&session.requestCount) > requestNumber || session.deleted {
+	if session.requestCount.Load() > requestNumber || session.deleted {
 		common.TerminateHTTPConnection(responseWriter, request)
 		return
 	}
@@ -790,7 +809,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 	position, isRetry := checkRangeHeader(request)
 	if isRetry {
-		atomic.AddInt64(&session.metricClientRetries, 1)
+		session.metricClientRetries.Add(1)
 	}
 
 	hasCompleteCachedResponse := session.cachedResponse.HasPosition(0)
@@ -1876,7 +1895,9 @@ func (server *MeekServer) inproxyReloadTactics() error {
 		p.Int(parameters.InproxyBrokerMatcherOfferLimitEntryCount),
 		p.Int(parameters.InproxyBrokerMatcherOfferRateLimitQuantity),
 		p.Duration(parameters.InproxyBrokerMatcherOfferRateLimitInterval),
-		p.Int(parameters.InproxyMaxCompartmentIDListLength))
+		p.Int(parameters.InproxyMaxCompartmentIDListLength),
+		p.Int(parameters.InproxyBrokerDSLRequestRateLimitQuantity),
+		p.Duration(parameters.InproxyBrokerDSLRequestRateLimitInterval))
 
 	server.inproxyBroker.SetProxyQualityParameters(
 		p.Bool(parameters.InproxyEnableProxyQuality),
@@ -2173,17 +2194,14 @@ func (server *MeekServer) inproxyBrokerHandler(
 }
 
 type meekSession struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	lastActivity                     int64
-	requestCount                     int64
-	metricClientRetries              int64
-	metricPeakResponseSize           int64
-	metricPeakCachedResponseSize     int64
-	metricPeakCachedResponseHitSize  int64
-	metricCachedResponseMissPosition int64
-	metricUnderlyingConnCount        int64
+	lastActivity                     atomic.Int64
+	requestCount                     atomic.Int64
+	metricClientRetries              atomic.Int64
+	metricPeakResponseSize           atomic.Int64
+	metricPeakCachedResponseSize     atomic.Int64
+	metricPeakCachedResponseHitSize  atomic.Int64
+	metricCachedResponseMissPosition atomic.Int64
+	metricUnderlyingConnCount        atomic.Int64
 	lock                             sync.Mutex
 	deleted                          bool
 	underlyingConn                   net.Conn
@@ -2197,7 +2215,7 @@ type meekSession struct {
 }
 
 func (session *meekSession) touch() {
-	atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
+	session.lastActivity.Store(int64(monotime.Now()))
 }
 
 func (session *meekSession) expired() bool {
@@ -2206,7 +2224,7 @@ func (session *meekSession) expired() bool {
 		// the session to MeekServer.sessions.
 		return false
 	}
-	lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
+	lastActivity := monotime.Time(session.lastActivity.Load())
 	return monotime.Since(lastActivity) >
 		session.clientConn.meekServer.maxSessionStaleness
 }
@@ -2256,12 +2274,12 @@ func (session *meekSession) delete(haveLock bool) {
 // GetMetrics implements the common.MetricsSource interface.
 func (session *meekSession) GetMetrics() common.LogFields {
 	logFields := make(common.LogFields)
-	logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)
-	logFields["meek_peak_response_size"] = atomic.LoadInt64(&session.metricPeakResponseSize)
-	logFields["meek_peak_cached_response_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseSize)
-	logFields["meek_peak_cached_response_hit_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseHitSize)
-	logFields["meek_cached_response_miss_position"] = atomic.LoadInt64(&session.metricCachedResponseMissPosition)
-	logFields["meek_underlying_connection_count"] = atomic.LoadInt64(&session.metricUnderlyingConnCount)
+	logFields["meek_client_retries"] = session.metricClientRetries.Load()
+	logFields["meek_peak_response_size"] = session.metricPeakResponseSize.Load()
+	logFields["meek_peak_cached_response_size"] = session.metricPeakCachedResponseSize.Load()
+	logFields["meek_peak_cached_response_hit_size"] = session.metricPeakCachedResponseHitSize.Load()
+	logFields["meek_cached_response_miss_position"] = session.metricCachedResponseMissPosition.Load()
+	logFields["meek_underlying_connection_count"] = session.metricUnderlyingConnCount.Load()
 	logFields["meek_cookie_name"] = session.cookieName
 	logFields["meek_content_type"] = session.contentType
 	logFields["meek_server_http_version"] = session.httpVersion

+ 3 - 3
psiphon/server/pb/psiphond/dial_params.pb.go

@@ -68,7 +68,7 @@ type DialParams struct {
 	ServerReplayFragmentation         *bool                  `protobuf:"varint,42,opt,name=server_replay_fragmentation,json=serverReplayFragmentation,proto3,oneof" json:"server_replay_fragmentation,omitempty"`
 	ServerReplayPacketManipulation    *bool                  `protobuf:"varint,43,opt,name=server_replay_packet_manipulation,json=serverReplayPacketManipulation,proto3,oneof" json:"server_replay_packet_manipulation,omitempty"`
 	ServerEntryValid                  *bool                  `protobuf:"varint,44,opt,name=server_entry_valid,json=serverEntryValid,proto3,oneof" json:"server_entry_valid,omitempty"`
-	CandidateNumber                   *int32                 `protobuf:"varint,45,opt,name=candidate_number,json=candidateNumber,proto3,oneof" json:"candidate_number,omitempty"`
+	CandidateNumber                   *int64                 `protobuf:"varint,45,opt,name=candidate_number,json=candidateNumber,proto3,oneof" json:"candidate_number,omitempty"`
 	IsReplay                          *bool                  `protobuf:"varint,46,opt,name=is_replay,json=isReplay,proto3,oneof" json:"is_replay,omitempty"`
 	DialPortNumber                    *int64                 `protobuf:"varint,47,opt,name=dial_port_number,json=dialPortNumber,proto3,oneof" json:"dial_port_number,omitempty"`
 	DialDuration                      *int64                 `protobuf:"varint,48,opt,name=dial_duration,json=dialDuration,proto3,oneof" json:"dial_duration,omitempty"`
@@ -449,7 +449,7 @@ func (x *DialParams) GetServerEntryValid() bool {
 	return false
 }
 
-func (x *DialParams) GetCandidateNumber() int32 {
+func (x *DialParams) GetCandidateNumber() int64 {
 	if x != nil && x.CandidateNumber != nil {
 		return *x.CandidateNumber
 	}
@@ -778,7 +778,7 @@ const file_ca_psiphon_psiphond_dial_params_proto_rawDesc = "" +
 	"\x1bserver_replay_fragmentation\x18* \x01(\bH)R\x19serverReplayFragmentation\x88\x01\x01\x12N\n" +
 	"!server_replay_packet_manipulation\x18+ \x01(\bH*R\x1eserverReplayPacketManipulation\x88\x01\x01\x121\n" +
 	"\x12server_entry_valid\x18, \x01(\bH+R\x10serverEntryValid\x88\x01\x01\x12.\n" +
-	"\x10candidate_number\x18- \x01(\x05H,R\x0fcandidateNumber\x88\x01\x01\x12 \n" +
+	"\x10candidate_number\x18- \x01(\x03H,R\x0fcandidateNumber\x88\x01\x01\x12 \n" +
 	"\tis_replay\x18. \x01(\bH-R\bisReplay\x88\x01\x01\x12-\n" +
 	"\x10dial_port_number\x18/ \x01(\x03H.R\x0edialPortNumber\x88\x01\x01\x12(\n" +
 	"\rdial_duration\x180 \x01(\x03H/R\fdialDuration\x88\x01\x01\x125\n" +

+ 22 - 12
psiphon/server/pb/psiphond/inproxy_broker.pb.go

@@ -24,8 +24,8 @@ const (
 type InproxyBroker struct {
 	state                         protoimpl.MessageState `protogen:"open.v1"`
 	BaseParams                    *BaseParams            `protobuf:"bytes,1,opt,name=base_params,json=baseParams,proto3,oneof" json:"base_params,omitempty"`
-	AnnouncementMatchIndex        *int32                 `protobuf:"varint,100,opt,name=announcement_match_index,json=announcementMatchIndex,proto3,oneof" json:"announcement_match_index,omitempty"`
-	AnnouncementQueueSize         *int32                 `protobuf:"varint,101,opt,name=announcement_queue_size,json=announcementQueueSize,proto3,oneof" json:"announcement_queue_size,omitempty"`
+	AnnouncementMatchIndex        *int64                 `protobuf:"varint,100,opt,name=announcement_match_index,json=announcementMatchIndex,proto3,oneof" json:"announcement_match_index,omitempty"`
+	AnnouncementQueueSize         *int64                 `protobuf:"varint,101,opt,name=announcement_queue_size,json=announcementQueueSize,proto3,oneof" json:"announcement_queue_size,omitempty"`
 	AnswerError                   *string                `protobuf:"bytes,102,opt,name=answer_error,json=answerError,proto3,oneof" json:"answer_error,omitempty"`
 	BrokerEvent                   *string                `protobuf:"bytes,103,opt,name=broker_event,json=brokerEvent,proto3,oneof" json:"broker_event,omitempty"`
 	BrokerId                      *string                `protobuf:"bytes,104,opt,name=broker_id,json=brokerId,proto3,oneof" json:"broker_id,omitempty"`
@@ -55,7 +55,7 @@ type InproxyBroker struct {
 	PeakUpstreamBytesPerSecond    *int64                 `protobuf:"varint,128,opt,name=peak_upstream_bytes_per_second,json=peakUpstreamBytesPerSecond,proto3,oneof" json:"peak_upstream_bytes_per_second,omitempty"`
 	PortMappingTypes              []string               `protobuf:"bytes,129,rep,name=port_mapping_types,json=portMappingTypes,proto3" json:"port_mapping_types,omitempty"`
 	PreferredNatMatch             *bool                  `protobuf:"varint,130,opt,name=preferred_nat_match,json=preferredNatMatch,proto3,oneof" json:"preferred_nat_match,omitempty"`
-	ProtocolVersion               *int32                 `protobuf:"varint,131,opt,name=protocol_version,json=protocolVersion,proto3,oneof" json:"protocol_version,omitempty"`
+	ProtocolVersion               *int64                 `protobuf:"varint,131,opt,name=protocol_version,json=protocolVersion,proto3,oneof" json:"protocol_version,omitempty"`
 	ProxyId                       *string                `protobuf:"bytes,132,opt,name=proxy_id,json=proxyId,proto3,oneof" json:"proxy_id,omitempty"`
 	ProxyNatType                  *string                `protobuf:"bytes,133,opt,name=proxy_nat_type,json=proxyNatType,proto3,oneof" json:"proxy_nat_type,omitempty"`
 	ProxyPortMappingTypes         []string               `protobuf:"bytes,134,rep,name=proxy_port_mapping_types,json=proxyPortMappingTypes,proto3" json:"proxy_port_mapping_types,omitempty"`
@@ -63,6 +63,7 @@ type InproxyBroker struct {
 	StoredTacticsTag              *string                `protobuf:"bytes,136,opt,name=stored_tactics_tag,json=storedTacticsTag,proto3,oneof" json:"stored_tactics_tag,omitempty"`
 	TimedOut                      *bool                  `protobuf:"varint,137,opt,name=timed_out,json=timedOut,proto3,oneof" json:"timed_out,omitempty"`
 	MeekServerHttpVersion         *string                `protobuf:"bytes,138,opt,name=meek_server_http_version,json=meekServerHttpVersion,proto3,oneof" json:"meek_server_http_version,omitempty"`
+	PendingAnswersSize            *int64                 `protobuf:"varint,139,opt,name=pending_answers_size,json=pendingAnswersSize,proto3,oneof" json:"pending_answers_size,omitempty"`
 	unknownFields                 protoimpl.UnknownFields
 	sizeCache                     protoimpl.SizeCache
 }
@@ -104,14 +105,14 @@ func (x *InproxyBroker) GetBaseParams() *BaseParams {
 	return nil
 }
 
-func (x *InproxyBroker) GetAnnouncementMatchIndex() int32 {
+func (x *InproxyBroker) GetAnnouncementMatchIndex() int64 {
 	if x != nil && x.AnnouncementMatchIndex != nil {
 		return *x.AnnouncementMatchIndex
 	}
 	return 0
 }
 
-func (x *InproxyBroker) GetAnnouncementQueueSize() int32 {
+func (x *InproxyBroker) GetAnnouncementQueueSize() int64 {
 	if x != nil && x.AnnouncementQueueSize != nil {
 		return *x.AnnouncementQueueSize
 	}
@@ -321,7 +322,7 @@ func (x *InproxyBroker) GetPreferredNatMatch() bool {
 	return false
 }
 
-func (x *InproxyBroker) GetProtocolVersion() int32 {
+func (x *InproxyBroker) GetProtocolVersion() int64 {
 	if x != nil && x.ProtocolVersion != nil {
 		return *x.ProtocolVersion
 	}
@@ -377,16 +378,23 @@ func (x *InproxyBroker) GetMeekServerHttpVersion() string {
 	return ""
 }
 
+func (x *InproxyBroker) GetPendingAnswersSize() int64 {
+	if x != nil && x.PendingAnswersSize != nil {
+		return *x.PendingAnswersSize
+	}
+	return 0
+}
+
 var File_ca_psiphon_psiphond_inproxy_broker_proto protoreflect.FileDescriptor
 
 const file_ca_psiphon_psiphond_inproxy_broker_proto_rawDesc = "" +
 	"\n" +
-	"(ca.psiphon.psiphond/inproxy_broker.proto\x12\x13ca.psiphon.psiphond\x1a%ca.psiphon.psiphond/base_params.proto\"\xfc\x15\n" +
+	"(ca.psiphon.psiphond/inproxy_broker.proto\x12\x13ca.psiphon.psiphond\x1a%ca.psiphon.psiphond/base_params.proto\"\xcd\x16\n" +
 	"\rInproxyBroker\x12E\n" +
 	"\vbase_params\x18\x01 \x01(\v2\x1f.ca.psiphon.psiphond.BaseParamsH\x00R\n" +
 	"baseParams\x88\x01\x01\x12=\n" +
-	"\x18announcement_match_index\x18d \x01(\x05H\x01R\x16announcementMatchIndex\x88\x01\x01\x12;\n" +
-	"\x17announcement_queue_size\x18e \x01(\x05H\x02R\x15announcementQueueSize\x88\x01\x01\x12&\n" +
+	"\x18announcement_match_index\x18d \x01(\x03H\x01R\x16announcementMatchIndex\x88\x01\x01\x12;\n" +
+	"\x17announcement_queue_size\x18e \x01(\x03H\x02R\x15announcementQueueSize\x88\x01\x01\x12&\n" +
 	"\fanswer_error\x18f \x01(\tH\x03R\vanswerError\x88\x01\x01\x12&\n" +
 	"\fbroker_event\x18g \x01(\tH\x04R\vbrokerEvent\x88\x01\x01\x12 \n" +
 	"\tbroker_id\x18h \x01(\tH\x05R\bbrokerId\x88\x01\x01\x120\n" +
@@ -419,14 +427,15 @@ const file_ca_psiphon_psiphond_inproxy_broker_proto_rawDesc = "" +
 	"\x1epeak_upstream_bytes_per_second\x18\x80\x01 \x01(\x03H\x1bR\x1apeakUpstreamBytesPerSecond\x88\x01\x01\x12-\n" +
 	"\x12port_mapping_types\x18\x81\x01 \x03(\tR\x10portMappingTypes\x124\n" +
 	"\x13preferred_nat_match\x18\x82\x01 \x01(\bH\x1cR\x11preferredNatMatch\x88\x01\x01\x12/\n" +
-	"\x10protocol_version\x18\x83\x01 \x01(\x05H\x1dR\x0fprotocolVersion\x88\x01\x01\x12\x1f\n" +
+	"\x10protocol_version\x18\x83\x01 \x01(\x03H\x1dR\x0fprotocolVersion\x88\x01\x01\x12\x1f\n" +
 	"\bproxy_id\x18\x84\x01 \x01(\tH\x1eR\aproxyId\x88\x01\x01\x12*\n" +
 	"\x0eproxy_nat_type\x18\x85\x01 \x01(\tH\x1fR\fproxyNatType\x88\x01\x01\x128\n" +
 	"\x18proxy_port_mapping_types\x18\x86\x01 \x03(\tR\x15proxyPortMappingTypes\x12!\n" +
 	"\tserver_id\x18\x87\x01 \x01(\tH R\bserverId\x88\x01\x01\x122\n" +
 	"\x12stored_tactics_tag\x18\x88\x01 \x01(\tH!R\x10storedTacticsTag\x88\x01\x01\x12!\n" +
 	"\ttimed_out\x18\x89\x01 \x01(\bH\"R\btimedOut\x88\x01\x01\x12=\n" +
-	"\x18meek_server_http_version\x18\x8a\x01 \x01(\tH#R\x15meekServerHttpVersion\x88\x01\x01B\x0e\n" +
+	"\x18meek_server_http_version\x18\x8a\x01 \x01(\tH#R\x15meekServerHttpVersion\x88\x01\x01\x126\n" +
+	"\x14pending_answers_size\x18\x8b\x01 \x01(\x03H$R\x12pendingAnswersSize\x88\x01\x01B\x0e\n" +
 	"\f_base_paramsB\x1b\n" +
 	"\x19_announcement_match_indexB\x1a\n" +
 	"\x18_announcement_queue_sizeB\x0f\n" +
@@ -465,7 +474,8 @@ const file_ca_psiphon_psiphond_inproxy_broker_proto_rawDesc = "" +
 	"\x13_stored_tactics_tagB\f\n" +
 	"\n" +
 	"_timed_outB\x1b\n" +
-	"\x19_meek_server_http_versionBHZFgithub.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/pb/psiphondb\x06proto3"
+	"\x19_meek_server_http_versionB\x17\n" +
+	"\x15_pending_answers_sizeBHZFgithub.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/pb/psiphondb\x06proto3"
 
 var (
 	file_ca_psiphon_psiphond_inproxy_broker_proto_rawDescOnce sync.Once

+ 3 - 3
psiphon/server/pb/psiphond/irregular_tunnel.pb.go

@@ -34,7 +34,7 @@ type IrregularTunnel struct {
 	DuplicateElapsedTimeMs             *int64                 `protobuf:"varint,107,opt,name=duplicate_elapsed_time_ms,json=duplicateElapsedTimeMs,proto3,oneof" json:"duplicate_elapsed_time_ms,omitempty"`
 	DuplicateSeed                      *string                `protobuf:"bytes,108,opt,name=duplicate_seed,json=duplicateSeed,proto3,oneof" json:"duplicate_seed,omitempty"`
 	DuplicateSeedType                  *string                `protobuf:"bytes,109,opt,name=duplicate_seed_type,json=duplicateSeedType,proto3,oneof" json:"duplicate_seed_type,omitempty"`
-	ListenerPortNumber                 *uint32                `protobuf:"varint,110,opt,name=listener_port_number,json=listenerPortNumber,proto3,oneof" json:"listener_port_number,omitempty"`
+	ListenerPortNumber                 *int64                 `protobuf:"varint,110,opt,name=listener_port_number,json=listenerPortNumber,proto3,oneof" json:"listener_port_number,omitempty"`
 	ListenerProtocol                   *string                `protobuf:"bytes,111,opt,name=listener_protocol,json=listenerProtocol,proto3,oneof" json:"listener_protocol,omitempty"`
 	TunnelError                        *string                `protobuf:"bytes,112,opt,name=tunnel_error,json=tunnelError,proto3,oneof" json:"tunnel_error,omitempty"`
 	unknownFields                      protoimpl.UnknownFields
@@ -148,7 +148,7 @@ func (x *IrregularTunnel) GetDuplicateSeedType() string {
 	return ""
 }
 
-func (x *IrregularTunnel) GetListenerPortNumber() uint32 {
+func (x *IrregularTunnel) GetListenerPortNumber() int64 {
 	if x != nil && x.ListenerPortNumber != nil {
 		return *x.ListenerPortNumber
 	}
@@ -189,7 +189,7 @@ const file_ca_psiphon_psiphond_irregular_tunnel_proto_rawDesc = "" +
 	"\x0eduplicate_seed\x18l \x01(\tH\tR\rduplicateSeed\x88\x01\x01\x123\n" +
 	"\x13duplicate_seed_type\x18m \x01(\tH\n" +
 	"R\x11duplicateSeedType\x88\x01\x01\x125\n" +
-	"\x14listener_port_number\x18n \x01(\rH\vR\x12listenerPortNumber\x88\x01\x01\x120\n" +
+	"\x14listener_port_number\x18n \x01(\x03H\vR\x12listenerPortNumber\x88\x01\x01\x120\n" +
 	"\x11listener_protocol\x18o \x01(\tH\fR\x10listenerProtocol\x88\x01\x01\x12&\n" +
 	"\ftunnel_error\x18p \x01(\tH\rR\vtunnelError\x88\x01\x01B\x0e\n" +
 	"\f_base_paramsB%\n" +

+ 21 - 29
psiphon/server/pb/psiphond/psiphond.pb.go

@@ -24,11 +24,11 @@ const (
 
 type Psiphond struct {
 	state        protoimpl.MessageState `protogen:"open.v1"`
-	Timestamp    *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3,oneof" json:"timestamp,omitempty"`
-	HostId       *string                `protobuf:"bytes,2,opt,name=host_id,json=hostId,proto3,oneof" json:"host_id,omitempty"`
-	HostType     *string                `protobuf:"bytes,3,opt,name=host_type,json=hostType,proto3,oneof" json:"host_type,omitempty"`
-	HostBuildRev *string                `protobuf:"bytes,4,opt,name=host_build_rev,json=hostBuildRev,proto3,oneof" json:"host_build_rev,omitempty"`
-	Provider     *string                `protobuf:"bytes,5,opt,name=provider,proto3,oneof" json:"provider,omitempty"`
+	Timestamp    *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
+	HostId       string                 `protobuf:"bytes,2,opt,name=host_id,json=hostId,proto3" json:"host_id,omitempty"`
+	HostType     string                 `protobuf:"bytes,3,opt,name=host_type,json=hostType,proto3" json:"host_type,omitempty"`
+	HostBuildRev string                 `protobuf:"bytes,4,opt,name=host_build_rev,json=hostBuildRev,proto3" json:"host_build_rev,omitempty"`
+	Provider     string                 `protobuf:"bytes,5,opt,name=provider,proto3" json:"provider,omitempty"`
 	// Types that are valid to be assigned to Metric:
 	//
 	//	*Psiphond_DomainBytes
@@ -91,29 +91,29 @@ func (x *Psiphond) GetTimestamp() *timestamppb.Timestamp {
 }
 
 func (x *Psiphond) GetHostId() string {
-	if x != nil && x.HostId != nil {
-		return *x.HostId
+	if x != nil {
+		return x.HostId
 	}
 	return ""
 }
 
 func (x *Psiphond) GetHostType() string {
-	if x != nil && x.HostType != nil {
-		return *x.HostType
+	if x != nil {
+		return x.HostType
 	}
 	return ""
 }
 
 func (x *Psiphond) GetHostBuildRev() string {
-	if x != nil && x.HostBuildRev != nil {
-		return *x.HostBuildRev
+	if x != nil {
+		return x.HostBuildRev
 	}
 	return ""
 }
 
 func (x *Psiphond) GetProvider() string {
-	if x != nil && x.Provider != nil {
-		return *x.Provider
+	if x != nil {
+		return x.Provider
 	}
 	return ""
 }
@@ -388,13 +388,13 @@ var File_ca_psiphon_psiphond_psiphond_proto protoreflect.FileDescriptor
 
 const file_ca_psiphon_psiphond_psiphond_proto_rawDesc = "" +
 	"\n" +
-	"\"ca.psiphon.psiphond/psiphond.proto\x12\x13ca.psiphon.psiphond\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&ca.psiphon.psiphond/domain_bytes.proto\x1a'ca.psiphon.psiphond/failed_tunnel.proto\x1a(ca.psiphon.psiphond/inproxy_broker.proto\x1a*ca.psiphon.psiphond/irregular_tunnel.proto\x1a'ca.psiphon.psiphond/orphan_packet.proto\x1a,ca.psiphon.psiphond/remote_server_list.proto\x1a*ca.psiphon.psiphond/server_blocklist.proto\x1a%ca.psiphon.psiphond/server_load.proto\x1a&ca.psiphon.psiphond/server_panic.proto\x1a'ca.psiphon.psiphond/server_packet.proto\x1a'ca.psiphon.psiphond/server_tunnel.proto\x1a!ca.psiphon.psiphond/tactics.proto\x1a%ca.psiphon.psiphond/unique_user.proto\"\xde\f\n" +
-	"\bPsiphond\x12=\n" +
-	"\ttimestamp\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampH\x01R\ttimestamp\x88\x01\x01\x12\x1c\n" +
-	"\ahost_id\x18\x02 \x01(\tH\x02R\x06hostId\x88\x01\x01\x12 \n" +
-	"\thost_type\x18\x03 \x01(\tH\x03R\bhostType\x88\x01\x01\x12)\n" +
-	"\x0ehost_build_rev\x18\x04 \x01(\tH\x04R\fhostBuildRev\x88\x01\x01\x12\x1f\n" +
-	"\bprovider\x18\x05 \x01(\tH\x05R\bprovider\x88\x01\x01\x12E\n" +
+	"\"ca.psiphon.psiphond/psiphond.proto\x12\x13ca.psiphon.psiphond\x1a\x1fgoogle/protobuf/timestamp.proto\x1a&ca.psiphon.psiphond/domain_bytes.proto\x1a'ca.psiphon.psiphond/failed_tunnel.proto\x1a(ca.psiphon.psiphond/inproxy_broker.proto\x1a*ca.psiphon.psiphond/irregular_tunnel.proto\x1a'ca.psiphon.psiphond/orphan_packet.proto\x1a,ca.psiphon.psiphond/remote_server_list.proto\x1a*ca.psiphon.psiphond/server_blocklist.proto\x1a%ca.psiphon.psiphond/server_load.proto\x1a&ca.psiphon.psiphond/server_panic.proto\x1a'ca.psiphon.psiphond/server_packet.proto\x1a'ca.psiphon.psiphond/server_tunnel.proto\x1a!ca.psiphon.psiphond/tactics.proto\x1a%ca.psiphon.psiphond/unique_user.proto\"\xfd\v\n" +
+	"\bPsiphond\x128\n" +
+	"\ttimestamp\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12\x17\n" +
+	"\ahost_id\x18\x02 \x01(\tR\x06hostId\x12\x1b\n" +
+	"\thost_type\x18\x03 \x01(\tR\bhostType\x12$\n" +
+	"\x0ehost_build_rev\x18\x04 \x01(\tR\fhostBuildRev\x12\x1a\n" +
+	"\bprovider\x18\x05 \x01(\tR\bprovider\x12E\n" +
 	"\fdomain_bytes\x18e \x01(\v2 .ca.psiphon.psiphond.DomainBytesH\x00R\vdomainBytes\x12H\n" +
 	"\rfailed_tunnel\x18f \x01(\v2!.ca.psiphon.psiphond.FailedTunnelH\x00R\ffailedTunnel\x12K\n" +
 	"\x0einproxy_broker\x18g \x01(\v2\".ca.psiphon.psiphond.InproxyBrokerH\x00R\rinproxyBroker\x12Q\n" +
@@ -414,15 +414,7 @@ const file_ca_psiphon_psiphond_psiphond_proto_rawDesc = "" +
 	"\x11tactics_speedtest\x18t \x01(\v2%.ca.psiphon.psiphond.TacticsSpeedTestH\x00R\x10tacticsSpeedtest\x12B\n" +
 	"\vunique_user\x18u \x01(\v2\x1f.ca.psiphon.psiphond.UniqueUserH\x00R\n" +
 	"uniqueUserB\b\n" +
-	"\x06metricB\f\n" +
-	"\n" +
-	"_timestampB\n" +
-	"\n" +
-	"\b_host_idB\f\n" +
-	"\n" +
-	"_host_typeB\x11\n" +
-	"\x0f_host_build_revB\v\n" +
-	"\t_providerBHZFgithub.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/pb/psiphondb\x06proto3"
+	"\x06metricBHZFgithub.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/pb/psiphondb\x06proto3"
 
 var (
 	file_ca_psiphon_psiphond_psiphond_proto_rawDescOnce sync.Once

+ 27 - 27
psiphon/server/pb/psiphond/server_load.pb.go

@@ -169,20 +169,20 @@ func (x *ServerLoadDNS) GetDnsFailedDuration() int64 {
 type ServerLoad struct {
 	state                                   protoimpl.MessageState `protogen:"open.v1"`
 	CpuPercent                              *float64               `protobuf:"fixed64,100,opt,name=cpu_percent,json=cpuPercent,proto3,oneof" json:"cpu_percent,omitempty"`
-	HeapAlloc                               *uint64                `protobuf:"varint,101,opt,name=heap_alloc,json=heapAlloc,proto3,oneof" json:"heap_alloc,omitempty"`
-	HeapIdle                                *uint64                `protobuf:"varint,102,opt,name=heap_idle,json=heapIdle,proto3,oneof" json:"heap_idle,omitempty"`
-	HeapInuse                               *uint64                `protobuf:"varint,103,opt,name=heap_inuse,json=heapInuse,proto3,oneof" json:"heap_inuse,omitempty"`
-	HeapObjects                             *uint64                `protobuf:"varint,104,opt,name=heap_objects,json=heapObjects,proto3,oneof" json:"heap_objects,omitempty"`
-	HeapReleased                            *uint64                `protobuf:"varint,105,opt,name=heap_released,json=heapReleased,proto3,oneof" json:"heap_released,omitempty"`
-	HeapSys                                 *uint64                `protobuf:"varint,106,opt,name=heap_sys,json=heapSys,proto3,oneof" json:"heap_sys,omitempty"`
+	HeapAlloc                               *int64                 `protobuf:"varint,101,opt,name=heap_alloc,json=heapAlloc,proto3,oneof" json:"heap_alloc,omitempty"`
+	HeapIdle                                *int64                 `protobuf:"varint,102,opt,name=heap_idle,json=heapIdle,proto3,oneof" json:"heap_idle,omitempty"`
+	HeapInuse                               *int64                 `protobuf:"varint,103,opt,name=heap_inuse,json=heapInuse,proto3,oneof" json:"heap_inuse,omitempty"`
+	HeapObjects                             *int64                 `protobuf:"varint,104,opt,name=heap_objects,json=heapObjects,proto3,oneof" json:"heap_objects,omitempty"`
+	HeapReleased                            *int64                 `protobuf:"varint,105,opt,name=heap_released,json=heapReleased,proto3,oneof" json:"heap_released,omitempty"`
+	HeapSys                                 *int64                 `protobuf:"varint,106,opt,name=heap_sys,json=heapSys,proto3,oneof" json:"heap_sys,omitempty"`
 	NetworkBytesReceived                    *int64                 `protobuf:"varint,107,opt,name=network_bytes_received,json=networkBytesReceived,proto3,oneof" json:"network_bytes_received,omitempty"`
 	NetworkBytesSent                        *int64                 `protobuf:"varint,108,opt,name=network_bytes_sent,json=networkBytesSent,proto3,oneof" json:"network_bytes_sent,omitempty"`
 	EstablishTunnels                        *bool                  `protobuf:"varint,109,opt,name=establish_tunnels,json=establishTunnels,proto3,oneof" json:"establish_tunnels,omitempty"`
 	EstablishTunnelsLimitedCount            *int64                 `protobuf:"varint,110,opt,name=establish_tunnels_limited_count,json=establishTunnelsLimitedCount,proto3,oneof" json:"establish_tunnels_limited_count,omitempty"`
 	LastGc                                  *timestamppb.Timestamp `protobuf:"bytes,111,opt,name=last_gc,json=lastGc,proto3,oneof" json:"last_gc,omitempty"`
-	NumForcedGc                             *uint32                `protobuf:"varint,112,opt,name=num_forced_gc,json=numForcedGc,proto3,oneof" json:"num_forced_gc,omitempty"`
-	NumGc                                   *uint32                `protobuf:"varint,113,opt,name=num_gc,json=numGc,proto3,oneof" json:"num_gc,omitempty"`
-	NumGoroutine                            *uint32                `protobuf:"varint,114,opt,name=num_goroutine,json=numGoroutine,proto3,oneof" json:"num_goroutine,omitempty"`
+	NumForcedGc                             *int64                 `protobuf:"varint,112,opt,name=num_forced_gc,json=numForcedGc,proto3,oneof" json:"num_forced_gc,omitempty"`
+	NumGc                                   *int64                 `protobuf:"varint,113,opt,name=num_gc,json=numGc,proto3,oneof" json:"num_gc,omitempty"`
+	NumGoroutine                            *int64                 `protobuf:"varint,114,opt,name=num_goroutine,json=numGoroutine,proto3,oneof" json:"num_goroutine,omitempty"`
 	ReplayDeleteReplayCount                 *int64                 `protobuf:"varint,115,opt,name=replay_delete_replay_count,json=replayDeleteReplayCount,proto3,oneof" json:"replay_delete_replay_count,omitempty"`
 	ReplayFailedReplayCount                 *int64                 `protobuf:"varint,116,opt,name=replay_failed_replay_count,json=replayFailedReplayCount,proto3,oneof" json:"replay_failed_replay_count,omitempty"`
 	ReplayGetReplayHitCount                 *int64                 `protobuf:"varint,117,opt,name=replay_get_replay_hit_count,json=replayGetReplayHitCount,proto3,oneof" json:"replay_get_replay_hit_count,omitempty"`
@@ -254,42 +254,42 @@ func (x *ServerLoad) GetCpuPercent() float64 {
 	return 0
 }
 
-func (x *ServerLoad) GetHeapAlloc() uint64 {
+func (x *ServerLoad) GetHeapAlloc() int64 {
 	if x != nil && x.HeapAlloc != nil {
 		return *x.HeapAlloc
 	}
 	return 0
 }
 
-func (x *ServerLoad) GetHeapIdle() uint64 {
+func (x *ServerLoad) GetHeapIdle() int64 {
 	if x != nil && x.HeapIdle != nil {
 		return *x.HeapIdle
 	}
 	return 0
 }
 
-func (x *ServerLoad) GetHeapInuse() uint64 {
+func (x *ServerLoad) GetHeapInuse() int64 {
 	if x != nil && x.HeapInuse != nil {
 		return *x.HeapInuse
 	}
 	return 0
 }
 
-func (x *ServerLoad) GetHeapObjects() uint64 {
+func (x *ServerLoad) GetHeapObjects() int64 {
 	if x != nil && x.HeapObjects != nil {
 		return *x.HeapObjects
 	}
 	return 0
 }
 
-func (x *ServerLoad) GetHeapReleased() uint64 {
+func (x *ServerLoad) GetHeapReleased() int64 {
 	if x != nil && x.HeapReleased != nil {
 		return *x.HeapReleased
 	}
 	return 0
 }
 
-func (x *ServerLoad) GetHeapSys() uint64 {
+func (x *ServerLoad) GetHeapSys() int64 {
 	if x != nil && x.HeapSys != nil {
 		return *x.HeapSys
 	}
@@ -331,21 +331,21 @@ func (x *ServerLoad) GetLastGc() *timestamppb.Timestamp {
 	return nil
 }
 
-func (x *ServerLoad) GetNumForcedGc() uint32 {
+func (x *ServerLoad) GetNumForcedGc() int64 {
 	if x != nil && x.NumForcedGc != nil {
 		return *x.NumForcedGc
 	}
 	return 0
 }
 
-func (x *ServerLoad) GetNumGc() uint32 {
+func (x *ServerLoad) GetNumGc() int64 {
 	if x != nil && x.NumGc != nil {
 		return *x.NumGc
 	}
 	return 0
 }
 
-func (x *ServerLoad) GetNumGoroutine() uint32 {
+func (x *ServerLoad) GetNumGoroutine() int64 {
 	if x != nil && x.NumGoroutine != nil {
 		return *x.NumGoroutine
 	}
@@ -594,22 +594,22 @@ const file_ca_psiphon_psiphond_server_load_proto_rawDesc = "" +
 	"\vcpu_percent\x18d \x01(\x01H\x00R\n" +
 	"cpuPercent\x88\x01\x01\x12\"\n" +
 	"\n" +
-	"heap_alloc\x18e \x01(\x04H\x01R\theapAlloc\x88\x01\x01\x12 \n" +
-	"\theap_idle\x18f \x01(\x04H\x02R\bheapIdle\x88\x01\x01\x12\"\n" +
+	"heap_alloc\x18e \x01(\x03H\x01R\theapAlloc\x88\x01\x01\x12 \n" +
+	"\theap_idle\x18f \x01(\x03H\x02R\bheapIdle\x88\x01\x01\x12\"\n" +
 	"\n" +
-	"heap_inuse\x18g \x01(\x04H\x03R\theapInuse\x88\x01\x01\x12&\n" +
-	"\fheap_objects\x18h \x01(\x04H\x04R\vheapObjects\x88\x01\x01\x12(\n" +
-	"\rheap_released\x18i \x01(\x04H\x05R\fheapReleased\x88\x01\x01\x12\x1e\n" +
-	"\bheap_sys\x18j \x01(\x04H\x06R\aheapSys\x88\x01\x01\x129\n" +
+	"heap_inuse\x18g \x01(\x03H\x03R\theapInuse\x88\x01\x01\x12&\n" +
+	"\fheap_objects\x18h \x01(\x03H\x04R\vheapObjects\x88\x01\x01\x12(\n" +
+	"\rheap_released\x18i \x01(\x03H\x05R\fheapReleased\x88\x01\x01\x12\x1e\n" +
+	"\bheap_sys\x18j \x01(\x03H\x06R\aheapSys\x88\x01\x01\x129\n" +
 	"\x16network_bytes_received\x18k \x01(\x03H\aR\x14networkBytesReceived\x88\x01\x01\x121\n" +
 	"\x12network_bytes_sent\x18l \x01(\x03H\bR\x10networkBytesSent\x88\x01\x01\x120\n" +
 	"\x11establish_tunnels\x18m \x01(\bH\tR\x10establishTunnels\x88\x01\x01\x12J\n" +
 	"\x1festablish_tunnels_limited_count\x18n \x01(\x03H\n" +
 	"R\x1cestablishTunnelsLimitedCount\x88\x01\x01\x128\n" +
 	"\alast_gc\x18o \x01(\v2\x1a.google.protobuf.TimestampH\vR\x06lastGc\x88\x01\x01\x12'\n" +
-	"\rnum_forced_gc\x18p \x01(\rH\fR\vnumForcedGc\x88\x01\x01\x12\x1a\n" +
-	"\x06num_gc\x18q \x01(\rH\rR\x05numGc\x88\x01\x01\x12(\n" +
-	"\rnum_goroutine\x18r \x01(\rH\x0eR\fnumGoroutine\x88\x01\x01\x12@\n" +
+	"\rnum_forced_gc\x18p \x01(\x03H\fR\vnumForcedGc\x88\x01\x01\x12\x1a\n" +
+	"\x06num_gc\x18q \x01(\x03H\rR\x05numGc\x88\x01\x01\x12(\n" +
+	"\rnum_goroutine\x18r \x01(\x03H\x0eR\fnumGoroutine\x88\x01\x01\x12@\n" +
 	"\x1areplay_delete_replay_count\x18s \x01(\x03H\x0fR\x17replayDeleteReplayCount\x88\x01\x01\x12@\n" +
 	"\x1areplay_failed_replay_count\x18t \x01(\x03H\x10R\x17replayFailedReplayCount\x88\x01\x01\x12A\n" +
 	"\x1breplay_get_replay_hit_count\x18u \x01(\x03H\x11R\x17replayGetReplayHitCount\x88\x01\x01\x12C\n" +

+ 1 - 1
psiphon/server/proto/ca.psiphon.psiphond/dial_params.proto

@@ -57,7 +57,7 @@ message DialParams {
     optional bool server_replay_packet_manipulation = 43;
     optional bool server_entry_valid = 44;
 
-    optional int32 candidate_number = 45;
+    optional int64 candidate_number = 45;
     optional bool is_replay = 46;
     optional int64 dial_port_number = 47;
     optional int64 dial_duration = 48;

+ 4 - 3
psiphon/server/proto/ca.psiphon.psiphond/inproxy_broker.proto

@@ -11,8 +11,8 @@ message InproxyBroker {
 
     // Fields 1-99 are reserved for field groupings.
 
-    optional int32 announcement_match_index = 100;
-    optional int32 announcement_queue_size = 101;
+    optional int64 announcement_match_index = 100;
+    optional int64 announcement_queue_size = 101;
     optional string answer_error = 102;
     optional string broker_event = 103;
     optional string broker_id = 104;
@@ -42,7 +42,7 @@ message InproxyBroker {
     optional int64 peak_upstream_bytes_per_second = 128;
     repeated string port_mapping_types = 129;
     optional bool preferred_nat_match = 130;
-    optional int32 protocol_version = 131;
+    optional int64 protocol_version = 131;
     optional string proxy_id = 132;
     optional string proxy_nat_type = 133;
     repeated string proxy_port_mapping_types = 134;
@@ -50,4 +50,5 @@ message InproxyBroker {
     optional string stored_tactics_tag = 136;
     optional bool timed_out = 137;
     optional string meek_server_http_version = 138;
+    optional int64 pending_answers_size = 139;
 }

+ 1 - 1
psiphon/server/proto/ca.psiphon.psiphond/irregular_tunnel.proto

@@ -21,7 +21,7 @@ message IrregularTunnel {
     optional int64 duplicate_elapsed_time_ms = 107;
     optional string duplicate_seed = 108;
     optional string duplicate_seed_type = 109;
-    optional uint32 listener_port_number = 110;
+    optional int64 listener_port_number = 110;
     optional string listener_protocol = 111;
     optional string tunnel_error = 112;
 }

+ 5 - 5
psiphon/server/proto/ca.psiphon.psiphond/psiphond.proto

@@ -20,12 +20,12 @@ import "ca.psiphon.psiphond/unique_user.proto";
 option go_package = "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/pb/psiphond";
 
 message Psiphond {
-  optional google.protobuf.Timestamp timestamp = 1;
+  google.protobuf.Timestamp timestamp = 1;
 
-  optional string host_id = 2;
-  optional string host_type = 3;
-  optional string host_build_rev = 4;
-  optional string provider = 5;
+  string host_id = 2;
+  string host_type = 3;
+  string host_build_rev = 4;
+  string provider = 5;
  
   oneof metric {
     ca.psiphon.psiphond.DomainBytes domain_bytes = 101;

+ 9 - 9
psiphon/server/proto/ca.psiphon.psiphond/server_load.proto

@@ -31,12 +31,12 @@ message ServerLoad {
 
   optional double cpu_percent = 100;
 
-  optional uint64 heap_alloc = 101;
-  optional uint64 heap_idle = 102;
-  optional uint64 heap_inuse = 103;
-  optional uint64 heap_objects = 104;
-  optional uint64 heap_released = 105;
-  optional uint64 heap_sys = 106;
+  optional int64 heap_alloc = 101;
+  optional int64 heap_idle = 102;
+  optional int64 heap_inuse = 103;
+  optional int64 heap_objects = 104;
+  optional int64 heap_released = 105;
+  optional int64 heap_sys = 106;
 
   optional int64 network_bytes_received = 107;
   optional int64 network_bytes_sent = 108;
@@ -45,9 +45,9 @@ message ServerLoad {
   optional int64 establish_tunnels_limited_count = 110;
 
   optional google.protobuf.Timestamp last_gc = 111;
-  optional uint32 num_forced_gc = 112;
-  optional uint32 num_gc = 113;
-  optional uint32 num_goroutine = 114;
+  optional int64 num_forced_gc = 112;
+  optional int64 num_gc = 113;
+  optional int64 num_goroutine = 114;
 
   optional int64 replay_delete_replay_count = 115;
   optional int64 replay_failed_replay_count = 116;

+ 4 - 8
psiphon/server/protobufConverter.go

@@ -122,14 +122,10 @@ func newPsiphondProtobufMessageWrapper(ts *timestamppb.Timestamp, hostType strin
 	}
 
 	wrapper.Timestamp = ts
-
-	wrapper.HostId = &logHostID
-	wrapper.HostBuildRev = &logBuildRev
-	if logHostProvider != "" {
-		wrapper.Provider = &logHostProvider
-	}
-
-	wrapper.HostType = &hostType
+	wrapper.HostId = logHostID
+	wrapper.HostBuildRev = logBuildRev
+	wrapper.Provider = logHostProvider
+	wrapper.HostType = hostType
 
 	return wrapper
 }

+ 9 - 9
psiphon/server/services.go

@@ -461,15 +461,15 @@ func getRuntimeMetrics() LogFields {
 	}
 
 	return LogFields{
-		"num_goroutine": numGoroutine,
-		"heap_alloc":    memStats.HeapAlloc,
-		"heap_sys":      memStats.HeapSys,
-		"heap_idle":     memStats.HeapIdle,
-		"heap_inuse":    memStats.HeapInuse,
-		"heap_released": memStats.HeapReleased,
-		"heap_objects":  memStats.HeapObjects,
-		"num_gc":        memStats.NumGC,
-		"num_forced_gc": memStats.NumForcedGC,
+		"num_goroutine": int64(numGoroutine),
+		"heap_alloc":    int64(memStats.HeapAlloc),
+		"heap_sys":      int64(memStats.HeapSys),
+		"heap_idle":     int64(memStats.HeapIdle),
+		"heap_inuse":    int64(memStats.HeapInuse),
+		"heap_released": int64(memStats.HeapReleased),
+		"heap_objects":  int64(memStats.HeapObjects),
+		"num_gc":        int64(memStats.NumGC),
+		"num_forced_gc": int64(memStats.NumForcedGC),
 		"last_gc":       lastGC,
 	}
 }

+ 43 - 39
psiphon/server/tunnelServer.go

@@ -78,6 +78,7 @@ const (
 	RANDOM_STREAM_MAX_BYTES               = 10485760
 	ALERT_REQUEST_QUEUE_BUFFER_SIZE       = 16
 	SSH_MAX_CLIENT_COUNT                  = 100000
+	SSH_CLIENT_MAX_DSL_REQUEST_COUNT      = 32
 )
 
 // TunnelServer is the main server that accepts Psiphon client
@@ -150,6 +151,13 @@ func (server *TunnelServer) Run() error {
 
 	var listeners []*sshListener
 
+	defer func() {
+		// Ensure listeners are closed on early error returns.
+		for _, listener := range listeners {
+			listener.Close()
+		}
+	}()
+
 	for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
 
 		localAddress := net.JoinHostPort(
@@ -213,26 +221,40 @@ func (server *TunnelServer) Run() error {
 				maxPacketSizeAdjustment,
 				support.Config.ObfuscatedSSHKey,
 				enableGQUIC)
+			if err != nil {
+				return errors.Trace(err)
+			}
 
 		} else if protocol.TunnelProtocolUsesRefractionNetworking(tunnelProtocol) {
 
 			listener, err = refraction.Listen(localAddress)
+			if err != nil {
+				return errors.Trace(err)
+			}
 
 		} else if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
 
 			listener, err = net.Listen("tcp", localAddress)
+			if err != nil {
+				return errors.Trace(err)
+			}
 
 		} else {
 
 			// Only direct, unfronted protocol listeners use TCP BPF circumvention
 			// programs.
 			listener, BPFProgramName, err = newTCPListenerWithBPF(support, localAddress)
+			if err != nil {
+				return errors.Trace(err)
+			}
 
 			if protocol.TunnelProtocolUsesTLSOSSH(tunnelProtocol) {
+
 				listener, err = ListenTLSTunnel(support, listener, tunnelProtocol, listenPort)
 				if err != nil {
 					return errors.Trace(err)
 				}
+
 			} else if protocol.TunnelProtocolUsesShadowsocks(tunnelProtocol) {
 
 				logTunnelProtocol := tunnelProtocol
@@ -252,13 +274,6 @@ func (server *TunnelServer) Run() error {
 			}
 		}
 
-		if err != nil {
-			for _, existingListener := range listeners {
-				existingListener.Listener.Close()
-			}
-			return errors.Trace(err)
-		}
-
 		tacticsListener := NewTacticsListener(
 			support,
 			listener,
@@ -402,14 +417,11 @@ func (server *TunnelServer) GetEstablishTunnelsMetrics() (bool, int64) {
 }
 
 type sshServer struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	lastAuthLog             int64
-	authFailedCount         int64
-	establishLimitedCount   int64
 	support                 *SupportServices
 	establishTunnels        int32
+	lastAuthLog             atomic.Int64
+	authFailedCount         atomic.Int64
+	establishLimitedCount   atomic.Int64
 	concurrentSSHHandshakes semaphore.Semaphore
 	shutdownBroadcast       <-chan struct{}
 	sshHostKey              ssh.Signer
@@ -616,7 +628,7 @@ func (sshServer *sshServer) setEstablishTunnels(establish bool) {
 func (sshServer *sshServer) checkEstablishTunnels() bool {
 	establishTunnels := atomic.LoadInt32(&sshServer.establishTunnels) == 1
 	if !establishTunnels {
-		atomic.AddInt64(&sshServer.establishLimitedCount, 1)
+		sshServer.establishLimitedCount.Add(1)
 	}
 	return establishTunnels
 }
@@ -631,7 +643,7 @@ func (sshServer *sshServer) checkLoadLimiting() bool {
 
 func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
 	return atomic.LoadInt32(&sshServer.establishTunnels) == 1,
-		atomic.SwapInt64(&sshServer.establishLimitedCount, 0)
+		sshServer.establishLimitedCount.Swap(0)
 }
 
 // additionalTransportData is additional data gathered at transport level,
@@ -1967,6 +1979,7 @@ type sshClient struct {
 	checkedServerEntryTags               int
 	invalidServerEntryTags               int
 	sshProtocolBytesTracker              *sshProtocolBytesTracker
+	dslRequestCount                      int
 }
 
 type trafficState struct {
@@ -2184,11 +2197,8 @@ func (lookup *splitTunnelLookup) lookup(region string) bool {
 }
 
 type inproxyProxyQualityTracker struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	bytesUp         int64
-	bytesDown       int64
+	bytesUp         atomic.Int64
+	bytesDown       atomic.Int64
 	reportTriggered int32
 
 	sshClient       *sshClient
@@ -2226,8 +2236,8 @@ func (t *inproxyProxyQualityTracker) UpdateProgress(
 		return
 	}
 
-	bytesUp := atomic.AddInt64(&t.bytesUp, upstreamBytes)
-	bytesDown := atomic.AddInt64(&t.bytesDown, downstreamBytes)
+	bytesUp := t.bytesUp.Add(upstreamBytes)
+	bytesDown := t.bytesDown.Add(downstreamBytes)
 
 	if (t.targetBytesUp == 0 || bytesUp >= t.targetBytesUp) &&
 		(t.targetBytesDown == 0 || bytesDown >= t.targetBytesDown) &&
@@ -2266,18 +2276,12 @@ func (t *inproxyProxyQualityTracker) UpdateProgress(
 }
 
 type sshProtocolBytesTracker struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	totalBytesRead    int64
-	totalBytesWritten int64
+	totalBytesRead    atomic.Int64
+	totalBytesWritten atomic.Int64
 }
 
 func newSSHProtocolBytesTracker(sshClient *sshClient) *sshProtocolBytesTracker {
-	return &sshProtocolBytesTracker{
-		totalBytesRead:    0,
-		totalBytesWritten: 0,
-	}
+	return &sshProtocolBytesTracker{}
 }
 
 func (t *sshProtocolBytesTracker) UpdateProgress(
@@ -2286,8 +2290,8 @@ func (t *sshProtocolBytesTracker) UpdateProgress(
 	// Concurrency: UpdateProgress may be called concurrently; all accesses to
 	// mutated fields use atomic operations.
 
-	atomic.AddInt64(&t.totalBytesRead, bytesRead)
-	atomic.AddInt64(&t.totalBytesWritten, bytesWritten)
+	t.totalBytesRead.Add(bytesRead)
+	t.totalBytesWritten.Add(bytesWritten)
 }
 
 func newSshClient(
@@ -2978,13 +2982,13 @@ func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string
 		// retain some record of this activity in case this is relevant to, e.g., a performance
 		// investigation.
 
-		atomic.AddInt64(&sshClient.sshServer.authFailedCount, 1)
+		sshClient.sshServer.authFailedCount.Add(1)
 
-		lastAuthLog := monotime.Time(atomic.LoadInt64(&sshClient.sshServer.lastAuthLog))
+		lastAuthLog := monotime.Time(sshClient.sshServer.lastAuthLog.Load())
 		if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {
 			now := int64(monotime.Now())
-			if atomic.CompareAndSwapInt64(&sshClient.sshServer.lastAuthLog, int64(lastAuthLog), now) {
-				count := atomic.SwapInt64(&sshClient.sshServer.authFailedCount, 0)
+			if sshClient.sshServer.lastAuthLog.CompareAndSwap(int64(lastAuthLog), now) {
+				count := sshClient.sshServer.authFailedCount.Swap(0)
 				log.WithTraceFields(
 					LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")
 			}
@@ -3848,8 +3852,8 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 	logFields["bytes"] = bytes
 
 	// Pre-calculate ssh protocol bytes and overhead.
-	sshProtocolBytes := sshClient.sshProtocolBytesTracker.totalBytesWritten +
-		sshClient.sshProtocolBytesTracker.totalBytesRead
+	sshProtocolBytes := sshClient.sshProtocolBytesTracker.totalBytesWritten.Load() +
+		sshClient.sshProtocolBytesTracker.totalBytesRead.Load()
 	logFields["ssh_protocol_bytes"] = sshProtocolBytes
 	logFields["ssh_protocol_bytes_overhead"] = sshProtocolBytes - bytes
 

+ 14 - 19
psiphon/server/udp.go

@@ -296,14 +296,12 @@ func (mux *udpgwPortForwardMultiplexer) run() {
 				dialIP:         dialIP,
 				conn:           conn,
 				lruEntry:       lruEntry,
-				bytesUp:        0,
-				bytesDown:      0,
 				relayWaitGroup: new(sync.WaitGroup),
 				mux:            mux,
 			}
 
 			if message.forwardDNS {
-				portForward.dnsFirstWriteTime = int64(monotime.Now())
+				portForward.dnsFirstWriteTime.Store(int64(monotime.Now()))
 			}
 
 			mux.portForwardsMutex.Lock()
@@ -326,7 +324,7 @@ func (mux *udpgwPortForwardMultiplexer) run() {
 
 		portForward.lruEntry.Touch()
 
-		atomic.AddInt64(&portForward.bytesUp, int64(len(message.packet)))
+		portForward.bytesUp.Add(int64(len(message.packet)))
 	}
 
 	// Cleanup all udpgw port forward workers when exiting
@@ -348,13 +346,10 @@ func (mux *udpgwPortForwardMultiplexer) removePortForward(connID uint16) {
 }
 
 type udpgwPortForward struct {
-	// Note: 64-bit ints used with atomic operations are placed
-	// at the start of struct to ensure 64-bit alignment.
-	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	dnsFirstWriteTime int64
-	dnsFirstReadTime  int64
-	bytesUp           int64
-	bytesDown         int64
+	dnsFirstWriteTime atomic.Int64
+	dnsFirstReadTime  atomic.Int64
+	bytesUp           atomic.Int64
+	bytesDown         atomic.Int64
 	connID            uint16
 	preambleSize      int
 	remoteIP          []byte
@@ -417,9 +412,9 @@ func (portForward *udpgwPortForward) relayDownstream() {
 			break
 		}
 
-		if atomic.LoadInt64(&portForward.dnsFirstWriteTime) > 0 &&
-			atomic.LoadInt64(&portForward.dnsFirstReadTime) == 0 { // Check if already set before invoking Now.
-			atomic.CompareAndSwapInt64(&portForward.dnsFirstReadTime, 0, int64(monotime.Now()))
+		if portForward.dnsFirstWriteTime.Load() > 0 &&
+			portForward.dnsFirstReadTime.Load() == 0 { // Check if already set before invoking Now.
+			portForward.dnsFirstReadTime.CompareAndSwap(0, int64(monotime.Now()))
 		}
 
 		err = writeUdpgwPreamble(
@@ -448,7 +443,7 @@ func (portForward *udpgwPortForward) relayDownstream() {
 
 		portForward.lruEntry.Touch()
 
-		atomic.AddInt64(&portForward.bytesDown, int64(packetSize))
+		portForward.bytesDown.Add(int64(packetSize))
 	}
 
 	portForward.mux.removePortForward(portForward.connID)
@@ -457,11 +452,11 @@ func (portForward *udpgwPortForward) relayDownstream() {
 
 	portForward.conn.Close()
 
-	bytesUp := atomic.LoadInt64(&portForward.bytesUp)
-	bytesDown := atomic.LoadInt64(&portForward.bytesDown)
+	bytesUp := portForward.bytesUp.Load()
+	bytesDown := portForward.bytesDown.Load()
 	portForward.mux.sshClient.closedPortForward(portForwardTypeUDP, bytesUp, bytesDown)
 
-	dnsStartTime := monotime.Time(atomic.LoadInt64(&portForward.dnsFirstWriteTime))
+	dnsStartTime := monotime.Time(portForward.dnsFirstWriteTime.Load())
 	if dnsStartTime > 0 {
 
 		// Record DNS metrics using a heuristic: if a UDP packet was written and
@@ -471,7 +466,7 @@ func (portForward *udpgwPortForward) relayDownstream() {
 		// assume a resolver will not respond when, e.g., rate limiting; we ignore
 		// subsequent requests made via the same UDP port forward.
 
-		dnsEndTime := monotime.Time(atomic.LoadInt64(&portForward.dnsFirstReadTime))
+		dnsEndTime := monotime.Time(portForward.dnsFirstReadTime.Load())
 
 		dnsSuccess := true
 		if dnsEndTime == 0 {

+ 3 - 3
psiphon/server/utils.go

@@ -79,10 +79,10 @@ func min(a, b int) int {
 	return b
 }
 
-func greaterThanSwapInt64(addr *int64, new int64) bool {
-	old := atomic.LoadInt64(addr)
+func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
+	old := addr.Load()
 	if new > old {
-		return atomic.CompareAndSwapInt64(addr, old, new)
+		return addr.CompareAndSwap(old, new)
 	}
 	return false
 }