Просмотр исходного кода

Add InproxyBrokerMatcherPrioritizeProxies mechanism

Rod Hynes 1 год назад
Родитель
Сommit
5b012e0bf2

+ 43 - 13
psiphon/common/inproxy/broker.go

@@ -122,6 +122,15 @@ type BrokerConfig struct {
 	// clients. Proxies with personal compartment IDs are always allowed.
 	AllowProxy func(common.GeoIPData) bool
 
+	// PrioritizeProxy is a callback which can indicate whether proxy
+	// announcements from proxies with the specified GeoIPData and
+	// APIParameters should be prioritized in the matcher queue. Priority
+	// proxy announcements match ahead of other proxy announcements,
+	// regardless of announcement age/deadline. Priority status takes
+	// precedence over preferred NAT matching. Prioritization applies only to
+	// common compartment IDs and not personal pairing mode.
+	PrioritizeProxy func(common.GeoIPData, common.APIParameters) bool
+
 	// AllowClient is a callback which can indicate whether a client with the
 	// given GeoIP data is allowed to match with common compartment ID
 	// proxies. Clients are always allowed to match based on personal
@@ -548,6 +557,8 @@ func (b *Broker) handleProxyAnnounce(
 		return nil, errors.Trace(err)
 	}
 
+	hasPersonalCompartmentIDs := len(announceRequest.PersonalCompartmentIDs) > 0
+
 	// Return MustUpgrade when the proxy's protocol version is less than the
 	// minimum required.
 	if announceRequest.Metrics.ProxyProtocolVersion < MinimumProxyProtocolVersion {
@@ -596,7 +607,7 @@ func (b *Broker) handleProxyAnnounce(
 	// compartment IDs are always allowed, as they will be used only by
 	// clients specifically configured to use them.
 
-	if len(announceRequest.PersonalCompartmentIDs) == 0 &&
+	if !hasPersonalCompartmentIDs &&
 		!b.config.AllowProxy(geoIPData) {
 
 		return nil, errors.TraceNew("proxy disallowed")
@@ -608,7 +619,7 @@ func (b *Broker) handleProxyAnnounce(
 	// assigned to the same compartment.
 
 	var commonCompartmentIDs []ID
-	if len(announceRequest.PersonalCompartmentIDs) == 0 {
+	if !hasPersonalCompartmentIDs {
 		compartmentID, err := b.selectCommonCompartmentID(proxyID)
 		if err != nil {
 			return nil, errors.Trace(err)
@@ -616,6 +627,28 @@ func (b *Broker) handleProxyAnnounce(
 		commonCompartmentIDs = []ID{compartmentID}
 	}
 
+	// In the common compartment ID case, invoke the callback to check if the
+	// announcement should be prioritized.
+
+	isPriority := false
+	if b.config.PrioritizeProxy != nil && !hasPersonalCompartmentIDs {
+
+		// Limitation: Of the two return values from
+		// ValidateAndGetParametersAndLogFields, apiParams and logFields,
+		// only logFields contains fields such as max_clients
+		// and *_bytes_per_second, and so these cannot be part of any
+		// filtering performed by the PrioritizeProxy callback.
+		//
+		// TODO: include the additional fields in logFields. Since the
+		// logFields return value is the output of server.getRequestLogFields
+		// processing, it's not safe to use it directly. In addition,
+		// filtering by fields such as max_clients and *_bytes_per_second
+		// calls for range filtering, which is not yet supported in the
+		// psiphon/server.MeekServer PrioritizeProxy provider.
+
+		isPriority = b.config.PrioritizeProxy(geoIPData, apiParams)
+	}
+
 	// Await client offer.
 
 	timeout := common.ValueOrDefault(
@@ -638,6 +671,7 @@ func (b *Broker) handleProxyAnnounce(
 		proxyIP,
 		&MatchAnnouncement{
 			Properties: MatchProperties{
+				IsPriority:             isPriority,
 				CommonCompartmentIDs:   commonCompartmentIDs,
 				PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
 				GeoIPData:              geoIPData,
@@ -830,6 +864,8 @@ func (b *Broker) handleClientOffer(
 		return nil, errors.Trace(err)
 	}
 
+	hasPersonalCompartmentIDs := len(offerRequest.PersonalCompartmentIDs) > 0
+
 	offerSDP := offerRequest.ClientOfferSDP
 	offerSDP.SDP = string(filteredSDP)
 
@@ -837,16 +873,10 @@ func (b *Broker) handleClientOffer(
 	// from offering. Clients are always allowed to match proxies with shared
 	// personal compartment IDs.
 
-	commonCompartmentIDs := offerRequest.CommonCompartmentIDs
-
-	if !b.config.AllowClient(geoIPData) {
-
-		if len(offerRequest.PersonalCompartmentIDs) == 0 {
-			return nil, errors.TraceNew("client disallowed")
-		}
+	if !hasPersonalCompartmentIDs &&
+		!b.config.AllowClient(geoIPData) {
 
-		// Only match personal compartment IDs.
-		commonCompartmentIDs = nil
+		return nil, errors.TraceNew("client disallowed")
 	}
 
 	// Validate that the proxy destination specified by the client is a valid
@@ -884,7 +914,7 @@ func (b *Broker) handleClientOffer(
 	// personal pairing mode, to facilitate a faster no-match result and
 	// resulting broker rotation.
 	var timeout time.Duration
-	if len(offerRequest.PersonalCompartmentIDs) > 0 {
+	if hasPersonalCompartmentIDs {
 		timeout = time.Duration(atomic.LoadInt64(&b.clientOfferPersonalTimeout))
 	} else {
 		timeout = time.Duration(atomic.LoadInt64(&b.clientOfferTimeout))
@@ -901,7 +931,7 @@ func (b *Broker) handleClientOffer(
 
 	clientMatchOffer = &MatchOffer{
 		Properties: MatchProperties{
-			CommonCompartmentIDs:   commonCompartmentIDs,
+			CommonCompartmentIDs:   offerRequest.CommonCompartmentIDs,
 			PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
 			GeoIPData:              geoIPData,
 			NetworkType:            GetNetworkType(offerRequest.Metrics.BaseAPIParameters),

+ 87 - 28
psiphon/common/inproxy/matcher.go

@@ -114,6 +114,7 @@ type Matcher struct {
 // MatchProperties specifies the compartment, GeoIP, and network topology
 // matching roperties of clients and proxies.
 type MatchProperties struct {
+	IsPriority             bool
 	CommonCompartmentIDs   []ID
 	PersonalCompartmentIDs []ID
 	GeoIPData              common.GeoIPData
@@ -775,16 +776,27 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
 	var bestMatch *announcementEntry
 	bestMatchIndex := -1
+	bestMatchIsPriority := false
 	bestMatchNAT := false
 
 	candidateIndex := -1
 	for {
 
-		announcementEntry := matchIterator.getNext()
+		announcementEntry, isPriority := matchIterator.getNext()
 		if announcementEntry == nil {
 			break
 		}
 
+		if !isPriority && bestMatchIsPriority {
+
+			// There is a priority match, but it wasn't bestMatchNAT and we
+			// continued to iterate. Now that isPriority is false, we're past the
+			// end of the priority items, so stop looking for any best NAT match
+			// and return the previous priority match. When there are zero
+			// priority items to begin with, this case should not be hit.
+			break
+		}
+
 		candidateIndex += 1
 
 		// Skip and remove this announcement if its deadline has already
@@ -829,8 +841,8 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
 			bestMatch = announcementEntry
 			bestMatchIndex = candidateIndex
+			bestMatchIsPriority = isPriority
 			bestMatchNAT = matchNAT
-
 		}
 
 		// Stop as soon as we have the best possible match, or have reached
@@ -1107,9 +1119,10 @@ func getRateLimitIP(strIP string) string {
 // matching a specified list of compartment IDs. announcementMultiQueue and
 // its underlying data structures are not safe for concurrent access.
 type announcementMultiQueue struct {
-	commonCompartmentQueues   map[ID]*announcementCompartmentQueue
-	personalCompartmentQueues map[ID]*announcementCompartmentQueue
-	totalEntries              int
+	priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
+	commonCompartmentQueues         map[ID]*announcementCompartmentQueue
+	personalCompartmentQueues       map[ID]*announcementCompartmentQueue
+	totalEntries                    int
 }
 
 // announcementCompartmentQueue is a single compartment queue within an
@@ -1120,6 +1133,7 @@ type announcementMultiQueue struct {
 // matches may be possible.
 type announcementCompartmentQueue struct {
 	isCommonCompartment      bool
+	isPriority               bool
 	compartmentID            ID
 	entries                  *list.List
 	unlimitedNATCount        int
@@ -1131,11 +1145,10 @@ type announcementCompartmentQueue struct {
 // subset of announcementMultiQueue compartment queues. Concurrent
 // announcementMatchIterators are not supported.
 type announcementMatchIterator struct {
-	multiQueue           *announcementMultiQueue
-	isCommonCompartments bool
-	compartmentQueues    []*announcementCompartmentQueue
-	compartmentIDs       []ID
-	nextEntries          []*list.Element
+	multiQueue        *announcementMultiQueue
+	compartmentQueues []*announcementCompartmentQueue
+	compartmentIDs    []ID
+	nextEntries       []*list.Element
 }
 
 // announcementQueueReference represents the queue position for a given
@@ -1148,8 +1161,9 @@ type announcementQueueReference struct {
 
 func newAnnouncementMultiQueue() *announcementMultiQueue {
 	return &announcementMultiQueue{
-		commonCompartmentQueues:   make(map[ID]*announcementCompartmentQueue),
-		personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
+		priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
+		commonCompartmentQueues:         make(map[ID]*announcementCompartmentQueue),
+		personalCompartmentQueues:       make(map[ID]*announcementCompartmentQueue),
 	}
 }
 
@@ -1179,22 +1193,31 @@ func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) e
 		return errors.TraceNew("announcement must specify exactly one compartment ID")
 	}
 
+	isPriority := announcementEntry.announcement.Properties.IsPriority
+
 	isCommonCompartment := true
 	var compartmentID ID
 	var compartmentQueues map[ID]*announcementCompartmentQueue
 	if len(commonCompartmentIDs) > 0 {
 		compartmentID = commonCompartmentIDs[0]
 		compartmentQueues = q.commonCompartmentQueues
+		if isPriority {
+			compartmentQueues = q.priorityCommonCompartmentQueues
+		}
 	} else {
 		isCommonCompartment = false
 		compartmentID = personalCompartmentIDs[0]
 		compartmentQueues = q.personalCompartmentQueues
+		if isPriority {
+			return errors.TraceNew("priority not supported for personal compartments")
+		}
 	}
 
 	compartmentQueue, ok := compartmentQueues[compartmentID]
 	if !ok {
 		compartmentQueue = &announcementCompartmentQueue{
 			isCommonCompartment: isCommonCompartment,
+			isPriority:          isPriority,
 			compartmentID:       compartmentID,
 			entries:             list.New(),
 		}
@@ -1250,9 +1273,14 @@ func (r *announcementQueueReference) dequeue() bool {
 
 	if r.compartmentQueue.entries.Len() == 0 {
 		// Remove empty compartment queue.
-		queues := r.multiQueue.commonCompartmentQueues
-		if !r.compartmentQueue.isCommonCompartment {
-			queues = r.multiQueue.personalCompartmentQueues
+		queues := r.multiQueue.personalCompartmentQueues
+		if r.compartmentQueue.isCommonCompartment {
+			if r.compartmentQueue.isPriority {
+				queues = r.multiQueue.priorityCommonCompartmentQueues
+
+			} else {
+				queues = r.multiQueue.commonCompartmentQueues
+			}
 		}
 		delete(queues, r.compartmentQueue.compartmentID)
 	}
@@ -1270,8 +1298,7 @@ func (q *announcementMultiQueue) startMatching(
 	compartmentIDs []ID) *announcementMatchIterator {
 
 	iter := &announcementMatchIterator{
-		multiQueue:           q,
-		isCommonCompartments: isCommonCompartments,
+		multiQueue: q,
 	}
 
 	// Find the matching compartment queues and initialize iteration over
@@ -1280,16 +1307,29 @@ func (q *announcementMultiQueue) startMatching(
 	// maxCompartmentIDs, as enforced in
 	// ClientOfferRequest.ValidateAndGetLogFields).
 
-	compartmentQueues := q.commonCompartmentQueues
-	if !isCommonCompartments {
-		compartmentQueues = q.personalCompartmentQueues
+	// Priority queues, when in use, must all be added to the beginning of
+	// iter.compartmentQueues in order to ensure that the iteration logic in
+	// getNext visits all priority items first.
+
+	var compartmentQueuesList []map[ID]*announcementCompartmentQueue
+	if isCommonCompartments {
+		compartmentQueuesList = append(
+			compartmentQueuesList,
+			q.priorityCommonCompartmentQueues,
+			q.commonCompartmentQueues)
+	} else {
+		compartmentQueuesList = append(
+			compartmentQueuesList,
+			q.personalCompartmentQueues)
 	}
 
-	for _, ID := range compartmentIDs {
-		if compartmentQueue, ok := compartmentQueues[ID]; ok {
-			iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
-			iter.compartmentIDs = append(iter.compartmentIDs, ID)
-			iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
+	for _, compartmentQueues := range compartmentQueuesList {
+		for _, ID := range compartmentIDs {
+			if compartmentQueue, ok := compartmentQueues[ID]; ok {
+				iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
+				iter.compartmentIDs = append(iter.compartmentIDs, ID)
+				iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
+			}
 		}
 	}
 
@@ -1327,10 +1367,16 @@ func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
 // are not supported during iteration. Iteration and dequeue should all be
 // performed with a lock over the entire announcementMultiQueue, and with
 // only one concurrent announcementMatchIterator.
-func (iter *announcementMatchIterator) getNext() *announcementEntry {
+//
+// getNext returns a nil *announcementEntry when there are no more items.
+// getNext also returns an isPriority flag, indicating the announcement is a
+// priority candidate. All priority candidates are guaranteed to be returned
+// before any non-priority candidates.
+func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
 
 	// Assumes announcements are enqueued in announcementEntry.ctx.Deadline
-	// order.
+	// order. Also assumes that any priority queues are all at the front of
+	// iter.compartmentQueues.
 
 	// Select the oldest item, by deadline, from all the candidate queue head
 	// items. This operation is linear in the number of matching compartment
@@ -1338,6 +1384,10 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 	// compartment IDs (no more than maxCompartmentIDs, as enforced in
 	// ClientOfferRequest.ValidateAndGetLogFields).
 	//
+	// When there are priority candidates, they are selected first, regardless
+	// of the deadlines of non-priority candidates. Multiple priority
+	// candidates are processed in FIFO deadline order.
+	//
 	// A potential future enhancement is to add more iterator state to track
 	// which queue has the next oldest time to select on the following
 	// getNext call. Another potential enhancement is to remove fully
@@ -1345,14 +1395,22 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 
 	var selectedCandidate *announcementEntry
 	selectedIndex := -1
+	selectedPriority := false
 
 	for i := 0; i < len(iter.compartmentQueues); i++ {
 		if iter.nextEntries[i] == nil {
 			continue
 		}
+		isPriority := iter.compartmentQueues[i].isPriority
+		if selectedPriority && !isPriority {
+			// Ignore older of non-priority entries when there are priority
+			// candidates.
+			break
+		}
 		if selectedCandidate == nil {
 			selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
 			selectedIndex = i
+			selectedPriority = isPriority
 		} else {
 			candidate := iter.nextEntries[i].Value.(*announcementEntry)
 			deadline, deadlineOk := candidate.ctx.Deadline()
@@ -1360,6 +1418,7 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 			if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
 				selectedCandidate = candidate
 				selectedIndex = i
+				selectedPriority = isPriority
 			}
 		}
 	}
@@ -1371,5 +1430,5 @@ func (iter *announcementMatchIterator) getNext() *announcementEntry {
 		iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
 	}
 
-	return selectedCandidate
+	return selectedCandidate, selectedPriority
 }

+ 130 - 31
psiphon/common/inproxy/matcher_test.go

@@ -617,6 +617,31 @@ func runTestMatcher() error {
 		return errors.Trace(err)
 	}
 
+	// Test: priority supercedes preferred NAT match
+
+	go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
+	time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
+	proxy2Properties.IsPriority = true
+	go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
+	time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
+	go clientFunc(clientResultChan, clientIP, client2Properties, 10*time.Millisecond)
+
+	err = <-proxy1ResultChan
+	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
+		return errors.Tracef("unexpected result: %v", err)
+	}
+
+	// proxy2 should match since it's the priority, but not preferred NAT match
+	err = <-proxy2ResultChan
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	err = <-clientResultChan
+	if err != nil {
+		return errors.Trace(err)
+	}
+
 	// Test: many matches
 
 	// Reduce test log noise for this phase of the test
@@ -675,24 +700,26 @@ func TestMatcherMultiQueue(t *testing.T) {
 
 func runTestMatcherMultiQueue() error {
 
-	q := newAnnouncementMultiQueue()
-
 	// Test: invalid compartment IDs
 
-	err := q.enqueue(&announcementEntry{
-		announcement: &MatchAnnouncement{
-			Properties: MatchProperties{}}})
+	q := newAnnouncementMultiQueue()
+
+	err := q.enqueue(
+		&announcementEntry{
+			announcement: &MatchAnnouncement{
+				Properties: MatchProperties{}}})
 	if err == nil {
 		return errors.TraceNew("unexpected success")
 	}
 
 	compartmentID, _ := MakeID()
-	err = q.enqueue(&announcementEntry{
-		announcement: &MatchAnnouncement{
-			Properties: MatchProperties{
-				CommonCompartmentIDs:   []ID{compartmentID},
-				PersonalCompartmentIDs: []ID{compartmentID},
-			}}})
+	err = q.enqueue(
+		&announcementEntry{
+			announcement: &MatchAnnouncement{
+				Properties: MatchProperties{
+					CommonCompartmentIDs:   []ID{compartmentID},
+					PersonalCompartmentIDs: []ID{compartmentID},
+				}}})
 	if err == nil {
 		return errors.TraceNew("unexpected success")
 	}
@@ -716,25 +743,27 @@ func runTestMatcherMultiQueue() error {
 		ctx, cancel := context.WithDeadline(
 			context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
 		defer cancel()
-		err := q.enqueue(&announcementEntry{
-			ctx: ctx,
-			announcement: &MatchAnnouncement{
-				Properties: MatchProperties{
-					CommonCompartmentIDs: []ID{
-						otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
-					NATType: NATTypeSymmetric,
-				}}})
+		err := q.enqueue(
+			&announcementEntry{
+				ctx: ctx,
+				announcement: &MatchAnnouncement{
+					Properties: MatchProperties{
+						CommonCompartmentIDs: []ID{
+							otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
+						NATType: NATTypeSymmetric,
+					}}})
 		if err != nil {
 			return errors.Trace(err)
 		}
-		err = q.enqueue(&announcementEntry{
-			ctx: ctx,
-			announcement: &MatchAnnouncement{
-				Properties: MatchProperties{
-					PersonalCompartmentIDs: []ID{
-						otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
-					NATType: NATTypeSymmetric,
-				}}})
+		err = q.enqueue(
+			&announcementEntry{
+				ctx: ctx,
+				announcement: &MatchAnnouncement{
+					Properties: MatchProperties{
+						PersonalCompartmentIDs: []ID{
+							otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
+						NATType: NATTypeSymmetric,
+					}}})
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -797,7 +826,7 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected NAT counts")
 	}
 
-	match := iter.getNext()
+	match, _ := iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -826,7 +855,7 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected NAT counts")
 	}
 
-	match = iter.getNext()
+	match, _ = iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -844,7 +873,7 @@ func runTestMatcherMultiQueue() error {
 
 	// Test: getNext after dequeue
 
-	match = iter.getNext()
+	match, _ = iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -856,7 +885,7 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected already dequeued")
 	}
 
-	match = iter.getNext()
+	match, _ = iter.getNext()
 	if match == nil {
 		return errors.TraceNew("unexpected missing match")
 	}
@@ -882,6 +911,76 @@ func runTestMatcherMultiQueue() error {
 		return errors.TraceNew("unexpected compartment queue count")
 	}
 
+	// Test: priority
+
+	q = newAnnouncementMultiQueue()
+
+	var commonCompartmentIDs []ID
+	numCompartmentIDs := 10
+	for i := 0; i < numCompartmentIDs; i++ {
+		commonCompartmentID, _ := MakeID()
+		commonCompartmentIDs = append(
+			commonCompartmentIDs, commonCompartmentID)
+	}
+
+	priorityProxyID, _ := MakeID()
+	nonPriorityProxyID, _ := MakeID()
+
+	ctx, cancel := context.WithDeadline(
+		context.Background(), time.Now().Add(10*time.Minute))
+	defer cancel()
+
+	numEntries := 10000
+	for i := 0; i < numEntries; i++ {
+		// Enqueue every other announcement as a priority
+		isPriority := i%2 == 0
+		proxyID := priorityProxyID
+		if !isPriority {
+			proxyID = nonPriorityProxyID
+		}
+		err := q.enqueue(
+			&announcementEntry{
+				ctx: ctx,
+				announcement: &MatchAnnouncement{
+					ProxyID: proxyID,
+					Properties: MatchProperties{
+						IsPriority: isPriority,
+						CommonCompartmentIDs: []ID{
+							commonCompartmentIDs[prng.Intn(numCompartmentIDs)]},
+						NATType: NATTypeUnknown,
+					}}})
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	iter = q.startMatching(true, commonCompartmentIDs)
+	for i := 0; i < numEntries; i++ {
+		match, isPriority := iter.getNext()
+		if match == nil {
+			return errors.TraceNew("unexpected missing match")
+		}
+		// First half, and only first half, of matches is priority
+		expectPriority := i < numEntries/2
+		if isPriority != expectPriority {
+			return errors.TraceNew("unexpected isPriority")
+		}
+		expectedProxyID := priorityProxyID
+		if !expectPriority {
+			expectedProxyID = nonPriorityProxyID
+		}
+		if match.announcement.ProxyID != expectedProxyID {
+			return errors.TraceNew("unexpected ProxyID")
+		}
+		if !match.queueReference.dequeue() {
+			return errors.TraceNew("unexpected already dequeued")
+		}
+	}
+	match, _ = iter.getNext()
+	if match != nil {
+		return errors.TraceNew("unexpected  match")
+	}
+
 	return nil
 }
 

+ 11 - 0
psiphon/common/parameters/parameters.go

@@ -401,6 +401,8 @@ const (
 	InproxyBrokerMatcherOfferLimitEntryCount           = "InproxyBrokerMatcherOfferLimitEntryCount"
 	InproxyBrokerMatcherOfferRateLimitQuantity         = "InproxyBrokerMatcherOfferRateLimitQuantity"
 	InproxyBrokerMatcherOfferRateLimitInterval         = "InproxyBrokerMatcherOfferRateLimitInterval"
+	InproxyBrokerMatcherPrioritizeProxiesProbability   = "InproxyBrokerMatcherPrioritizeProxiesProbability"
+	InproxyBrokerMatcherPrioritizeProxiesFilter        = "InproxyBrokerMatcherPrioritizeProxiesFilter"
 	InproxyBrokerProxyAnnounceTimeout                  = "InproxyBrokerProxyAnnounceTimeout"
 	InproxyBrokerClientOfferTimeout                    = "InproxyBrokerClientOfferTimeout"
 	InproxyBrokerClientOfferPersonalTimeout            = "InproxyBrokerClientOfferPersonalTimeout"
@@ -916,6 +918,8 @@ var defaultParameters = map[string]struct {
 	InproxyBrokerMatcherOfferLimitEntryCount:           {value: 10, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferRateLimitQuantity:         {value: 50, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferRateLimitInterval:         {value: 1 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
+	InproxyBrokerMatcherPrioritizeProxiesProbability:   {value: 1.0, minimum: 0.0},
+	InproxyBrokerMatcherPrioritizeProxiesFilter:        {value: KeyStrings{}},
 	InproxyBrokerProxyAnnounceTimeout:                  {value: 2 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferTimeout:                    {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferPersonalTimeout:            {value: 5 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
@@ -1946,6 +1950,13 @@ func (p ParametersAccessor) KeyStrings(name, key string) []string {
 	return value[key]
 }
 
+// KeyStringsValue returns a complete KeyStrings parameter value.
+func (p ParametersAccessor) KeyStringsValue(name string) KeyStrings {
+	value := KeyStrings{}
+	p.snapshot.getValue(name, &value)
+	return value
+}
+
 // KeyDurations returns a KeyDurations parameter value, with string durations
 // converted to time.Duration.
 func (p ParametersAccessor) KeyDurations(name string) map[string]time.Duration {

+ 4 - 0
psiphon/common/parameters/parameters_test.go

@@ -135,6 +135,10 @@ func TestGetDefaultParameters(t *testing.T) {
 					t.Fatalf("KeyStrings returned %+v expected %+v", g, strings)
 				}
 			}
+			g := p.Get().KeyStringsValue(name)
+			if !reflect.DeepEqual(v, g) {
+				t.Fatalf("KeyStrings returned %+v expected %+v", g, v)
+			}
 		case KeyDurations:
 			g := p.Get().KeyDurations(name)
 			durations := make(map[string]time.Duration)

+ 33 - 0
psiphon/server/meek.go

@@ -351,6 +351,7 @@ func NewMeekServer(
 			&inproxy.BrokerConfig{
 				Logger:                         CommonLogger(log),
 				AllowProxy:                     meekServer.inproxyBrokerAllowProxy,
+				PrioritizeProxy:                meekServer.inproxyBrokerPrioritizeProxy,
 				AllowClient:                    meekServer.inproxyBrokerAllowClient,
 				AllowDomainFrontedDestinations: meekServer.inproxyBrokerAllowDomainFrontedDestinations,
 				LookupGeoIP:                    lookupGeoIPData,
@@ -1825,6 +1826,7 @@ func (server *MeekServer) inproxyReloadTactics() error {
 	if err != nil {
 		return errors.Trace(err)
 	}
+	defer p.Close()
 	if p.IsNil() {
 		return nil
 	}
@@ -1863,12 +1865,14 @@ func (server *MeekServer) inproxyReloadTactics() error {
 }
 
 func (server *MeekServer) lookupAllowTactic(geoIPData common.GeoIPData, parameterName string) bool {
+
 	// Fallback to not-allow on failure or nil tactics.
 	p, err := server.support.ServerTacticsParametersCache.Get(GeoIPData(geoIPData))
 	if err != nil {
 		log.WithTraceFields(LogFields{"error": err}).Warning("ServerTacticsParametersCache.Get failed")
 		return false
 	}
+	defer p.Close()
 	if p.IsNil() {
 		return false
 	}
@@ -1887,6 +1891,35 @@ func (server *MeekServer) inproxyBrokerAllowDomainFrontedDestinations(clientGeoI
 	return server.lookupAllowTactic(clientGeoIPData, parameters.InproxyAllowDomainFrontedDestinations)
 }
 
+func (server *MeekServer) inproxyBrokerPrioritizeProxy(
+	proxyGeoIPData common.GeoIPData, proxyAPIParams common.APIParameters) bool {
+
+	// Fallback to not-prioritized on failure or nil tactics.
+	p, err := server.support.ServerTacticsParametersCache.Get(GeoIPData(proxyGeoIPData))
+	if err != nil {
+		log.WithTraceFields(LogFields{"error": err}).Warning("ServerTacticsParametersCache.Get failed")
+		return false
+	}
+	defer p.Close()
+	if p.IsNil() {
+		return false
+	}
+	filter := p.KeyStringsValue(parameters.InproxyBrokerMatcherPrioritizeProxiesFilter)
+	if len(filter) == 0 {
+		return false
+	}
+	for name, values := range filter {
+		proxyValue, err := getStringRequestParam(proxyAPIParams, name)
+		if err != nil || !common.ContainsWildcard(values, proxyValue) {
+			return false
+		}
+	}
+	if !p.WeightedCoinFlip(parameters.InproxyBrokerMatcherPrioritizeProxiesProbability) {
+		return false
+	}
+	return true
+}
+
 // inproxyBrokerGetTacticsPayload is a callback used by the in-proxy broker to
 // provide tactics to proxies.
 //