Browse Source

Merge pull request #697 from rod-hynes/master

In-proxy broker enhancements
Rod Hynes 1 year ago
parent
commit
d16a328ba7

+ 8 - 1
psiphon/common/inproxy/api.go

@@ -215,7 +215,7 @@ type ClientMetrics struct {
 // ProxyAnnounceRequest is an API request sent from a proxy to a broker,
 // announcing that it is available for a client connection. Proxies send one
 // ProxyAnnounceRequest for each available client connection. The broker will
-// match the proxy with a a client and return WebRTC connection information
+// match the proxy with a client and return WebRTC connection information
 // in the response.
 //
 // PersonalCompartmentIDs limits the clients to those that supply one of the
@@ -223,11 +223,18 @@ type ClientMetrics struct {
 // proxy operators to client users out-of-band and provide optional access
 // control.
 //
+// When CheckTactics is set, the broker will check for new tactics or indicate
+// that the proxy's cached tactics TTL may be extended. Tactics information
+// is returned in the response TacticsPayload. To minimize broker processing
+// overhead, proxies with multiple workers should designate just one worker
+// to set CheckTactics.
+//
 // The proxy's session public key is an implicit and cryptographically
 // verified proxy ID.
 type ProxyAnnounceRequest struct {
 	PersonalCompartmentIDs []ID          `cbor:"1,keyasint,omitempty"`
 	Metrics                *ProxyMetrics `cbor:"2,keyasint,omitempty"`
+	CheckTactics           bool          `cbor:"3,keyasint,omitempty"`
 }
 
 // WebRTCSessionDescription is compatible with pion/webrtc.SessionDescription

+ 68 - 25
psiphon/common/inproxy/broker.go

@@ -122,6 +122,15 @@ type BrokerConfig struct {
 	// clients. Proxies with personal compartment IDs are always allowed.
 	AllowProxy func(common.GeoIPData) bool
 
+	// PrioritizeProxy is a callback which can indicate whether proxy
+	// announcements from proxies with the specified GeoIPData and
+	// APIParameters should be prioritized in the matcher queue. Priority
+	// proxy announcements match ahead of other proxy announcements,
+	// regardless of announcement age/deadline. Priority status takes
+	// precedence over preferred NAT matching. Prioritization applies only to
+	// common compartment IDs and not personal pairing mode.
+	PrioritizeProxy func(common.GeoIPData, common.APIParameters) bool
+
 	// AllowClient is a callback which can indicate whether a client with the
 	// given GeoIP data is allowed to match with common compartment ID
 	// proxies. Clients are always allowed to match based on personal
@@ -155,6 +164,14 @@ type BrokerConfig struct {
 	// entry tags.
 	IsValidServerEntryTag func(serverEntryTag string) bool
 
+	// IsLoadLimiting is a callback which checks if the broker process is in a
+	// load limiting state, where consumed resources, including allocated
+	// system memory and CPU load, exceed determined thresholds. When load
+	// limiting is indicated, the broker will attempt to reduce load by
+	// immediately rejecting either proxy announces or client offers,
+	// depending on the state of the corresponding queues.
+	IsLoadLimiting func() bool
+
 	// PrivateKey is the broker's secure session long term private key.
 	PrivateKey SessionPrivateKey
 
@@ -233,6 +250,8 @@ func NewBroker(config *BrokerConfig) (*Broker, error) {
 			OfferLimitEntryCount:           config.MatcherOfferLimitEntryCount,
 			OfferRateLimitQuantity:         config.MatcherOfferRateLimitQuantity,
 			OfferRateLimitInterval:         config.MatcherOfferRateLimitInterval,
+
+			IsLoadLimiting: config.IsLoadLimiting,
 		}),
 
 		proxyAnnounceTimeout:       int64(config.ProxyAnnounceTimeout),
@@ -548,6 +567,8 @@ func (b *Broker) handleProxyAnnounce(
 		return nil, errors.Trace(err)
 	}
 
+	hasPersonalCompartmentIDs := len(announceRequest.PersonalCompartmentIDs) > 0
+
 	// Return MustUpgrade when the proxy's protocol version is less than the
 	// minimum required.
 	if announceRequest.Metrics.ProxyProtocolVersion < MinimumProxyProtocolVersion {
@@ -573,22 +594,25 @@ func (b *Broker) handleProxyAnnounce(
 	// proxy can store and apply the new tactics before announcing again.
 
 	var tacticsPayload []byte
-	tacticsPayload, newTacticsTag, err = b.config.GetTacticsPayload(geoIPData, apiParams)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-
-	if tacticsPayload != nil && newTacticsTag != "" {
-		responsePayload, err := MarshalProxyAnnounceResponse(
-			&ProxyAnnounceResponse{
-				TacticsPayload: tacticsPayload,
-				NoMatch:        true,
-			})
+	if announceRequest.CheckTactics {
+		tacticsPayload, newTacticsTag, err =
+			b.config.GetTacticsPayload(geoIPData, apiParams)
 		if err != nil {
 			return nil, errors.Trace(err)
 		}
 
-		return responsePayload, nil
+		if tacticsPayload != nil && newTacticsTag != "" {
+			responsePayload, err := MarshalProxyAnnounceResponse(
+				&ProxyAnnounceResponse{
+					TacticsPayload: tacticsPayload,
+					NoMatch:        true,
+				})
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
+
+			return responsePayload, nil
+		}
 	}
 
 	// AllowProxy may be used to disallow proxies from certain geolocations,
@@ -596,7 +620,7 @@ func (b *Broker) handleProxyAnnounce(
 	// compartment IDs are always allowed, as they will be used only by
 	// clients specifically configured to use them.
 
-	if len(announceRequest.PersonalCompartmentIDs) == 0 &&
+	if !hasPersonalCompartmentIDs &&
 		!b.config.AllowProxy(geoIPData) {
 
 		return nil, errors.TraceNew("proxy disallowed")
@@ -608,7 +632,7 @@ func (b *Broker) handleProxyAnnounce(
 	// assigned to the same compartment.
 
 	var commonCompartmentIDs []ID
-	if len(announceRequest.PersonalCompartmentIDs) == 0 {
+	if !hasPersonalCompartmentIDs {
 		compartmentID, err := b.selectCommonCompartmentID(proxyID)
 		if err != nil {
 			return nil, errors.Trace(err)
@@ -616,6 +640,28 @@ func (b *Broker) handleProxyAnnounce(
 		commonCompartmentIDs = []ID{compartmentID}
 	}
 
+	// In the common compartment ID case, invoke the callback to check if the
+	// announcement should be prioritized.
+
+	isPriority := false
+	if b.config.PrioritizeProxy != nil && !hasPersonalCompartmentIDs {
+
+		// Limitation: Of the two return values from
+		// ValidateAndGetParametersAndLogFields, apiParams and logFields,
+		// only logFields contains fields such as max_clients
+		// and *_bytes_per_second, and so these cannot be part of any
+		// filtering performed by the PrioritizeProxy callback.
+		//
+		// TODO: include the additional fields in logFields. Since the
+		// logFields return value is the output of server.getRequestLogFields
+		// processing, it's not safe to use it directly. In addition,
+		// filtering by fields such as max_clients and *_bytes_per_second
+		// calls for range filtering, which is not yet supported in the
+		// psiphon/server.MeekServer PrioritizeProxy provider.
+
+		isPriority = b.config.PrioritizeProxy(geoIPData, apiParams)
+	}
+
 	// Await client offer.
 
 	timeout := common.ValueOrDefault(
@@ -638,6 +684,7 @@ func (b *Broker) handleProxyAnnounce(
 		proxyIP,
 		&MatchAnnouncement{
 			Properties: MatchProperties{
+				IsPriority:             isPriority,
 				CommonCompartmentIDs:   commonCompartmentIDs,
 				PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
 				GeoIPData:              geoIPData,
@@ -830,6 +877,8 @@ func (b *Broker) handleClientOffer(
 		return nil, errors.Trace(err)
 	}
 
+	hasPersonalCompartmentIDs := len(offerRequest.PersonalCompartmentIDs) > 0
+
 	offerSDP := offerRequest.ClientOfferSDP
 	offerSDP.SDP = string(filteredSDP)
 
@@ -837,16 +886,10 @@ func (b *Broker) handleClientOffer(
 	// from offering. Clients are always allowed to match proxies with shared
 	// personal compartment IDs.
 
-	commonCompartmentIDs := offerRequest.CommonCompartmentIDs
-
-	if !b.config.AllowClient(geoIPData) {
-
-		if len(offerRequest.PersonalCompartmentIDs) == 0 {
-			return nil, errors.TraceNew("client disallowed")
-		}
+	if !hasPersonalCompartmentIDs &&
+		!b.config.AllowClient(geoIPData) {
 
-		// Only match personal compartment IDs.
-		commonCompartmentIDs = nil
+		return nil, errors.TraceNew("client disallowed")
 	}
 
 	// Validate that the proxy destination specified by the client is a valid
@@ -884,7 +927,7 @@ func (b *Broker) handleClientOffer(
 	// personal pairing mode, to facilitate a faster no-match result and
 	// resulting broker rotation.
 	var timeout time.Duration
-	if len(offerRequest.PersonalCompartmentIDs) > 0 {
+	if hasPersonalCompartmentIDs {
 		timeout = time.Duration(atomic.LoadInt64(&b.clientOfferPersonalTimeout))
 	} else {
 		timeout = time.Duration(atomic.LoadInt64(&b.clientOfferTimeout))
@@ -901,7 +944,7 @@ func (b *Broker) handleClientOffer(
 
 	clientMatchOffer = &MatchOffer{
 		Properties: MatchProperties{
-			CommonCompartmentIDs:   commonCompartmentIDs,
+			CommonCompartmentIDs:   offerRequest.CommonCompartmentIDs,
 			PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
 			GeoIPData:              geoIPData,
 			NetworkType:            GetNetworkType(offerRequest.Metrics.BaseAPIParameters),

+ 184 - 35
psiphon/common/inproxy/matcher.go

@@ -114,6 +114,7 @@ type Matcher struct {
 // MatchProperties specifies the compartment, GeoIP, and network topology
 // matching roperties of clients and proxies.
 type MatchProperties struct {
+	IsPriority             bool
 	CommonCompartmentIDs   []ID
 	PersonalCompartmentIDs []ID
 	GeoIPData              common.GeoIPData
@@ -271,7 +272,7 @@ type MatcherConfig struct {
 	// Logger is used to log events.
 	Logger common.Logger
 
-	// Accouncement queue limits.
+	// Announcement queue limits.
 	AnnouncementLimitEntryCount    int
 	AnnouncementRateLimitQuantity  int
 	AnnouncementRateLimitInterval  time.Duration
@@ -281,6 +282,9 @@ type MatcherConfig struct {
 	OfferLimitEntryCount   int
 	OfferRateLimitQuantity int
 	OfferRateLimitInterval time.Duration
+
+	// Broker process load limit state callback. See Broker.Config.
+	IsLoadLimiting func() bool
 }
 
 // NewMatcher creates a new Matcher.
@@ -431,6 +435,12 @@ func (m *Matcher) Announce(
 		return nil, nil, errors.TraceNew("unexpected compartment ID count")
 	}
 
+	isAnnouncement := true
+	err := m.applyLoadLimit(isAnnouncement)
+	if err != nil {
+		return nil, nil, errors.Trace(err)
+	}
+
 	announcementEntry := &announcementEntry{
 		ctx:          ctx,
 		limitIP:      getRateLimitIP(proxyIP),
@@ -438,7 +448,7 @@ func (m *Matcher) Announce(
 		offerChan:    make(chan *MatchOffer, 1),
 	}
 
-	err := m.addAnnouncementEntry(announcementEntry)
+	err = m.addAnnouncementEntry(announcementEntry)
 	if err != nil {
 		return nil, nil, errors.Trace(err)
 	}
@@ -485,6 +495,12 @@ func (m *Matcher) Offer(
 		return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
 	}
 
+	isAnnouncement := false
+	err := m.applyLoadLimit(isAnnouncement)
+	if err != nil {
+		return nil, nil, nil, errors.Trace(err)
+	}
+
 	offerEntry := &offerEntry{
 		ctx:        ctx,
 		limitIP:    getRateLimitIP(clientIP),
@@ -492,7 +508,7 @@ func (m *Matcher) Offer(
 		answerChan: make(chan *answerInfo, 1),
 	}
 
-	err := m.addOfferEntry(offerEntry)
+	err = m.addOfferEntry(offerEntry)
 	if err != nil {
 		return nil, nil, nil, errors.Trace(err)
 	}
@@ -775,16 +791,27 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
 	var bestMatch *announcementEntry
 	bestMatchIndex := -1
+	bestMatchIsPriority := false
 	bestMatchNAT := false
 
 	candidateIndex := -1
 	for {
 
-		announcementEntry := matchIterator.getNext()
+		announcementEntry, isPriority := matchIterator.getNext()
 		if announcementEntry == nil {
 			break
 		}
 
+		if !isPriority && bestMatchIsPriority {
+
+			// There is a priority match, but it wasn't bestMatchNAT and we
+			// continued to iterate. Now that isPriority is false, we're past the
+			// end of the priority items, so stop looking for any best NAT match
+			// and return the previous priority match. When there are zero
+			// priority items to begin with, this case should not be hit.
+			break
+		}
+
 		candidateIndex += 1
 
 		// Skip and remove this announcement if its deadline has already
@@ -829,8 +856,8 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
 			bestMatch = announcementEntry
 			bestMatchIndex = candidateIndex
+			bestMatchIsPriority = isPriority
 			bestMatchNAT = matchNAT
-
 		}
 
 		// Stop as soon as we have the best possible match, or have reached
@@ -847,6 +874,79 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 	return bestMatch, bestMatchIndex
 }
 
+// applyLoadLimit checks if the broker process is in the load limiting state
+// and, in order to reduce load, determines if new proxy announces or client
+// offers should be rejected immediately instead of enqueued.
+func (m *Matcher) applyLoadLimit(isAnnouncement bool) error {
+
+	if m.config.IsLoadLimiting == nil || !m.config.IsLoadLimiting() {
+		return nil
+	}
+
+	// Acquire the queue locks only when in the load limit state, and in the
+	// same order as matchAllOffers.
+
+	m.announcementQueueMutex.Lock()
+	defer m.announcementQueueMutex.Unlock()
+	m.offerQueueMutex.Lock()
+	defer m.offerQueueMutex.Unlock()
+
+	announcementLen := m.announcementQueue.getLen()
+	offerLen := m.offerQueue.Len()
+
+	// When the load limit had been reached, and assuming the broker process
+	// is running only an in-proxy broker, it's likely, in practise, that
+	// only one of the two queues has hundreds of thousands of entries while
+	// the other has few, and there are no matches clearing the queue.
+	//
+	// Instead of simply rejecting all enqueue requests, allow the request
+	// type, announce or offer, that is in shorter supply as these are likely
+	// to match and draw down the larger queue. This attempts to make
+	// productive use of enqueued items, and also attempts to avoid simply
+	// emptying both queues -- as will happen in any case due to timeouts --
+	// and then have the same larger queue refill again after the load limit
+	// state exits.
+	//
+	// This approach assumes some degree of slack in available system memory
+	// and CPU in the load limiting state, similar to how the tunnel server
+	// continues to operate existing tunnels in the same state.
+	//
+	// The heuristic below of allowing when less than half the size of the
+	// larger queue puts a cap on the amount the shorter queue can continue
+	// to grow in the load limiting state, in the worst case.
+	//
+	// Limitation: in some scenarios that are expected to be rare, it can
+	// happen that allowed requests don't result in a match and memory
+	// consumption continues to grow, leading to a broker process OOM kill.
+
+	var allow bool
+	if isAnnouncement {
+		allow = announcementLen < offerLen/2
+	} else {
+		allow = offerLen < announcementLen/2
+	}
+	if allow {
+		return nil
+	}
+
+	// Do not return a MatcherLimitError, as is done in applyIPLimits. A
+	// MatcherLimitError results in a Response.Limited error response, which
+	// causes a proxy to back off and a client to abort its dial; but in
+	// neither case is the broker client reset. The error returned here will
+	// result in a fast 404 response to the proxy or client, which will
+	// instead trigger a broker client reset, and a chance of moving to a
+	// different broker that is not overloaded.
+	//
+	// Limitation: the 404 response won't be distinguishable, in client or
+	// proxy diagnostics, from other error conditions.
+	//
+	// TODO: add a new Response.LoadLimited flag which the proxy/client can
+	// use use log a distinct error and also ensure that it doesn't reselect
+	// the same broker again in the broker client reset random selection.
+
+	return errors.TraceNew("load limited")
+}
+
 // MatcherLimitError is the error type returned by Announce or Offer when the
 // caller has exceeded configured queue entry or rate limits.
 type MatcherLimitError struct {
@@ -861,9 +961,11 @@ func (e MatcherLimitError) Error() string {
 	return e.err.Error()
 }
 
-func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
+// applyIPLimits checks per-proxy or per-client -- as determined by peer IP
+// address -- rate limits and queue entry limits.
+func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
 
-	// Assumes the m.announcementQueueMutex or m.offerQueue mutex is locked.
+	// Assumes m.announcementQueueMutex or m.offerQueueMutex is locked.
 
 	var entryCountByIP map[string]int
 	var queueRateLimiters *lrucache.Cache
@@ -946,7 +1048,7 @@ func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) err
 	// Ensure no single peer IP can enqueue a large number of entries or
 	// rapidly enqueue beyond the configured rate.
 	isAnnouncement := true
-	err := m.applyLimits(
+	err := m.applyIPLimits(
 		isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
 	if err != nil {
 		return errors.Trace(err)
@@ -1029,7 +1131,7 @@ func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
 	// Ensure no single peer IP can enqueue a large number of entries or
 	// rapidly enqueue beyond the configured rate.
 	isAnnouncement := false
-	err := m.applyLimits(
+	err := m.applyIPLimits(
 		isAnnouncement, offerEntry.limitIP, ID{})
 	if err != nil {
 		return errors.Trace(err)
@@ -1107,9 +1209,10 @@ func getRateLimitIP(strIP string) string {
 // matching a specified list of compartment IDs. announcementMultiQueue and
 // its underlying data structures are not safe for concurrent access.
 type announcementMultiQueue struct {
-	commonCompartmentQueues   map[ID]*announcementCompartmentQueue
-	personalCompartmentQueues map[ID]*announcementCompartmentQueue
-	totalEntries              int
+	priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
+	commonCompartmentQueues         map[ID]*announcementCompartmentQueue
+	personalCompartmentQueues       map[ID]*announcementCompartmentQueue
+	totalEntries                    int
 }
 
 // announcementCompartmentQueue is a single compartment queue within an
@@ -1120,6 +1223,7 @@ type announcementMultiQueue struct {
 // matches may be possible.
 type announcementCompartmentQueue struct {
 	isCommonCompartment      bool
+	isPriority               bool
 	compartmentID            ID
 	entries                  *list.List
 	unlimitedNATCount        int
@@ -1131,11 +1235,10 @@ type announcementCompartmentQueue struct {
 // subset of announcementMultiQueue compartment queues. Concurrent
 // announcementMatchIterators are not supported.
 type announcementMatchIterator struct {
-	multiQueue           *announcementMultiQueue
-	isCommonCompartments bool
-	compartmentQueues    []*announcementCompartmentQueue
-	compartmentIDs       []ID
-	nextEntries          []*list.Element
+	multiQueue        *announcementMultiQueue
+	compartmentQueues []*announcementCompartmentQueue
+	compartmentIDs    []ID
+	nextEntries       []*list.Element
 }
 
 // announcementQueueReference represents the queue position for a given
@@ -1148,8 +1251,9 @@ type announcementQueueReference struct {
 
 func newAnnouncementMultiQueue() *announcementMultiQueue {
 	return &announcementMultiQueue{
-		commonCompartmentQueues:   make(map[ID]*announcementCompartmentQueue),
-		personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
+		priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
+		commonCompartmentQueues:         make(map[ID]*announcementCompartmentQueue),
+		personalCompartmentQueues:       make(map[ID]*announcementCompartmentQueue),
 	}
 }
 
@@ -1179,22 +1283,31 @@ func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) e
 		return errors.TraceNew("announcement must specify exactly one compartment ID")
 	}
 
+	isPriority := announcementEntry.announcement.Properties.IsPriority
+
 	isCommonCompartment := true
 	var compartmentID ID
 	var compartmentQueues map[ID]*announcementCompartmentQueue
 	if len(commonCompartmentIDs) > 0 {
 		compartmentID = commonCompartmentIDs[0]
 		compartmentQueues = q.commonCompartmentQueues
+		if isPriority {
+			compartmentQueues = q.priorityCommonCompartmentQueues
+		}
 	} else {
 		isCommonCompartment = false
 		compartmentID = personalCompartmentIDs[0]
 		compartmentQueues = q.personalCompartmentQueues
+		if isPriority {
+			return errors.TraceNew("priority not supported for personal compartments")
+		}
 	}
 
 	compartmentQueue, ok := compartmentQueues[compartmentID]
 	if !ok {
 		compartmentQueue = &announcementCompartmentQueue{
 			isCommonCompartment: isCommonCompartment,
+			isPriority:          isPriority,
 			compartmentID:       compartmentID,
 			entries:             list.New(),
 		}
@@ -1250,9 +1363,14 @@ func (r *announcementQueueReference) dequeue() bool {
 
 	if r.compartmentQueue.entries.Len() == 0 {
 		// Remove empty compartment queue.
-		queues := r.multiQueue.commonCompartmentQueues
-		if !r.compartmentQueue.isCommonCompartment {
-			queues = r.multiQueue.personalCompartmentQueues
+		queues := r.multiQueue.personalCompartmentQueues
+		if r.compartmentQueue.isCommonCompartment {
+			if r.compartmentQueue.isPriority {
+				queues = r.multiQueue.priorityCommonCompartmentQueues
+
+			} else {
+				queues = r.multiQueue.commonCompartmentQueues
+			}
 		}
 		delete(queues, r.compartmentQueue.compartmentID)
 	}
@@ -1270,8 +1388,7 @@ func (q *announcementMultiQueue) startMatching(
 	compartmentIDs []ID) *announcementMatchIterator {
 
 	iter := &announcementMatchIterator{
-		multiQueue:           q,
-		isCommonCompartments: isCommonCompartments,
+		multiQueue: q,
 	}
 
 	// Find the matching compartment queues and initialize iteration over
@@ -1280,16 +1397,29 @@ func (q *announcementMultiQueue) startMatching(
 	// maxCompartmentIDs, as enforced in
 	// ClientOfferRequest.ValidateAndGetLogFields).
 
-	compartmentQueues := q.commonCompartmentQueues
-	if !isCommonCompartments {
-		compartmentQueues = q.personalCompartmentQueues
+	// Priority queues, when in use, must all be added to the beginning of
+	// iter.compartmentQueues in order to ensure that the iteration logic in
+	// getNext visits all priority items first.
+
+	var compartmentQueuesList []map[ID]*announcementCompartmentQueue
+	if isCommonCompartments {
+		compartmentQueuesList = append(
+			compartmentQueuesList,
+			q.priorityCommonCompartmentQueues,
+			q.commonCompartmentQueues)
+	} else {
+		compartmentQueuesList = append(
+			compartmentQueuesList,
+			q.personalCompartmentQueues)
 	}
 
-	for _, ID := range compartmentIDs {
-		if compartmentQueue, ok := compartmentQueues[ID]; ok {
-			iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
-			iter.compartmentIDs = append(iter.compartmentIDs, ID)
-			iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
+	for _, compartmentQueues := range compartmentQueuesList {
+		for _, ID := range compartmentIDs {
+			if compartmentQueue, ok := compartmentQueues[ID]; ok {
+				iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
+				iter.compartmentIDs = append(iter.compartmentIDs, ID)
+				iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
+			}
 		}
 	}
 
@@ -1327,10 +1457,16 @@ func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
 // are not supported during iteration. Iteration and dequeue should all be
 // performed with a lock over the entire announcementMultiQueue, and with
 // only one concurrent announcementMatchIterator.
-func (iter *announcementMatchIterator) getNext() *announcementEntry {
+//
+// getNext returns a nil *announcementEntry when there are no more items.
+// getNext also returns an isPriority flag, indicating the announcement is a
+// priority candidate. All priority candidates are guaranteed to be returned
+// before any non-priority candidates.
+func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
 
 	// Assumes announcements are enqueued in announcementEntry.ctx.Deadline
-	// order.
+	// order. Also assumes that any priority queues are all at the front of
+	// iter.compartmentQueues.
 
 	// Select the oldest item, by deadline, from all the candidate queue head
 	// items. This operation is linear in the number of matching compartment
@@ -1338,6 +1474,10 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 	// compartment IDs (no more than maxCompartmentIDs, as enforced in
 	// ClientOfferRequest.ValidateAndGetLogFields).
 	//
+	// When there are priority candidates, they are selected first, regardless
+	// of the deadlines of non-priority candidates. Multiple priority
+	// candidates are processed in FIFO deadline order.
+	//
 	// A potential future enhancement is to add more iterator state to track
 	// which queue has the next oldest time to select on the following
 	// getNext call. Another potential enhancement is to remove fully
@@ -1345,14 +1485,22 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 
 	var selectedCandidate *announcementEntry
 	selectedIndex := -1
+	selectedPriority := false
 
 	for i := 0; i < len(iter.compartmentQueues); i++ {
 		if iter.nextEntries[i] == nil {
 			continue
 		}
+		isPriority := iter.compartmentQueues[i].isPriority
+		if selectedPriority && !isPriority {
+			// Ignore older of non-priority entries when there are priority
+			// candidates.
+			break
+		}
 		if selectedCandidate == nil {
 			selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
 			selectedIndex = i
+			selectedPriority = isPriority
 		} else {
 			candidate := iter.nextEntries[i].Value.(*announcementEntry)
 			deadline, deadlineOk := candidate.ctx.Deadline()
@@ -1360,6 +1508,7 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 			if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
 				selectedCandidate = candidate
 				selectedIndex = i
+				selectedPriority = isPriority
 			}
 		}
 	}
@@ -1371,5 +1520,5 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 		iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
 	}
 
-	return selectedCandidate
+	return selectedCandidate, selectedPriority
 }

+ 130 - 31
psiphon/common/inproxy/matcher_test.go

@@ -617,6 +617,31 @@ func runTestMatcher() error {
 		return errors.Trace(err)
 	}
 
+	// Test: priority supercedes preferred NAT match
+
+	go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
+	time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
+	proxy2Properties.IsPriority = true
+	go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
+	time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
+	go clientFunc(clientResultChan, clientIP, client2Properties, 10*time.Millisecond)
+
+	err = <-proxy1ResultChan
+	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
+		return errors.Tracef("unexpected result: %v", err)
+	}
+
+	// proxy2 should match since it's the priority, but not preferred NAT match
+	err = <-proxy2ResultChan
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	err = <-clientResultChan
+	if err != nil {
+		return errors.Trace(err)
+	}
+
 	// Test: many matches
 
 	// Reduce test log noise for this phase of the test
@@ -675,24 +700,26 @@ func TestMatcherMultiQueue(t *testing.T) {
 
 func runTestMatcherMultiQueue() error {
 
-	q := newAnnouncementMultiQueue()
-
 	// Test: invalid compartment IDs
 
-	err := q.enqueue(&announcementEntry{
-		announcement: &MatchAnnouncement{
-			Properties: MatchProperties{}}})
+	q := newAnnouncementMultiQueue()
+
+	err := q.enqueue(
+		&announcementEntry{
+			announcement: &MatchAnnouncement{
+				Properties: MatchProperties{}}})
 	if err == nil {
 		return errors.TraceNew("unexpected success")
 	}
 
 	compartmentID, _ := MakeID()
-	err = q.enqueue(&announcementEntry{
-		announcement: &MatchAnnouncement{
-			Properties: MatchProperties{
-				CommonCompartmentIDs:   []ID{compartmentID},
-				PersonalCompartmentIDs: []ID{compartmentID},
-			}}})
+	err = q.enqueue(
+		&announcementEntry{
+			announcement: &MatchAnnouncement{
+				Properties: MatchProperties{
+					CommonCompartmentIDs:   []ID{compartmentID},
+					PersonalCompartmentIDs: []ID{compartmentID},
+				}}})
 	if err == nil {
 		return errors.TraceNew("unexpected success")
 	}
@@ -716,25 +743,27 @@ func runTestMatcherMultiQueue() error {
 		ctx, cancel := context.WithDeadline(
 			context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
 		defer cancel()
-		err := q.enqueue(&announcementEntry{
-			ctx: ctx,
-			announcement: &MatchAnnouncement{
-				Properties: MatchProperties{
-					CommonCompartmentIDs: []ID{
-						otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
-					NATType: NATTypeSymmetric,
-				}}})
+		err := q.enqueue(
+			&announcementEntry{
+				ctx: ctx,
+				announcement: &MatchAnnouncement{
+					Properties: MatchProperties{
+						CommonCompartmentIDs: []ID{
+							otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
+						NATType: NATTypeSymmetric,
+					}}})
 		if err != nil {
 			return errors.Trace(err)
 		}
-		err = q.enqueue(&announcementEntry{
-			ctx: ctx,
-			announcement: &MatchAnnouncement{
-				Properties: MatchProperties{
-					PersonalCompartmentIDs: []ID{
-						otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
-					NATType: NATTypeSymmetric,
-				}}})
+		err = q.enqueue(
+			&announcementEntry{
+				ctx: ctx,
+				announcement: &MatchAnnouncement{
+					Properties: MatchProperties{
+						PersonalCompartmentIDs: []ID{
+							otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
+						NATType: NATTypeSymmetric,
+					}}})
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -797,7 +826,7 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected NAT counts")
 	}
 
-	match := iter.getNext()
+	match, _ := iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -826,7 +855,7 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected NAT counts")
 	}
 
-	match = iter.getNext()
+	match, _ = iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -844,7 +873,7 @@ func runTestMatcherMultiQueue() error {
 
 	// Test: getNext after dequeue
 
-	match = iter.getNext()
+	match, _ = iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -856,7 +885,7 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected already dequeued")
 	}
 
-	match = iter.getNext()
+	match, _ = iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -882,6 +911,76 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected compartment queue count")
 	}
 
+	// Test: priority
+
+	q = newAnnouncementMultiQueue()
+
+	var commonCompartmentIDs []ID
+	numCompartmentIDs := 10
+	for i := 0; i < numCompartmentIDs; i++ {
+		commonCompartmentID, _ := MakeID()
+		commonCompartmentIDs = append(
+			commonCompartmentIDs, commonCompartmentID)
+	}
+
+	priorityProxyID, _ := MakeID()
+	nonPriorityProxyID, _ := MakeID()
+
+	ctx, cancel := context.WithDeadline(
+		context.Background(), time.Now().Add(10*time.Minute))
+	defer cancel()
+
+	numEntries := 10000
+	for i := 0; i < numEntries; i++ {
+		// Enqueue every other announcement as a priority
+		isPriority := i%2 == 0
+		proxyID := priorityProxyID
+		if !isPriority {
+			proxyID = nonPriorityProxyID
+		}
+		err := q.enqueue(
+			&announcementEntry{
+				ctx: ctx,
+				announcement: &MatchAnnouncement{
+					ProxyID: proxyID,
+					Properties: MatchProperties{
+						IsPriority: isPriority,
+						CommonCompartmentIDs: []ID{
+							commonCompartmentIDs[prng.Intn(numCompartmentIDs)]},
+						NATType: NATTypeUnknown,
+					}}})
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	iter = q.startMatching(true, commonCompartmentIDs)
+	for i := 0; i < numEntries; i++ {
+		match, isPriority := iter.getNext()
+		if match == nil {
+			return errors.TraceNew("unexpected missing match")
+		}
+		// First half, and only first half, of matches is priority
+		expectPriority := i < numEntries/2
+		if isPriority != expectPriority {
+			return errors.TraceNew("unexpected isPriority")
+		}
+		expectedProxyID := priorityProxyID
+		if !expectPriority {
+			expectedProxyID = nonPriorityProxyID
+		}
+		if match.announcement.ProxyID != expectedProxyID {
+			return errors.TraceNew("unexpected ProxyID")
+		}
+		if !match.queueReference.dequeue() {
+			return errors.TraceNew("unexpected already dequeued")
+		}
+	}
+	match, _ = iter.getNext()
+	if match != nil {
+		return errors.TraceNew("unexpected  match")
+	}
+
 	return nil
 }
 

+ 12 - 3
psiphon/common/inproxy/proxy.go

@@ -202,14 +202,16 @@ func (p *Proxy) Run(ctx context.Context) {
 	// trip is awaited so that:
 	//
 	// - The first announce response will arrive with any new tactics,
-	//   avoiding a start up case where MaxClients initial, concurrent
-	//   announces all return with no-match and a tactics payload.
+	//   which may be applied before launching additions workers.
 	//
 	// - The first worker gets no announcement delay and is also guaranteed to
 	//   be the shared session establisher. Since the announcement delays are
 	//   applied _after_ waitToShareSession, it would otherwise be possible,
 	//   with a race of MaxClient initial, concurrent announces, for the
 	//   session establisher to be a different worker than the no-delay worker.
+	//
+	// The first worker is the only proxy worker which sets
+	// ProxyAnnounceRequest.CheckTactics.
 
 	signalFirstAnnounceCtx, signalFirstAnnounceDone :=
 		context.WithCancel(context.Background())
@@ -612,6 +614,10 @@ func (p *Proxy) proxyOneClient(
 	}
 	p.nextAnnounceMutex.Unlock()
 
+	// Only the first worker, which has signalAnnounceDone configured, checks
+	// for tactics.
+	checkTactics := signalAnnounceDone != nil
+
 	// A proxy ID is implicitly sent with requests; it's the proxy's session
 	// public key.
 	//
@@ -625,6 +631,7 @@ func (p *Proxy) proxyOneClient(
 		&ProxyAnnounceRequest{
 			PersonalCompartmentIDs: personalCompartmentIDs,
 			Metrics:                metrics,
+			CheckTactics:           checkTactics,
 		})
 	if logAnnounce() {
 		p.config.Logger.WithTraceFields(common.LogFields{
@@ -645,7 +652,9 @@ func (p *Proxy) proxyOneClient(
 		// discovery but proceed with handling the proxy announcement
 		// response as there may still be a match.
 
-		if p.config.HandleTacticsPayload(tacticsNetworkID, announceResponse.TacticsPayload) {
+		if p.config.HandleTacticsPayload(
+			tacticsNetworkID, announceResponse.TacticsPayload) {
+
 			p.resetNetworkDiscovery()
 		}
 	}

+ 13 - 2
psiphon/common/parameters/parameters.go

@@ -401,6 +401,8 @@ const (
 	InproxyBrokerMatcherOfferLimitEntryCount           = "InproxyBrokerMatcherOfferLimitEntryCount"
 	InproxyBrokerMatcherOfferRateLimitQuantity         = "InproxyBrokerMatcherOfferRateLimitQuantity"
 	InproxyBrokerMatcherOfferRateLimitInterval         = "InproxyBrokerMatcherOfferRateLimitInterval"
+	InproxyBrokerMatcherPrioritizeProxiesProbability   = "InproxyBrokerMatcherPrioritizeProxiesProbability"
+	InproxyBrokerMatcherPrioritizeProxiesFilter        = "InproxyBrokerMatcherPrioritizeProxiesFilter"
 	InproxyBrokerProxyAnnounceTimeout                  = "InproxyBrokerProxyAnnounceTimeout"
 	InproxyBrokerClientOfferTimeout                    = "InproxyBrokerClientOfferTimeout"
 	InproxyBrokerClientOfferPersonalTimeout            = "InproxyBrokerClientOfferPersonalTimeout"
@@ -916,6 +918,8 @@ var defaultParameters = map[string]struct {
 	InproxyBrokerMatcherOfferLimitEntryCount:           {value: 10, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferRateLimitQuantity:         {value: 50, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferRateLimitInterval:         {value: 1 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyBrokerMatcherPrioritizeProxiesProbability:   {value: 1.0, minimum: 0.0},
+	InproxyBrokerMatcherPrioritizeProxiesFilter:        {value: KeyStrings{}},
 	InproxyBrokerProxyAnnounceTimeout:                  {value: 2 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferTimeout:                    {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferPersonalTimeout:            {value: 5 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
@@ -964,8 +968,8 @@ var defaultParameters = map[string]struct {
 	InproxyPsiphonAPIRequestTimeout:                    {value: 10 * time.Second, minimum: 1 * time.Second, flags: useNetworkLatencyMultiplier},
 	InproxyProxyTotalActivityNoticePeriod:              {value: 5 * time.Minute, minimum: 1 * time.Second},
 	InproxyPersonalPairingConnectionWorkerPoolSize:     {value: 2, minimum: 1},
-	InproxyClientDialRateLimitQuantity:                 {value: 10, minimum: 0},
-	InproxyClientDialRateLimitInterval:                 {value: 1 * time.Minute, minimum: time.Duration(0)},
+	InproxyClientDialRateLimitQuantity:                 {value: 2, minimum: 0},
+	InproxyClientDialRateLimitInterval:                 {value: 500 * time.Millisecond, minimum: time.Duration(0)},
 	InproxyClientNoMatchFailoverProbability:            {value: 0.5, minimum: 0.0},
 	InproxyClientNoMatchFailoverPersonalProbability:    {value: 1.0, minimum: 0.0},
 	InproxyFrontingProviderClientMaxRequestTimeouts:    {value: KeyDurations{}},
@@ -1946,6 +1950,13 @@ func (p ParametersAccessor) KeyStrings(name, key string) []string {
 	return value[key]
 }
 
+// KeyStringsValue returns a complete KeyStrings parameter value.
+func (p ParametersAccessor) KeyStringsValue(name string) KeyStrings {
+	value := KeyStrings{}
+	p.snapshot.getValue(name, &value)
+	return value
+}
+
 // KeyDurations returns a KeyDurations parameter value, with string durations
 // converted to time.Duration.
 func (p ParametersAccessor) KeyDurations(name string) map[string]time.Duration {

+ 4 - 0
psiphon/common/parameters/parameters_test.go

@@ -135,6 +135,10 @@ func TestGetDefaultParameters(t *testing.T) {
 					t.Fatalf("KeyStrings returned %+v expected %+v", g, strings)
 				}
 			}
+			g := p.Get().KeyStringsValue(name)
+			if !reflect.DeepEqual(v, g) {
+				t.Fatalf("KeyStrings returned %+v expected %+v", g, v)
+			}
 		case KeyDurations:
 			g := p.Get().KeyDurations(name)
 			durations := make(map[string]time.Duration)

+ 123 - 21
psiphon/common/tactics/tactics.go

@@ -157,6 +157,7 @@ import (
 	"io/ioutil"
 	"net/http"
 	"sort"
+	"strings"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -164,6 +165,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	lrucache "github.com/cognusion/go-cache-lru"
 	"golang.org/x/crypto/nacl/box"
 )
 
@@ -189,6 +191,7 @@ const (
 	AGGREGATION_MINIMUM                = "Minimum"
 	AGGREGATION_MAXIMUM                = "Maximum"
 	AGGREGATION_MEDIAN                 = "Median"
+	PAYLOAD_CACHE_SIZE                 = 256
 )
 
 var (
@@ -250,6 +253,9 @@ type Server struct {
 	logger                common.Logger
 	logFieldFormatter     common.APIParameterLogFieldFormatter
 	apiParameterValidator common.APIParameterValidator
+
+	cachedTacticsData *lrucache.Cache
+	filterMatches     []bool
 }
 
 const (
@@ -442,6 +448,8 @@ func NewServer(
 		logger:                logger,
 		logFieldFormatter:     logFieldFormatter,
 		apiParameterValidator: apiParameterValidator,
+		cachedTacticsData: lrucache.NewWithLRU(
+			lrucache.NoExpiration, 1*time.Minute, PAYLOAD_CACHE_SIZE),
 	}
 
 	server.ReloadableFile = common.NewReloadableFile(
@@ -467,6 +475,18 @@ func NewServer(
 			server.DefaultTactics = newServer.DefaultTactics
 			server.FilteredTactics = newServer.FilteredTactics
 
+			// Any cached, merged tactics data is flushed when the
+			// configuration changes.
+			//
+			// A single filterMatches, used in getTactics, is allocated here
+			// to avoid allocating a slice per getTactics call.
+			//
+			// Server.ReloadableFile.RLock/RUnlock is the mutex for accessing
+			// these and other Server fields.
+
+			server.cachedTacticsData.Flush()
+			server.filterMatches = make([]bool, len(server.FilteredTactics))
+
 			server.initLookups()
 
 			server.loaded = true
@@ -730,6 +750,8 @@ func (server *Server) GetFilterGeoIPScope(geoIPData common.GeoIPData) int {
 //
 // Elements of the returned Payload, e.g., tactics parameters, will point to
 // data in DefaultTactics and FilteredTactics and must not be modifed.
+//
+// Callers must not mutate returned tactics data, which is cached.
 func (server *Server) GetTacticsPayload(
 	geoIPData common.GeoIPData,
 	apiParams common.APIParameters) (*Payload, error) {
@@ -737,22 +759,17 @@ func (server *Server) GetTacticsPayload(
 	// includeServerSideOnly is false: server-side only parameters are not
 	// used by the client, so including them wastes space and unnecessarily
 	// exposes the values.
-	tactics, err := server.GetTactics(false, geoIPData, apiParams)
+	tacticsData, err := server.getTactics(false, geoIPData, apiParams)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
 
-	if tactics == nil {
+	if tacticsData == nil {
 		return nil, nil
 	}
 
-	marshaledTactics, tag, err := marshalTactics(tactics)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-
 	payload := &Payload{
-		Tag: tag,
+		Tag: tacticsData.tag,
 	}
 
 	// New clients should always send STORED_TACTICS_TAG_PARAMETER_NAME. When they have no
@@ -777,7 +794,7 @@ func (server *Server) GetTacticsPayload(
 	}
 
 	if sendPayloadTactics {
-		payload.Tactics = marshaledTactics
+		payload.Tactics = tacticsData.payload
 	}
 
 	return payload, nil
@@ -797,37 +814,63 @@ func marshalTactics(tactics *Tactics) ([]byte, string, error) {
 }
 
 // GetTacticsWithTag returns a GetTactics value along with the associated tag value.
+//
+// Callers must not mutate returned tactics data, which is cached.
 func (server *Server) GetTacticsWithTag(
 	includeServerSideOnly bool,
 	geoIPData common.GeoIPData,
 	apiParams common.APIParameters) (*Tactics, string, error) {
 
-	tactics, err := server.GetTactics(
+	tacticsData, err := server.getTactics(
 		includeServerSideOnly, geoIPData, apiParams)
 	if err != nil {
 		return nil, "", errors.Trace(err)
 	}
 
-	if tactics == nil {
+	if tacticsData == nil {
 		return nil, "", nil
 	}
 
-	_, tag, err := marshalTactics(tactics)
+	return tacticsData.tactics, tacticsData.tag, nil
+}
+
+// tacticsData is cached tactics data, including the merged Tactics object,
+// the JSON marshaled paylod, and hashed tag.
+type tacticsData struct {
+	tactics *Tactics
+	payload []byte
+	tag     string
+}
+
+func newTacticsData(tactics *Tactics) (*tacticsData, error) {
+
+	payload, err := json.Marshal(tactics)
 	if err != nil {
-		return nil, "", errors.Trace(err)
+		return nil, errors.Trace(err)
 	}
 
-	return tactics, tag, nil
+	// MD5 hash is used solely as a data checksum and not for any security
+	// purpose.
+	digest := md5.Sum(payload)
+	tag := hex.EncodeToString(digest[:])
+
+	return &tacticsData{
+		tactics: tactics,
+		payload: payload,
+		tag:     tag,
+	}, nil
 }
 
 // GetTactics assembles and returns tactics data for a client with the
 // specified GeoIP, API parameter, and speed test attributes.
 //
 // The tactics return value may be nil.
-func (server *Server) GetTactics(
+//
+// Callers must not mutate returned tactics data, which is cached.
+func (server *Server) getTactics(
 	includeServerSideOnly bool,
 	geoIPData common.GeoIPData,
-	apiParams common.APIParameters) (*Tactics, error) {
+	apiParams common.APIParameters) (*tacticsData, error) {
 
 	server.ReloadableFile.RLock()
 	defer server.ReloadableFile.RUnlock()
@@ -837,11 +880,19 @@ func (server *Server) GetTactics(
 		return nil, nil
 	}
 
-	tactics := server.DefaultTactics.clone(includeServerSideOnly)
+	// Two passes are performed, one to get the list of matching filters, and
+	// then, if no merged tactics data is found for that filter match set,
+	// another pass to merge all the tactics parameters.
 
 	var aggregatedValues map[string]int
+	filterMatchCount := 0
 
-	for _, filteredTactics := range server.FilteredTactics {
+	// Use the preallocated slice to avoid an allocation per getTactics call.
+	filterMatches := server.filterMatches
+
+	for filterIndex, filteredTactics := range server.FilteredTactics {
+
+		filterMatches[filterIndex] = false
 
 		if len(filteredTactics.Filter.Regions) > 0 {
 			if filteredTactics.Filter.regionLookup != nil {
@@ -944,15 +995,66 @@ func (server *Server) GetTactics(
 			}
 		}
 
-		tactics.merge(includeServerSideOnly, &filteredTactics.Tactics)
+		filterMatchCount += 1
+		filterMatches[filterIndex] = true
+
+		// Continue to check for more matches. Last matching tactics filter
+		// has priority for any field.
+	}
+
+	// For any filter match set, the merged tactics parameters are the same,
+	// so the resulting merge is cached, along with the JSON encoding of the
+	// payload and hash tag. This cache reduces, for repeated tactics
+	// requests, heavy allocations from the JSON marshal and CPU load from
+	// both the marshal and hashing the marshal result.
+	//
+	// getCacheKey still allocates a strings.Builder buffer.
+	//
+	// TODO: log cache metrics; similar to what is done in
+	// psiphon/server.ServerTacticsParametersCache.GetMetrics.
 
-		// Continue to apply more matches. Last matching tactics has priority for any field.
+	cacheKey := getCacheKey(filterMatchCount > 0, filterMatches)
+
+	cacheValue, ok := server.cachedTacticsData.Get(cacheKey)
+	if ok {
+		return cacheValue.(*tacticsData), nil
+	}
+
+	tactics := server.DefaultTactics.clone(includeServerSideOnly)
+	if filterMatchCount > 0 {
+		for filterIndex, filteredTactics := range server.FilteredTactics {
+			if filterMatches[filterIndex] {
+				tactics.merge(includeServerSideOnly, &filteredTactics.Tactics)
+			}
+		}
 	}
 
 	// See Tactics.Probability doc comment.
 	tactics.Probability = 1.0
 
-	return tactics, nil
+	tacticsData, err := newTacticsData(tactics)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	server.cachedTacticsData.Set(cacheKey, tacticsData, 0)
+
+	return tacticsData, nil
+}
+
+func getCacheKey(hasFilterMatches bool, filterMatches []bool) string {
+	// When no filters match, the key is "". The input hasFilterMatches allows
+	// for skipping the strings.Builder setup and loop entirely.
+	if !hasFilterMatches {
+		return ""
+	}
+	var b strings.Builder
+	for filterIndex, match := range filterMatches {
+		if match {
+			fmt.Fprintf(&b, "%x-", filterIndex)
+		}
+	}
+	return b.String()
 }
 
 // TODO: refactor this copy of psiphon/server.getStringRequestParam into common?

+ 71 - 0
psiphon/common/tactics/tactics_test.go

@@ -93,6 +93,16 @@ func TestTactics(t *testing.T) {
             }
           }
         },
+        {
+          "Filter" : {
+            "APIParameters" : {"client_platform" : ["P2"], "client_version": ["V2"]}
+          },
+          "Tactics" : {
+            "Parameters" : {
+              "ConnectionWorkerPoolSize" : 1
+            }
+          }
+        },
         {
           "Filter" : {
             "Regions": ["R2"]
@@ -323,6 +333,24 @@ func TestTactics(t *testing.T) {
 		}
 	}
 
+	// Helper to check server-side cachedTacticsData state
+
+	checkServerCache := func(cacheEntryFilterMatches ...[]bool) {
+
+		cacheItems := server.cachedTacticsData.Items()
+		if len(cacheItems) != len(cacheEntryFilterMatches) {
+			t.Fatalf("Unexpected cachedTacticsData size: %v", len(cacheItems))
+		}
+
+		for _, filterMatches := range cacheEntryFilterMatches {
+			cacheKey := getCacheKey(true, filterMatches)
+			_, ok := server.cachedTacticsData.Get(cacheKey)
+			if !ok {
+				t.Fatalf("Unexpected missing cachedTacticsData entry: %v", filterMatches)
+			}
+		}
+	}
+
 	// Initial tactics request; will also run a speed test
 
 	// Request should complete in < 1 second
@@ -352,6 +380,10 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(initialFetchTacticsRecord)
 
+	// Server should be caching tactics data for tactics matching first two
+	// filters.
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// There should now be cached local tactics
 
 	storedTacticsRecord, err := UseStoredTactics(storer, networkID)
@@ -434,6 +466,9 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(fetchTacticsRecord)
 
+	// Server cache should be the same
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// Modify tactics configuration to change payload
 
 	tacticsConnectionWorkerPoolSize = 6
@@ -474,6 +509,9 @@ func TestTactics(t *testing.T) {
 		t.Fatalf("Server config failed to reload")
 	}
 
+	// Server cache should be flushed
+	checkServerCache()
+
 	// Next fetch should return a different payload
 
 	fetchTacticsRecord, err = FetchTactics(
@@ -509,6 +547,8 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(fetchTacticsRecord)
 
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// Exercise handshake transport of tactics
 
 	// Wait for tactics to expire; handshake should renew
@@ -563,6 +603,8 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(handshakeTacticsRecord)
 
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// Now there should be stored tactics
 
 	storedTacticsRecord, err = UseStoredTactics(storer, networkID)
@@ -596,6 +638,35 @@ func TestTactics(t *testing.T) {
 		t.Fatalf("unexpected stored tactics record")
 	}
 
+	// Server should cache a new entry for different filter matches
+
+	apiParams2 := common.APIParameters{
+		"client_platform": "P2",
+		"client_version":  "V2"}
+
+	fetchTacticsRecord, err = FetchTactics(
+		context.Background(),
+		params,
+		storer,
+		getNetworkID,
+		apiParams2,
+		endPointProtocol,
+		endPointRegion,
+		encodedRequestPublicKey,
+		encodedObfuscatedKey,
+		obfuscatedRoundTripper)
+	if err != nil {
+		t.Fatalf("FetchTactics failed: %s", err)
+	}
+
+	if fetchTacticsRecord == nil {
+		t.Fatalf("expected tactics record")
+	}
+
+	checkServerCache(
+		[]bool{true, true, false, false, false},
+		[]bool{false, false, true, false, false})
+
 	// Exercise speed test sample truncation
 
 	maxSamples := params.Get().Int(parameters.SpeedTestMaxSampleCount)

+ 23 - 8
psiphon/server/config.go

@@ -496,11 +496,12 @@ type Config struct {
 	peakUpstreamFailureRateMinimumSampleSize       int
 	periodicGarbageCollection                      time.Duration
 	stopEstablishTunnelsEstablishedClientThreshold int
-	dumpProfilesOnStopEstablishTunnelsDone         int32
+	dumpProfilesOnStopEstablishTunnelsDoneOnce     int32
 	providerID                                     string
 	frontingProviderID                             string
 	region                                         string
 	runningProtocols                               []string
+	runningOnlyInproxyBroker                       bool
 }
 
 // GetLogFileReopenConfig gets the reopen retries, and create/mode inputs for
@@ -547,11 +548,18 @@ func (config *Config) DumpProfilesOnStopEstablishTunnels(establishedClientsCount
 	if config.stopEstablishTunnelsEstablishedClientThreshold < 0 {
 		return false
 	}
-	if atomic.LoadInt32(&config.dumpProfilesOnStopEstablishTunnelsDone) != 0 {
+	if config.runningOnlyInproxyBroker {
+		// There will always be zero established clients when running only the
+		// in-proxy broker and no tunnel protocols.
+		return false
+	}
+	if atomic.LoadInt32(&config.dumpProfilesOnStopEstablishTunnelsDoneOnce) != 0 {
 		return false
 	}
 	dump := (establishedClientsCount <= config.stopEstablishTunnelsEstablishedClientThreshold)
-	atomic.StoreInt32(&config.dumpProfilesOnStopEstablishTunnelsDone, 1)
+	if dump {
+		atomic.StoreInt32(&config.dumpProfilesOnStopEstablishTunnelsDoneOnce, 1)
+	}
 	return dump
 }
 
@@ -584,6 +592,12 @@ func (config *Config) GetRunningProtocols() []string {
 	return config.runningProtocols
 }
 
+// GetRunningOnlyInproxyBroker indicates if the server is running only the
+// in-proxy broker and no tunnel protocols.
+func (config *Config) GetRunningOnlyInproxyBroker() bool {
+	return config.runningOnlyInproxyBroker
+}
+
 // LoadConfig loads and validates a JSON encoded server config.
 func LoadConfig(configJSON []byte) (*Config, error) {
 
@@ -627,6 +641,9 @@ func LoadConfig(configJSON []byte) (*Config, error) {
 		}
 	}
 
+	config.runningProtocols = []string{}
+	config.runningOnlyInproxyBroker = config.MeekServerRunInproxyBroker
+
 	for tunnelProtocol := range config.TunnelProtocolPorts {
 		if !common.Contains(protocol.SupportedTunnelProtocols, tunnelProtocol) {
 			return nil, errors.Tracef("Unsupported tunnel protocol: %s", tunnelProtocol)
@@ -693,6 +710,9 @@ func LoadConfig(configJSON []byte) (*Config, error) {
 					protocol.TUNNEL_PROTOCOL_FRONTED_MEEK)
 			}
 		}
+
+		config.runningProtocols = append(config.runningProtocols, tunnelProtocol)
+		config.runningOnlyInproxyBroker = false
 	}
 
 	for tunnelProtocol, address := range config.TunnelProtocolPassthroughAddresses {
@@ -796,11 +816,6 @@ func LoadConfig(configJSON []byte) (*Config, error) {
 		}
 	}
 
-	config.runningProtocols = []string{}
-	for tunnelProtocol := range config.TunnelProtocolPorts {
-		config.runningProtocols = append(config.runningProtocols, tunnelProtocol)
-	}
-
 	return &config, nil
 }
 

+ 34 - 0
psiphon/server/meek.go

@@ -351,6 +351,7 @@ func NewMeekServer(
 			&inproxy.BrokerConfig{
 				Logger:                         CommonLogger(log),
 				AllowProxy:                     meekServer.inproxyBrokerAllowProxy,
+				PrioritizeProxy:                meekServer.inproxyBrokerPrioritizeProxy,
 				AllowClient:                    meekServer.inproxyBrokerAllowClient,
 				AllowDomainFrontedDestinations: meekServer.inproxyBrokerAllowDomainFrontedDestinations,
 				LookupGeoIP:                    lookupGeoIPData,
@@ -358,6 +359,7 @@ func NewMeekServer(
 				APIParameterLogFieldFormatter:  getInproxyBrokerAPIParameterLogFieldFormatter(),
 				IsValidServerEntryTag:          support.PsinetDatabase.IsValidServerEntryTag,
 				GetTacticsPayload:              meekServer.inproxyBrokerGetTacticsPayload,
+				IsLoadLimiting:                 meekServer.support.TunnelServer.CheckLoadLimiting,
 				PrivateKey:                     sessionPrivateKey,
 				ObfuscationRootSecret:          obfuscationRootSecret,
 				ServerEntrySignaturePublicKey:  support.Config.InproxyBrokerServerEntrySignaturePublicKey,
@@ -1825,6 +1827,7 @@ func (server *MeekServer) inproxyReloadTactics() error {
 	if err != nil {
 		return errors.Trace(err)
 	}
+	defer p.Close()
 	if p.IsNil() {
 		return nil
 	}
@@ -1863,12 +1866,14 @@ func (server *MeekServer) inproxyReloadTactics() error {
 }
 
 func (server *MeekServer) lookupAllowTactic(geoIPData common.GeoIPData, parameterName string) bool {
+
 	// Fallback to not-allow on failure or nil tactics.
 	p, err := server.support.ServerTacticsParametersCache.Get(GeoIPData(geoIPData))
 	if err != nil {
 		log.WithTraceFields(LogFields{"error": err}).Warning("ServerTacticsParametersCache.Get failed")
 		return false
 	}
+	defer p.Close()
 	if p.IsNil() {
 		return false
 	}
@@ -1887,6 +1892,35 @@ func (server *MeekServer) inproxyBrokerAllowDomainFrontedDestinations(clientGeoI
 	return server.lookupAllowTactic(clientGeoIPData, parameters.InproxyAllowDomainFrontedDestinations)
 }
 
+func (server *MeekServer) inproxyBrokerPrioritizeProxy(
+	proxyGeoIPData common.GeoIPData, proxyAPIParams common.APIParameters) bool {
+
+	// Fallback to not-prioritized on failure or nil tactics.
+	p, err := server.support.ServerTacticsParametersCache.Get(GeoIPData(proxyGeoIPData))
+	if err != nil {
+		log.WithTraceFields(LogFields{"error": err}).Warning("ServerTacticsParametersCache.Get failed")
+		return false
+	}
+	defer p.Close()
+	if p.IsNil() {
+		return false
+	}
+	filter := p.KeyStringsValue(parameters.InproxyBrokerMatcherPrioritizeProxiesFilter)
+	if len(filter) == 0 {
+		return false
+	}
+	for name, values := range filter {
+		proxyValue, err := getStringRequestParam(proxyAPIParams, name)
+		if err != nil || !common.ContainsWildcard(values, proxyValue) {
+			return false
+		}
+	}
+	if !p.WeightedCoinFlip(parameters.InproxyBrokerMatcherPrioritizeProxiesProbability) {
+		return false
+	}
+	return true
+}
+
 // inproxyBrokerGetTacticsPayload is a callback used by the in-proxy broker to
 // provide tactics to proxies.
 //

+ 9 - 2
psiphon/server/services.go

@@ -315,10 +315,14 @@ func RunServices(configJSON []byte) (retErr error) {
 	// SIGUSR2 triggers an immediate load log and optional process profile output
 	logServerLoadSignal := makeSIGUSR2Channel()
 
-	// SIGTSTP triggers tunnelServer to stop establishing new tunnels
+	// SIGTSTP triggers tunnelServer to stop establishing new tunnels. The
+	// in-proxy broker, if running, may also stop enqueueing announces or
+	// offers.
 	stopEstablishingTunnelsSignal := makeSIGTSTPChannel()
 
-	// SIGCONT triggers tunnelServer to resume establishing new tunnels
+	// SIGCONT triggers tunnelServer to resume establishing new tunnels. The
+	// in-proxy broker, if running, will resume enqueueing all announces and
+	// offers.
 	resumeEstablishingTunnelsSignal := makeSIGCONTChannel()
 
 	err = nil
@@ -329,6 +333,9 @@ loop:
 		case <-stopEstablishingTunnelsSignal:
 			tunnelServer.SetEstablishTunnels(false)
 
+			// Dump profiles when entering the load limiting state with an
+			// unexpectedly low established client count, as determined by
+			// DumpProfilesOnStopEstablishTunnels.
 			if config.DumpProfilesOnStopEstablishTunnels(
 				tunnelServer.GetEstablishedClientCount()) {
 

+ 11 - 0
psiphon/server/tactics.go

@@ -135,6 +135,17 @@ func (c *ServerTacticsParametersCache) Get(
 
 	// Construct parameters from tactics.
 
+	// Note: since ServerTacticsParametersCache was implemented,
+	// tactics.Server.cachedTacticsData was added. This new cache is
+	// primarily intended to reduce server allocations and computations
+	// when _clients_ request tactics. cachedTacticsData also impacts
+	// GetTacticsWithTag.
+	//
+	// ServerTacticsParametersCache still optimizes performance for
+	// server-side tactics, since cachedTacticsData doesn't avoid filter
+	// checks, and ServerTacticsParametersCache includes a prepared
+	// parameters.ParametersAccessor.
+
 	tactics, tag, err := c.support.TacticsServer.GetTacticsWithTag(
 		true, common.GeoIPData(geoIPData), make(common.APIParameters))
 	if err != nil {

+ 15 - 0
psiphon/server/tunnelServer.go

@@ -324,6 +324,13 @@ func (server *TunnelServer) CheckEstablishTunnels() bool {
 	return server.sshServer.checkEstablishTunnels()
 }
 
+// CheckLoadLimiting returns whether the server is in the load limiting state,
+// which is when EstablishTunnels is false. CheckLoadLimiting is intended to
+// be checked by non-tunnel components; no metrics are updated by this call.
+func (server *TunnelServer) CheckLoadLimiting() bool {
+	return server.sshServer.checkLoadLimiting()
+}
+
 // GetEstablishTunnelsMetrics returns whether tunnel establishment is
 // currently allowed and the number of tunnels rejected since due to not
 // establishing since the last GetEstablishTunnelsMetrics call.
@@ -525,6 +532,14 @@ func (sshServer *sshServer) checkEstablishTunnels() bool {
 	return establishTunnels
 }
 
+func (sshServer *sshServer) checkLoadLimiting() bool {
+
+	// The server is in a general load limiting state when
+	// sshServer.establishTunnels is false (0). This check is intended to be
+	// used by non-tunnel components and no metrics are updated by this call.
+	return atomic.LoadInt32(&sshServer.establishTunnels) == 0
+}
+
 func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
 	return atomic.LoadInt32(&sshServer.establishTunnels) == 1,
 		atomic.SwapInt64(&sshServer.establishLimitedCount, 0)