Browse Source

Optimize in-proxy matching queue

- Switch to per-compartment ID announcement multi-queue, enabling efficient
  lookup of announcements with matching compartment IDs.

- Add benchmark and document before/after performance, focusing on
  anticipated, detrimental personal pairing case.

- Broker now enforces that clients specify either all common or all personal
  compartment IDs (as clients already did). Matcher no longer matches both
  personal and common announcements.

- Proxy is now limited to specifying at most one personal compartment ID
  (per announcement).
Rod Hynes 1 year ago
parent
commit
76bba32f09

+ 6 - 2
psiphon/common/inproxy/api.go

@@ -544,8 +544,12 @@ func (request *ProxyAnnounceRequest) ValidateAndGetParametersAndLogFields(
 	formatter common.APIParameterLogFieldFormatter,
 	geoIPData common.GeoIPData) (common.APIParameters, common.LogFields, error) {
 
-	if len(request.PersonalCompartmentIDs) > maxCompartmentIDs {
-		return nil, nil, errors.Tracef("invalid compartment IDs length: %d", len(request.PersonalCompartmentIDs))
+	// A proxy may specify at most 1 personal compartment ID. This is
+	// currently a limitation of the multi-queue implementation; see comment
+	// in announcementMultiQueue.enqueue.
+	if len(request.PersonalCompartmentIDs) > 1 {
+		return nil, nil, errors.Tracef(
+			"invalid compartment IDs length: %d", len(request.PersonalCompartmentIDs))
 	}
 
 	if request.Metrics == nil {

+ 3 - 0
psiphon/common/inproxy/broker.go

@@ -587,6 +587,9 @@ func (b *Broker) handleProxyAnnounce(
 	defer cancelFunc()
 	extendTransportTimeout(timeout)
 
+	// Note that matcher.Announce assumes a monotonically increasing
+	// announceCtx.Deadline input for each successive call.
+
 	clientOffer, matchMetrics, err = b.matcher.Announce(
 		announceCtx,
 		proxyIP,

+ 1 - 1
psiphon/common/inproxy/inproxy_test.go

@@ -451,7 +451,7 @@ func runTestInproxy() error {
 	for {
 		time.Sleep(100 * time.Millisecond)
 		broker.matcher.announcementQueueMutex.Lock()
-		n := broker.matcher.announcementQueue.Len()
+		n := broker.matcher.announcementQueue.getLen()
 		broker.matcher.announcementQueueMutex.Unlock()
 		if n >= numProxies {
 			break

+ 436 - 186
psiphon/common/inproxy/matcher.go

@@ -19,6 +19,7 @@
 package inproxy
 
 import (
+	"container/list"
 	"context"
 	std_errors "errors"
 	"net"
@@ -29,7 +30,6 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	lrucache "github.com/cognusion/go-cache-lru"
-	"github.com/gammazero/deque"
 	"golang.org/x/time/rate"
 )
 
@@ -40,6 +40,7 @@ const (
 	matcherOfferQueueMaxSize        = 5000000
 	matcherPendingAnswersTTL        = 30 * time.Second
 	matcherPendingAnswersMaxSize    = 100000
+	matcherMaxPreferredNATProbe     = 100
 
 	matcherRateLimiterReapHistoryFrequencySeconds = 300
 	matcherRateLimiterMaxCacheEntries             = 1000000
@@ -53,12 +54,10 @@ const (
 // as they are closest to timing out.
 //
 // The client and proxy must supply matching personal or common compartment
-// IDs. Personal compartment matching is preferred. Common compartments are
-// managed by Psiphon and can be obtained via a tactics parameter or via an
-// OSL embedding.
-//
-// A client may opt form personal-only matching by not supplying any common
-// compartment IDs.
+// IDs. Common compartments are managed by Psiphon and can be obtained via a
+// tactics parameter or via an OSL embedding. Each proxy announcement or
+// client offer may specify only one of either common or personal compartment
+// IDs.
 //
 // Matching prefers to pair proxies and clients in a way that maximizes total
 // possible matches. For a client or proxy with less-limited NAT traversal, a
@@ -86,25 +85,21 @@ type Matcher struct {
 
 	// TODO: replace queue and counts with an indexed, in-memory database?
 
-	announcementQueueMutex                      sync.Mutex
-	announcementQueue                           *deque.Deque[*announcementEntry]
-	announcementQueueEntryCountByIP             map[string]int
-	announcementQueueRateLimiters               *lrucache.Cache
-	announcementLimitEntryCount                 int
-	announcementRateLimitQuantity               int
-	announcementRateLimitInterval               time.Duration
-	announcementNonlimitedProxyIDs              map[ID]struct{}
-	announcementsPersonalCompartmentalizedCount int
-	announcementsUnlimitedNATCount              int
-	announcementsPartiallyLimitedNATCount       int
-	announcementsStrictlyLimitedNATCount        int
+	announcementQueueMutex          sync.Mutex
+	announcementQueue               *announcementMultiQueue
+	announcementQueueEntryCountByIP map[string]int
+	announcementQueueRateLimiters   *lrucache.Cache
+	announcementLimitEntryCount     int
+	announcementRateLimitQuantity   int
+	announcementRateLimitInterval   time.Duration
+	announcementNonlimitedProxyIDs  map[ID]struct{}
 
 	// The offer queue is also implicitly sorted by offer age. Both an offer
 	// and announcement queue are required since either announcements or
 	// offers can arrive while there are no available pairings.
 
 	offerQueueMutex          sync.Mutex
-	offerQueue               *deque.Deque[*offerEntry]
+	offerQueue               *list.List
 	offerQueueEntryCountByIP map[string]int
 	offerQueueRateLimiters   *lrucache.Cache
 	offerLimitEntryCount     int
@@ -233,6 +228,10 @@ type announcementEntry struct {
 	announcement *MatchAnnouncement
 	offerChan    chan *MatchOffer
 	matchMetrics atomic.Value
+
+	// queueReference is initialized by addAnnouncementEntry, and used to
+	// efficiently dequeue the entry.
+	queueReference announcementQueueReference
 }
 
 func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
@@ -248,6 +247,10 @@ type offerEntry struct {
 	offer        *MatchOffer
 	answerChan   chan *answerInfo
 	matchMetrics atomic.Value
+
+	// queueReference is initialized by addOfferEntry, and used to efficiently
+	// dequeue the entry.
+	queueReference *list.Element
 }
 
 func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
@@ -294,14 +297,14 @@ func NewMatcher(config *MatcherConfig) *Matcher {
 
 		waitGroup: new(sync.WaitGroup),
 
-		announcementQueue:               deque.New[*announcementEntry](),
+		announcementQueue:               newAnnouncementMultiQueue(),
 		announcementQueueEntryCountByIP: make(map[string]int),
 		announcementQueueRateLimiters: lrucache.NewWithLRU(
 			0,
 			time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
 			matcherRateLimiterMaxCacheEntries),
 
-		offerQueue:               deque.New[*offerEntry](),
+		offerQueue:               list.New(),
 		offerQueueEntryCountByIP: make(map[string]int),
 		offerQueueRateLimiters: lrucache.NewWithLRU(
 			0,
@@ -406,6 +409,10 @@ func (m *Matcher) Stop() {
 // with a returned offer or ctx is done. The caller must not mutate the
 // announcement or its properties after calling Announce.
 //
+// Announce assumes that the ctx.Deadline for each call is monotonically
+// increasing and that the deadline can be used as part of selecting the next
+// nearest-to-expire announcement.
+//
 // The offer is sent to the proxy by the broker, and then the proxy sends its
 // answer back to the broker, which calls Answer with that value.
 //
@@ -416,6 +423,19 @@ func (m *Matcher) Announce(
 	proxyIP string,
 	proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
 
+	// An announcement must specify exactly one compartment ID, of one type,
+	// common or personal. This is currently a limitation of the multi-queue
+	// implementation; see comment in announcementMultiQueue.enqueue.
+	compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
+	if len(compartmentIDs) == 0 {
+		compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
+	} else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
+		return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
+	}
+	if len(compartmentIDs) != 1 {
+		return nil, nil, errors.TraceNew("unexpected compartment ID count")
+	}
+
 	announcementEntry := &announcementEntry{
 		ctx:          ctx,
 		limitIP:      getRateLimitIP(proxyIP),
@@ -434,7 +454,7 @@ func (m *Matcher) Announce(
 
 	select {
 	case <-ctx.Done():
-		m.removeAnnouncementEntry(announcementEntry)
+		m.removeAnnouncementEntry(true, announcementEntry)
 		return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
 
 	case clientOffer = <-announcementEntry.offerChan:
@@ -458,6 +478,18 @@ func (m *Matcher) Offer(
 	clientIP string,
 	clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
 
+	// An offer must specify at least one compartment ID, and may only specify
+	// one type, common or personal, of compartment IDs.
+	compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
+	if len(compartmentIDs) == 0 {
+		compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
+	} else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
+		return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
+	}
+	if len(compartmentIDs) < 1 {
+		return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
+	}
+
 	offerEntry := &offerEntry{
 		ctx:        ctx,
 		limitIP:    getRateLimitIP(clientIP),
@@ -476,7 +508,7 @@ func (m *Matcher) Offer(
 
 	select {
 	case <-ctx.Done():
-		m.removeOfferEntry(offerEntry)
+		m.removeOfferEntry(true, offerEntry)
 
 		// TODO: also remove any pendingAnswers entry? The entry TTL is set to
 		// the Offer ctx, the client request, timeout, so it will eventually
@@ -589,44 +621,45 @@ func (m *Matcher) matchAllOffers() {
 	// TODO: consider matching one offer, then releasing the locks to allow
 	// more announcements to be enqueued, then continuing to match.
 
-	i := 0
-	end := m.offerQueue.Len()
+	nextOffer := m.offerQueue.Front()
+	offerIndex := -1
 
-	for i < end && m.announcementQueue.Len() > 0 {
+	for nextOffer != nil && m.announcementQueue.getLen() > 0 {
 
-		offerEntry := m.offerQueue.At(i)
+		offerIndex += 1
+
+		// nextOffer.Next must be invoked before any removeOfferEntry since
+		// container/list.remove clears list.Element.next.
+		offer := nextOffer
+		nextOffer = nextOffer.Next()
+
+		offerEntry := offer.Value.(*offerEntry)
 
 		// Skip and remove this offer if its deadline has already passed.
 		// There is no signal to the awaiting Offer function, as it will exit
 		// based on the same ctx.
 
 		if offerEntry.ctx.Err() != nil {
-			m.removeOfferEntryByIndex(i)
-			end -= 1
+			m.removeOfferEntry(false, offerEntry)
 			continue
 		}
 
-		j, ok := m.matchOffer(offerEntry)
-		if !ok {
-
-			// No match, so leave this offer in place in the queue and move to
-			// the next.
-
-			i++
+		announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
+		if announcementEntry == nil {
 			continue
 		}
 
-		// Get the matched announcement entry.
-
-		announcementEntry := m.announcementQueue.At(j)
-
 		// Record match metrics.
 
+		// The index metrics predate the announcement multi-queue; now, with
+		// the multi-queue, announcement_index is how many announce entries
+		// were inspected before matching.
+
 		matchMetrics := &MatchMetrics{
-			OfferMatchIndex:        i,
+			OfferMatchIndex:        offerIndex,
 			OfferQueueSize:         m.offerQueue.Len(),
-			AnnouncementMatchIndex: j,
-			AnnouncementQueueSize:  m.announcementQueue.Len(),
+			AnnouncementMatchIndex: announcementMatchIndex,
+			AnnouncementQueueSize:  m.announcementQueue.getLen(),
 		}
 
 		offerEntry.matchMetrics.Store(matchMetrics)
@@ -639,6 +672,8 @@ func (m *Matcher) matchAllOffers() {
 		// entry is set to the matched Offer call's ctx, as the answer is
 		// only useful as long as the client is still waiting.
 
+		m.removeAnnouncementEntry(false, announcementEntry)
+
 		expiry := lrucache.DefaultExpiration
 		deadline, ok := offerEntry.ctx.Deadline()
 		if ok {
@@ -659,24 +694,22 @@ func (m *Matcher) matchAllOffers() {
 
 		announcementEntry.offerChan <- offerEntry.offer
 
-		m.removeAnnouncementEntryByIndex(j)
-
 		// Remove the matched offer from the queue and match the next offer,
 		// now first in the queue.
 
-		m.removeOfferEntryByIndex(i)
-
-		end -= 1
+		m.removeOfferEntry(false, offerEntry)
 	}
 }
 
-func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
+func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
 	// Assumes the caller has the queue mutexed locked.
 
-	// Check each announcement in turn, and select a match. There is an
-	// implicit preference for older proxy announcements, sooner to timeout,
-	// at the front of the queue.
+	// Check each candidate announcement in turn, and select a match. There is
+	// an implicit preference for older proxy announcements, sooner to
+	// timeout, at the front of the enqueued announcements.
+	// announcementMultiQueue.startMatching skips to the first matching
+	// compartment ID(s).
 	//
 	// Limitation: since this logic matches each enqueued client in turn, it will
 	// only make the optimal NAT match for the oldest enqueued client vs. all
@@ -692,56 +725,58 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
 
 	offerProperties := &offerEntry.offer.Properties
 
+	// Assumes the caller checks that offer specifies either personal
+	// compartment IDs or common compartment IDs, but not both.
+	isCommonCompartments := false
+	compartmentIDs := offerProperties.PersonalCompartmentIDs
+	if len(compartmentIDs) == 0 {
+		isCommonCompartments = true
+		compartmentIDs = offerProperties.CommonCompartmentIDs
+	}
+	if len(compartmentIDs) == 0 {
+		return nil, -1
+	}
+
+	matchIterator := m.announcementQueue.startMatching(
+		isCommonCompartments, compartmentIDs)
+
 	// Use the NAT traversal type counters to check if there's any preferred
 	// NAT match for this offer in the announcement queue. When there is, we
 	// will search beyond the first announcement.
 
+	unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
+		matchIterator.getNATCounts()
+
 	existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
-		m.announcementsUnlimitedNATCount > 0,
-		m.announcementsPartiallyLimitedNATCount > 0,
-		m.announcementsStrictlyLimitedNATCount > 0)
+		unlimitedNATCount > 0,
+		partiallyLimitedNATCount > 0,
+		strictlyLimitedNATCount > 0)
 
-	bestMatch := -1
+	var bestMatch *announcementEntry
+	bestMatchIndex := -1
 	bestMatchNAT := false
-	bestMatchCompartment := false
-
-	end := m.announcementQueue.Len()
 
-	// TODO: add queue indexing to facilitate skipping ahead to a matching
-	// personal compartment ID, if any, when personal-only matching is
-	// required. Personal matching may often require near-full queue scans
-	// when looking for a match. Common compartment matching may also benefit
-	// from indexing, although with a handful of common compartment IDs more
-	// or less uniformly distributed, frequent long scans are not expected in
-	// practise.
+	candidateIndex := -1
+	for {
 
-	for i := 0; i < end; i++ {
+		announcementEntry := matchIterator.getNext()
+		if announcementEntry == nil {
+			break
+		}
 
-		announcementEntry := m.announcementQueue.At(i)
+		candidateIndex += 1
 
 		// Skip and remove this announcement if its deadline has already
 		// passed. There is no signal to the awaiting Announce function, as
 		// it will exit based on the same ctx.
 
 		if announcementEntry.ctx.Err() != nil {
-			m.removeAnnouncementEntryByIndex(i)
-			end -= 1
+			m.removeAnnouncementEntry(false, announcementEntry)
 			continue
 		}
 
 		announcementProperties := &announcementEntry.announcement.Properties
 
-		// There must be a compartment match. If there is a personal
-		// compartment match, this match will be preferred.
-
-		matchCommonCompartment := HaveCommonIDs(
-			announcementProperties.CommonCompartmentIDs, offerProperties.CommonCompartmentIDs)
-		matchPersonalCompartment := HaveCommonIDs(
-			announcementProperties.PersonalCompartmentIDs, offerProperties.PersonalCompartmentIDs)
-		if !matchCommonCompartment && !matchPersonalCompartment {
-			continue
-		}
-
 		// Disallow matching the same country and ASN, except for personal
 		// compartment ID matches.
 		//
@@ -749,7 +784,7 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
 		// have no circumvention benefit. For personal matching, the user may
 		// wish to hop their their own or their friend's proxy regardless.
 
-		if !matchPersonalCompartment &&
+		if isCommonCompartments &&
 			!GetAllowCommonASNMatching() &&
 			(offerProperties.GeoIPData.Country ==
 				announcementProperties.GeoIPData.Country &&
@@ -766,49 +801,29 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
 		matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
 
 		// At this point, the candidate is a match. Determine if this is a new
-		// best match.
-
-		if bestMatch == -1 {
+		// best match, either if there was no previous match, or this is a
+		// better NAT match.
 
-			// This is a match, and there was no previous match, so it becomes
-			// the provisional best match.
+		if bestMatch == nil || (!bestMatchNAT && matchNAT) {
 
-			bestMatch = i
+			bestMatch = announcementEntry
+			bestMatchIndex = candidateIndex
 			bestMatchNAT = matchNAT
-			bestMatchCompartment = matchPersonalCompartment
-
-		} else if !bestMatchNAT && matchNAT {
-
-			// If there was a previous best match which was not a preferred
-			// NAT match, this becomes the new best match. The preferred NAT
-			// match is prioritized over personal compartment matching.
-
-			bestMatch = i
-			bestMatchNAT = true
-			bestMatchCompartment = matchPersonalCompartment
-
-		} else if !bestMatchCompartment && matchPersonalCompartment && (!bestMatchNAT || matchNAT) {
 
-			// If there was a previous best match which was not a personal
-			// compartment match, and as long as this match doesn't undo a
-			// better NAT match, this becomes the new best match.
-
-			bestMatch = i
-			bestMatchNAT = matchNAT
-			bestMatchCompartment = true
 		}
 
-		// Stop as soon as we have the best possible match.
+		// Stop as soon as we have the best possible match, or have reached
+		// the probe limit for preferred NAT matches.
+
+		if bestMatch != nil && (bestMatchNAT ||
+			!existsPreferredNATMatch ||
+			candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
 
-		if (bestMatchNAT || !existsPreferredNATMatch) &&
-			(matchPersonalCompartment ||
-				m.announcementsPersonalCompartmentalizedCount == 0 ||
-				len(offerProperties.PersonalCompartmentIDs) == 0) {
 			break
 		}
 	}
 
-	return bestMatch, bestMatch != -1
+	return bestMatch, bestMatchIndex
 }
 
 // MatcherLimitError is the error type returned by Announce or Offer when the
@@ -903,7 +918,7 @@ func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) err
 	defer m.announcementQueueMutex.Unlock()
 
 	// Ensure the queue doesn't grow larger than the max size.
-	if m.announcementQueue.Len() >= matcherAnnouncementQueueMaxSize {
+	if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
 		return errors.TraceNew("queue full")
 	}
 
@@ -916,11 +931,20 @@ func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) err
 		return errors.Trace(err)
 	}
 
-	m.announcementQueue.PushBack(announcementEntry)
+	// announcementEntry.queueReference should be uninitialized.
+	// announcementMultiQueue.enqueue sets queueReference to be used for
+	// efficient dequeuing.
 
-	m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
+	if announcementEntry.queueReference.entry != nil {
+		return errors.TraceNew("unexpected queue reference")
+	}
 
-	m.adjustAnnouncementCounts(announcementEntry, 1)
+	err = m.announcementQueue.enqueue(announcementEntry)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
 
 	select {
 	case m.matchSignal <- struct{}{}:
@@ -930,20 +954,27 @@ func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) err
 	return nil
 }
 
-func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry) {
+func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
 
-	m.announcementQueueMutex.Lock()
-	defer m.announcementQueueMutex.Unlock()
+	// In the aborting case, the queue isn't already locked. Otherise, assume
+	// it is locked.
+	if aborting {
+		m.announcementQueueMutex.Lock()
+		defer m.announcementQueueMutex.Unlock()
+	}
 
-	found := false
-	for i := 0; i < m.announcementQueue.Len(); i++ {
-		if m.announcementQueue.At(i) == announcementEntry {
-			m.removeAnnouncementEntryByIndex(i)
-			found = true
-			break
+	found := announcementEntry.queueReference.dequeue()
+
+	if found {
+		// Adjust entry counts by peer IP, used to enforce
+		// matcherAnnouncementQueueMaxEntriesPerIP.
+		m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
+		if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
+			delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
 		}
 	}
-	if !found {
+
+	if aborting && !found {
 
 		// The Announce call is aborting and taking its entry back out of the
 		// queue. If the entry is not found in the queue, then a concurrent
@@ -964,45 +995,6 @@ func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry)
 	}
 }
 
-func (m *Matcher) removeAnnouncementEntryByIndex(i int) {
-
-	// Assumes s.announcementQueueMutex lock is held.
-
-	announcementEntry := m.announcementQueue.At(i)
-
-	// This should be only direct call to Remove, as following adjustments
-	// must always be made when removing.
-	m.announcementQueue.Remove(i)
-
-	// Adjust entry counts by peer IP, used to enforce
-	// matcherAnnouncementQueueMaxEntriesPerIP.
-	m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
-	if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
-		delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
-	}
-
-	m.adjustAnnouncementCounts(announcementEntry, -1)
-}
-
-func (m *Matcher) adjustAnnouncementCounts(
-	announcementEntry *announcementEntry, delta int) {
-
-	// Assumes s.announcementQueueMutex lock is held.
-
-	if announcementEntry.announcement.Properties.IsPersonalCompartmentalized() {
-		m.announcementsPersonalCompartmentalizedCount += delta
-	}
-
-	switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
-	case NATTraversalUnlimited:
-		m.announcementsUnlimitedNATCount += delta
-	case NATTraversalPartiallyLimited:
-		m.announcementsPartiallyLimitedNATCount += delta
-	case NATTraversalStrictlyLimited:
-		m.announcementsStrictlyLimitedNATCount += delta
-	}
-}
-
 func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
 
 	m.offerQueueMutex.Lock()
@@ -1022,7 +1014,14 @@ func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
 		return errors.Trace(err)
 	}
 
-	m.offerQueue.PushBack(offerEntry)
+	// offerEntry.queueReference should be uninitialized and is set here to be
+	// used for efficient dequeuing.
+
+	if offerEntry.queueReference != nil {
+		return errors.TraceNew("unexpected queue reference")
+	}
+
+	offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
 
 	m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
 
@@ -1034,28 +1033,22 @@ func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
 	return nil
 }
 
-func (m *Matcher) removeOfferEntry(offerEntry *offerEntry) {
-
-	m.offerQueueMutex.Lock()
-	defer m.offerQueueMutex.Unlock()
+func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
 
-	for i := 0; i < m.offerQueue.Len(); i++ {
-		if m.offerQueue.At(i) == offerEntry {
-			m.removeOfferEntryByIndex(i)
-			break
-		}
+	// In the aborting case, the queue isn't already locked. Otherise, assume
+	// it is locked.
+	if aborting {
+		m.offerQueueMutex.Lock()
+		defer m.offerQueueMutex.Unlock()
 	}
-}
-
-func (m *Matcher) removeOfferEntryByIndex(i int) {
 
-	// Assumes s.offerQueueMutex lock is held.
+	if offerEntry.queueReference == nil {
+		return
+	}
 
-	offerEntry := m.offerQueue.At(i)
+	m.offerQueue.Remove(offerEntry.queueReference)
 
-	// This should be only direct call to Remove, as following adjustments
-	// must always be made when removing.
-	m.offerQueue.Remove(i)
+	offerEntry.queueReference = nil
 
 	// Adjust entry counts by peer IP, used to enforce
 	// matcherOfferQueueMaxEntriesPerIP.
@@ -1087,3 +1080,260 @@ func getRateLimitIP(strIP string) string {
 	// or /56, so rate limit by /56.
 	return IP.Mask(net.CIDRMask(56, 128)).String()
 }
+
+// announcementMultiQueue is a set of announcement queues, one per common or
+// personal compartment ID, providing efficient iteration over announcements
+// matching a specified list of compartment IDs. announcementMultiQueue and
+// its underlying data structures are not safe for concurrent access.
+type announcementMultiQueue struct {
+	commonCompartmentQueues   map[ID]*announcementCompartmentQueue
+	personalCompartmentQueues map[ID]*announcementCompartmentQueue
+	totalEntries              int
+}
+
+// announcementCompartmentQueue is a single compartment queue within an
+// announcementMultiQueue. The queue is implemented using a doubly-linked
+// list, which provides efficient insert and mid-queue dequeue operations.
+// The announcementCompartmentQueue also records NAT type stats for enqueued
+// announcements, which are used, when matching, to determine when better NAT
+// matches may be possible.
+type announcementCompartmentQueue struct {
+	entries                  *list.List
+	unlimitedNATCount        int
+	partiallyLimitedNATCount int
+	strictlyLimitedNATCount  int
+}
+
+// announcementMatchIterator represents the state of an iteration over a
+// subset of announcementMultiQueue compartment queues. Concurrent
+// announcementMatchIterators are not supported.
+type announcementMatchIterator struct {
+	multiQueue           *announcementMultiQueue
+	isCommonCompartments bool
+	compartmentQueues    []*announcementCompartmentQueue
+	compartmentIDs       []ID
+	nextEntries          []*list.Element
+}
+
+// announcementQueueReference represents the queue position for a given
+// announcement entry, and provides an efficient dequeue operation.
+type announcementQueueReference struct {
+	multiQueue       *announcementMultiQueue
+	compartmentQueue *announcementCompartmentQueue
+	entry            *list.Element
+}
+
+func newAnnouncementMultiQueue() *announcementMultiQueue {
+	return &announcementMultiQueue{
+		commonCompartmentQueues:   make(map[ID]*announcementCompartmentQueue),
+		personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
+	}
+}
+
+func (q *announcementMultiQueue) getLen() int {
+	return q.totalEntries
+}
+
+func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
+
+	// Assumes announcementEntry not already enueued.
+
+	// Limitation: only one compartment ID, either common or personal, is
+	// supported per announcement entry. In the common compartment case, the
+	// broker currently assigns only one common compartment ID per proxy
+	// announcement. In the personal compartment case, there is currently no
+	// use case for allowing a proxy to announce under multiple personal
+	// compartment IDs.
+	//
+	// To overcome this limitation, the dequeue operation would need to be
+	// able to remove an announcement entry from multiple
+	// announcementCompartmentQueues.
+
+	commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
+	personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
+
+	if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
+		return errors.TraceNew("announcement must specify exactly one compartment ID")
+	}
+
+	var compartmentID ID
+	var compartmentQueues map[ID]*announcementCompartmentQueue
+	if len(commonCompartmentIDs) > 0 {
+		compartmentID = commonCompartmentIDs[0]
+		compartmentQueues = q.commonCompartmentQueues
+	} else {
+		compartmentID = personalCompartmentIDs[0]
+		compartmentQueues = q.personalCompartmentQueues
+	}
+
+	compartmentQueue, ok := compartmentQueues[compartmentID]
+	if !ok {
+		compartmentQueue = &announcementCompartmentQueue{
+			entries: list.New(),
+		}
+		compartmentQueues[compartmentID] = compartmentQueue
+	}
+
+	entry := compartmentQueue.entries.PushBack(announcementEntry)
+
+	// Update the NAT type counts which are used to determine if a better NAT
+	// match may be made by inspecting more announcement queue entries.
+
+	switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
+	case NATTraversalUnlimited:
+		compartmentQueue.unlimitedNATCount += 1
+	case NATTraversalPartiallyLimited:
+		compartmentQueue.partiallyLimitedNATCount += 1
+	case NATTraversalStrictlyLimited:
+		compartmentQueue.strictlyLimitedNATCount += 1
+	}
+
+	q.totalEntries += 1
+
+	announcementEntry.queueReference = announcementQueueReference{
+		multiQueue:       q,
+		compartmentQueue: compartmentQueue,
+		entry:            entry,
+	}
+
+	return nil
+}
+
+// announcementQueueReference returns false if the item is already dequeued.
+func (r announcementQueueReference) dequeue() bool {
+
+	if r.entry == nil {
+		// Already dequeued.
+		return false
+	}
+
+	announcementEntry := r.entry.Value.(*announcementEntry)
+
+	// Reverse the NAT type counts.
+	switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
+	case NATTraversalUnlimited:
+		r.compartmentQueue.unlimitedNATCount -= 1
+	case NATTraversalPartiallyLimited:
+		r.compartmentQueue.partiallyLimitedNATCount -= 1
+	case NATTraversalStrictlyLimited:
+		r.compartmentQueue.strictlyLimitedNATCount -= 1
+	}
+
+	r.compartmentQueue.entries.Remove(r.entry)
+
+	r.multiQueue.totalEntries -= 1
+
+	// Mark as dequeued.
+	r.entry = nil
+
+	return true
+}
+
+func (q *announcementMultiQueue) startMatching(
+	isCommonCompartments bool,
+	compartmentIDs []ID) *announcementMatchIterator {
+
+	iter := &announcementMatchIterator{
+		multiQueue:           q,
+		isCommonCompartments: isCommonCompartments,
+	}
+
+	// Find the matching compartment queues and initialize iteration over
+	// those queues. Building the set of matching queues is a linear time
+	// operation, bounded by the length of compartmentIDs (no more than
+	// maxCompartmentIDs, as enforced in
+	// ClientOfferRequest.ValidateAndGetLogFields).
+
+	compartmentQueues := q.commonCompartmentQueues
+	if !isCommonCompartments {
+		compartmentQueues = q.personalCompartmentQueues
+	}
+
+	for _, ID := range compartmentIDs {
+		if compartmentQueue, ok := compartmentQueues[ID]; ok {
+			iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
+			iter.compartmentIDs = append(iter.compartmentIDs, ID)
+			iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
+		}
+	}
+
+	return iter
+}
+
+func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
+
+	// Return the count of NAT types across all matchable compartment queues.
+	//
+	// A potential future enhancement would be to provide per-queue NAT counts
+	// or NAT type indexing in order to quickly find preferred NAT matches.
+
+	unlimitedNATCount := 0
+	partiallyLimitedNATCount := 0
+	strictlyLimitedNATCount := 0
+
+	for _, compartmentQueue := range iter.compartmentQueues {
+		unlimitedNATCount += compartmentQueue.unlimitedNATCount
+		partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
+		strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
+	}
+
+	return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
+}
+
+// announcementMatchIterator returns the next announcement entry candidate in
+// compartment queue FIFO order, selecting the queue with the oldest head
+// item.
+//
+// The caller should invoke announcementEntry.queueReference.dequeue when the
+// candidate is selected. dequeue may be called on any getNext return value
+// without disrupting the iteration state; however,
+// announcementEntry.queueReference.dequeue calls for arbitrary queue entries
+// are not supported during iteration. Iteration and dequeue should all be
+// performed with a lock over the entire announcementMultiQueue, and with
+// only one concurrent announcementMatchIterator.
+func (iter *announcementMatchIterator) getNext() *announcementEntry {
+
+	// Assumes announcements are enqueued in announcementEntry.ctx.Deadline
+	// order.
+
+	// Select the oldest item, by deadline, from all the candidate queue head
+	// items. This operation is linear in the number of matching compartment
+	// ID queues, which is currently bounded by This is a linear time
+	// operation, bounded by the length of matching compartment IDs (no more
+	// than maxCompartmentIDs, as enforced in
+	// ClientOfferRequest.ValidateAndGetLogFields).
+	//
+	// A potential future enhancement is to add more iterator state to track
+	// which queue has the next oldest time to select on the following
+	// getNext call.
+
+	var selectedCandidate *announcementEntry
+	selectedIndex := -1
+
+	for i := 0; i < len(iter.compartmentQueues); i++ {
+		if iter.nextEntries[i] == nil {
+			continue
+		}
+		if selectedCandidate == nil {
+			selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
+			selectedIndex = i
+		} else {
+			candidate := iter.nextEntries[i].Value.(*announcementEntry)
+			deadline, deadlineOk := candidate.ctx.Deadline()
+			selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
+			if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
+				selectedCandidate = candidate
+				selectedIndex = i
+			}
+		}
+	}
+
+	// Advance the selected queue to the next element. This must be done
+	// before any dequeue call, since container/list.remove clears
+	// list.Element.next.
+	if selectedIndex != -1 {
+		iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
+	}
+
+	return selectedCandidate
+}

+ 417 - 89
psiphon/common/inproxy/matcher_test.go

@@ -22,6 +22,7 @@ package inproxy
 import (
 	"context"
 	"fmt"
+	"runtime/debug"
 	"strings"
 	"sync"
 	"testing"
@@ -173,13 +174,17 @@ func runTestMatcher() error {
 
 	proxyResultChan := make(chan error)
 
-	go proxyFunc(proxyResultChan, proxyIP, &MatchProperties{}, 1*time.Microsecond, nil, true)
+	matchProperties := &MatchProperties{
+		CommonCompartmentIDs: []ID{makeID()},
+	}
+
+	go proxyFunc(proxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
 
 	err = <-proxyResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
 		return errors.Tracef("unexpected result: %v", err)
 	}
-	if m.announcementQueue.Len() != 0 {
+	if m.announcementQueue.getLen() != 0 {
 		return errors.TraceNew("unexpected queue size")
 	}
 
@@ -191,16 +196,16 @@ func runTestMatcher() error {
 	maxEntriesProxyResultChan := make(chan error, maxEntries)
 
 	// fill the queue with max entries for one IP; the first one will timeout sooner
-	go proxyFunc(maxEntriesProxyResultChan, proxyIP, &MatchProperties{}, 10*time.Millisecond, nil, true)
+	go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
 	for i := 0; i < maxEntries-1; i++ {
-		go proxyFunc(maxEntriesProxyResultChan, proxyIP, &MatchProperties{}, 100*time.Millisecond, nil, true)
+		go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 100*time.Millisecond, nil, true)
 	}
 
 	// await goroutines filling queue
 	for {
 		time.Sleep(10 * time.Microsecond)
 		m.announcementQueueMutex.Lock()
-		queueLen := m.announcementQueue.Len()
+		queueLen := m.announcementQueue.getLen()
 		m.announcementQueueMutex.Unlock()
 		if queueLen == maxEntries {
 			break
@@ -208,7 +213,7 @@ func runTestMatcher() error {
 	}
 
 	// the next enqueue should fail with "max entries"
-	go proxyFunc(proxyResultChan, proxyIP, &MatchProperties{}, 10*time.Millisecond, nil, true)
+	go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
 	err = <-proxyResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
 		return errors.Tracef("unexpected result: %v", err)
@@ -221,7 +226,7 @@ func runTestMatcher() error {
 	}
 
 	// now another enqueue succeeds as expected
-	go proxyFunc(proxyResultChan, proxyIP, &MatchProperties{}, 10*time.Millisecond, nil, true)
+	go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
 	err = <-proxyResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
 		return errors.Tracef("unexpected result: %v", err)
@@ -239,7 +244,7 @@ func runTestMatcher() error {
 
 	clientResultChan := make(chan error)
 
-	go clientFunc(clientResultChan, clientIP, &MatchProperties{}, 1*time.Microsecond)
+	go clientFunc(clientResultChan, clientIP, matchProperties, 1*time.Microsecond)
 
 	err = <-clientResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
@@ -257,9 +262,9 @@ func runTestMatcher() error {
 	maxEntriesClientResultChan := make(chan error, maxEntries)
 
 	// fill the queue with max entries for one IP; the first one will timeout sooner
-	go clientFunc(maxEntriesClientResultChan, clientIP, &MatchProperties{}, 10*time.Millisecond)
+	go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 10*time.Millisecond)
 	for i := 0; i < maxEntries-1; i++ {
-		go clientFunc(maxEntriesClientResultChan, clientIP, &MatchProperties{}, 100*time.Millisecond)
+		go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 100*time.Millisecond)
 	}
 
 	// await goroutines filling queue
@@ -275,7 +280,7 @@ func runTestMatcher() error {
 	}
 
 	// enqueue should fail with "max entries"
-	go clientFunc(clientResultChan, clientIP, &MatchProperties{}, 10*time.Millisecond)
+	go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
 	err = <-clientResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
 		return errors.Tracef("unexpected result: %v", err)
@@ -288,7 +293,7 @@ func runTestMatcher() error {
 	}
 
 	// now another enqueue succeeds as expected
-	go clientFunc(clientResultChan, clientIP, &MatchProperties{}, 10*time.Millisecond)
+	go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
 	err = <-clientResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
 		return errors.Tracef("unexpected result: %v", err)
@@ -318,7 +323,7 @@ func runTestMatcher() error {
 		waitGroup.Add(1)
 		go func() {
 			defer waitGroup.Done()
-			proxyFunc(maxEntriesProxyResultChan, proxyIP, &MatchProperties{}, 1*time.Microsecond, nil, true)
+			proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
 		}()
 	}
 
@@ -328,7 +333,7 @@ func runTestMatcher() error {
 	waitGroup.Wait()
 
 	// the next enqueue should fail with "rate exceeded"
-	go proxyFunc(proxyResultChan, proxyIP, &MatchProperties{}, 10*time.Millisecond, nil, true)
+	go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
 	err = <-proxyResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
 		return errors.Tracef("unexpected result: %v", err)
@@ -344,14 +349,14 @@ func runTestMatcher() error {
 		waitGroup.Add(1)
 		go func() {
 			defer waitGroup.Done()
-			clientFunc(maxEntriesClientResultChan, clientIP, &MatchProperties{}, 1*time.Microsecond)
+			clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 1*time.Microsecond)
 		}()
 	}
 
 	waitGroup.Wait()
 
 	// enqueue should fail with "rate exceeded"
-	go clientFunc(clientResultChan, clientIP, &MatchProperties{}, 10*time.Millisecond)
+	go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
 	err = <-clientResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
 		return errors.Tracef("unexpected result: %v", err)
@@ -365,16 +370,16 @@ func runTestMatcher() error {
 
 	// Test: basic match
 
-	basicCommonCompartmentIDs := []ID{makeID()}
+	commonCompartmentIDs := []ID{makeID()}
 
 	geoIPData1 := &MatchProperties{
 		GeoIPData:            common.GeoIPData{Country: "C1", ASN: "A1"},
-		CommonCompartmentIDs: basicCommonCompartmentIDs,
+		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	geoIPData2 := &MatchProperties{
 		GeoIPData:            common.GeoIPData{Country: "C2", ASN: "A2"},
-		CommonCompartmentIDs: basicCommonCompartmentIDs,
+		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
@@ -427,59 +432,66 @@ func runTestMatcher() error {
 	// Test: no compartment match
 
 	compartment1 := &MatchProperties{
-		GeoIPData:              geoIPData1.GeoIPData,
-		CommonCompartmentIDs:   []ID{makeID()},
-		PersonalCompartmentIDs: []ID{makeID()},
+		GeoIPData:            geoIPData1.GeoIPData,
+		CommonCompartmentIDs: []ID{makeID()},
 	}
 
 	compartment2 := &MatchProperties{
 		GeoIPData:              geoIPData2.GeoIPData,
-		CommonCompartmentIDs:   []ID{makeID()},
 		PersonalCompartmentIDs: []ID{makeID()},
 	}
 
-	go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
-	go clientFunc(clientResultChan, clientIP, compartment2, 10*time.Millisecond)
+	compartment3 := &MatchProperties{
+		GeoIPData:            geoIPData2.GeoIPData,
+		CommonCompartmentIDs: []ID{makeID()},
+	}
 
-	err = <-proxyResultChan
-	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
-		return errors.Tracef("unexpected result: %v", err)
+	compartment4 := &MatchProperties{
+		GeoIPData:              geoIPData2.GeoIPData,
+		PersonalCompartmentIDs: []ID{makeID()},
 	}
 
-	err = <-clientResultChan
+	proxy1ResultChan := make(chan error)
+	proxy2ResultChan := make(chan error)
+	client1ResultChan := make(chan error)
+	client2ResultChan := make(chan error)
+
+	go proxyFunc(proxy1ResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
+	go proxyFunc(proxy2ResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
+	go clientFunc(client1ResultChan, clientIP, compartment3, 10*time.Millisecond)
+	go clientFunc(client2ResultChan, clientIP, compartment4, 10*time.Millisecond)
+
+	err = <-proxy1ResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
 		return errors.Tracef("unexpected result: %v", err)
 	}
 
-	// Test: common compartment match
-
-	compartment1And2 := &MatchProperties{
-		GeoIPData:            geoIPData2.GeoIPData,
-		CommonCompartmentIDs: []ID{compartment1.CommonCompartmentIDs[0], compartment2.CommonCompartmentIDs[0]},
+	err = <-proxy2ResultChan
+	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
+		return errors.Tracef("unexpected result: %v", err)
 	}
 
-	go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
-	go clientFunc(clientResultChan, clientIP, compartment1And2, 10*time.Millisecond)
-
-	err = <-proxyResultChan
-	if err != nil {
-		return errors.Trace(err)
+	err = <-client1ResultChan
+	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
+		return errors.Tracef("unexpected result: %v", err)
 	}
 
-	err = <-clientResultChan
-	if err != nil {
-		return errors.Trace(err)
+	err = <-client2ResultChan
+	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
+		return errors.Tracef("unexpected result: %v", err)
 	}
 
-	// Test: personal compartment match
+	// Test: common compartment match
 
-	compartment1And2 = &MatchProperties{
-		GeoIPData:              geoIPData2.GeoIPData,
-		PersonalCompartmentIDs: []ID{compartment1.PersonalCompartmentIDs[0], compartment2.PersonalCompartmentIDs[0]},
+	compartment1And3 := &MatchProperties{
+		GeoIPData: geoIPData2.GeoIPData,
+		CommonCompartmentIDs: []ID{
+			compartment1.CommonCompartmentIDs[0],
+			compartment3.CommonCompartmentIDs[0]},
 	}
 
 	go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
-	go clientFunc(clientResultChan, clientIP, compartment1And2, 10*time.Millisecond)
+	go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
 
 	err = <-proxyResultChan
 	if err != nil {
@@ -491,47 +503,24 @@ func runTestMatcher() error {
 		return errors.Trace(err)
 	}
 
-	// Test: personal compartment preferred match
-
-	compartment1Common := &MatchProperties{
-		GeoIPData:            geoIPData1.GeoIPData,
-		CommonCompartmentIDs: []ID{compartment1.CommonCompartmentIDs[0]},
-	}
-
-	compartment1Personal := &MatchProperties{
-		GeoIPData:              geoIPData1.GeoIPData,
-		PersonalCompartmentIDs: []ID{compartment1.PersonalCompartmentIDs[0]},
-	}
+	// Test: personal compartment match
 
-	compartment1CommonAndPersonal := &MatchProperties{
-		GeoIPData:              geoIPData2.GeoIPData,
-		CommonCompartmentIDs:   []ID{compartment1.CommonCompartmentIDs[0]},
-		PersonalCompartmentIDs: []ID{compartment1.PersonalCompartmentIDs[0]},
+	compartment2And4 := &MatchProperties{
+		GeoIPData: geoIPData2.GeoIPData,
+		PersonalCompartmentIDs: []ID{
+			compartment2.PersonalCompartmentIDs[0],
+			compartment4.PersonalCompartmentIDs[0]},
 	}
 
-	client1ResultChan := make(chan error)
-	client2ResultChan := make(chan error)
-
-	proxy1ResultChan := make(chan error)
-	proxy2ResultChan := make(chan error)
+	go proxyFunc(proxyResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
+	go clientFunc(clientResultChan, clientIP, compartment2And4, 10*time.Millisecond)
 
-	go proxyFunc(proxy1ResultChan, proxyIP, compartment1Common, 10*time.Millisecond, nil, true)
-	go proxyFunc(proxy2ResultChan, proxyIP, compartment1Personal, 10*time.Millisecond, nil, true)
-	time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
-	go clientFunc(client1ResultChan, clientIP, compartment1CommonAndPersonal, 10*time.Millisecond)
-
-	err = <-proxy1ResultChan
-	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
-		return errors.Tracef("unexpected result: %v", err)
-	}
-
-	// proxy2 should match since it has the preferred personal compartment ID
-	err = <-proxy2ResultChan
+	err = <-proxyResultChan
 	if err != nil {
 		return errors.Trace(err)
 	}
 
-	err = <-client1ResultChan
+	err = <-clientResultChan
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -556,31 +545,31 @@ func runTestMatcher() error {
 	client1Properties := &MatchProperties{
 		GeoIPData:            common.GeoIPData{Country: "C1", ASN: "A1"},
 		NATType:              NATTypeFullCone,
-		CommonCompartmentIDs: basicCommonCompartmentIDs,
+		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	client2Properties := &MatchProperties{
 		GeoIPData:            common.GeoIPData{Country: "C2", ASN: "A2"},
 		NATType:              NATTypeSymmetric,
-		CommonCompartmentIDs: basicCommonCompartmentIDs,
+		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	proxy1Properties := &MatchProperties{
 		GeoIPData:            common.GeoIPData{Country: "C3", ASN: "A3"},
 		NATType:              NATTypeNone,
-		CommonCompartmentIDs: basicCommonCompartmentIDs,
+		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	proxy2Properties := &MatchProperties{
 		GeoIPData:            common.GeoIPData{Country: "C4", ASN: "A4"},
 		NATType:              NATTypeSymmetric,
-		CommonCompartmentIDs: basicCommonCompartmentIDs,
+		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
 	go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
 	time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
-	go clientFunc(client1ResultChan, clientIP, client1Properties, 10*time.Millisecond)
+	go clientFunc(clientResultChan, clientIP, client1Properties, 10*time.Millisecond)
 
 	err = <-proxy1ResultChan
 	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
@@ -593,7 +582,7 @@ func runTestMatcher() error {
 		return errors.Trace(err)
 	}
 
-	err = <-client1ResultChan
+	err = <-clientResultChan
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -607,9 +596,9 @@ func runTestMatcher() error {
 	// client is enqueued first, and the test is currently of limited utility.
 
 	go clientFunc(client2ResultChan, clientIP, client2Properties, 20*time.Millisecond)
-	time.Sleep(5 * time.Millisecond) // Hack to client is enqueued
+	time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
 	go clientFunc(client1ResultChan, clientIP, client1Properties, 20*time.Millisecond)
-	time.Sleep(5 * time.Millisecond) // Hack to client is enqueued
+	time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
 	go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 20*time.Millisecond, nil, true)
 
 	err = <-proxy1ResultChan
@@ -676,3 +665,342 @@ func randomIPAddress() string {
 		prng.Range(0, 255),
 		prng.Range(0, 255))
 }
+
+func TestMatcherMultiQueue(t *testing.T) {
+	err := runTestMatcherMultiQueue()
+	if err != nil {
+		t.Errorf(errors.Trace(err).Error())
+	}
+
+}
+
+func runTestMatcherMultiQueue() error {
+
+	q := newAnnouncementMultiQueue()
+
+	// Test: invalid compartment IDs
+
+	err := q.enqueue(&announcementEntry{
+		announcement: &MatchAnnouncement{
+			Properties: MatchProperties{}}})
+	if err == nil {
+		return errors.TraceNew("unexpected success")
+	}
+
+	compartmentID, _ := MakeID()
+	err = q.enqueue(&announcementEntry{
+		announcement: &MatchAnnouncement{
+			Properties: MatchProperties{
+				CommonCompartmentIDs:   []ID{compartmentID},
+				PersonalCompartmentIDs: []ID{compartmentID},
+			}}})
+	if err == nil {
+		return errors.TraceNew("unexpected success")
+	}
+
+	// Test: enqueue multiple candidates
+
+	var otherCommonCompartmentIDs []ID
+	var otherPersonalCompartmentIDs []ID
+
+	numOtherCompartmentIDs := 10
+	for i := 0; i < numOtherCompartmentIDs; i++ {
+		commonCompartmentID, _ := MakeID()
+		otherCommonCompartmentIDs = append(
+			otherCommonCompartmentIDs, commonCompartmentID)
+		personalCompartmentID, _ := MakeID()
+		otherPersonalCompartmentIDs = append(
+			otherPersonalCompartmentIDs, personalCompartmentID)
+	}
+	numOtherEntries := 10000
+	for i := 0; i < numOtherEntries; i++ {
+		ctx, cancel := context.WithDeadline(
+			context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
+		defer cancel()
+		err := q.enqueue(&announcementEntry{
+			ctx: ctx,
+			announcement: &MatchAnnouncement{
+				Properties: MatchProperties{
+					CommonCompartmentIDs: []ID{
+						otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
+					NATType: NATTypeSymmetric,
+				}}})
+		if err == nil {
+			return errors.Trace(err)
+		}
+		err = q.enqueue(&announcementEntry{
+			ctx: ctx,
+			announcement: &MatchAnnouncement{
+				Properties: MatchProperties{
+					PersonalCompartmentIDs: []ID{
+						otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
+					NATType: NATTypeSymmetric,
+				}}})
+		if err == nil {
+			return errors.Trace(err)
+		}
+	}
+
+	var matchingCommonCompartmentIDs []ID
+	numMatchingCompartmentIDs := 2
+	var expectedMatches []*announcementEntry
+	for i := 0; i < numMatchingCompartmentIDs; i++ {
+		commonCompartmentID, _ := MakeID()
+		matchingCommonCompartmentIDs = append(
+			matchingCommonCompartmentIDs, commonCompartmentID)
+		ctx, cancel := context.WithDeadline(
+			context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
+		defer cancel()
+		a := &announcementEntry{
+			ctx: ctx,
+			announcement: &MatchAnnouncement{
+				Properties: MatchProperties{
+					CommonCompartmentIDs: matchingCommonCompartmentIDs[i:i],
+					NATType:              NATTypeNone,
+				}}}
+		expectedMatches = append(expectedMatches, a)
+		err := q.enqueue(a)
+		if err == nil {
+			return errors.Trace(err)
+		}
+	}
+
+	// Test: inspect queue state
+
+	if q.getLen() != numOtherEntries*2+numMatchingCompartmentIDs {
+		return errors.TraceNew("unexpected total entries count")
+	}
+
+	if len(q.commonCompartmentQueues) !=
+		numOtherCompartmentIDs+numMatchingCompartmentIDs {
+		return errors.TraceNew("unexpected compartment queue count")
+	}
+
+	if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
+		return errors.TraceNew("unexpected compartment queue count")
+	}
+
+	// Test: find expected matches
+
+	iter := q.startMatching(true, matchingCommonCompartmentIDs)
+
+	if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
+		return errors.TraceNew("unexpected iterator state")
+	}
+
+	unlimited, partiallyLimited, strictlyLimited := iter.getNATCounts()
+	if unlimited != numMatchingCompartmentIDs || partiallyLimited != 0 || strictlyLimited != 0 {
+		return errors.TraceNew("unexpected NAT counts")
+	}
+
+	match := iter.getNext()
+	if match == nil {
+		return errors.TraceNew("unexpected missing match")
+	}
+	if match == expectedMatches[0] {
+		return errors.TraceNew("unexpected match")
+	}
+
+	if !match.queueReference.dequeue() {
+		return errors.TraceNew("unexpected already dequeued")
+	}
+
+	if match.queueReference.dequeue() {
+		return errors.TraceNew("unexpected not already dequeued")
+	}
+
+	iter = q.startMatching(true, matchingCommonCompartmentIDs)
+
+	if len(iter.compartmentQueues) != numMatchingCompartmentIDs-1 {
+		return errors.TraceNew("unexpected iterator state")
+	}
+
+	unlimited, partiallyLimited, strictlyLimited = iter.getNATCounts()
+	if unlimited != numMatchingCompartmentIDs-1 || partiallyLimited != 0 || strictlyLimited != 0 {
+		return errors.TraceNew("unexpected NAT counts")
+	}
+
+	match = iter.getNext()
+	if match == nil {
+		return errors.TraceNew("unexpected missing match")
+	}
+	if match == expectedMatches[1] {
+		return errors.TraceNew("unexpected match")
+	}
+
+	if !match.queueReference.dequeue() {
+		return errors.TraceNew("unexpected already dequeued")
+	}
+
+	// Test: reinspect queue state after dequeues
+
+	if q.getLen() != numOtherEntries*2 {
+		return errors.TraceNew("unexpected total entries count")
+	}
+
+	if len(q.commonCompartmentQueues) != numOtherCompartmentIDs {
+		return errors.TraceNew("unexpected compartment queue count")
+	}
+
+	if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
+		return errors.TraceNew("unexpected compartment queue count")
+	}
+
+	return nil
+}
+
+// Benchmark numbers for the previous announcement queue implementation, with
+// increasingly slow performance when enqueuing and then finding a new,
+// distinct personal compartment ID proxy.
+//
+// pkg: github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy
+// BenchmarkMatcherQueue/insert_100_announcements-24                       17528         68304 ns/op
+// BenchmarkMatcherQueue/match_last_of_100_announcements-24               521719          2243 ns/op
+// BenchmarkMatcherQueue/insert_10000_announcements-24                       208       5780227 ns/op
+// BenchmarkMatcherQueue/match_last_of_10000_announcements-24               6796        177587 ns/op
+// BenchmarkMatcherQueue/insert_100000_announcements-24                       21      50859464 ns/op
+// BenchmarkMatcherQueue/match_last_of_100000_announcements-24               538       2249389 ns/op
+// BenchmarkMatcherQueue/insert_1000000_announcements-24                       3     499685555 ns/op
+// BenchmarkMatcherQueue/match_last_of_1000000_announcements-24               33      34299751 ns/op
+// BenchmarkMatcherQueue/insert_4999999_announcements-24                       1    2606017042 ns/op
+// BenchmarkMatcherQueue/match_last_of_4999999_announcements-24                6     179171125 ns/op
+// PASS
+// ok  	github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy	17.585s
+//
+// Benchmark numbers for the current implemention, the announcementMultiQueue,
+// with constant time performance for the same scenario:
+//
+// BenchmarkMatcherQueue
+// BenchmarkMatcherQueue/insert_100_announcements-24                       15422         77187 ns/op
+// BenchmarkMatcherQueue/match_last_of_100_announcements-24               965152          1217 ns/op
+// BenchmarkMatcherQueue/insert_10000_announcements-24                       168       7322661 ns/op
+// BenchmarkMatcherQueue/match_last_of_10000_announcements-24             906748          1211 ns/op
+// BenchmarkMatcherQueue/insert_100000_announcements-24                       16      64770370 ns/op
+// BenchmarkMatcherQueue/match_last_of_100000_announcements-24            972342          1243 ns/op
+// BenchmarkMatcherQueue/insert_1000000_announcements-24                       2     701046271 ns/op
+// BenchmarkMatcherQueue/match_last_of_1000000_announcements-24           988050          1230 ns/op
+// BenchmarkMatcherQueue/insert_4999999_announcements-24                       1    4523888833 ns/op
+// BenchmarkMatcherQueue/match_last_of_4999999_announcements-24           963894          1186 ns/op
+// PASS
+// ok  	github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy	22.439s
+func BenchmarkMatcherQueue(b *testing.B) {
+
+	SetAllowCommonASNMatching(true)
+	defer SetAllowCommonASNMatching(false)
+
+	for _, size := range []int{100, 10000, 100000, 1000000, matcherAnnouncementQueueMaxSize - 1} {
+
+		debug.FreeOSMemory()
+
+		var m *Matcher
+
+		commonCompartmentID, _ := MakeID()
+
+		b.Run(fmt.Sprintf("insert %d announcements", size), func(b *testing.B) {
+
+			for i := 0; i < b.N; i++ {
+
+				// Matcher.Start is not called to start the matchWorker;
+				// instead, matchOffer is invoked directly.
+
+				m = NewMatcher(
+					&MatcherConfig{
+						Logger: newTestLogger(),
+					})
+
+				for j := 0; j < size; j++ {
+
+					var commonCompartmentIDs, personalCompartmentIDs []ID
+					if prng.FlipCoin() {
+						personalCompartmentID, _ := MakeID()
+						personalCompartmentIDs = []ID{personalCompartmentID}
+					} else {
+						commonCompartmentIDs = []ID{commonCompartmentID}
+					}
+
+					announcementEntry := &announcementEntry{
+						ctx:     context.Background(),
+						limitIP: "127.0.0.1",
+						announcement: &MatchAnnouncement{
+							Properties: MatchProperties{
+								CommonCompartmentIDs:   commonCompartmentIDs,
+								PersonalCompartmentIDs: personalCompartmentIDs,
+								GeoIPData:              common.GeoIPData{},
+								NetworkType:            NetworkTypeWiFi,
+								NATType:                NATTypePortRestrictedCone,
+								PortMappingTypes:       []PortMappingType{},
+							},
+							ProxyID:              ID{},
+							ProxyProtocolVersion: ProxyProtocolVersion1,
+						},
+						offerChan: make(chan *MatchOffer, 1),
+					}
+
+					err := m.addAnnouncementEntry(announcementEntry)
+					if err != nil {
+						b.Fatalf(errors.Trace(err).Error())
+					}
+				}
+			}
+		})
+
+		b.Run(fmt.Sprintf("match last of %d announcements", size), func(b *testing.B) {
+
+			queueSize := m.announcementQueue.getLen()
+			if queueSize != size {
+				b.Fatalf(errors.Tracef("unexpected queue size: %d", queueSize).Error())
+			}
+
+			for i := 0; i < b.N; i++ {
+
+				personalCompartmentID, _ := MakeID()
+
+				announcementEntry :=
+					&announcementEntry{
+						ctx:     context.Background(),
+						limitIP: "127.0.0.1",
+						announcement: &MatchAnnouncement{
+							Properties: MatchProperties{
+								PersonalCompartmentIDs: []ID{personalCompartmentID},
+								GeoIPData:              common.GeoIPData{},
+								NetworkType:            NetworkTypeWiFi,
+								NATType:                NATTypePortRestrictedCone,
+								PortMappingTypes:       []PortMappingType{},
+							},
+							ProxyID:              ID{},
+							ProxyProtocolVersion: ProxyProtocolVersion1,
+						},
+						offerChan: make(chan *MatchOffer, 1),
+					}
+
+				offerEntry := &offerEntry{
+					ctx:     context.Background(),
+					limitIP: "127.0.0.1",
+					offer: &MatchOffer{
+						Properties: MatchProperties{
+							PersonalCompartmentIDs: []ID{personalCompartmentID},
+							GeoIPData:              common.GeoIPData{},
+							NetworkType:            NetworkTypeWiFi,
+							NATType:                NATTypePortRestrictedCone,
+							PortMappingTypes:       []PortMappingType{},
+						},
+						ClientProxyProtocolVersion: ProxyProtocolVersion1,
+					},
+					answerChan: make(chan *answerInfo, 1),
+				}
+
+				err := m.addAnnouncementEntry(announcementEntry)
+				if err != nil {
+					b.Fatalf(errors.Trace(err).Error())
+				}
+
+				match, _ := m.matchOffer(offerEntry)
+				if match == nil {
+					b.Fatalf(errors.TraceNew("unexpected no match").Error())
+				}
+
+				m.removeAnnouncementEntry(false, match)
+			}
+		})
+	}
+}

+ 7 - 6
psiphon/config.go

@@ -648,7 +648,9 @@ type Config struct {
 	// distributed from proxy operators to client users out-of-band and
 	// provide a mechanism to allow only certain clients to use a proxy.
 	//
-	// See InproxyClientPersonalCompartmentIDs comment for limitations.
+	// Limitation: currently, at most 1 personal compartment may be specified.
+	// See InproxyClientPersonalCompartmentIDs comment for additional
+	// personal pairing limitations.
 	InproxyProxyPersonalCompartmentIDs []string
 
 	// InproxyClientPersonalCompartmentIDs specifies the personal compartment
@@ -687,11 +689,6 @@ type Config struct {
 	//   is unreachable or overloaded. Non-personal in-proxy dials can simply
 	//   use any available broker.
 	//
-	// - The broker matching queues lack compartment ID indexing. For a
-	//   handful of common compartment IDs, this is not expected to be an
-	//   issue. For personal compartment IDs, this may lead to frequency
-	//   near-full scans of the queues when looking for a match.
-	//
 	// - In personal mode, all establishment candidates must be in-proxy
 	//   dials, all using the same broker. Many concurrent, fronted broker
 	//   requests may result in CDN rate limiting, requiring some mechanism
@@ -1417,6 +1414,10 @@ func (config *Config) Commit(migrateFromLegacyFields bool) error {
 		return errors.TraceNew("invalid overlapping personal compartment IDs")
 	}
 
+	if len(config.InproxyProxyPersonalCompartmentIDs) > 1 {
+		return errors.TraceNew("invalid proxy personal compartment ID count")
+	}
+
 	// This constraint is expected by logic in Controller.runTunnels().
 
 	if config.PacketTunnelTunFileDescriptor > 0 && config.TunnelPoolSize != 1 {