Просмотр исходного кода

Prioritization of in-proxy proxies by tunnel quality

Rod Hynes 1 год назад
Родитель
Сommit
1490ae0faf

+ 58 - 0
psiphon/common/certificate.go

@@ -148,3 +148,61 @@ func GenerateWebServerCertificate(hostname string) (string, string, string, erro
 
 	return string(webServerCertificate), string(webServerPrivateKey), pin, nil
 }
+
+// VerifyServerCertificate and VerifyCertificatePins test coverage provided by
+// psiphon/controller_test and psiphon/server/server_test.
+
+// VerifyServerCertificate parses and verifies the provided chain. If
+// successful, it returns the verified chains that were built.
+func VerifyServerCertificate(
+	rootCAs *x509.CertPool, rawCerts [][]byte, verifyServerName string) ([][]*x509.Certificate, error) {
+
+	// This duplicates the verification logic in utls (and standard crypto/tls).
+
+	certs := make([]*x509.Certificate, len(rawCerts))
+	for i, rawCert := range rawCerts {
+		cert, err := x509.ParseCertificate(rawCert)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+		certs[i] = cert
+	}
+
+	opts := x509.VerifyOptions{
+		Roots:         rootCAs,
+		DNSName:       verifyServerName,
+		Intermediates: x509.NewCertPool(),
+	}
+
+	for i, cert := range certs {
+		if i == 0 {
+			continue
+		}
+		opts.Intermediates.AddCert(cert)
+	}
+
+	verifiedChains, err := certs[0].Verify(opts)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return verifiedChains, nil
+}
+
+// VerifyCertificatePins checks whether any specified certificate pin -- a
+// SHA-256 hash of a certificate public key -- if found in the given
+// certificate chain.
+func VerifyCertificatePins(pins []string, verifiedChains [][]*x509.Certificate) error {
+	for _, chain := range verifiedChains {
+		for _, cert := range chain {
+			publicKeyDigest := sha256.Sum256(cert.RawSubjectPublicKeyInfo)
+			expectedPin := base64.StdEncoding.EncodeToString(publicKeyDigest[:])
+			if Contains(pins, expectedPin) {
+				// Return success on the first match of any certificate public key to any
+				// pin.
+				return nil
+			}
+		}
+	}
+	return errors.TraceNew("no pin found")
+}

+ 120 - 0
psiphon/common/inproxy/api.go

@@ -23,6 +23,9 @@ import (
 	"crypto/rand"
 	"crypto/subtle"
 	"encoding/base64"
+	"encoding/binary"
+	"math"
+	"strconv"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
@@ -510,6 +513,65 @@ type BrokerServerReport struct {
 	ProxyPortMappingTypes PortMappingTypes `cbor:"6,keyasint,omitempty"`
 }
 
+// ProxyQualityKey is the key that proxy quality is indexed on a proxy ID and
+// a proxy ASN. Quality is tracked at a fine-grained level, with the proxy ID
+// representing, typically, an individual device, and the proxy ASN
+// representing the network the device used at the time a quality tunnel was
+// reported.
+type ProxyQualityKey [36]byte
+
+// MakeProxyQualityKey creates a ProxyQualityKey using the given proxy ID and
+// proxy ASN. In the key, the proxy ID remains encoded as-is, and the ASN is
+// encoded in the 4-byte representation (see RFC6793).
+func MakeProxyQualityKey(proxyID ID, proxyASN string) ProxyQualityKey {
+	var key ProxyQualityKey
+	copy(key[0:32], proxyID[:])
+	ASN, err := strconv.Atoi(proxyASN)
+	if err != nil || ASN < 0 || ASN > math.MaxUint32 {
+		// In cases including failed or misconfigured GeoIP lookups -- with
+		// values such as server.GEOIP_UNKNOWN_VALUE or invalid AS numbers --
+		// fall back to a reserved AS number (see RFC5398). This is, effectively, a less
+		// fine-grained key.
+		//
+		// Note that GeoIP lookups are performed server-side and a proxy
+		// itself cannot force this downgrade (to obtain false quality
+		// classification across different networks).
+		ASN = 65536
+	}
+	binary.BigEndian.PutUint32(key[32:36], uint32(ASN))
+	return key
+}
+
+// ProxyQualityASNCounts is tunnel quality data, a map from client ASNs to
+// counts of quality tunnels that a proxy relayed for those client ASNs.
+type ProxyQualityASNCounts map[string]int
+
+// ProxyQualityRequestCounts is ProxyQualityASNCounts for a set of proxies.
+type ProxyQualityRequestCounts map[ProxyQualityKey]ProxyQualityASNCounts
+
+// ServerProxyQualityRequest is an API request sent from a server to a broker,
+// reporting a set of proxy IDs/ASNs that have relayed quality tunnels -- as
+// determined by bytes transferred and duration thresholds -- for clients in
+// the given ASNs. This quality data is used, by brokers, to prioritize
+// well-performing proxies, and to match clients with proxies that worked
+// successfully for the client's ASN.
+//
+// QualityCounts is a map from proxy ID/ASN to ASN quality tunnel counts.
+//
+// DialParameters specifies additional parameters to log with proxy quality
+// broker events, including any relevant server broker dial parameters.
+// Unlike clients and proxies, servers do not send BaseAPIParameters to
+// brokers.
+type ServerProxyQualityRequest struct {
+	QualityCounts  ProxyQualityRequestCounts    `cbor:"1,keyasint,omitempty"`
+	DialParameters protocol.PackedAPIParameters `cbor:"2,keyasint,omitempty"`
+}
+
+// ServerProxyQualityResponse is the acknowledgement for a
+// ServerProxyQualityRequest.
+type ServerProxyQualityResponse struct {
+}
+
 // GetNetworkType extracts the network_type from base API metrics and returns
 // a corresponding NetworkType. This is the one base metric that is used in
 // the broker logic, and not simply logged.
@@ -540,6 +602,8 @@ const (
 	maxPaddingSize    = 16384
 	maxDecoyMessages  = 100
 	maxDecoySize      = 16384
+
+	maxQualityCounts = 10000
 )
 
 // ValidateAndGetParametersAndLogFields validates the ProxyMetrics and returns
@@ -944,6 +1008,40 @@ func (report *BrokerServerReport) ValidateAndGetLogFields(
 	return logFields, nil
 }
 
+// ValidateAndGetLogFields validates the ServerProxyQualityRequest and returns
+// common.LogFields for logging.
+func (request *ServerProxyQualityRequest) ValidateAndGetLogFields() (common.LogFields, error) {
+
+	if len(request.QualityCounts) > maxQualityCounts {
+		return nil, errors.Tracef("invalid quality count length: %d", len(request.QualityCounts))
+	}
+
+	// Currently, there is no custom validator or formatter for
+	// DialParameters, as there is for the BaseAPIParameters sent by clients
+	// and proxies:
+	//
+	// - The DialParameters inputs, used only to annotate logs, are from a
+	//   trusted Psiphon server.
+	//
+	// - Psiphon servers do not send fields required by the existing
+	//   BaseAPIParameters validators, such as sponsor ID.
+	//
+	// - No formatter transforms, such as "0"/"1" to bool, are currently
+	//   expected; and server.getRequestLogFields is inefficient when a
+	//   couple of log fields are expected; for an example for any future
+	//   special case formatter, see
+	//   server.getInproxyBrokerServerReportParameterLogFieldFormatter.
+
+	dialParams, err := protocol.DecodePackedAPIParameters(request.DialParameters)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	logFields := common.LogFields(dialParams)
+
+	return logFields, nil
+}
+
 func MarshalProxyAnnounceRequest(request *ProxyAnnounceRequest) ([]byte, error) {
 	payload, err := marshalRecord(request, recordTypeAPIProxyAnnounceRequest)
 	return payload, errors.Trace(err)
@@ -1042,3 +1140,25 @@ func UnmarshalBrokerServerReport(payload []byte) (*BrokerServerReport, error) {
 	err := unmarshalRecord(recordTypeAPIBrokerServerReport, payload, &request)
 	return request, errors.Trace(err)
 }
+
+func MarshalServerProxyQualityRequest(request *ServerProxyQualityRequest) ([]byte, error) {
+	payload, err := marshalRecord(request, recordTypeAPIServerProxyQualityRequest)
+	return payload, errors.Trace(err)
+}
+
+func UnmarshalServerProxyQualityRequest(payload []byte) (*ServerProxyQualityRequest, error) {
+	var request *ServerProxyQualityRequest
+	err := unmarshalRecord(recordTypeAPIServerProxyQualityRequest, payload, &request)
+	return request, errors.Trace(err)
+}
+
+func MarshalServerProxyQualityResponse(response *ServerProxyQualityResponse) ([]byte, error) {
+	payload, err := marshalRecord(response, recordTypeAPIServerProxyQualityResponse)
+	return payload, errors.Trace(err)
+}
+
+func UnmarshalServerProxyQualityResponse(payload []byte) (*ServerProxyQualityResponse, error) {
+	var response *ServerProxyQualityResponse
+	err := unmarshalRecord(recordTypeAPIServerProxyQualityResponse, payload, &response)
+	return response, errors.Trace(err)
+}

+ 188 - 12
psiphon/common/inproxy/broker.go

@@ -77,12 +77,14 @@ type GetTacticsPayload func(
 // runs a Broker and calls Broker.HandleSessionPacket to handle web requests
 // encapsulating secure session packets.
 type Broker struct {
-	config               *BrokerConfig
-	brokerID             ID
-	initiatorSessions    *InitiatorSessions
-	responderSessions    *ResponderSessions
-	matcher              *Matcher
-	pendingServerReports *lrucache.Cache
+	config                  *BrokerConfig
+	brokerID                ID
+	initiatorSessions       *InitiatorSessions
+	responderSessions       *ResponderSessions
+	matcher                 *Matcher
+	pendingServerReports    *lrucache.Cache
+	proxyQualityState       *ProxyQualityState
+	knownServerInitiatorIDs sync.Map
 
 	commonCompartmentsMutex sync.Mutex
 	commonCompartments      *consistent.Consistent
@@ -235,6 +237,8 @@ func NewBroker(config *BrokerConfig) (*Broker, error) {
 		return nil, errors.Trace(err)
 	}
 
+	proxyQuality := NewProxyQuality()
+
 	b := &Broker{
 		config:            config,
 		brokerID:          ID(brokerID),
@@ -247,13 +251,18 @@ func NewBroker(config *BrokerConfig) (*Broker, error) {
 			AnnouncementRateLimitQuantity:  config.MatcherAnnouncementRateLimitQuantity,
 			AnnouncementRateLimitInterval:  config.MatcherAnnouncementRateLimitInterval,
 			AnnouncementNonlimitedProxyIDs: config.MatcherAnnouncementNonlimitedProxyIDs,
-			OfferLimitEntryCount:           config.MatcherOfferLimitEntryCount,
-			OfferRateLimitQuantity:         config.MatcherOfferRateLimitQuantity,
-			OfferRateLimitInterval:         config.MatcherOfferRateLimitInterval,
+
+			OfferLimitEntryCount:   config.MatcherOfferLimitEntryCount,
+			OfferRateLimitQuantity: config.MatcherOfferRateLimitQuantity,
+			OfferRateLimitInterval: config.MatcherOfferRateLimitInterval,
+
+			ProxyQualityState: proxyQuality,
 
 			IsLoadLimiting: config.IsLoadLimiting,
 		}),
 
+		proxyQualityState: proxyQuality,
+
 		proxyAnnounceTimeout:       int64(config.ProxyAnnounceTimeout),
 		clientOfferTimeout:         int64(config.ClientOfferTimeout),
 		clientOfferPersonalTimeout: int64(config.ClientOfferPersonalTimeout),
@@ -347,6 +356,17 @@ func (b *Broker) SetLimits(
 		int64(common.ValueOrDefault(maxCompartmentIDs, MaxCompartmentIDs)))
 }
 
+func (b *Broker) SetProxyQualityParameters(
+	qualityTTL time.Duration,
+	pendingFailedMatchDeadline time.Duration,
+	failedMatchThreshold int) {
+
+	b.proxyQualityState.SetParameters(
+		qualityTTL,
+		pendingFailedMatchDeadline,
+		failedMatchThreshold)
+}
+
 // HandleSessionPacket handles a session packet from a client or proxy and
 // provides a response packet. The packet is part of a secure session and may
 // be a session handshake message, an expired session reset token, or a
@@ -432,6 +452,18 @@ func (b *Broker) HandleSessionPacket(
 			if err != nil {
 				return nil, errors.Trace(err)
 			}
+		case recordTypeAPIServerProxyQualityRequest:
+			responsePayload, err = b.handleServerProxyQuality(
+				ctx,
+				extendTransportTimeout,
+				transportLogFields,
+				brokerClientIP,
+				geoIPData,
+				initiatorID,
+				unwrappedRequestPayload)
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
 		case recordTypeAPIClientRelayedPacketRequest:
 			responsePayload, err = b.handleClientRelayedPacket(
 				ctx,
@@ -643,10 +675,39 @@ func (b *Broker) handleProxyAnnounce(
 		commonCompartmentIDs = []ID{compartmentID}
 	}
 
-	// In the common compartment ID case, invoke the callback to check if the
-	// announcement should be prioritized.
+	// Determine whether to enqueue the proxy announcement in the priority
+	// queue. To be prioritized, a proxy, identified by its ID and ASN, must
+	// have a recent quality tunnel recorded in the quality state. In
+	// addition, when the PrioritizeProxy callback is set, invoke this
+	// additional condition, which can filter by proxy geolocation and other
+	// properties.
+	//
+	// There is no prioritization for personal pairing announcements.
+
+	// Potential future enhancements:
+	//
+	// - For a proxy with unknown quality (neither reported quality tunnels,
+	//   nor known failed matches), prioritize with some low probability to
+	//   give unknown proxies a chance to qualify? This could be limited, for
+	//   example, to proxies in the same ASN as other quality proxies. To
+	//   implement this, ProxyQualityState would need to record proxy IDs
+	//   with failed matches; and proxy ASNs would need to be input to
+	//   ProxyQualityState.
+	//
+	// - Consider using the Psiphon server region, as given in the signed
+	//   server entry, as part of the prioritization logic.
+
+	if !hasPersonalCompartmentIDs {
+
+		// Here, no specific client ASN is specified for HasQuality. As long
+		// as a proxy has a quality tunnel for any client ASN, it is
+		// prioritized. In the matching process, an attempt is made to match
+		// using HasQuality using the client ASN. See Matcher.matchOffer.
 
-	if b.config.PrioritizeProxy != nil && !hasPersonalCompartmentIDs {
+		isPriority = b.proxyQualityState.HasQuality(proxyID, geoIPData.ASN, "")
+	}
+
+	if isPriority && b.config.PrioritizeProxy != nil {
 
 		// Limitation: Of the two return values from
 		// ValidateAndGetParametersAndLogFields, apiParams and logFields,
@@ -788,6 +849,18 @@ func (b *Broker) handleProxyAnnounce(
 		return nil, errors.Trace(err)
 	}
 
+	// Set the "failed match" trigger, which will progress towards clearing
+	// the quality state for this proxyID unless quality tunnels are reported
+	// soon enough after matches. This includes failure, by the proxy, to
+	// return an proxy answer, as well as any tunnel failures after that.
+	//
+	// Failures are expected even for good quality proxies, due to cases such
+	// as the in-proxy protocol losing the client tunnel establishment horse
+	// race. There is a threshold number of failed matches that must be
+	// reached before a quality state is cleared.
+
+	b.proxyQualityState.Matched(proxyID, geoIPData.ASN)
+
 	return responsePayload, nil
 }
 
@@ -1233,6 +1306,88 @@ func (b *Broker) handleProxyAnswer(
 	return responsePayload, nil
 }
 
+// handleServerProxyQuality receives, from servers, proxy tunnel quality and
+// records that in the proxy quality state that is used to prioritize
+// well-performing proxies.
+func (b *Broker) handleServerProxyQuality(
+	ctx context.Context,
+	extendTransportTimeout ExtendTransportTimeout,
+	transportLogFields common.LogFields,
+	proxyIP string,
+	geoIPData common.GeoIPData,
+	initiatorID ID,
+	requestPayload []byte) (retResponse []byte, retErr error) {
+
+	startTime := time.Now()
+
+	var logFields common.LogFields
+
+	// Only known, trusted Psiphon server initiators are allowed to send proxy
+	// quality requests. knownServerInitiatorIDs is populated with the
+	// Curve25519 public keys -- initiator IDs -- corresponding to the
+	// session public keys found in signed Psiphon server entries.
+	//
+	// Currently, knownServerInitiatorIDs is populated with destination server
+	// entries received in client offers, so the broker must first receive a
+	// client offer before a given server is trusted, which means
+	// that "invalid initiator" errors may occur, and some quality requests
+	// may be dropped, in some expected situations, including a broker restart.
+
+	// serverID is the server entry diagnostic ID of the server.
+	serverIDValue, ok := b.knownServerInitiatorIDs.Load(initiatorID)
+	if !ok {
+		return nil, errors.TraceNew("invalid initiator")
+	}
+	serverID := serverIDValue.(string)
+
+	// Always log the outcome.
+	defer func() {
+
+		// Typically, a server will send the same proxy quality request to all
+		// brokers. For the one "broadcast" request, server-proxy-quality is
+		// logged by each broker, as an indication that every server/broker
+		// request pair is successful.
+		//
+		// TODO: log more details from ServerProxyQualityRequest.QualityCounts?
+
+		if logFields == nil {
+			logFields = common.LogFields{}
+		}
+		logFields["broker_event"] = "server-proxy-quality"
+		logFields["broker_id"] = b.brokerID
+		logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
+		logFields["server_id"] = serverID
+		logFields.Add(transportLogFields)
+		b.config.Logger.LogMetric(brokerMetricName, logFields)
+	}()
+
+	qualityRequest, err := UnmarshalServerProxyQualityRequest(requestPayload)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	logFields, err = qualityRequest.ValidateAndGetLogFields()
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	// Add the quality counts into the existing proxy quality state.
+	for proxyKey, counts := range qualityRequest.QualityCounts {
+		b.proxyQualityState.AddQuality(proxyKey, counts)
+	}
+
+	// There is no data in this response, it's simply an acknowledgement that
+	// the request was received.
+
+	responsePayload, err := MarshalServerProxyQualityResponse(
+		&ServerProxyQualityResponse{})
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return responsePayload, nil
+}
+
 // handleClientRelayedPacket facilitates broker/server sessions. The initial
 // packet from the broker is sent to the client in the ClientOfferResponse.
 // The client sends that to the server in the Psiphon handshake. If the
@@ -1558,6 +1713,27 @@ func (b *Broker) validateDestination(
 		return nil, errors.Trace(err)
 	}
 
+	// Record that this server is known and trusted ServerProxyQualityRequest
+	// sender. The serverID is stored for logging in handleServerProxyQuality.
+	//
+	// There is no expiry for knownServerInitiatorIDs entries, and they will
+	// clear only if the broker is restarted (which is the same lifetime as
+	// ServerEntrySignaturePublicKey).
+	//
+	// Limitation: in time, the above IsValidServerEntryTag check could become
+	// false for a retired server, while its entry remains in
+	// knownServerInitiatorIDs. However, unlike the case of a recycled
+	// Psiphon server IP being used as a proxy destination, it's safer to
+	// assume that a retired server's session private key does not become
+	// exposed.
+
+	serverInitiatorID, err := params.sessionPublicKey.ToCurve25519()
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	b.knownServerInitiatorIDs.Store(ID(serverInitiatorID), serverID)
+
 	return params, nil
 }
 

+ 5 - 1
psiphon/common/inproxy/coordinator.go

@@ -32,15 +32,19 @@ import (
 // fronted HTTPS. RoundTripper is used by clients and proxies to make
 // requests to brokers.
 //
-// The round trip implementation must apply any specified delay before the
+// The RoundTrip implementation must apply any specified delay before the
 // network round trip begins; and apply the specified timeout to the network
 // round trip, excluding any delay.
+//
+// Close must interrupt any in-flight requests and close all network
+// resources.
 type RoundTripper interface {
 	RoundTrip(
 		ctx context.Context,
 		roundTripDelay time.Duration,
 		roundTripTimeout time.Duration,
 		requestPayload []byte) (responsePayload []byte, err error)
+	Close() error
 }
 
 // RoundTripperFailedError is an error type that should be returned from

+ 93 - 7
psiphon/common/inproxy/inproxy_test.go

@@ -309,13 +309,45 @@ func runTestInproxy(doMustUpgrade bool) error {
 	// Stub server broker request handler (in Psiphon, this will be the
 	// destination Psiphon server; here, it's not necessary to build this
 	// handler into the destination echo server)
+	//
+	// The stub server broker request handler also triggers a server proxy
+	// quality request in the other direction.
+
+	makeServerBrokerClientRoundTripper := func(_ SessionPublicKey) (
+		RoundTripper, common.APIParameters, error) {
+
+		return newHTTPRoundTripper(brokerListener.Addr().String(), "server"), nil, nil
+	}
+
+	serverSessionsConfig := &ServerBrokerSessionsConfig{
+		Logger:                       logger,
+		ServerPrivateKey:             serverPrivateKey,
+		ServerRootObfuscationSecret:  serverRootObfuscationSecret,
+		BrokerPublicKeys:             []SessionPublicKey{brokerPublicKey},
+		BrokerRootObfuscationSecrets: []ObfuscationSecret{brokerRootObfuscationSecret},
+		BrokerRoundTripperMaker:      makeServerBrokerClientRoundTripper,
+		ProxyMetricsValidator:        apiParameterValidator,
+		ProxyMetricsFormatter:        apiParameterLogFieldFormatter,
+		ProxyMetricsPrefix:           "",
+	}
+
+	serverSessions, err := NewServerBrokerSessions(serverSessionsConfig)
+	if err != nil {
+		return errors.Trace(err)
+	}
 
-	serverSessions, err := NewServerBrokerSessions(
-		serverPrivateKey, serverRootObfuscationSecret, []SessionPublicKey{brokerPublicKey},
-		apiParameterValidator, apiParameterLogFieldFormatter, "")
+	err = serverSessions.Start()
 	if err != nil {
 		return errors.Trace(err)
 	}
+	defer serverSessions.Stop()
+
+	// Don't delay reporting quality.
+	serverSessions.SetProxyQualityRequestParameters(
+		proxyQualityReporterMaxRequestEntries,
+		0,
+		proxyQualityReporterRequestTimeout,
+		proxyQualityReporterRequestRetries)
 
 	var pendingBrokerServerReportsMutex sync.Mutex
 	pendingBrokerServerReports := make(map[ID]bool)
@@ -326,20 +358,51 @@ func runTestInproxy(doMustUpgrade bool) error {
 		pendingBrokerServerReports[connectionID] = true
 	}
 
+	removePendingBrokerServerReport := func(connectionID ID) {
+		pendingBrokerServerReportsMutex.Lock()
+		defer pendingBrokerServerReportsMutex.Unlock()
+		delete(pendingBrokerServerReports, connectionID)
+	}
+
 	hasPendingBrokerServerReports := func() bool {
 		pendingBrokerServerReportsMutex.Lock()
 		defer pendingBrokerServerReportsMutex.Unlock()
 		return len(pendingBrokerServerReports) > 0
 	}
 
+	serverQualityGroup := new(errgroup.Group)
+	var serverQualityProxyIDsMutex sync.Mutex
+	serverQualityProxyIDs := make(map[ID]struct{})
+	testProxyASN := "65537"
+	testClientASN := "65538"
+
 	handleBrokerServerReports := func(in []byte, clientConnectionID ID) ([]byte, error) {
 
-		handler := func(brokerVerifiedOriginalClientIP string, logFields common.LogFields) {
-			pendingBrokerServerReportsMutex.Lock()
-			defer pendingBrokerServerReportsMutex.Unlock()
+		handler := func(
+			brokerVerifiedOriginalClientIP string,
+			brokerReportedProxyID ID,
+			brokerMatchedPersonalCompartments bool,
+			logFields common.LogFields) {
 
 			// Mark the report as no longer outstanding
-			delete(pendingBrokerServerReports, clientConnectionID)
+			removePendingBrokerServerReport(clientConnectionID)
+
+			// Trigger an asynchronous proxy quality request to the broker.
+			// This roughly follows the Psiphon server functionality, where a
+			// quality request is made sometime after the Psiphon handshake
+			// completes, once tunnel quality thresholds are achieved.
+
+			serverQualityGroup.Go(func() error {
+				serverSessions.ReportQuality(
+					brokerReportedProxyID, testProxyASN, testClientASN)
+
+				serverQualityProxyIDsMutex.Lock()
+				serverQualityProxyIDs[brokerReportedProxyID] = struct{}{}
+				serverQualityProxyIDsMutex.Unlock()
+
+				return nil
+			})
+
 		}
 
 		out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
@@ -876,6 +939,29 @@ func runTestInproxy(doMustUpgrade bool) error {
 			return errors.TraceNew("unexpected pending proxy tactics callback")
 		}
 
+		err = serverQualityGroup.Wait()
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		// Inspect the broker's proxy quality state, to verify that the proxy
+		// quality request was processed.
+		//
+		// Limitation: currently we don't check the priority
+		// announcement _queue_, as announcements may have arrived before the
+		// quality request, and announcements are promoted between queues.
+
+		serverQualityProxyIDsMutex.Lock()
+		defer serverQualityProxyIDsMutex.Unlock()
+		for proxyID, _ := range serverQualityProxyIDs {
+			if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, "") {
+				return errors.TraceNew("unexpected missing HasQuality (no client ASN)")
+			}
+			if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, testClientASN) {
+				return errors.TraceNew("unexpected missing HasQuality (with client ASN)")
+			}
+		}
+
 		// TODO: check that elapsed time is consistent with rate limit (+/-)
 
 		// Check if STUN server replay callbacks were triggered

+ 50 - 21
psiphon/common/inproxy/matcher.go

@@ -112,6 +112,30 @@ type Matcher struct {
 	pendingAnswers *lrucache.Cache
 }
 
+// MatcherConfig specifies the configuration for a matcher.
+type MatcherConfig struct {
+
+	// Logger is used to log events.
+	Logger common.Logger
+
+	// Announcement queue limits.
+	AnnouncementLimitEntryCount    int
+	AnnouncementRateLimitQuantity  int
+	AnnouncementRateLimitInterval  time.Duration
+	AnnouncementNonlimitedProxyIDs []ID
+
+	// Offer queue limits.
+	OfferLimitEntryCount   int
+	OfferRateLimitQuantity int
+	OfferRateLimitInterval time.Duration
+
+	// Proxy quality state.
+	ProxyQualityState *ProxyQualityState
+
+	// Broker process load limit state callback. See Broker.Config.
+	IsLoadLimiting func() bool
+}
+
 // MatchProperties specifies the compartment, GeoIP, and network topology
 // matching properties of clients and proxies.
 type MatchProperties struct {
@@ -267,27 +291,6 @@ type pendingAnswer struct {
 	answerChan   chan *answerInfo
 }
 
-// MatcherConfig specifies the configuration for a matcher.
-type MatcherConfig struct {
-
-	// Logger is used to log events.
-	Logger common.Logger
-
-	// Announcement queue limits.
-	AnnouncementLimitEntryCount    int
-	AnnouncementRateLimitQuantity  int
-	AnnouncementRateLimitInterval  time.Duration
-	AnnouncementNonlimitedProxyIDs []ID
-
-	// Offer queue limits.
-	OfferLimitEntryCount   int
-	OfferRateLimitQuantity int
-	OfferRateLimitInterval time.Duration
-
-	// Broker process load limit state callback. See Broker.Config.
-	IsLoadLimiting func() bool
-}
-
 // NewMatcher creates a new Matcher.
 func NewMatcher(config *MatcherConfig) *Matcher {
 
@@ -872,6 +875,32 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
 		matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
 
+		// Use proxy ASN quality as an alternative to preferred NAT matches.
+		//
+		// The NAT matching logic depends on RFC5780 NAT discovery test
+		// results, which may not be entirely accurate, and may not be
+		// available in the first place, especially if skipped for clients,
+		// which is the default.
+		//
+		// Proxy ASN quality leverages the quality data, provided by servers,
+		// indicating that the particular proxy recently relayed a successful
+		// tunnel for some client in the given ASN. When this quality data is
+		// present, NAT compatibility is assumed, with the caveat that the
+		// client device and immediate router may not be the same.
+		//
+		// Limitations:
+		// - existsPreferredNATMatch doesn't reflect existence of matching
+		//   proxy ASN quality, so the NAT match probe can end prematurely.
+		// - IsPreferredNATMatch currently takes precedence over proxy ASN
+		//   quality.
+
+		if !matchNAT && isPriority {
+			matchNAT = m.config.ProxyQualityState.HasQuality(
+				announcementEntry.announcement.ProxyID,
+				announcementEntry.announcement.Properties.GeoIPData.ASN,
+				offerProperties.GeoIPData.ASN)
+		}
+
 		// At this point, the candidate is a match. Determine if this is a new
 		// best match, either if there was no previous match, or this is a
 		// better NAT match.

+ 2 - 0
psiphon/common/inproxy/matcher_test.go

@@ -60,6 +60,8 @@ func runTestMatcher() error {
 			OfferLimitEntryCount:   limitEntryCount,
 			OfferRateLimitQuantity: rateLimitQuantity,
 			OfferRateLimitInterval: rateLimitInterval,
+
+			ProxyQualityState: NewProxyQuality(),
 		})
 	err := m.Start()
 	if err != nil {

+ 810 - 0
psiphon/common/inproxy/quality.go

@@ -0,0 +1,810 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package inproxy
+
+import (
+	"container/list"
+	"context"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	lrucache "github.com/cognusion/go-cache-lru"
+)
+
+const (
+	proxyQualityMaxEntries                 = 10000000
+	proxyQualityTTL                        = 24 * time.Hour
+	proxyQualityMaxPendingFailedMatches    = 1000000
+	proxyQualityPendingFailedMatchDeadline = 5 * time.Minute
+	proxyQualityFailedMatchThreshold       = 10
+	proxyQualityReporterMaxQueueEntries    = 5000000
+	proxyQualityReporterMaxRequestEntries  = 1000
+	proxyQualityReporterRequestDelay       = 10 * time.Second
+	proxyQualityReporterRequestTimeout     = 10 * time.Second
+	proxyQualityReporterRequestRetries     = 1
+)
+
+// ProxyQualityState records and manages proxy tunnel quality data reported by
+// servers and used to prioritize proxies in the broker matching process.
+type ProxyQualityState struct {
+	mutex                      sync.Mutex
+	qualityTTL                 time.Duration
+	pendingFailedMatchDeadline time.Duration
+	failedMatchThreshold       int
+	entries                    *lrucache.Cache
+	pendingFailedMatches       *lrucache.Cache
+}
+
+type proxyQualityEntry struct {
+	clientASNCounts  ProxyQualityASNCounts
+	failedMatchCount int
+}
+
+// NewProxyQuality creates a new ProxyQualityState.
+func NewProxyQuality() *ProxyQualityState {
+
+	// Limitation: max cache sizes are not dynamically configurable and are
+	// set to fixed values that are in line with other, indirectly related
+	// limits, such as matcherAnnouncementQueueMaxSize.
+
+	// TODO: lrucache.Cache.DeleteExpired is a linear scan; review the
+	// performance of scanning up to 10,000,000 entries every 1 minute.
+
+	q := &ProxyQualityState{
+		qualityTTL:                 proxyQualityTTL,
+		pendingFailedMatchDeadline: proxyQualityPendingFailedMatchDeadline,
+		failedMatchThreshold:       proxyQualityFailedMatchThreshold,
+
+		entries:              lrucache.NewWithLRU(0, 1*time.Minute, proxyQualityMaxEntries),
+		pendingFailedMatches: lrucache.NewWithLRU(0, 1*time.Minute, proxyQualityMaxPendingFailedMatches),
+	}
+
+	q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
+
+	return q
+}
+
+// SetProxyQualityRequestParameters overrides default values for proxy quality
+// state management parameters.
+//
+// qualityTTL is the TTL for a proxy's quality entry. Each AddQuality call
+// extends an entry's TTL.
+//
+// pendingFailedMatchDeadline is the elapsed time between calling Match for a
+// given proxy, and subsequently incrementing that proxy's failed match
+// count, unless an AddQuality call is made in the meantime.
+//
+// failedMatchThreshold is the threshold failed match count after which a
+// proxy's quality entry is deleted.
+func (q *ProxyQualityState) SetParameters(
+	qualityTTL time.Duration,
+	pendingFailedMatchDeadline time.Duration,
+	failedMatchThreshold int) {
+
+	q.mutex.Lock()
+	defer q.mutex.Unlock()
+
+	q.qualityTTL = qualityTTL
+	q.pendingFailedMatchDeadline = pendingFailedMatchDeadline
+	q.failedMatchThreshold = failedMatchThreshold
+}
+
+// HasQuality indicates if the specified proxy, defined by its ID and ASN, has
+// a quality entry. If the input client ASN is blank, any entry suffices. If
+// a client ASN is given the proxy must have a quality tunnel for a client in
+// that ASN.
+func (q *ProxyQualityState) HasQuality(
+	proxyID ID, proxyASN string, clientASN string) bool {
+
+	q.mutex.Lock()
+	defer q.mutex.Unlock()
+
+	proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
+
+	strProxyKey := string(proxyKey[:])
+
+	entryValue, ok := q.entries.Get(strProxyKey)
+
+	if !ok {
+		return false
+	}
+
+	entry := entryValue.(*proxyQualityEntry)
+
+	// Currently, the actual count value is not used; any count > 0
+	// is "quality".
+
+	if clientASN == "" {
+		// No specific ASN.
+		return len(entry.clientASNCounts) > 0
+	}
+
+	return entry.clientASNCounts[clientASN] > 0
+}
+
+// AddQuality adds a new quality entry or adds counts to an existing quality
+// entry for the specified proxy, defined by its ID and ASN. For an existing
+// entry, its TTL is extended, and any failed match count is reset to zero.
+// AddQuality deletes any pending failed match, set by Matched, for the
+// proxy.
+func (q *ProxyQualityState) AddQuality(
+	proxyKey ProxyQualityKey, counts ProxyQualityASNCounts) {
+
+	q.mutex.Lock()
+	defer q.mutex.Unlock()
+
+	strProxyKey := string(proxyKey[:])
+
+	entryValue, ok := q.entries.Get(strProxyKey)
+
+	var entry *proxyQualityEntry
+	if ok {
+		entry = entryValue.(*proxyQualityEntry)
+	} else {
+		entry = &proxyQualityEntry{
+			clientASNCounts: make(ProxyQualityASNCounts),
+		}
+	}
+
+	// Reset the consecutive failed match count for existing entry.
+	entry.failedMatchCount = 0
+
+	// Add in counts.
+	for ASN, count := range counts {
+		entry.clientASNCounts[ASN] += count
+	}
+
+	// Set both updates the value and extends the TTL for any existing entry.
+	q.entries.Set(strProxyKey, entry, q.qualityTTL)
+
+	// Delete any pending failed match. The actual pending match may still be
+	// in progress and may even fail, but the new quality event is considered
+	// sufficient to ignore that outcome.
+	//
+	// lrucache.Cache.Delete invokes OnEvicted, so OnEvicted is temporarily
+	// cleared to avoid incrementing the failed match count. In addition,
+	// avoiding OnEvicted here ensures that addFailedMatch can assume that
+	// the mutex lock is not held.
+
+	q.pendingFailedMatches.OnEvicted(nil)
+	q.pendingFailedMatches.Delete(strProxyKey)
+	q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
+}
+
+// Matched reports that, for the specified proxy, defined by its ID and ASN, a
+// proxy announcement was just matched with a client offer, and an announcement
+// response returned to the proxy. Matched begins a "countdown" until a
+// subsequent, expected AddQuality call for the same proxy: if too much time
+// elapses with no AddQuality, the match is considered to have failed to
+// produce a successful tunnel. After exceeding a threshold count of
+// consecutive failed matches, a proxy's quality entry is deleted.
+//
+// Matched/AddQuality do not track the outcome of specific matches -- for a
+// given proxy, any successful, quality tunnel will cancel any pending failed
+// match.
+
+func (q *ProxyQualityState) Matched(proxyID ID, proxyASN string) {
+	q.mutex.Lock()
+	defer q.mutex.Unlock()
+
+	// This uses a lrucache.Cache and OnEvicted events as an implementation of
+	// the failed match deadline without requiring a timer or goroutine per
+	// pending match. When the cache entry expires due to TTL, the failed
+	// match deadline is met.
+
+	proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
+
+	strProxyKey := string(proxyKey[:])
+
+	_, ok := q.pendingFailedMatches.Get(strProxyKey)
+	if ok {
+		// When there's already a pending failed match, leave the existing
+		// deadline in place and don't extend it.
+		return
+	}
+
+	q.pendingFailedMatches.Add(
+		strProxyKey, struct{}{}, q.pendingFailedMatchDeadline)
+}
+
+// addFailedMatch is invoked when a pendingFailedMatches expires, increments
+// the failed match count, and removes a quality entry when the failed match
+// threshold count is exceeded.
+func (q *ProxyQualityState) addFailedMatch(strProxyKey string, _ interface{}) {
+
+	// Assumes pendingFailedMatches.OnEvicted is not invoked while already
+	// holding the mutex lock.
+
+	q.mutex.Lock()
+	defer q.mutex.Unlock()
+
+	entryValue, ok := q.entries.Get(strProxyKey)
+	if !ok {
+		// No quality to remove.
+		return
+	}
+
+	entry := entryValue.(*proxyQualityEntry)
+
+	entry.failedMatchCount += 1
+
+	if entry.failedMatchCount >= q.failedMatchThreshold {
+		// Remove quality.
+		q.entries.Delete(strProxyKey)
+	}
+}
+
+// ProxyQualityReporter manages sending proxy quality requests to brokers.
+type ProxyQualityReporter struct {
+	logger                  common.Logger
+	serverBrokerSessions    *ServerBrokerSessions
+	serverSessionPrivateKey SessionPrivateKey
+	roundTripperMaker       ProxyQualityBrokerRoundTripperMaker
+
+	mutex             sync.Mutex
+	runContext        context.Context
+	stopRunning       context.CancelFunc
+	waitGroup         *sync.WaitGroup
+	reportQueue       *list.List
+	proxyIDQueueEntry map[ProxyQualityKey]*list.Element
+
+	brokerPublicKeys             atomic.Value
+	brokerRootObfuscationSecrets atomic.Value
+	requestDelay                 int64
+	maxRequestEntries            int64
+	requestTimeout               int64
+	requestRetries               int64
+
+	signalReport chan struct{}
+}
+
+// ProxyQualityBrokerRoundTripperMaker is a callback which creates a new
+// RoundTripper for sending requests to the broker specified by the given
+// session public key.
+//
+// The optional common.APIParameters are broker dial parameter metrics to be
+// reported to the broker.
+type ProxyQualityBrokerRoundTripperMaker func(SessionPublicKey) (
+	RoundTripper, common.APIParameters, error)
+
+type proxyQualityReportQueueEntry struct {
+	proxyKey ProxyQualityKey
+	counts   ProxyQualityASNCounts
+}
+
+type serverBrokerClient struct {
+	publicKey             SessionPublicKey
+	rootObfuscationSecret ObfuscationSecret
+	brokerInitiatorID     ID
+	sessions              *InitiatorSessions
+	roundTripper          RoundTripper
+	dialParams            common.APIParameters
+}
+
+// NewProxyQualityReporter creates a new ProxyQualityReporter.
+//
+// serverBrokerSessions is the server's ServerBrokerSessions instance which
+// manages inbound reports from the broker; the ServerBrokerSessions is
+// consulted to determine which brokers have recently communicated with the
+// server, and are therefore expected to trust the server's public key.
+//
+// serverSessionPrivateKey is the server's session private key to be used in
+// the quality reporting Noise sessions established with the brokers.
+// brokerPublicKeys specify the brokers to send to.
+//
+// roundTripperMaker is a callback which creates RoundTrippers for these
+// brokers. The ProxyQualityReporter will invoke roundTripperMaker when
+// attempting to send requests to a given broker; each RoundTripper will be
+// retained and reused as long as it continues to work successfully.
+func NewProxyQualityReporter(
+	logger common.Logger,
+	serverBrokerSessions *ServerBrokerSessions,
+	serverSessionPrivateKey SessionPrivateKey,
+	brokerPublicKeys []SessionPublicKey,
+	brokerRootObfuscationSecrets []ObfuscationSecret,
+	roundTripperMaker ProxyQualityBrokerRoundTripperMaker) (
+	*ProxyQualityReporter, error) {
+
+	r := &ProxyQualityReporter{
+		logger:                  logger,
+		serverBrokerSessions:    serverBrokerSessions,
+		serverSessionPrivateKey: serverSessionPrivateKey,
+		roundTripperMaker:       roundTripperMaker,
+
+		waitGroup: new(sync.WaitGroup),
+
+		requestDelay:      int64(proxyQualityReporterRequestDelay),
+		maxRequestEntries: proxyQualityReporterMaxRequestEntries,
+		requestTimeout:    int64(proxyQualityReporterRequestTimeout),
+		requestRetries:    proxyQualityReporterRequestRetries,
+
+		reportQueue:       list.New(),
+		proxyIDQueueEntry: make(map[ProxyQualityKey]*list.Element),
+
+		signalReport: make(chan struct{}, 1),
+	}
+
+	err := r.SetKnownBrokers(brokerPublicKeys, brokerRootObfuscationSecrets)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return r, nil
+}
+
+// SetKnownBrokers updates the set of brokers to send to.
+func (r *ProxyQualityReporter) SetKnownBrokers(
+	brokerPublicKeys []SessionPublicKey,
+	brokerRootObfuscationSecrets []ObfuscationSecret) error {
+
+	if len(brokerPublicKeys) != len(brokerRootObfuscationSecrets) {
+		return errors.TraceNew("invalid broker specs")
+	}
+
+	r.brokerPublicKeys.Store(brokerPublicKeys)
+	r.brokerRootObfuscationSecrets.Store(brokerRootObfuscationSecrets)
+
+	return nil
+}
+
+// SetRequestParameters overrides default values for request parameters.
+func (r *ProxyQualityReporter) SetRequestParameters(
+	maxRequestEntries int,
+	requestDelay time.Duration,
+	requestTimeout time.Duration,
+	requestRetries int) {
+
+	atomic.StoreInt64(&r.requestDelay, int64(requestDelay))
+	atomic.StoreInt64(&r.maxRequestEntries, int64(maxRequestEntries))
+	atomic.StoreInt64(&r.requestTimeout, int64(requestTimeout))
+	atomic.StoreInt64(&r.requestRetries, int64(requestRetries))
+}
+
+// Start launches the request workers.
+func (r *ProxyQualityReporter) Start() error {
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	if r.runContext != nil {
+		return errors.TraceNew("already running")
+	}
+
+	r.runContext, r.stopRunning = context.WithCancel(context.Background())
+
+	r.waitGroup.Add(1)
+	go func() {
+		defer r.waitGroup.Done()
+		r.requestScheduler(r.runContext)
+	}()
+
+	return nil
+}
+
+// Stop terminates the request workers.
+func (r *ProxyQualityReporter) Stop() {
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	r.stopRunning()
+	r.waitGroup.Wait()
+	r.runContext, r.stopRunning = nil, nil
+}
+
+// ReportQuality registers a quality tunnel for the specified proxy, defined
+// by its ID and ASN, and client ASN. Broker requests are scheduled to be
+// sent after a short delay -- intended to batch up additional data -- or
+// once sufficient request data is accumulated.
+func (r *ProxyQualityReporter) ReportQuality(
+	proxyID ID, proxyASN string, clientASN string) {
+
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	proxyKey := MakeProxyQualityKey(proxyID, proxyASN)
+
+	// Proxy quality data is stored in a FIFO queue. New reports are merged
+	// into existing entries for that same proxy ID when possible.
+
+	entry, ok := r.proxyIDQueueEntry[proxyKey]
+	if ok {
+		entry.Value.(proxyQualityReportQueueEntry).counts[clientASN] += 1
+		return
+	}
+
+	// Sanity check against an unbounded queue. When the queue is full, new
+	// reports are simply dropped. There is no back pressure to slow down the
+	// rate of quality tunnels, since the overall goal is to establish
+	// quality tunnels.
+	if r.reportQueue.Len() > proxyQualityReporterMaxQueueEntries {
+		r.logger.WithTrace().Warning("proxyQualityReporterMaxQueueEntries exceeded")
+		return
+	}
+
+	counts := make(ProxyQualityASNCounts)
+	counts[clientASN] += 1
+
+	entry = r.reportQueue.PushBack(
+		proxyQualityReportQueueEntry{
+			proxyKey: proxyKey,
+			counts:   counts,
+		})
+	r.proxyIDQueueEntry[proxyKey] = entry
+
+	// signalReport has a buffer size of 1, so when a signal can't be sent to
+	// the channel, it's already signalled.
+	select {
+	case r.signalReport <- struct{}{}:
+	default:
+	}
+}
+
+func (r *ProxyQualityReporter) requestScheduler(ctx context.Context) {
+
+	// Retain a set of serverBrokerClients, with established round trip
+	// transports and Noise sessions, for reuse across many requests.
+	// sendToBrokers will add to and trim this set.
+
+	brokerClients := make(map[SessionPublicKey]*serverBrokerClient)
+
+	for {
+
+		// Await the signal that there is quality data to report.
+
+		select {
+		case <-r.signalReport:
+		case <-ctx.Done():
+			return
+		}
+
+		// Delay, for a brief moment, sending requests in an effort to batch
+		// up more data for the requests.
+
+		requestDelay := time.Duration(atomic.LoadInt64(&r.requestDelay))
+		if requestDelay > 0 {
+
+			// TODO: SleepWithContext creates and discards a timer per call;
+			// instead reuse an inline timer?
+			common.SleepWithContext(ctx, requestDelay)
+		}
+
+		// Loop and drain the quality data queue, sending the same payload to
+		// each broker in each iteration. sendToBrokers performs the broker
+		// requests in parallel, but sendToBrokers doesn't return until all
+		// requests are complete, meaning no broker will get far ahead of any
+		// other.
+		//
+		// If a certain broker request fails, including retries, that may
+		// delay the overall schedule, up to requestTimeout * requestRetries.
+		// Furthermore, after all retries fail, the failing broker simply does
+		// never receives the payload.
+
+		// Future enhancements:
+		//
+		// - Use a dynamic request timeout for failing brokers, to avoid
+		//   repeatedly delaying every round when one broker persistently fails?
+		//
+		// - Consider skipping sending a quality payload if contains only the
+		//   exact same proxy ID(s) and client ASNs reported in a very recent
+		//   request? Currently, the quality _count_ values aren't used as
+		//   distinguisher, so the primary benefit for sending additional
+		//   counts for the same proxy ID and client ASN are TTL extensions
+		//   in the ProxyQualityState.
+
+		for r.hasMoreRequests() {
+
+			requestCounts := r.prepareNextRequest()
+
+			r.sendToBrokers(ctx, brokerClients, requestCounts)
+		}
+	}
+}
+
+func (r *ProxyQualityReporter) hasMoreRequests() bool {
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	return r.reportQueue.Len() > 0
+}
+
+func (r *ProxyQualityReporter) prepareNextRequest() ProxyQualityRequestCounts {
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	// prepareNextRequest should not hold the mutex for a long period, as this
+	// blocks ReportQuality, which in turn could block tunnel I/O operations.
+
+	counts := make(ProxyQualityRequestCounts)
+
+	queueEntry := r.reportQueue.Front()
+
+	// Limit the size of each request to maxRequestEntries.
+	//
+	// Limitation: maxRequestEntries doesn't take into account the number of
+	// different client ASN counts per entry. In practice, there shouldn't be
+	// an excessive number of client ASNs.
+
+	for queueEntry != nil && int64(len(counts)) < atomic.LoadInt64(&r.maxRequestEntries) {
+
+		entry := queueEntry.Value.(proxyQualityReportQueueEntry)
+
+		// Reuse queueEntry.counts rather than make a copy. As queueEntry is
+		// removed from the queue here, this should be safe as no subsequent
+		// ReportQuality will add to the same entry.
+
+		counts[entry.proxyKey] = entry.counts
+
+		removeEntry := queueEntry
+		queueEntry = queueEntry.Next()
+
+		r.reportQueue.Remove(removeEntry)
+		delete(r.proxyIDQueueEntry, entry.proxyKey)
+	}
+
+	return counts
+}
+
+func (r *ProxyQualityReporter) sendToBrokers(
+	ctx context.Context,
+	brokerClients map[SessionPublicKey]*serverBrokerClient,
+	requestCounts ProxyQualityRequestCounts) {
+
+	// Iterate over the current list of brokers, as identified by the public
+	// keys in brokerPublicKeys. For each broker, reuse any existing broker
+	// client or create a new one. Spawns short term goroutine workers to
+	// send requests to each broker in parallel, and await all worker
+	// completion. Leave all working broker clients in place for future use,
+	// but prune failed or unused broker clients from brokerClients. Assumes
+	// only a handful of brokers.
+
+	// This implementation is not using BrokerClient, the type used as the
+	// proxy/client broker client, as BrokerClient uses a BrokerDialCoordinator
+	// and is oriented to proxy/client functionality.
+
+	var sendWaitGroup sync.WaitGroup
+
+	var retainBrokerClientsMutex sync.Mutex
+	retainBrokerClients := make(map[SessionPublicKey]struct{})
+
+	brokerPublicKeys := r.brokerPublicKeys.Load().([]SessionPublicKey)
+	brokerRootObfuscationSecrets := r.brokerRootObfuscationSecrets.Load().([]ObfuscationSecret)
+
+	establishedBrokerIDs := r.serverBrokerSessions.sessions.GetEstablishedKnownInitiatorIDs()
+
+	for i, brokerPublicKey := range brokerPublicKeys {
+
+		// Get or create the brokerClient for brokerPublicKey.
+
+		brokerClient, ok := brokerClients[brokerPublicKey]
+		if !ok {
+
+			initiatorID, err := brokerPublicKey.ToCurve25519()
+			if err != nil {
+				r.logger.WithTraceFields(
+					common.LogFields{"brokerID": brokerPublicKey, "error": err.Error()},
+				).Warning("ToCurve25519 failed")
+				continue
+			}
+
+			brokerClient = &serverBrokerClient{
+				publicKey:             brokerPublicKey,
+				rootObfuscationSecret: brokerRootObfuscationSecrets[i],
+				brokerInitiatorID:     ID(initiatorID),
+			}
+
+			// This partially initialized brokerClient will be retained even
+			// if the following establishedBrokerIDs check fails, as this
+			// caches the result of the ToCurve25519. The next sendToBrokers
+			// call will check the same brokerPublicKey again -- unless
+			// brokerPublicKeys changes.
+
+			brokerClients[brokerPublicKey] = brokerClient
+		}
+
+		// Currently, brokers will only trust and allow proxy quality requests
+		// from servers for which the broker has seen the corresponding
+		// signed server entries as client proxy destinations. As such, the
+		// following request is expected to fail unless the broker has
+		// established a session with this server as indicated in
+		// establishedBrokerIDs. Skip any broker that's not in
+		// establishedBrokerIDs; those brokers will not receive this proxy
+		// quality request payload.
+		//
+		// Mitigating factor: due to proxy affinity to a single broker, it's
+		// likely that the proxy in any local ReportQuality call used and is
+		// using a broker that has relayed a BrokerServerReport to this server.
+		//
+		// Future enhancement: the server could send its own signed server
+		// entry to a broker, instead of relying on the broker to receive
+		// that signed server entry in a client offer.
+
+		if _, ok := establishedBrokerIDs[brokerClient.brokerInitiatorID]; !ok {
+
+			// If there is a brokerClient for brokerPublicKey but the
+			// establishedBrokerIDs check _no longer_ passes, remove and
+			// garbage collect any round tripper and Noise session. The
+			// remaining brokerClient is still retained, for the cached
+			// ToCurve25519 conversion.
+
+			brokerClient.sessions = nil
+			if brokerClient.roundTripper != nil {
+				// Close all network connections.
+				brokerClient.roundTripper.Close()
+			}
+			brokerClient.roundTripper = nil
+
+			retainBrokerClientsMutex.Lock()
+			retainBrokerClients[brokerPublicKey] = struct{}{}
+			retainBrokerClientsMutex.Unlock()
+
+			continue
+		}
+
+		if brokerClient.sessions == nil {
+
+			// Initialize the rest of the brokerClient: the round tripper and
+			// the Noise session.
+			//
+			// Once initialized, these are retained after a successful round
+			// trip, so that subsequent sendToBrokers calls can reuse the
+			// existing, established network transport and Noise session.
+			//
+			// This implementation uses one Noise InitiatorSessions per
+			// broker, instead of sharing a single instance, since
+			// InitiatorSessions currently lacks an API to discard a
+			// particular session.
+
+			roundTripper, dialParams, err := r.roundTripperMaker(brokerPublicKey)
+			if err != nil {
+				r.logger.WithTraceFields(
+					common.LogFields{"brokerID": brokerPublicKey, "error": err.Error()},
+				).Warning("roundTripperMaker failed")
+				continue
+			}
+
+			brokerClient.sessions = NewInitiatorSessions(r.serverSessionPrivateKey)
+			brokerClient.roundTripper = roundTripper
+			brokerClient.dialParams = dialParams
+		}
+
+		// Spawn a goroutine to send the request to this brokerClient.
+		// Spawning goroutines for every request round should be efficient
+		// enough, and avoids additional complexity in alternatives such as
+		// maintaining long-running goroutine workers per broker.
+
+		sendWaitGroup.Add(1)
+		go func(brokerClient *serverBrokerClient) {
+			defer sendWaitGroup.Done()
+
+			retries := int(atomic.LoadInt64(&r.requestRetries))
+			for i := 0; i <= retries; i++ {
+				err := r.sendBrokerRequest(ctx, brokerClient, requestCounts)
+				if err != nil {
+					r.logger.WithTraceFields(
+						common.LogFields{"brokerID": brokerClient.publicKey, "error": err.Error()},
+					).Warning("sendBrokerRequest failed")
+					if i < retries {
+						continue
+					}
+					return
+				}
+			}
+
+			// Retain the successful brokerClient.
+			retainBrokerClientsMutex.Lock()
+			retainBrokerClients[brokerClient.publicKey] = struct{}{}
+			retainBrokerClientsMutex.Unlock()
+		}(brokerClient)
+
+	}
+
+	// Await all request worker completion.
+	//
+	// Currently there is no backoff for brokers whose requests fail Unlike
+	// proxies (and to some degree clients), there is only one concurrent
+	// request, from this server, per broker, so there is less expectation of
+	// hitting rate limiting by some intermediary, such as a CDN. The
+	// requestDelay, primarily intended for batching data payloads, should
+	// also provide a short cool-down period after failures.
+
+	sendWaitGroup.Wait()
+
+	// Trim the set of broker clients. Broker clients in brokerClients but not
+	// in retainBrokerClients include cases where the request failed and
+	// where the broker is no longer in brokerPublicKeys.
+
+	for brokerPublicKey, brokerClient := range brokerClients {
+		if _, ok := retainBrokerClients[brokerPublicKey]; !ok {
+			// Close all network connections.
+			brokerClient.roundTripper.Close()
+			delete(brokerClients, brokerPublicKey)
+		}
+	}
+}
+
+func (r *ProxyQualityReporter) sendBrokerRequest(
+	ctx context.Context,
+	brokerClient *serverBrokerClient,
+	requestCounts ProxyQualityRequestCounts) error {
+
+	requestTimeout := time.Duration(atomic.LoadInt64(&r.requestTimeout))
+
+	// While the request payload, requestCounts, is the same for every broker,
+	// each broker round tripper may have different dial parameters, so each
+	// request worker encodes and marshals its own request. requestCounts is
+	// shared across multiple concurrent workers and must not be mutated.
+
+	dialParams, err := protocol.EncodePackedAPIParameters(brokerClient.dialParams)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	request := &ServerProxyQualityRequest{
+		QualityCounts:  requestCounts,
+		DialParameters: dialParams,
+	}
+
+	requestPayload, err := MarshalServerProxyQualityRequest(request)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	// Unlike clients and proxies, there is no Noise session sharing, as
+	// there's only one, sequentially invoked sendBrokerRequest worker per
+	// broker. The ServerProxyQualityRequest is not a long polling request,
+	// so there's no special case, shorter Noise handshake timeout. There's
+	// no request delay at this level.
+
+	waitToShareSession := false
+	sessionHandshakeTimeout := requestTimeout
+	requestDelay := time.Duration(0)
+
+	responsePayload, err := brokerClient.sessions.RoundTrip(
+		ctx,
+		brokerClient.roundTripper,
+		brokerClient.publicKey,
+		brokerClient.rootObfuscationSecret,
+		waitToShareSession,
+		sessionHandshakeTimeout,
+		requestDelay,
+		requestTimeout,
+		requestPayload)
+	if err != nil {
+
+		// TODO: check if the error is a RoundTripperFailedError and,
+		// if not, potentially retain the RoundTripper? At this time,
+		// the server.InproxyProxyQualityBrokerRoundTripper.RoundTrip
+		// implementation always returns RoundTripperFailedError.
+
+		return errors.Trace(err)
+	}
+
+	// The response is simply an acknowledgement of the request.
+
+	_, err = UnmarshalServerProxyQualityResponse(responsePayload)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	return nil
+}

+ 194 - 0
psiphon/common/inproxy/quality_test.go

@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package inproxy
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	lrucache "github.com/cognusion/go-cache-lru"
+)
+
+func TestProxyQualityState(t *testing.T) {
+	err := runTestProxyQualityState()
+	if err != nil {
+		t.Errorf(errors.Trace(err).Error())
+	}
+}
+
+func TestProxyQualityReporter(t *testing.T) {
+	err := runTestProxyQualityReporter()
+	if err != nil {
+		t.Errorf(errors.Trace(err).Error())
+	}
+}
+
+func runTestProxyQualityState() error {
+
+	qualityTTL := 100 * time.Millisecond
+	pendingFailedMatchDeadline := 100 * time.Millisecond
+	failedMatchThreshold := 3
+
+	q := NewProxyQuality()
+
+	// Substitute a cache with a much shorter janitor interval, to ensure
+	// evictions happen within the artificially short test intervals.
+	q.pendingFailedMatches = lrucache.NewWithLRU(0, 1*time.Millisecond, proxyQualityMaxPendingFailedMatches)
+	q.pendingFailedMatches.OnEvicted(q.addFailedMatch)
+
+	q.SetParameters(
+		qualityTTL, pendingFailedMatchDeadline, failedMatchThreshold)
+
+	testProxyASN := "65537"
+	testClientASN1 := "65538"
+	testClientASN2 := "65539"
+	testClientASN3 := "65540"
+
+	proxyID, err := MakeID()
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	proxyKey := MakeProxyQualityKey(proxyID, testProxyASN)
+
+	q.AddQuality(proxyKey, ProxyQualityASNCounts{testClientASN1: 1, testClientASN2: 2})
+
+	if !q.HasQuality(proxyID, testProxyASN, "") {
+		return errors.TraceNew("unexpected HasQuality")
+	}
+
+	if !q.HasQuality(proxyID, testProxyASN, testClientASN1) {
+		return errors.TraceNew("unexpected HasQuality")
+	}
+
+	if q.HasQuality(proxyID, testProxyASN, testClientASN3) {
+		return errors.TraceNew("unexpected HasQuality")
+	}
+
+	time.Sleep(qualityTTL + 1*time.Millisecond)
+
+	if q.HasQuality(proxyID, testProxyASN, "") {
+		return errors.TraceNew("unexpected HasQuality")
+	}
+
+	qualityTTL = proxyQualityTTL
+
+	q.SetParameters(
+		qualityTTL, pendingFailedMatchDeadline, failedMatchThreshold)
+
+	q.AddQuality(proxyKey, ProxyQualityASNCounts{testClientASN1: 1, testClientASN2: 2})
+
+	for i := 0; i < failedMatchThreshold; i++ {
+
+		q.Matched(proxyID, testProxyASN)
+
+		time.Sleep(pendingFailedMatchDeadline + 10*time.Millisecond)
+
+		expectQuality := i < failedMatchThreshold-1
+
+		if q.HasQuality(proxyID, testProxyASN, "") != expectQuality {
+			return errors.TraceNew("unexpected HasQuality")
+		}
+	}
+
+	return nil
+}
+
+func runTestProxyQualityReporter() error {
+
+	// This unit test exercises the report queue state, but not the report
+	// requests. ProxyQualityReporter.requestScheduler/sendToBrokers are
+	// exercised in TestInproxy.
+
+	r, err := NewProxyQualityReporter(
+		newTestLogger(),
+		nil,
+		SessionPrivateKey{},
+		nil,
+		nil,
+		nil)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	maxEntries := 10
+	expectedRequestCount := 2
+
+	r.SetRequestParameters(maxEntries, 0, 0, 0)
+
+	var proxyKeys []ProxyQualityKey
+	testProxyASN := "65537"
+
+	for i := 0; i < 20; i++ {
+		proxyID, err := MakeID()
+		if err != nil {
+			return errors.Trace(err)
+		}
+		proxyKey := MakeProxyQualityKey(proxyID, testProxyASN)
+		for j := 0; j < 10; j++ {
+			testClientASN := fmt.Sprintf("%d", 65538+j)
+			for k := 0; k <= i; k++ {
+				r.ReportQuality(
+					proxyID, testProxyASN, testClientASN)
+			}
+		}
+		proxyKeys = append(proxyKeys, proxyKey)
+	}
+
+	if r.reportQueue.Len() != len(proxyKeys) {
+		return errors.TraceNew("unexpected queue size")
+
+	}
+
+	for count := 0; count < expectedRequestCount; count++ {
+
+		if !r.hasMoreRequests() {
+			return errors.TraceNew("unexpected hasMoreRequests")
+		}
+
+		requestCounts := r.prepareNextRequest()
+
+		for i := count * 10; i < count*10+10; i++ {
+			counts, ok := requestCounts[proxyKeys[i]]
+			if !ok {
+				return errors.TraceNew("missing proxyID")
+			}
+			for j := 0; j < 10; j++ {
+				testClientASN := fmt.Sprintf("%d", 65538+j)
+				count, ok := counts[testClientASN]
+				if !ok {
+					return errors.TraceNew("missing client ASN")
+				}
+				if count != i+1 {
+					return errors.Tracef("unexpected count")
+				}
+			}
+		}
+
+	}
+
+	if r.hasMoreRequests() {
+		return errors.TraceNew("unexpected hasMoreRequests")
+	}
+
+	return nil
+}

+ 3 - 1
psiphon/common/inproxy/records.go

@@ -47,7 +47,9 @@ const (
 	recordTypeAPIClientRelayedPacketRequest  = 9
 	recordTypeAPIClientRelayedPacketResponse = 10
 	recordTypeAPIBrokerServerReport          = 11
-	recordTypeLast                           = 11
+	recordTypeAPIServerProxyQualityRequest   = 12
+	recordTypeAPIServerProxyQualityResponse  = 13
+	recordTypeLast                           = 13
 )
 
 func marshalRecord(record interface{}, recordType int) ([]byte, error) {

+ 139 - 23
psiphon/common/inproxy/server.go

@@ -20,6 +20,8 @@
 package inproxy
 
 import (
+	"time"
+
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 )
@@ -34,48 +36,155 @@ const MaxRelayRoundTrips = 10
 // maintains a ServerBrokerSessions, with a set of established sessions for
 // each broker. Session messages are relayed between the broker and the
 // server by the client.
+//
+// ServerBrokerSessions runs a ProxyQualityReporter which sends proxy quality
+// reports back to the same brokers.
 type ServerBrokerSessions struct {
-	sessions *ResponderSessions
+	config               *ServerBrokerSessionsConfig
+	sessions             *ResponderSessions
+	proxyQualityReporter *ProxyQualityReporter
+}
+
+// ServerBrokerSessionsConfig specifies the configuration for a
+// ServerBrokerSessions instance.
+type ServerBrokerSessionsConfig struct {
+
+	// Logger provides a logging facility.
+	Logger common.Logger
+
+	// ServerPrivateKey is the server's session private key. It must
+	// correspond to the server session public key that a broker finds in a
+	// signed Psiphon server entry.
+	ServerPrivateKey SessionPrivateKey
+
+	// ServerRootObfuscationSecret is the server's root obfuscation secret, as
+	// found in the server's signed Psiphon server entry.
+	ServerRootObfuscationSecret ObfuscationSecret
+
+	// BrokerPublicKeys specifies the public keys corresponding to known
+	// brokers that are trusted to connect to the server; which are also the
+	// brokers to which the server will send its proxy quality reports.
+	BrokerPublicKeys []SessionPublicKey
+
+	// BrokerRootObfuscationSecrets are the obfuscation secrets corresponding
+	// to the entries in BrokerPublicKeys.
+	BrokerRootObfuscationSecrets []ObfuscationSecret
+
+	// BrokerRoundTripperMaker constructs round trip transports used to send
+	// proxy quality requests to the specified broker.
+	BrokerRoundTripperMaker ProxyQualityBrokerRoundTripperMaker
+
+	// ProxyMetricsValidator is used to further validate the proxy metrics
+	// fields relayed by the broker in broker server reports.
+	ProxyMetricsValidator common.APIParameterValidator
 
-	proxyMetricsValidator common.APIParameterValidator
-	proxyMetricsFormatter common.APIParameterLogFieldFormatter
-	proxyMetricsPrefix    string
+	// ProxyMetricsValidator is used to log-format the proxy metrics fields
+	// relayed by the broker in broker server reports.
+	ProxyMetricsFormatter common.APIParameterLogFieldFormatter
+
+	// ProxyMetricsPrefix specifies an optional prefix to be prepended to
+	// proxy metric fields when logging.
+	ProxyMetricsPrefix string
 }
 
 // NewServerBrokerSessions create a new ServerBrokerSessions, with the
 // specified key material. The expected brokers are authenticated with
 // brokerPublicKeys, an allow list.
 func NewServerBrokerSessions(
-	serverPrivateKey SessionPrivateKey,
-	serverRootObfuscationSecret ObfuscationSecret,
-	brokerPublicKeys []SessionPublicKey,
-	proxyMetricsValidator common.APIParameterValidator,
-	proxyMetricsFormatter common.APIParameterLogFieldFormatter,
-	proxyMetricsPrefix string) (*ServerBrokerSessions, error) {
+	config *ServerBrokerSessionsConfig) (*ServerBrokerSessions, error) {
 
 	sessions, err := NewResponderSessionsForKnownInitiators(
-		serverPrivateKey, serverRootObfuscationSecret, brokerPublicKeys)
+		config.ServerPrivateKey,
+		config.ServerRootObfuscationSecret,
+		config.BrokerPublicKeys)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
 
-	return &ServerBrokerSessions{
+	s := &ServerBrokerSessions{
+		config:   config,
 		sessions: sessions,
+	}
+
+	s.proxyQualityReporter, err = NewProxyQualityReporter(
+		config.Logger,
+		s,
+		config.ServerPrivateKey,
+		config.BrokerPublicKeys,
+		config.BrokerRootObfuscationSecrets,
+		config.BrokerRoundTripperMaker)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return s, nil
+}
+
+// Start launches the proxy quality reporter.
+func (s *ServerBrokerSessions) Start() error {
 
-		proxyMetricsValidator: proxyMetricsValidator,
-		proxyMetricsFormatter: proxyMetricsFormatter,
-		proxyMetricsPrefix:    proxyMetricsPrefix,
-	}, nil
+	err := s.proxyQualityReporter.Start()
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	return nil
+}
+
+// Stop terminates the proxy quality reporter.
+func (s *ServerBrokerSessions) Stop() {
+
+	s.proxyQualityReporter.Stop()
 }
 
-// SetKnownBrokerPublicKeys updates the set of broker public keys which are
+// SetKnownBrokers updates the set of broker public keys which are
 // allowed to establish sessions with the server. Any existing sessions with
 // keys not in the new list are deleted. Existing sessions with keys which
 // remain in the list are retained.
-func (s *ServerBrokerSessions) SetKnownBrokerPublicKeys(
-	brokerPublicKeys []SessionPublicKey) error {
+//
+// The broker public keys also identify those brokers to which the proxy
+// quality reporter will send quality requests. The broker obfuscation
+// secrets are used by the reporter.
+func (s *ServerBrokerSessions) SetKnownBrokers(
+	brokerPublicKeys []SessionPublicKey,
+	brokerRootObfuscationSecrets []ObfuscationSecret) error {
+
+	err := s.sessions.SetKnownInitiatorPublicKeys(
+		brokerPublicKeys)
+	if err != nil {
+		return errors.Trace(err)
+	}
 
-	return errors.Trace(s.sessions.SetKnownInitiatorPublicKeys(brokerPublicKeys))
+	err = s.proxyQualityReporter.SetKnownBrokers(
+		brokerPublicKeys, brokerRootObfuscationSecrets)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	return nil
+}
+
+// SetProxyQualityRequestParameters overrides default values for proxy quality
+// reporter parameters.
+func (s *ServerBrokerSessions) SetProxyQualityRequestParameters(
+	maxRequestEntries int,
+	requestDelay time.Duration,
+	requestTimeout time.Duration,
+	requestRetries int) {
+
+	s.proxyQualityReporter.SetRequestParameters(
+		maxRequestEntries,
+		requestDelay,
+		requestTimeout,
+		requestRetries)
+}
+
+// ReportQuality enqueues a proxy quality event in the proxy quality reporter.
+// See ProxyQualityReporter.ReportQuality for details.
+func (s *ServerBrokerSessions) ReportQuality(
+	proxyID ID, proxyASN string, clientASN string) {
+
+	s.proxyQualityReporter.ReportQuality(proxyID, proxyASN, clientASN)
 }
 
 // ProxiedConnectionHandler is a callback, provided by the Psiphon server,
@@ -99,6 +208,8 @@ func (s *ServerBrokerSessions) SetKnownBrokerPublicKeys(
 // The fields in logFields should be added to server_tunnel logs.
 type ProxiedConnectionHandler func(
 	brokerVerifiedOriginalClientIP string,
+	brokerReportedProxyID ID,
+	brokerMatchedPersonalCompartments bool,
 	logFields common.LogFields)
 
 // HandlePacket handles a broker/server session packet, which are relayed by
@@ -137,7 +248,9 @@ func (s *ServerBrokerSessions) HandlePacket(
 		}
 
 		logFields, err := brokerReport.ValidateAndGetLogFields(
-			s.proxyMetricsValidator, s.proxyMetricsFormatter, s.proxyMetricsPrefix)
+			s.config.ProxyMetricsValidator,
+			s.config.ProxyMetricsFormatter,
+			s.config.ProxyMetricsPrefix)
 		if err != nil {
 			return nil, errors.Trace(err)
 		}
@@ -165,8 +278,11 @@ func (s *ServerBrokerSessions) HandlePacket(
 		}
 
 		if ok {
-
-			handler(brokerReport.ClientIP, logFields)
+			handler(
+				brokerReport.ClientIP,
+				brokerReport.ProxyID,
+				brokerReport.MatchedPersonalCompartments,
+				logFields)
 		}
 
 		// Returns nil, as there is no response to the report, and so no

+ 37 - 0
psiphon/common/inproxy/session.go

@@ -970,6 +970,43 @@ func (s *ResponderSessions) SetKnownInitiatorPublicKeys(
 	return nil
 }
 
+// GetEstablishedKnownInitiatorIDs returns a list of known initiator IDs, the
+// Curve21559 equivalents of known initiator public keys, with currently
+// established sessions.
+//
+// The return value is a map that may be used for lookups, supporting the
+// ProxyQualityReporter use case of sending server proxy quality requests
+// only to brokers that are expected to already trust the server's session
+// public key.
+//
+// GetEstablishedKnownInitiatorIDs requires KnownInitiators mode, and is
+// intended for use with only a small number of known initiators.
+func (s *ResponderSessions) GetEstablishedKnownInitiatorIDs() map[ID]struct{} {
+
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	initiatorIDs := make(map[ID]struct{})
+
+	if s.expectedInitiatorPublicKeys == nil {
+		// Exit immediately when not in known initiator mode. Don't
+		// accidentally iterator over potentially millions of sessions.
+		return initiatorIDs
+	}
+
+	for _, entry := range s.sessions.Items() {
+		session := entry.Object.(*session)
+		initiatorID, err := session.getPeerID()
+		if err != nil {
+			// When getPeerID fails, the session is not yet established.
+			continue
+		}
+		initiatorIDs[initiatorID] = struct{}{}
+	}
+
+	return initiatorIDs
+}
+
 // RequestHandler is an application-level handler that receives the decrypted
 // request payload and returns a response payload to be encrypted and sent to
 // the initiator. The initiatorID is the authenticated identifier of the

+ 14 - 3
psiphon/common/parameters/inproxy.go

@@ -20,6 +20,8 @@
 package parameters
 
 import (
+	"reflect"
+
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
@@ -37,14 +39,23 @@ type InproxyBrokerSpec struct {
 }
 
 // Validate checks that the in-proxy broker specs values are well-formed.
-func (specs InproxyBrokerSpecsValue) Validate(checkBrokerPublicKeyList *[]string) error {
+func (specs InproxyBrokerSpecsValue) Validate(checkBrokerSpecsList *InproxyBrokerSpecsValue) error {
 
 	for _, spec := range specs {
 		if _, err := inproxy.SessionPublicKeyFromString(spec.BrokerPublicKey); err != nil {
 			return errors.Tracef("invalid broker public key: %w", err)
 		}
-		if checkBrokerPublicKeyList != nil && !common.Contains(*checkBrokerPublicKeyList, spec.BrokerPublicKey) {
-			return errors.TraceNew("unknown broker public key")
+		if checkBrokerSpecsList != nil {
+			found := false
+			for _, checkBrokerSpec := range *checkBrokerSpecsList {
+				if reflect.DeepEqual(spec, checkBrokerSpec) {
+					found = true
+					break
+				}
+			}
+			if !found {
+				return errors.TraceNew("unknown broker spec")
+			}
 		}
 		if _, err := inproxy.ObfuscationSecretFromString(spec.BrokerRootObfuscationSecret); err != nil {
 			return errors.Tracef("invalid broker root obfuscation secret: %w", err)

+ 39 - 10
psiphon/common/parameters/parameters.go

@@ -402,7 +402,7 @@ const (
 	InproxyAllowClient                                 = "InproxyAllowClient"
 	InproxyAllowDomainFrontedDestinations              = "InproxyAllowDomainFrontedDestinations"
 	InproxyTunnelProtocolSelectionProbability          = "InproxyTunnelProtocolSelectionProbability"
-	InproxyAllBrokerPublicKeys                         = "InproxyAllBrokerPublicKeys"
+	InproxyAllBrokerSpecs                              = "InproxyAllBrokerSpecs"
 	InproxyBrokerSpecs                                 = "InproxyBrokerSpecs"
 	InproxyPersonalPairingBrokerSpecs                  = "InproxyPersonalPairingBrokerSpecs"
 	InproxyProxyBrokerSpecs                            = "InproxyProxyBrokerSpecs"
@@ -487,6 +487,20 @@ const (
 	InproxyProxyOnBrokerClientFailedRetryPeriod        = "InproxyProxyOnBrokerClientFailedRetryPeriod"
 	InproxyProxyIncompatibleNetworkTypes               = "InproxyProxyIncompatibleNetworkTypes"
 	InproxyClientIncompatibleNetworkTypes              = "InproxyClientIncompatibleNetworkTypes"
+	InproxyEnableProxyQuality                          = "InproxyEnableProxyQuality"
+	InproxyEnableProxyQualityClientRegions             = "InproxyEnableProxyQualityClientRegions"
+	InproxyProxyQualityTargetUpstreamBytes             = "InproxyProxyQualityTargetUpstreamBytes"
+	InproxyProxyQualityTargetDownstreamBytes           = "InproxyProxyQualityTargetDownstreamBytes"
+	InproxyProxyQualityTargetDuration                  = "InproxyProxyQualityTargetDuration"
+	InproxyProxyQualityReporterTrustedCACertificates   = "InproxyProxyQualityReporterTrustedCACertificates"
+	InproxyProxyQualityReporterAdditionalHeaders       = "InproxyProxyQualityReporterAdditionalHeaders"
+	InproxyProxyQualityReporterMaxRequestEntries       = "InproxyProxyQualityReporterMaxRequestEntries"
+	InproxyProxyQualityReporterRequestDelay            = "InproxyProxyQualityReporterRequestDelay"
+	InproxyProxyQualityReporterRequestTimeout          = "InproxyProxyQualityReporterRequestTimeout"
+	InproxyProxyQualityReporterRequestRetries          = "InproxyProxyQualityReporterRequestRetries"
+	InproxyProxyQualityTTL                             = "InproxyProxyQualityTTL"
+	InproxyProxyQualityPendingFailedMatchDeadline      = "InproxyProxyQualityPendingFailedMatchDeadline"
+	InproxyProxyQualityFailedMatchThreshold            = "InproxyProxyQualityFailedMatchThreshold"
 	NetworkIDCacheTTL                                  = "NetworkIDCacheTTL"
 
 	// Retired parameters
@@ -954,7 +968,7 @@ var defaultParameters = map[string]struct {
 	InproxyAllowClient:                                 {value: false, flags: serverSideOnly},
 	InproxyAllowDomainFrontedDestinations:              {value: false, flags: serverSideOnly},
 	InproxyTunnelProtocolSelectionProbability:          {value: 0.5, minimum: 0.0},
-	InproxyAllBrokerPublicKeys:                         {value: []string{}, flags: serverSideOnly},
+	InproxyAllBrokerSpecs:                              {value: InproxyBrokerSpecsValue{}, flags: serverSideOnly},
 	InproxyBrokerSpecs:                                 {value: InproxyBrokerSpecsValue{}},
 	InproxyPersonalPairingBrokerSpecs:                  {value: InproxyBrokerSpecsValue{}},
 	InproxyProxyBrokerSpecs:                            {value: InproxyBrokerSpecsValue{}},
@@ -1040,6 +1054,21 @@ var defaultParameters = map[string]struct {
 	InproxyProxyIncompatibleNetworkTypes:               {value: []string{}},
 	InproxyClientIncompatibleNetworkTypes:              {value: []string{}},
 
+	InproxyEnableProxyQuality:                        {value: false, flags: serverSideOnly},
+	InproxyEnableProxyQualityClientRegions:           {value: []string{}, flags: serverSideOnly},
+	InproxyProxyQualityTargetUpstreamBytes:           {value: 0, minimum: 0, flags: serverSideOnly},
+	InproxyProxyQualityTargetDownstreamBytes:         {value: 0, minimum: 0, flags: serverSideOnly},
+	InproxyProxyQualityTargetDuration:                {value: time.Duration(0), minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyProxyQualityReporterMaxRequestEntries:     {value: 1000, minimum: 1, flags: serverSideOnly},
+	InproxyProxyQualityReporterTrustedCACertificates: {value: "", flags: serverSideOnly},
+	InproxyProxyQualityReporterAdditionalHeaders:     {value: http.Header{}, flags: serverSideOnly},
+	InproxyProxyQualityReporterRequestDelay:          {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyProxyQualityReporterRequestTimeout:        {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyProxyQualityReporterRequestRetries:        {value: 2, minimum: 0, flags: serverSideOnly},
+	InproxyProxyQualityTTL:                           {value: 24 * time.Hour, minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyProxyQualityPendingFailedMatchDeadline:    {value: 5 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyProxyQualityFailedMatchThreshold:          {value: 10, minimum: 1, flags: serverSideOnly},
+
 	NetworkIDCacheTTL: {value: 500 * time.Millisecond, minimum: time.Duration(0)},
 }
 
@@ -1291,19 +1320,19 @@ func (p *Parameters) Set(
 	}
 	shadowsocksPrefixSpecs, _ := shadowsocksPrefixSpecsValue.(transforms.Specs)
 
-	// Special case: in-proxy broker public keys in InproxyBrokerSpecs must
-	// appear in InproxyAllBrokerPublicKeys; and inproxy common compartment
-	// IDs must appear in InproxyAllCommonCompartmentIDs. This check is
+	// Special case: in-proxy broker specs in any InproxyBrokerSpecs must
+	// appear in InproxyAllBrokerSpecs; and in-proxy common compartment IDs
+	// must appear in InproxyAllCommonCompartmentIDs. This check is
 	// server-side only as the "All" parameters are serverSideOnly.
 
 	checkInproxyLists := !skipOnError && serverSide
 
-	inproxyAllBrokerPublicKeysValue, err := getAppliedValue(
-		InproxyAllBrokerPublicKeys, parameters, applyParameters)
+	inproxyAllBrokerSpecsValue, err := getAppliedValue(
+		InproxyAllBrokerSpecs, parameters, applyParameters)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
-	inproxyAllBrokerPublicKeys, _ := inproxyAllBrokerPublicKeysValue.([]string)
+	inproxyAllBrokerSpecs, _ := inproxyAllBrokerSpecsValue.(InproxyBrokerSpecsValue)
 
 	inproxyAllCommonCompartmentIDsValue, err := getAppliedValue(
 		InproxyAllCommonCompartmentIDs, parameters, applyParameters)
@@ -1566,9 +1595,9 @@ func (p *Parameters) Set(
 				}
 			case InproxyBrokerSpecsValue:
 
-				var checkList *[]string
+				var checkList *InproxyBrokerSpecsValue
 				if checkInproxyLists && name == InproxyBrokerSpecs {
-					checkList = &inproxyAllBrokerPublicKeys
+					checkList = &inproxyAllBrokerSpecs
 				}
 
 				err := v.Validate(checkList)

+ 29 - 13
psiphon/server/api.go

@@ -166,6 +166,8 @@ func handshakeAPIRequestHandler(
 	var clientGeoIPData GeoIPData
 
 	var inproxyClientIP string
+	var inproxyProxyID inproxy.ID
+	var inproxyMatchedPersonalCompartments bool
 	var inproxyClientGeoIPData GeoIPData
 	var inproxyRelayLogFields common.LogFields
 
@@ -204,10 +206,15 @@ func handshakeAPIRequestHandler(
 			return nil, errors.Trace(err)
 		}
 
-		inproxyClientIP, inproxyRelayLogFields, err = doHandshakeInproxyBrokerRelay(
-			sshClient,
-			inproxyConnectionID,
-			inproxyRelayPacket)
+		inproxyClientIP,
+			inproxyProxyID,
+			inproxyMatchedPersonalCompartments,
+			inproxyRelayLogFields,
+			err =
+			doHandshakeInproxyBrokerRelay(
+				sshClient,
+				inproxyConnectionID,
+				inproxyRelayPacket)
 		if err != nil {
 			return nil, errors.Trace(err)
 		}
@@ -307,6 +314,8 @@ func handshakeAPIRequestHandler(
 			newTacticsTag:           newTacticsTag,
 			inproxyClientIP:         inproxyClientIP,
 			inproxyClientGeoIPData:  inproxyClientGeoIPData,
+			inproxyProxyID:          inproxyProxyID,
+			inproxyMatchedPersonal:  inproxyMatchedPersonalCompartments,
 			inproxyRelayLogFields:   inproxyRelayLogFields,
 		},
 		authorizations)
@@ -454,14 +463,16 @@ func handshakeAPIRequestHandler(
 func doHandshakeInproxyBrokerRelay(
 	sshClient *sshClient,
 	clientConnectionID string,
-	initialRelayPacket []byte) (string, common.LogFields, error) {
+	initialRelayPacket []byte) (string, inproxy.ID, bool, common.LogFields, error) {
 
 	connectionID, err := inproxy.IDFromString(clientConnectionID)
 	if err != nil {
-		return "", nil, errors.Trace(err)
+		return "", inproxy.ID{}, false, nil, errors.Trace(err)
 	}
 
 	clientIP := ""
+	var proxyID inproxy.ID
+	var matchedPersonalCompartments bool
 	var logFields common.LogFields
 
 	// This first packet from broker arrives via the client handshake. If
@@ -481,10 +492,14 @@ func doHandshakeInproxyBrokerRelay(
 			connectionID,
 			func(
 				brokerVerifiedOriginalClientIP string,
+				brokerReportedProxyID inproxy.ID,
+				brokerMatchedPersonalCompartments bool,
 				fields common.LogFields) {
 
 				// Once the broker report is received, this callback is invoked.
 				clientIP = brokerVerifiedOriginalClientIP
+				proxyID = brokerReportedProxyID
+				matchedPersonalCompartments = brokerMatchedPersonalCompartments
 				logFields = fields
 			})
 		if err != nil {
@@ -494,7 +509,7 @@ func doHandshakeInproxyBrokerRelay(
 				// invalid. Drop the packet and return an error. Do _not_
 				// reset the session, otherwise a malicious client could
 				// interrupt a valid broker/server session with a malformed packet.
-				return "", nil, errors.Trace(err)
+				return "", inproxy.ID{}, false, nil, errors.Trace(err)
 			}
 
 			// In the case of expired sessions, a reset session token is sent
@@ -509,7 +524,7 @@ func doHandshakeInproxyBrokerRelay(
 
 			// The relay is complete; the handler recording the clientIP and
 			// logFields was invoked.
-			return clientIP, logFields, nil
+			return clientIP, proxyID, matchedPersonalCompartments, logFields, nil
 		}
 
 		// server -> broker
@@ -523,7 +538,7 @@ func doHandshakeInproxyBrokerRelay(
 		}
 		requestPayload, err := protocol.CBOREncoding.Marshal(request)
 		if err != nil {
-			return "", nil, errors.Trace(err)
+			return "", inproxy.ID{}, false, nil, errors.Trace(err)
 		}
 
 		ok, responsePayload, err := sshClient.sshConn.SendRequest(
@@ -531,22 +546,23 @@ func doHandshakeInproxyBrokerRelay(
 			true,
 			requestPayload)
 		if err != nil {
-			return "", nil, errors.Trace(err)
+			return "", inproxy.ID{}, false, nil, errors.Trace(err)
 		}
 		if !ok {
-			return "", nil, errors.TraceNew("client rejected request")
+			return "", inproxy.ID{}, false, nil, errors.TraceNew("client rejected request")
 		}
 
 		var response protocol.InproxyRelayResponse
 		err = cbor.Unmarshal(responsePayload, &response)
 		if err != nil {
-			return "", nil, errors.Trace(err)
+			return "", inproxy.ID{}, false, nil, errors.Trace(err)
 		}
 
 		relayPacket = response.Packet
 	}
 
-	return "", nil, errors.Tracef("exceeded %d relay round trips", inproxy.MaxRelayRoundTrips)
+	return "", inproxy.ID{}, false, nil, errors.Tracef(
+		"exceeded %d relay round trips", inproxy.MaxRelayRoundTrips)
 }
 
 // uniqueUserParams are the connected request parameters which are logged for

+ 301 - 0
psiphon/server/inproxy.go

@@ -0,0 +1,301 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package server
+
+import (
+	"bytes"
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"io"
+	"net"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+)
+
+// MakeInproxyProxyQualityBrokerRoundTripper creates a new
+// InproxyProxyQualityBrokerRoundTripper for an in-proxy broker specified by
+// public key.
+func MakeInproxyProxyQualityBrokerRoundTripper(
+	support *SupportServices,
+	brokerPublicKey inproxy.SessionPublicKey) (
+	*InproxyProxyQualityBrokerRoundTripper, common.APIParameters, error) {
+
+	// Lookup the broker dial information in InproxyAllBrokerSpecs.
+	//
+	// Assumes no GeoIP targeting for InproxyAllBrokerSpecs.
+
+	p, err := support.ServerTacticsParametersCache.Get(NewGeoIPData())
+	if err != nil {
+		return nil, nil, errors.Trace(err)
+	}
+
+	if p.IsNil() {
+		return nil, nil, errors.TraceNew("missing tactics")
+	}
+
+	brokerSpecs := p.InproxyBrokerSpecs(parameters.InproxyAllBrokerSpecs)
+
+	// InproxyProxyQualityReporterTrustedCACertificates and
+	// InproxyProxyQualityReporterAdditionalHeaders are intended to support
+	// testing.
+	trustedCACertificates := p.String(
+		parameters.InproxyProxyQualityReporterTrustedCACertificates)
+	if trustedCACertificates != "" {
+		// Convert JSON-escaped "/n" back to PEM newlines.
+		trustedCACertificates = strings.ReplaceAll(trustedCACertificates, "\\n", "\n")
+	}
+	additionalHeaders := p.HTTPHeaders(
+		parameters.InproxyProxyQualityReporterAdditionalHeaders)
+
+	// This linear search over all broker specs is suitable for a handful of
+	// brokers, and assumes broker round trippers are reused and not
+	// recreated continuously.
+
+	brokerPublicKeyStr := brokerPublicKey.String()
+
+	for _, brokerSpec := range brokerSpecs {
+		if brokerSpec.BrokerPublicKey == brokerPublicKeyStr {
+			roundTripper, err := NewInproxyProxyQualityBrokerRoundTripper(
+				brokerSpec, trustedCACertificates, additionalHeaders)
+			if err != nil {
+				return nil, nil, errors.Trace(err)
+			}
+			return roundTripper, roundTripper.dialParams, nil
+		}
+	}
+
+	return nil, nil, errors.Tracef("broker public key not found: %s", brokerPublicKeyStr)
+}
+
+// InproxyProxyQualityBrokerRoundTripper is a broker request round trip
+// network transport which implements the inproxy.RoundTripper interface.
+type InproxyProxyQualityBrokerRoundTripper struct {
+	transport         *http.Transport
+	conns             *common.Conns[net.Conn]
+	requestURL        string
+	additionalHeaders http.Header
+	dialParams        common.APIParameters
+}
+
+// NewInproxyProxyQualityBrokerRoundTripper initializes a new
+// InproxyProxyQualityBrokerRoundTripper, using the dial parameters in the
+// input InproxyBrokerSpec.
+func NewInproxyProxyQualityBrokerRoundTripper(
+	brokerSpec *parameters.InproxyBrokerSpec,
+	trustedCACertificates string,
+	additionalHeaders http.Header) (*InproxyProxyQualityBrokerRoundTripper, error) {
+
+	// While server to broker connections are not expected to be subject to
+	// blocking, this transport currently uses CDN fronts, as already
+	// specified for client and proxy broker connections. Direct server to
+	// broker connections are not supported, but could be added in the future.
+	//
+	// The CDN path may, in fact, assist with performance and scaling, given
+	// that requests routed through CDNs will be multiplexed over existing
+	// CDN-to-origin connections, and not use additional ephemeral ports on
+	// the broker origin host.
+
+	frontingProviderID,
+		frontingTransport,
+		frontingDialAddress,
+		_, // SNIServerName is ignored
+		verifyServerName,
+		verifyPins,
+		hostHeader,
+		err := brokerSpec.BrokerFrontingSpecs.SelectParameters()
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	if frontingTransport != protocol.FRONTING_TRANSPORT_HTTPS {
+		return nil, errors.TraceNew("unsupported fronting transport")
+	}
+
+	// The following wires up simplified domain fronted requests, including
+	// the basic, distinct dial/SNI and host header values. Certificate
+	// validation based on FrontingSpec parameters, including pins, includes
+	// a subset of the functionality from psiphon.CustomTLSDial.
+	//
+	// psiphon.DialMeek/CustomTLSDial features omitted here include dial
+	// parameter replay, the QUIC transport, and obfuscation techniques
+	// including custom DNS resolution, SNI transforms, utls TLS
+	// fingerprints, and TCP and TLS fragmentation, TLS padding, and other
+	// mechanisms.
+
+	var tlsConfigRootCAs *x509.CertPool
+	if trustedCACertificates != "" {
+		tlsConfigRootCAs = x509.NewCertPool()
+		if !tlsConfigRootCAs.AppendCertsFromPEM([]byte(trustedCACertificates)) {
+			return nil, errors.TraceNew("AppendCertsFromPEM failed")
+		}
+	}
+
+	conns := common.NewConns[net.Conn]()
+
+	transport := &http.Transport{
+		DialTLSContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
+			conn, err := (&net.Dialer{}).DialContext(ctx, network, frontingDialAddress)
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
+
+			// Track conn to facilitate InproxyProxyQualityBrokerRoundTripper.Close
+			// interrupting and closing all connections.
+			conn = &inproxyProxyQualityBrokerRoundTripperConn{Conn: conn, conns: conns}
+			if !conns.Add(conn) {
+				conn.Close()
+				return nil, errors.Trace(err)
+			}
+
+			tlsConn := tls.Client(
+				conn,
+				&tls.Config{
+					InsecureSkipVerify: true,
+					VerifyPeerCertificate: func(
+						rawCerts [][]byte, _ [][]*x509.Certificate) error {
+
+						var verifiedChains [][]*x509.Certificate
+						var err error
+						if verifyServerName != "" {
+							verifiedChains, err = common.VerifyServerCertificate(
+								tlsConfigRootCAs, rawCerts, verifyServerName)
+							if err != nil {
+								return errors.Trace(err)
+							}
+						}
+						if len(verifyPins) > 0 {
+							err := common.VerifyCertificatePins(
+								verifyPins, verifiedChains)
+							if err != nil {
+								return errors.Trace(err)
+							}
+						}
+						return nil
+					},
+				})
+			err = tlsConn.HandshakeContext(ctx)
+			if err != nil {
+				conn.Close()
+				return nil, errors.Trace(err)
+			}
+			return tlsConn, nil
+		},
+	}
+
+	url := &url.URL{
+		Scheme: "https",
+		Host:   hostHeader,
+		Path:   "/",
+	}
+
+	// Note that there's currently no custom log formatter (or validator) in
+	// inproxy.ServerProxyQualityRequest.ValidateAndGetLogFields, so field
+	// transforms, such as "0"/"1" to bool, are not yet supported here.
+
+	dialParams := common.APIParameters{
+		"fronting_provider_id": frontingProviderID,
+	}
+
+	return &InproxyProxyQualityBrokerRoundTripper{
+		transport:         transport,
+		conns:             conns,
+		requestURL:        url.String(),
+		additionalHeaders: additionalHeaders,
+		dialParams:        dialParams,
+	}, nil
+}
+
+type inproxyProxyQualityBrokerRoundTripperConn struct {
+	net.Conn
+	conns *common.Conns[net.Conn]
+}
+
+func (conn *inproxyProxyQualityBrokerRoundTripperConn) Close() error {
+	conn.conns.Remove(conn)
+	return errors.Trace(conn.Conn.Close())
+}
+
+// RoundTrip performs a broker request round trip.
+func (r *InproxyProxyQualityBrokerRoundTripper) RoundTrip(
+	ctx context.Context,
+	roundTripDelay time.Duration,
+	roundTripTimeout time.Duration,
+	requestPayload []byte) (retResponsePayload []byte, retErr error) {
+
+	defer func() {
+		// Wrap every return with RoundTripperFailedError to conform with the
+		// inproxy.RoundTripper interface. This is a simplification of the
+		// logic in InproxyBrokerRoundTripper.RoundTrip, which conditionally
+		// wraps errors based on various heuristics and conditions that are
+		// more relevant to clients and proxies with long polling and
+		// multiple concurrent requests.
+		if retErr != nil {
+			retErr = inproxy.NewRoundTripperFailedError(retErr)
+		}
+	}()
+
+	// Proxy quality broker round trips are not expected to apply a delay here.
+	if roundTripDelay > 0 {
+		return nil, errors.TraceNew("roundTripDelay unsupported")
+	}
+
+	request, err := http.NewRequestWithContext(
+		ctx, "POST", r.requestURL, bytes.NewReader(requestPayload))
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	for name, value := range r.additionalHeaders {
+		request.Header[name] = value
+	}
+
+	response, err := r.transport.RoundTrip(request)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	defer response.Body.Close()
+	if response.StatusCode != http.StatusOK {
+		return nil, errors.Tracef(
+			"unexpected response status code %d", response.StatusCode)
+	}
+
+	responsePayload, err := io.ReadAll(response.Body)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return responsePayload, nil
+}
+
+// Close interrupts any in-flight request and closes all underlying network
+// connections.
+func (r *InproxyProxyQualityBrokerRoundTripper) Close() error {
+	r.conns.CloseAll()
+	return nil
+}

+ 5 - 0
psiphon/server/meek.go

@@ -1865,6 +1865,11 @@ func (server *MeekServer) inproxyReloadTactics() error {
 		p.Duration(parameters.InproxyBrokerMatcherOfferRateLimitInterval),
 		p.Int(parameters.InproxyMaxCompartmentIDListLength))
 
+	server.inproxyBroker.SetProxyQualityParameters(
+		p.Duration(parameters.InproxyProxyQualityTTL),
+		p.Duration(parameters.InproxyProxyQualityPendingFailedMatchDeadline),
+		p.Int(parameters.InproxyProxyQualityFailedMatchThreshold))
+
 	return nil
 }
 

+ 95 - 18
psiphon/server/server_test.go

@@ -1095,6 +1095,11 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	// 3. hot reload of server tactics (runConfig.doHotReload && doServerTactics)
 	discoveryLog := make(chan map[string]interface{}, 3)
 
+	inproxyProxyAnnounceLog := make(chan map[string]interface{}, 1)
+	inproxyClientOfferLog := make(chan map[string]interface{}, 1)
+	inproxyProxyAnswerLog := make(chan map[string]interface{}, 1)
+	inproxyServerProxyQualityLog := make(chan map[string]interface{}, 1)
+
 	setLogCallback(func(log []byte) {
 
 		logFields := make(map[string]interface{})
@@ -1131,18 +1136,31 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			default:
 			}
 		case "inproxy_broker":
-			// Check that broker receives the correct fronting provider ID.
-			//
-			// TODO: check inproxy_broker logs received when expected and
-			// check more fields
+
 			event, ok := logFields["broker_event"].(string)
 			if !ok {
 				t.Errorf("missing inproxy_broker.broker_event")
 			}
-			if event == "client-offer" || event == "proxy-announce" {
-				fronting_provider_id, ok := logFields["fronting_provider_id"].(string)
-				if !ok || fronting_provider_id != inproxyTestConfig.brokerFrontingProviderID {
-					t.Errorf("unexpected inproxy_broker.fronting_provider_id for %s", event)
+			switch event {
+			case "proxy-announce":
+				select {
+				case inproxyProxyAnnounceLog <- logFields:
+				default:
+				}
+			case "client-offer":
+				select {
+				case inproxyClientOfferLog <- logFields:
+				default:
+				}
+			case "proxy-answer":
+				select {
+				case inproxyProxyAnswerLog <- logFields:
+				default:
+				}
+			case "server-proxy-quality":
+				select {
+				case inproxyServerProxyQualityLog <- logFields:
+				default:
 				}
 			}
 		}
@@ -2025,6 +2043,46 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		}
 	}
 
+	// Check in-proxy broker logs. This check also confirms that the server
+	// proxy quality report request succeeded.
+
+	logChannels := []chan map[string]interface{}{
+		inproxyProxyAnnounceLog,
+		inproxyClientOfferLog,
+		inproxyProxyAnswerLog,
+		inproxyServerProxyQualityLog}
+	for _, logChannel := range logChannels {
+
+		// There's no proxy quality report in personal pairing mode.
+		expectLog := !(logChannel == inproxyServerProxyQualityLog && runConfig.doPersonalPairing)
+
+		if doInproxy && expectLog {
+			select {
+			case logFields := <-logChannel:
+
+				// Check that broker receives the correct fronting provider ID.
+				//
+				// TODO: check more fields
+				if logChannel == inproxyProxyAnnounceLog ||
+					logChannel == inproxyClientOfferLog ||
+					logChannel == inproxyServerProxyQualityLog {
+					fronting_provider_id, ok := logFields["fronting_provider_id"].(string)
+					if !ok || fronting_provider_id != inproxyTestConfig.brokerFrontingProviderID {
+						t.Errorf("unexpected inproxy_broker.fronting_provider_id")
+					}
+				}
+			default:
+				t.Fatalf("missing in-proxy broker log")
+			}
+		} else {
+			select {
+			case <-logChannel:
+				t.Fatalf("unexpected in-proxy broker log")
+			default:
+			}
+		}
+	}
+
 	// Check that datastore had retained/pruned server entries as expected.
 	checkPruneServerEntriesTest(t, runConfig, testDataDirName, pruneServerEntryTestCases)
 
@@ -3859,8 +3917,8 @@ func generateInproxyTestConfig(
 		verifyPins = "[]"
 	}
 
-	brokerSpecsJSONFormat := `
-            [{
+	brokerSpecJSONFormat := `
+            {
                 "BrokerPublicKey": "%s",
                 "BrokerRootObfuscationSecret": "%s",
                 "BrokerFrontingSpecs": [{
@@ -3872,11 +3930,11 @@ func generateInproxyTestConfig(
                     "VerifyPins": %s,
                     "Host": "%s"
                 }]
-            }]
+            }
     `
 
-	validBrokerSpecsJSON := fmt.Sprintf(
-		brokerSpecsJSONFormat,
+	validBrokerSpecJSON := fmt.Sprintf(
+		brokerSpecJSONFormat,
 		brokerSessionPublicKeyStr,
 		brokerRootObfuscationSecretStr,
 		brokerFrontingProviderID,
@@ -3890,8 +3948,8 @@ func generateInproxyTestConfig(
 	otherSessionPublicKey, _ := otherSessionPrivateKey.GetPublicKey()
 	otherRootObfuscationSecret, _ := inproxy.GenerateRootObfuscationSecret()
 
-	invalidBrokerSpecsJSON := fmt.Sprintf(
-		brokerSpecsJSONFormat,
+	invalidBrokerSpecJSON := fmt.Sprintf(
+		brokerSpecJSONFormat,
 		otherSessionPublicKey.String(),
 		otherRootObfuscationSecret.String(),
 		prng.HexString(16),
@@ -3901,6 +3959,10 @@ func generateInproxyTestConfig(
 		fmt.Sprintf("[\"%s\"]", prng.HexString(16)),
 		prng.HexString(16))
 
+	validBrokerSpecsJSON := fmt.Sprintf("[%s]", validBrokerSpecJSON)
+	invalidBrokerSpecsJSON := fmt.Sprintf("[%s]", invalidBrokerSpecJSON)
+	allBrokerSpecsJSON := fmt.Sprintf("[%s, %s]", validBrokerSpecJSON, invalidBrokerSpecJSON)
+
 	var brokerSpecsJSON, proxyBrokerSpecsJSON, clientBrokerSpecsJSON string
 	if doTargetBrokerSpecs {
 		// invalidBrokerSpecsJSON should be ignored when specific proxy/client
@@ -3914,6 +3976,13 @@ func generateInproxyTestConfig(
 		clientBrokerSpecsJSON = "[]"
 	}
 
+	additionalHeaders := http.Header{}
+	for name, value := range brokerMeekRequiredHeaders {
+		additionalHeaders[name] = []string{value}
+	}
+	additionalHeadersJSONBytes, _ := json.Marshal(additionalHeaders)
+	additionalHeadersJSON := string(additionalHeadersJSONBytes)
+
 	maxRequestTimeoutsJSON := ""
 	if prng.FlipCoin() {
 		maxRequestTimeoutsJSONFormat := `
@@ -3930,7 +3999,7 @@ func generateInproxyTestConfig(
             "InproxyAllowProxy": true,
             "InproxyAllowClient": true,
             "InproxyTunnelProtocolSelectionProbability": 1.0,
-            "InproxyAllBrokerPublicKeys": ["%s", "%s"],
+            "InproxyAllBrokerSpecs": %s,
             "InproxyBrokerSpecs": %s,
             "InproxyProxyBrokerSpecs": %s,
             "InproxyClientBrokerSpecs": %s,
@@ -3941,6 +4010,13 @@ func generateInproxyTestConfig(
             "InproxyDisablePortMapping": true,
             "InproxyDisableIPv6ICECandidates": true,
             "InproxyWebRTCMediaStreamsProbability": %s,
+            "InproxyEnableProxyQuality": true,
+            "InproxyProxyQualityTargetUpstreamBytes": 1,
+            "InproxyProxyQualityTargetDownstreamBytes": 1,
+            "InproxyProxyQualityTargetDuration": "1ns",
+            "InproxyProxyQualityReporterTrustedCACertificates": "%s",
+            "InproxyProxyQualityReporterAdditionalHeaders": %s,
+            "InproxyProxyQualityReporterRequestDelay": 0,
             %s
     `
 
@@ -3951,14 +4027,15 @@ func generateInproxyTestConfig(
 
 	tacticsParametersJSON := fmt.Sprintf(
 		tacticsParametersJSONFormat,
-		brokerSessionPublicKeyStr,
-		otherSessionPublicKey.String(),
+		allBrokerSpecsJSON,
 		brokerSpecsJSON,
 		proxyBrokerSpecsJSON,
 		clientBrokerSpecsJSON,
 		commonCompartmentIDStr,
 		commonCompartmentIDStr,
 		mediaStreamsProbability,
+		strings.ReplaceAll(brokerServerCertificate, "\n", "\\n"),
+		additionalHeadersJSON,
 		maxRequestTimeoutsJSON)
 
 	config := &inproxyTestConfig{

+ 308 - 64
psiphon/server/tunnelServer.go

@@ -142,7 +142,6 @@ func NewTunnelServer(
 // comment in sshClient.stop(). TODO: fully synchronized shutdown.
 func (server *TunnelServer) Run() error {
 
-	// TODO: should TunnelServer hold its own support pointer?
 	support := server.sshServer.support
 
 	// First bind all listeners; once all are successful,
@@ -281,6 +280,17 @@ func (server *TunnelServer) Run() error {
 			})
 	}
 
+	if server.sshServer.inproxyBrokerSessions != nil {
+
+		// When running in-proxy tunnels, start the InproxyBrokerSession
+		// background worker, which includes the proxy quality reporter.
+		// Start this before any tunnels can be established.
+		err := server.sshServer.inproxyBrokerSessions.Start()
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
 	for _, listener := range listeners {
 		server.runWaitGroup.Add(1)
 		go func(listener *sshListener) {
@@ -317,6 +327,10 @@ func (server *TunnelServer) Run() error {
 	server.sshServer.stopClients()
 	server.runWaitGroup.Wait()
 
+	if server.sshServer.inproxyBrokerSessions != nil {
+		server.sshServer.inproxyBrokerSessions.Stop()
+	}
+
 	log.WithTrace().Info("stopped")
 
 	return err
@@ -466,12 +480,15 @@ func newSSHServer(
 	// original in-proxy client IP and the in-proxy proxy ID.
 	//
 	// Only brokers with public keys configured in the
-	// InproxyAllBrokerPublicKeys tactic parameter are allowed to connect to
+	// InproxyAllBrokerSpecs tactic parameter are allowed to connect to
 	// the server, and brokers verify the server's public key via the
 	// InproxySessionPublicKey server entry field.
 	//
 	// Sessions are initialized and run for all psiphond instances running any
 	// in-proxy tunnel protocol.
+	//
+	// inproxyBrokerSessions also run the server proxy quality reporter, which
+	// makes requests to brokers configured in InproxyAllBrokerSpecs.
 
 	var inproxyBrokerSessions *inproxy.ServerBrokerSessions
 
@@ -497,15 +514,33 @@ func newSSHServer(
 			return nil, errors.Trace(err)
 		}
 
-		// The expected broker public keys are set in reloadTactics directly
-		// below, so none are set here.
-		inproxyBrokerSessions, err = inproxy.NewServerBrokerSessions(
-			inproxyPrivateKey,
-			inproxyObfuscationSecret,
-			nil,
-			getInproxyBrokerAPIParameterValidator(support.Config),
-			getInproxyBrokerAPIParameterLogFieldFormatter(),
-			"inproxy_proxy_") // Prefix for proxy metrics log fields in server_tunnel
+		makeRoundTripper := func(
+			brokerPublicKey inproxy.SessionPublicKey) (
+			inproxy.RoundTripper, common.APIParameters, error) {
+
+			roundTripper, additionalParams, err := MakeInproxyProxyQualityBrokerRoundTripper(
+				support, brokerPublicKey)
+			if err != nil {
+				return nil, nil, errors.Trace(err)
+			}
+			return roundTripper, additionalParams, nil
+		}
+
+		// The expected broker specd and public keys are set in reloadTactics
+		// directly below, so none are set here.
+		config := &inproxy.ServerBrokerSessionsConfig{
+			Logger:                      CommonLogger(log),
+			ServerPrivateKey:            inproxyPrivateKey,
+			ServerRootObfuscationSecret: inproxyObfuscationSecret,
+			BrokerRoundTripperMaker:     makeRoundTripper,
+			ProxyMetricsValidator:       getInproxyBrokerAPIParameterValidator(support.Config),
+			ProxyMetricsFormatter:       getInproxyBrokerAPIParameterLogFieldFormatter(),
+
+			// Prefix for proxy metrics log fields in server_tunnel
+			ProxyMetricsPrefix: "inproxy_proxy_",
+		}
+
+		inproxyBrokerSessions, err = inproxy.NewServerBrokerSessions(config)
 		if err != nil {
 			return nil, errors.Trace(err)
 		}
@@ -1487,9 +1522,9 @@ func (sshServer *sshServer) reloadTactics() error {
 	// in-proxy tunnel protocols.
 	if sshServer.inproxyBrokerSessions != nil {
 
-		// Get InproxyAllBrokerPublicKeys from tactics.
+		// Get InproxyAllBrokerSpecs from tactics.
 		//
-		// Limitation: assumes no GeoIP targeting for InproxyAllBrokerPublicKeys.
+		// Limitation: assumes no GeoIP targeting for InproxyAllBrokerSpecs.
 
 		p, err := sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
 		if err != nil {
@@ -1498,20 +1533,47 @@ func (sshServer *sshServer) reloadTactics() error {
 
 		if !p.IsNil() {
 
-			brokerPublicKeys, err := inproxy.SessionPublicKeysFromStrings(
-				p.Strings(parameters.InproxyAllBrokerPublicKeys))
-			if err != nil {
-				return errors.Trace(err)
+			brokerSpecs := p.InproxyBrokerSpecs(parameters.InproxyAllBrokerSpecs)
+
+			var brokerPublicKeys []inproxy.SessionPublicKey
+			var brokerRootObfuscationSecrets []inproxy.ObfuscationSecret
+
+			for _, brokerSpec := range brokerSpecs {
+
+				brokerPublicKey, err := inproxy.SessionPublicKeyFromString(
+					brokerSpec.BrokerPublicKey)
+				if err != nil {
+					return errors.Trace(err)
+				}
+
+				brokerPublicKeys = append(
+					brokerPublicKeys, brokerPublicKey)
+
+				brokerRootObfuscationSecret, err := inproxy.ObfuscationSecretFromString(
+					brokerSpec.BrokerRootObfuscationSecret)
+				if err != nil {
+					return errors.Trace(err)
+				}
+
+				brokerRootObfuscationSecrets = append(
+					brokerRootObfuscationSecrets, brokerRootObfuscationSecret)
 			}
 
 			// SetKnownBrokerPublicKeys will terminate any existing sessions
 			// for broker public keys no longer in the known/expected list;
 			// but will retain any existing sessions for broker public keys
 			// that remain in the list.
-			err = sshServer.inproxyBrokerSessions.SetKnownBrokerPublicKeys(brokerPublicKeys)
+			err = sshServer.inproxyBrokerSessions.SetKnownBrokers(
+				brokerPublicKeys, brokerRootObfuscationSecrets)
 			if err != nil {
 				return errors.Trace(err)
 			}
+
+			sshServer.inproxyBrokerSessions.SetProxyQualityRequestParameters(
+				p.Int(parameters.InproxyProxyQualityReporterMaxRequestEntries),
+				p.Duration(parameters.InproxyProxyQualityReporterRequestDelay),
+				p.Duration(parameters.InproxyProxyQualityReporterRequestTimeout),
+				p.Int(parameters.InproxyProxyQualityReporterRequestRetries))
 		}
 	}
 
@@ -1855,6 +1917,7 @@ type sshClient struct {
 	sentAlertRequests                    map[string]bool
 	peakMetrics                          peakMetrics
 	destinationBytesMetrics              map[string]*protocolDestinationBytesMetrics
+	inproxyProxyQualityTracker           *inproxyProxyQualityTracker
 }
 
 type trafficState struct {
@@ -1981,6 +2044,8 @@ type handshakeState struct {
 	newTacticsTag           string
 	inproxyClientIP         string
 	inproxyClientGeoIPData  GeoIPData
+	inproxyProxyID          inproxy.ID
+	inproxyMatchedPersonal  bool
 	inproxyRelayLogFields   common.LogFields
 }
 
@@ -2069,6 +2134,73 @@ func (lookup *splitTunnelLookup) lookup(region string) bool {
 	}
 }
 
+type inproxyProxyQualityTracker struct {
+	sshClient       *sshClient
+	targetBytesUp   int64
+	targetBytesDown int64
+	targetDuration  time.Duration
+	startTime       time.Time
+
+	bytesUp         int64
+	bytesDown       int64
+	reportTriggered int32
+}
+
+func newInproxyProxyQualityTracker(
+	sshClient *sshClient,
+	targetBytesUp int64,
+	targetBytesDown int64,
+	targetDuration time.Duration) *inproxyProxyQualityTracker {
+
+	return &inproxyProxyQualityTracker{
+		sshClient:       sshClient,
+		targetBytesUp:   targetBytesUp,
+		targetBytesDown: targetBytesDown,
+		targetDuration:  targetDuration,
+
+		startTime: time.Now(),
+	}
+}
+
+func (t *inproxyProxyQualityTracker) UpdateProgress(
+	downstreamBytes, upstreamBytes, _ int64) {
+
+	// Concurrency: UpdateProgress may be called concurrently; all accesses to
+	// mutated fields use atomic operations.
+
+	if atomic.LoadInt32(&t.reportTriggered) != 0 {
+		// TODO: performance -- remove the updater once the target met,
+		// instead of making this residual, no-op update call per tunnel I/O?
+		return
+	}
+
+	bytesUp := atomic.AddInt64(&t.bytesUp, upstreamBytes)
+	bytesDown := atomic.AddInt64(&t.bytesDown, downstreamBytes)
+
+	if (t.targetBytesUp == 0 || bytesUp >= t.targetBytesUp) &&
+		(t.targetBytesDown == 0 || bytesDown >= t.targetBytesDown) &&
+		(t.targetDuration == 0 || time.Since(t.startTime) >= t.targetDuration) {
+
+		// The tunnel connection is wrapped with the quality tracker just
+		// before the SSH handshake. It's possible that the quality targets
+		// are met before the Psiphon handshake completes, due to sufficient
+		// bytes/duration during the intermediate handshakes, or during the
+		// liveness test. Since the proxy ID isn't known until then Psiphon
+		// handshake completes, delay any report until at least after the
+		// Psiphon handshake is completed.
+
+		handshaked, _ := t.sshClient.getHandshaked()
+		if handshaked {
+
+			if !atomic.CompareAndSwapInt32(&t.reportTriggered, 0, 1) {
+				return
+			}
+
+			t.sshClient.reportProxyQuality()
+		}
+	}
+}
+
 func newSshClient(
 	sshServer *sshServer,
 	sshListener *sshListener,
@@ -2158,11 +2290,20 @@ func (sshClient *sshClient) run(
 	// the connection active. Writes are not considered reliable activity indicators
 	// due to buffering.
 
+	// getTunnelActivityUpdaters wires up updaters that act on tunnel duration
+	// and bytes transferred, including the in-proxy proxy quality tracker.
+	// The quality tracker will include non-user traffic bytes, so it's not
+	// equivalent to server_tunnel bytes.
+	//
+	// Limitation: wrapping at this point omits some obfuscation layer bytes,
+	// including MEEK and QUIC.
+
 	activityConn, err := common.NewActivityMonitoredConn(
 		conn,
 		SSH_CONNECTION_READ_DEADLINE,
 		false,
-		nil)
+		nil,
+		sshClient.getTunnelActivityUpdaters()...)
 	if err != nil {
 		conn.Close()
 		if !isExpectedTunnelIOError(err) {
@@ -3173,7 +3314,8 @@ func (sshClient *sshClient) handleNewPacketTunnelChannel(
 			trafficType = portForwardTypeUDP
 		}
 
-		activityUpdaters := sshClient.getActivityUpdaters(trafficType, upstreamIPAddress)
+		activityUpdaters := sshClient.getPortForwardActivityUpdaters(
+			trafficType, upstreamIPAddress)
 
 		flowUpdaters := make([]tun.FlowActivityUpdater, len(activityUpdaters))
 		for i, activityUpdater := range activityUpdaters {
@@ -3425,26 +3567,23 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 
 		destinationBytesMetricsASNs := []string{}
 		destinationBytesMetricsASN := ""
-		if sshClient.sshServer.support.ServerTacticsParametersCache != nil {
 
-			// Target this using the client, not peer, GeoIP. In the case of
-			// in-proxy tunnel protocols, the client GeoIP fields will be None
-			// if the handshake does not complete. In that case, no bytes will
-			// have transferred.
+		// Target this using the client, not peer, GeoIP. In the case of
+		// in-proxy tunnel protocols, the client GeoIP fields will be None
+		// if the handshake does not complete. In that case, no bytes will
+		// have transferred.
 
-			p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
-			if err == nil && !p.IsNil() {
-				destinationBytesMetricsASNs = p.Strings(parameters.DestinationBytesMetricsASNs)
-				destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)
-			}
-			p.Close()
+		p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
+		if err == nil && !p.IsNil() {
+			destinationBytesMetricsASNs = p.Strings(parameters.DestinationBytesMetricsASNs)
+			destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)
 		}
+		p.Close()
 
 		if destinationBytesMetricsASN != "" {
 
 			// Log any parameters.DestinationBytesMetricsASN data in the
-			// legacy log field format. Zero values are not omitted in this
-			// format.
+			// legacy log field format.
 
 			destinationBytesMetrics, ok :=
 				sshClient.destinationBytesMetrics[destinationBytesMetricsASN]
@@ -3485,27 +3624,11 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 				bytesUpUDP := destinationBytesMetrics.udpMetrics.getBytesUp()
 				bytesDownUDP := destinationBytesMetrics.udpMetrics.getBytesDown()
 
-				// Zero values are omitted to reduce log size.
-
-				bytes := bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
-				if bytes <= 0 {
-					continue
-				}
-
 				destBytes[ASN] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
-
-				if bytesUpTCP > 0 {
-					destBytesUpTCP[ASN] = bytesUpTCP
-				}
-				if bytesDownTCP > 0 {
-					destBytesDownTCP[ASN] = bytesDownTCP
-				}
-				if bytesUpUDP > 0 {
-					destBytesUpUDP[ASN] = bytesUpUDP
-				}
-				if bytesDownUDP > 0 {
-					destBytesDownUDP[ASN] = bytesDownUDP
-				}
+				destBytesUpTCP[ASN] = bytesUpTCP
+				destBytesDownTCP[ASN] = bytesDownTCP
+				destBytesUpUDP[ASN] = bytesUpUDP
+				destBytesDownUDP[ASN] = bytesDownUDP
 			}
 
 			logFields["asn_dest_bytes"] = destBytes
@@ -3848,7 +3971,8 @@ func (sshClient *sshClient) setHandshakeState(
 
 	if sshClient.isInproxyTunnelProtocol {
 
-		p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
+		p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(
+			sshClient.clientGeoIPData)
 		if err != nil {
 			return nil, errors.Trace(err)
 		}
@@ -4241,17 +4365,13 @@ func (sshClient *sshClient) setDestinationBytesMetrics() {
 	// of an additional tactics filtering per tunnel. As this cache is
 	// designed for GeoIP filtering only, handshake API parameters are not
 	// applied to tactics filtering in this case.
-
-	tacticsCache := sshClient.sshServer.support.ServerTacticsParametersCache
-	if tacticsCache == nil {
-		return
-	}
-
+	//
 	// Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
 	// protocols, the client GeoIP fields will be populated using the
 	// original client IP already received, from the broker, in the handshake.
 
-	p, err := tacticsCache.Get(sshClient.clientGeoIPData)
+	p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(
+		sshClient.clientGeoIPData)
 	if err != nil {
 		log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
 		return
@@ -4285,7 +4405,9 @@ func (sshClient *sshClient) setDestinationBytesMetrics() {
 	}
 }
 
-func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType int, IPAddress net.IP) *destinationBytesMetrics {
+func (sshClient *sshClient) newDestinationBytesMetricsUpdater(
+	portForwardType int, IPAddress net.IP) *destinationBytesMetrics {
+
 	sshClient.Lock()
 	defer sshClient.Unlock()
 
@@ -4310,7 +4432,9 @@ func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType in
 	return &metrics.udpMetrics
 }
 
-func (sshClient *sshClient) getActivityUpdaters(portForwardType int, IPAddress net.IP) []common.ActivityUpdater {
+func (sshClient *sshClient) getPortForwardActivityUpdaters(
+	portForwardType int, IPAddress net.IP) []common.ActivityUpdater {
+
 	var updaters []common.ActivityUpdater
 
 	clientSeedPortForward := sshClient.newClientSeedPortForward(IPAddress)
@@ -4326,6 +4450,126 @@ func (sshClient *sshClient) getActivityUpdaters(portForwardType int, IPAddress n
 	return updaters
 }
 
+func (sshClient *sshClient) newInproxyProxyQualityTracker() *inproxyProxyQualityTracker {
+
+	sshClient.Lock()
+	defer sshClient.Unlock()
+
+	if !protocol.TunnelProtocolUsesInproxy(sshClient.tunnelProtocol) {
+		return nil
+	}
+
+	// Limitation: assumes no GeoIP targeting for in-proxy quality
+	// configuration. The original client GeoIP information is not available
+	// until after the Psiphon handshake completes, and we want to include
+	// earlier tunnel bytes, including any liveness test.
+	//
+	// As a future enhancement, quality tracker targets could be _extended_ in
+	// reportProxyQuality.
+
+	p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
+	if err != nil {
+		log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
+		return nil
+	}
+	if p.IsNil() {
+		return nil
+	}
+
+	// InproxyEnableProxyQuality indicates if proxy quality reporting is
+	// enabled or not.
+	//
+	// Note that flipping InproxyEnableProxyQuality to false in tactics does
+	// not interrupt any tracker already in progress.
+	if !p.Bool(parameters.InproxyEnableProxyQuality) {
+		return nil
+	}
+
+	tracker := newInproxyProxyQualityTracker(
+		sshClient,
+		int64(p.Int(parameters.InproxyProxyQualityTargetUpstreamBytes)),
+		int64(p.Int(parameters.InproxyProxyQualityTargetDownstreamBytes)),
+		p.Duration(parameters.InproxyProxyQualityTargetDuration))
+
+	sshClient.inproxyProxyQualityTracker = tracker
+
+	return tracker
+}
+
+func (sshClient *sshClient) reportProxyQuality() {
+
+	sshClient.Lock()
+	defer sshClient.Unlock()
+
+	if !protocol.TunnelProtocolUsesInproxy(sshClient.tunnelProtocol) ||
+		!sshClient.handshakeState.completed {
+		log.Warning("unexpected reportProxyQuality call")
+		return
+	}
+
+	if sshClient.handshakeState.inproxyMatchedPersonal {
+		// Skip quality reporting for personal paired proxies. Brokers don't use
+		// quality data for personal matching, and no quality data from personal
+		// pairing should not influence common matching prioritization.
+		return
+	}
+
+	// Enforce InproxyEnableProxyQualityClientRegions. If set, this is a
+	// restricted list of client regions for which quality is reported.
+	//
+	// Note that it's possible to have an soft client GeoIP limit given that
+	// in-proxy protocols are default disabled and enabled via
+	// LimitTunnelProtocols. However, that parameter is enforced on the
+	// client side.
+	//
+	// Now that that the Psiphon handshake is complete, the original client IP
+	// is known. Here, as in newInproxyProxyQualityTracker, the tactics
+	// filter remains non-region specific, so
+	// InproxyEnableProxyQualityClientRegions should be a global list. This
+	// accommodates a simpler configuration vs., for example, using many
+	// region-specific filters to override InproxyEnableProxyQuality.
+	//
+	// Future enhancement: here, we could extend inproxyProxyQualityTracker
+	// targets with client GeoIP-specific values.
+
+	p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
+	if err != nil || p.IsNil() {
+		log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
+		return
+	}
+
+	enabledRegions := p.Strings(parameters.InproxyEnableProxyQualityClientRegions)
+	if len(enabledRegions) > 0 &&
+		!common.Contains(enabledRegions, sshClient.clientGeoIPData.Country) {
+
+		// Quality reporting is restricted to specific regions, and this client's region is not included.
+		return
+	}
+
+	// ReportQuality will enqueue the quality data to be sent to brokers.
+	// There's a delay before making broker requests, in an effort to batch
+	// up data. Requests may be made to only a subset of brokers in
+	// InproxyAllBrokerSpecs, depending on whether the broker is expected to
+	// trust this server's session public key; see ReportQuality.
+
+	sshClient.sshServer.inproxyBrokerSessions.ReportQuality(
+		sshClient.handshakeState.inproxyProxyID,
+		sshClient.peerGeoIPData.ASN,
+		sshClient.clientGeoIPData.ASN)
+}
+
+func (sshClient *sshClient) getTunnelActivityUpdaters() []common.ActivityUpdater {
+
+	var updaters []common.ActivityUpdater
+
+	inproxyProxyQualityTracker := sshClient.newInproxyProxyQualityTracker()
+	if inproxyProxyQualityTracker != nil {
+		updaters = append(updaters, inproxyProxyQualityTracker)
+	}
+
+	return updaters
+}
+
 // setTrafficRules resets the client's traffic rules based on the latest server config
 // and client properties. As sshClient.trafficRules may be reset by a concurrent
 // goroutine, trafficRules must only be accessed within the sshClient mutex.
@@ -5062,7 +5306,7 @@ func (sshClient *sshClient) handleTCPChannel(
 		sshClient.idleTCPPortForwardTimeout(),
 		true,
 		lruEntry,
-		sshClient.getActivityUpdaters(portForwardTypeTCP, IP)...)
+		sshClient.getPortForwardActivityUpdaters(portForwardTypeTCP, IP)...)
 	if err != nil {
 		log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
 		return

+ 2 - 1
psiphon/server/udp.go

@@ -271,7 +271,8 @@ func (mux *udpgwPortForwardMultiplexer) run() {
 			var activityUpdaters []common.ActivityUpdater
 			// Don't incur activity monitor overhead for DNS requests
 			if !message.forwardDNS {
-				activityUpdaters = mux.sshClient.getActivityUpdaters(portForwardTypeUDP, dialIP)
+				activityUpdaters = mux.sshClient.getPortForwardActivityUpdaters(
+					portForwardTypeUDP, dialIP)
 			}
 
 			conn, err := common.NewActivityMonitoredConn(

+ 2 - 56
psiphon/tlsDialer.go

@@ -54,9 +54,7 @@ package psiphon
 import (
 	"bytes"
 	"context"
-	"crypto/sha256"
 	"crypto/x509"
-	"encoding/base64"
 	"encoding/binary"
 	"encoding/hex"
 	std_errors "errors"
@@ -334,7 +332,7 @@ func CustomTLSDial(
 					return errors.TraceNew("unexpected verified chains")
 				}
 				var err error
-				verifiedChains, err = verifyServerCertificate(
+				verifiedChains, err = common.VerifyServerCertificate(
 					tlsConfigRootCAs, rawCerts, verifyServerName)
 				if err != nil {
 					return errors.Trace(err)
@@ -342,7 +340,7 @@ func CustomTLSDial(
 			}
 
 			if len(config.VerifyPins) > 0 {
-				err := verifyCertificatePins(
+				err := common.VerifyCertificatePins(
 					config.VerifyPins, verifiedChains)
 				if err != nil {
 					return errors.Trace(err)
@@ -719,58 +717,6 @@ func verifyLegacyCertificate(rawCerts [][]byte, expectedCertificate *x509.Certif
 	return nil
 }
 
-// verifyServerCertificate parses and verifies the provided chain. If
-// successful, it returns the verified chains that were built.
-func verifyServerCertificate(
-	rootCAs *x509.CertPool, rawCerts [][]byte, verifyServerName string) ([][]*x509.Certificate, error) {
-
-	// This duplicates the verification logic in utls (and standard crypto/tls).
-
-	certs := make([]*x509.Certificate, len(rawCerts))
-	for i, rawCert := range rawCerts {
-		cert, err := x509.ParseCertificate(rawCert)
-		if err != nil {
-			return nil, errors.Trace(err)
-		}
-		certs[i] = cert
-	}
-
-	opts := x509.VerifyOptions{
-		Roots:         rootCAs,
-		DNSName:       verifyServerName,
-		Intermediates: x509.NewCertPool(),
-	}
-
-	for i, cert := range certs {
-		if i == 0 {
-			continue
-		}
-		opts.Intermediates.AddCert(cert)
-	}
-
-	verifiedChains, err := certs[0].Verify(opts)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-
-	return verifiedChains, nil
-}
-
-func verifyCertificatePins(pins []string, verifiedChains [][]*x509.Certificate) error {
-	for _, chain := range verifiedChains {
-		for _, cert := range chain {
-			publicKeyDigest := sha256.Sum256(cert.RawSubjectPublicKeyInfo)
-			expectedPin := base64.StdEncoding.EncodeToString(publicKeyDigest[:])
-			if common.Contains(pins, expectedPin) {
-				// Return success on the first match of any certificate public key to any
-				// pin.
-				return nil
-			}
-		}
-	}
-	return errors.TraceNew("no pin found")
-}
-
 func IsTLSConnUsingHTTP2(conn net.Conn) bool {
 	if t, ok := conn.(*tlsConn); ok {
 		if u, ok := t.Conn.(*utls.UConn); ok {