|
|
@@ -19,6 +19,7 @@
|
|
|
package inproxy
|
|
|
|
|
|
import (
|
|
|
+ "container/list"
|
|
|
"context"
|
|
|
std_errors "errors"
|
|
|
"net"
|
|
|
@@ -29,7 +30,6 @@ import (
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
|
|
|
lrucache "github.com/cognusion/go-cache-lru"
|
|
|
- "github.com/gammazero/deque"
|
|
|
"golang.org/x/time/rate"
|
|
|
)
|
|
|
|
|
|
@@ -40,6 +40,7 @@ const (
|
|
|
matcherOfferQueueMaxSize = 5000000
|
|
|
matcherPendingAnswersTTL = 30 * time.Second
|
|
|
matcherPendingAnswersMaxSize = 100000
|
|
|
+ matcherMaxPreferredNATProbe = 100
|
|
|
|
|
|
matcherRateLimiterReapHistoryFrequencySeconds = 300
|
|
|
matcherRateLimiterMaxCacheEntries = 1000000
|
|
|
@@ -53,12 +54,10 @@ const (
|
|
|
// as they are closest to timing out.
|
|
|
//
|
|
|
// The client and proxy must supply matching personal or common compartment
|
|
|
-// IDs. Personal compartment matching is preferred. Common compartments are
|
|
|
-// managed by Psiphon and can be obtained via a tactics parameter or via an
|
|
|
-// OSL embedding.
|
|
|
-//
|
|
|
-// A client may opt form personal-only matching by not supplying any common
|
|
|
-// compartment IDs.
|
|
|
+// IDs. Common compartments are managed by Psiphon and can be obtained via a
|
|
|
+// tactics parameter or via an OSL embedding. Each proxy announcement or
|
|
|
+// client offer may specify only one compartment ID type, either common or
|
|
|
+// personal.
|
|
|
//
|
|
|
// Matching prefers to pair proxies and clients in a way that maximizes total
|
|
|
// possible matches. For a client or proxy with less-limited NAT traversal, a
|
|
|
@@ -86,25 +85,21 @@ type Matcher struct {
|
|
|
|
|
|
// TODO: replace queue and counts with an indexed, in-memory database?
|
|
|
|
|
|
- announcementQueueMutex sync.Mutex
|
|
|
- announcementQueue *deque.Deque[*announcementEntry]
|
|
|
- announcementQueueEntryCountByIP map[string]int
|
|
|
- announcementQueueRateLimiters *lrucache.Cache
|
|
|
- announcementLimitEntryCount int
|
|
|
- announcementRateLimitQuantity int
|
|
|
- announcementRateLimitInterval time.Duration
|
|
|
- announcementNonlimitedProxyIDs map[ID]struct{}
|
|
|
- announcementsPersonalCompartmentalizedCount int
|
|
|
- announcementsUnlimitedNATCount int
|
|
|
- announcementsPartiallyLimitedNATCount int
|
|
|
- announcementsStrictlyLimitedNATCount int
|
|
|
+ announcementQueueMutex sync.Mutex
|
|
|
+ announcementQueue *announcementMultiQueue
|
|
|
+ announcementQueueEntryCountByIP map[string]int
|
|
|
+ announcementQueueRateLimiters *lrucache.Cache
|
|
|
+ announcementLimitEntryCount int
|
|
|
+ announcementRateLimitQuantity int
|
|
|
+ announcementRateLimitInterval time.Duration
|
|
|
+ announcementNonlimitedProxyIDs map[ID]struct{}
|
|
|
|
|
|
// The offer queue is also implicitly sorted by offer age. Both an offer
|
|
|
// and announcement queue are required since either announcements or
|
|
|
// offers can arrive while there are no available pairings.
|
|
|
|
|
|
offerQueueMutex sync.Mutex
|
|
|
- offerQueue *deque.Deque[*offerEntry]
|
|
|
+ offerQueue *list.List
|
|
|
offerQueueEntryCountByIP map[string]int
|
|
|
offerQueueRateLimiters *lrucache.Cache
|
|
|
offerLimitEntryCount int
|
|
|
@@ -166,12 +161,6 @@ func (p *MatchProperties) IsPreferredNATMatch(
|
|
|
peerMatchProperties.EffectiveNATType())
|
|
|
}
|
|
|
|
|
|
-// IsPersonalCompartmentalized indicates whether the candidate has personal
|
|
|
-// compartment IDs.
|
|
|
-func (p *MatchProperties) IsPersonalCompartmentalized() bool {
|
|
|
- return len(p.PersonalCompartmentIDs) > 0
|
|
|
-}
|
|
|
-
|
|
|
// MatchAnnouncement is a proxy announcement to be queued for matching.
|
|
|
type MatchAnnouncement struct {
|
|
|
Properties MatchProperties
|
|
|
@@ -233,6 +222,10 @@ type announcementEntry struct {
|
|
|
announcement *MatchAnnouncement
|
|
|
offerChan chan *MatchOffer
|
|
|
matchMetrics atomic.Value
|
|
|
+
|
|
|
+ // queueReference is initialized by addAnnouncementEntry, and used to
|
|
|
+ // efficiently dequeue the entry.
|
|
|
+ queueReference announcementQueueReference
|
|
|
}
|
|
|
|
|
|
func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
|
|
|
@@ -248,6 +241,10 @@ type offerEntry struct {
|
|
|
offer *MatchOffer
|
|
|
answerChan chan *answerInfo
|
|
|
matchMetrics atomic.Value
|
|
|
+
|
|
|
+ // queueReference is initialized by addOfferEntry, and used to efficiently
|
|
|
+ // dequeue the entry.
|
|
|
+ queueReference *list.Element
|
|
|
}
|
|
|
|
|
|
func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
|
|
|
@@ -294,14 +291,14 @@ func NewMatcher(config *MatcherConfig) *Matcher {
|
|
|
|
|
|
waitGroup: new(sync.WaitGroup),
|
|
|
|
|
|
- announcementQueue: deque.New[*announcementEntry](),
|
|
|
+ announcementQueue: newAnnouncementMultiQueue(),
|
|
|
announcementQueueEntryCountByIP: make(map[string]int),
|
|
|
announcementQueueRateLimiters: lrucache.NewWithLRU(
|
|
|
0,
|
|
|
time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
|
|
|
matcherRateLimiterMaxCacheEntries),
|
|
|
|
|
|
- offerQueue: deque.New[*offerEntry](),
|
|
|
+ offerQueue: list.New(),
|
|
|
offerQueueEntryCountByIP: make(map[string]int),
|
|
|
offerQueueRateLimiters: lrucache.NewWithLRU(
|
|
|
0,
|
|
|
@@ -406,6 +403,10 @@ func (m *Matcher) Stop() {
|
|
|
// with a returned offer or ctx is done. The caller must not mutate the
|
|
|
// announcement or its properties after calling Announce.
|
|
|
//
|
|
|
+// Announce assumes that the ctx.Deadline for each call is monotonically
|
|
|
+// increasing and that the deadline can be used as part of selecting the next
|
|
|
+// nearest-to-expire announcement.
|
|
|
+//
|
|
|
// The offer is sent to the proxy by the broker, and then the proxy sends its
|
|
|
// answer back to the broker, which calls Answer with that value.
|
|
|
//
|
|
|
@@ -416,6 +417,20 @@ func (m *Matcher) Announce(
|
|
|
proxyIP string,
|
|
|
proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
|
|
|
|
|
|
+ // An announcement must specify exactly one compartment ID, of one type,
|
|
|
+ // common or personal. The limit of one is currently a limitation of the
|
|
|
+ // multi-queue implementation; see comment in
|
|
|
+ // announcementMultiQueue.enqueue.
|
|
|
+ compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
|
|
|
+ if len(compartmentIDs) == 0 {
|
|
|
+ compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
|
|
|
+ } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
|
|
|
+ return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
|
|
|
+ }
|
|
|
+ if len(compartmentIDs) != 1 {
|
|
|
+ return nil, nil, errors.TraceNew("unexpected compartment ID count")
|
|
|
+ }
|
|
|
+
|
|
|
announcementEntry := &announcementEntry{
|
|
|
ctx: ctx,
|
|
|
limitIP: getRateLimitIP(proxyIP),
|
|
|
@@ -434,7 +449,7 @@ func (m *Matcher) Announce(
|
|
|
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
- m.removeAnnouncementEntry(announcementEntry)
|
|
|
+ m.removeAnnouncementEntry(true, announcementEntry)
|
|
|
return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
|
|
|
|
|
|
case clientOffer = <-announcementEntry.offerChan:
|
|
|
@@ -458,6 +473,18 @@ func (m *Matcher) Offer(
|
|
|
clientIP string,
|
|
|
clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
|
|
|
|
|
|
+ // An offer must specify at least one compartment ID, and may only specify
|
|
|
+ // one type, common or personal, of compartment IDs.
|
|
|
+ compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
|
|
|
+ if len(compartmentIDs) == 0 {
|
|
|
+ compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
|
|
|
+ } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
|
|
|
+ return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
|
|
|
+ }
|
|
|
+ if len(compartmentIDs) < 1 {
|
|
|
+ return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
|
|
|
+ }
|
|
|
+
|
|
|
offerEntry := &offerEntry{
|
|
|
ctx: ctx,
|
|
|
limitIP: getRateLimitIP(clientIP),
|
|
|
@@ -476,7 +503,7 @@ func (m *Matcher) Offer(
|
|
|
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
- m.removeOfferEntry(offerEntry)
|
|
|
+ m.removeOfferEntry(true, offerEntry)
|
|
|
|
|
|
// TODO: also remove any pendingAnswers entry? The entry TTL is set to
|
|
|
// the Offer ctx, the client request, timeout, so it will eventually
|
|
|
@@ -511,6 +538,31 @@ func (m *Matcher) Offer(
|
|
|
nil
|
|
|
}
|
|
|
|
|
|
+// AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
|
|
|
+// announcement identified by the specified proxy ID and connection ID and
|
|
|
+// returns whether the announcement has personal compartment IDs, indicating
|
|
|
+// personal pairing mode.
|
|
|
+//
|
|
|
+// If no pending answer is found, an error is returned.
|
|
|
+func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
|
|
|
+ proxyID ID, connectionID ID) (bool, error) {
|
|
|
+
|
|
|
+ key := m.pendingAnswerKey(proxyID, connectionID)
|
|
|
+ pendingAnswerValue, ok := m.pendingAnswers.Get(key)
|
|
|
+ if !ok {
|
|
|
+ // The input IDs don't correspond to a pending answer, or the client
|
|
|
+ // is no longer awaiting the response.
|
|
|
+ return false, errors.TraceNew("no pending answer")
|
|
|
+ }
|
|
|
+
|
|
|
+ pendingAnswer := pendingAnswerValue.(*pendingAnswer)
|
|
|
+
|
|
|
+ hasPersonalCompartmentIDs := len(
|
|
|
+ pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
|
|
|
+
|
|
|
+ return hasPersonalCompartmentIDs, nil
|
|
|
+}
|
|
|
+
|
|
|
// Answer delivers an answer from the proxy for a previously matched offer.
|
|
|
// The ProxyID and ConnectionID must correspond to the original announcement.
|
|
|
// The caller must not mutate the answer after calling Answer. Answer does
|
|
|
@@ -524,8 +576,9 @@ func (m *Matcher) Answer(
|
|
|
key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
|
|
|
pendingAnswerValue, ok := m.pendingAnswers.Get(key)
|
|
|
if !ok {
|
|
|
- // The client is no longer awaiting the response.
|
|
|
- return errors.TraceNew("no client")
|
|
|
+ // The input IDs don't correspond to a pending answer, or the client
|
|
|
+ // is no longer awaiting the response.
|
|
|
+ return errors.TraceNew("no pending answer")
|
|
|
}
|
|
|
|
|
|
m.pendingAnswers.Delete(key)
|
|
|
@@ -589,44 +642,45 @@ func (m *Matcher) matchAllOffers() {
|
|
|
// TODO: consider matching one offer, then releasing the locks to allow
|
|
|
// more announcements to be enqueued, then continuing to match.
|
|
|
|
|
|
- i := 0
|
|
|
- end := m.offerQueue.Len()
|
|
|
+ nextOffer := m.offerQueue.Front()
|
|
|
+ offerIndex := -1
|
|
|
+
|
|
|
+ for nextOffer != nil && m.announcementQueue.getLen() > 0 {
|
|
|
+
|
|
|
+ offerIndex += 1
|
|
|
|
|
|
- for i < end && m.announcementQueue.Len() > 0 {
|
|
|
+ // nextOffer.Next must be invoked before any removeOfferEntry since
|
|
|
+ // container/list.remove clears list.Element.next.
|
|
|
+ offer := nextOffer
|
|
|
+ nextOffer = nextOffer.Next()
|
|
|
|
|
|
- offerEntry := m.offerQueue.At(i)
|
|
|
+ offerEntry := offer.Value.(*offerEntry)
|
|
|
|
|
|
// Skip and remove this offer if its deadline has already passed.
|
|
|
// There is no signal to the awaiting Offer function, as it will exit
|
|
|
// based on the same ctx.
|
|
|
|
|
|
if offerEntry.ctx.Err() != nil {
|
|
|
- m.removeOfferEntryByIndex(i)
|
|
|
- end -= 1
|
|
|
+ m.removeOfferEntry(false, offerEntry)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- j, ok := m.matchOffer(offerEntry)
|
|
|
- if !ok {
|
|
|
-
|
|
|
- // No match, so leave this offer in place in the queue and move to
|
|
|
- // the next.
|
|
|
-
|
|
|
- i++
|
|
|
+ announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
|
|
|
+ if announcementEntry == nil {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- // Get the matched announcement entry.
|
|
|
-
|
|
|
- announcementEntry := m.announcementQueue.At(j)
|
|
|
-
|
|
|
// Record match metrics.
|
|
|
|
|
|
+ // The index metrics predate the announcement multi-queue; now, with
|
|
|
+ // the multi-queue, announcement_index is how many announce entries
|
|
|
+ // were inspected before matching.
|
|
|
+
|
|
|
matchMetrics := &MatchMetrics{
|
|
|
- OfferMatchIndex: i,
|
|
|
+ OfferMatchIndex: offerIndex,
|
|
|
OfferQueueSize: m.offerQueue.Len(),
|
|
|
- AnnouncementMatchIndex: j,
|
|
|
- AnnouncementQueueSize: m.announcementQueue.Len(),
|
|
|
+ AnnouncementMatchIndex: announcementMatchIndex,
|
|
|
+ AnnouncementQueueSize: m.announcementQueue.getLen(),
|
|
|
}
|
|
|
|
|
|
offerEntry.matchMetrics.Store(matchMetrics)
|
|
|
@@ -639,6 +693,8 @@ func (m *Matcher) matchAllOffers() {
|
|
|
// entry is set to the matched Offer call's ctx, as the answer is
|
|
|
// only useful as long as the client is still waiting.
|
|
|
|
|
|
+ m.removeAnnouncementEntry(false, announcementEntry)
|
|
|
+
|
|
|
expiry := lrucache.DefaultExpiration
|
|
|
deadline, ok := offerEntry.ctx.Deadline()
|
|
|
if ok {
|
|
|
@@ -659,24 +715,22 @@ func (m *Matcher) matchAllOffers() {
|
|
|
|
|
|
announcementEntry.offerChan <- offerEntry.offer
|
|
|
|
|
|
- m.removeAnnouncementEntryByIndex(j)
|
|
|
-
|
|
|
// Remove the matched offer from the queue and match the next offer,
|
|
|
// now first in the queue.
|
|
|
|
|
|
- m.removeOfferEntryByIndex(i)
|
|
|
-
|
|
|
- end -= 1
|
|
|
+ m.removeOfferEntry(false, offerEntry)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
|
|
|
+func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
|
|
|
|
|
|
// Assumes the caller has the queue mutexed locked.
|
|
|
|
|
|
- // Check each announcement in turn, and select a match. There is an
|
|
|
- // implicit preference for older proxy announcements, sooner to timeout,
|
|
|
- // at the front of the queue.
|
|
|
+ // Check each candidate announcement in turn, and select a match. There is
|
|
|
+ // an implicit preference for older proxy announcements, sooner to
|
|
|
+ // timeout, at the front of the enqueued announcements.
|
|
|
+ // announcementMultiQueue.startMatching skips to the first matching
|
|
|
+ // compartment ID(s).
|
|
|
//
|
|
|
// Limitation: since this logic matches each enqueued client in turn, it will
|
|
|
// only make the optimal NAT match for the oldest enqueued client vs. all
|
|
|
@@ -692,56 +746,58 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
|
|
|
|
|
|
offerProperties := &offerEntry.offer.Properties
|
|
|
|
|
|
+ // Assumes the caller checks that offer specifies either personal
|
|
|
+ // compartment IDs or common compartment IDs, but not both.
|
|
|
+ isCommonCompartments := false
|
|
|
+ compartmentIDs := offerProperties.PersonalCompartmentIDs
|
|
|
+ if len(compartmentIDs) == 0 {
|
|
|
+ isCommonCompartments = true
|
|
|
+ compartmentIDs = offerProperties.CommonCompartmentIDs
|
|
|
+ }
|
|
|
+ if len(compartmentIDs) == 0 {
|
|
|
+ return nil, -1
|
|
|
+ }
|
|
|
+
|
|
|
+ matchIterator := m.announcementQueue.startMatching(
|
|
|
+ isCommonCompartments, compartmentIDs)
|
|
|
+
|
|
|
// Use the NAT traversal type counters to check if there's any preferred
|
|
|
// NAT match for this offer in the announcement queue. When there is, we
|
|
|
// will search beyond the first announcement.
|
|
|
|
|
|
+ unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
|
|
|
+ matchIterator.getNATCounts()
|
|
|
+
|
|
|
existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
|
|
|
- m.announcementsUnlimitedNATCount > 0,
|
|
|
- m.announcementsPartiallyLimitedNATCount > 0,
|
|
|
- m.announcementsStrictlyLimitedNATCount > 0)
|
|
|
+ unlimitedNATCount > 0,
|
|
|
+ partiallyLimitedNATCount > 0,
|
|
|
+ strictlyLimitedNATCount > 0)
|
|
|
|
|
|
- bestMatch := -1
|
|
|
+ var bestMatch *announcementEntry
|
|
|
+ bestMatchIndex := -1
|
|
|
bestMatchNAT := false
|
|
|
- bestMatchCompartment := false
|
|
|
-
|
|
|
- end := m.announcementQueue.Len()
|
|
|
|
|
|
- // TODO: add queue indexing to facilitate skipping ahead to a matching
|
|
|
- // personal compartment ID, if any, when personal-only matching is
|
|
|
- // required. Personal matching may often require near-full queue scans
|
|
|
- // when looking for a match. Common compartment matching may also benefit
|
|
|
- // from indexing, although with a handful of common compartment IDs more
|
|
|
- // or less uniformly distributed, frequent long scans are not expected in
|
|
|
- // practise.
|
|
|
+ candidateIndex := -1
|
|
|
+ for {
|
|
|
|
|
|
- for i := 0; i < end; i++ {
|
|
|
+ announcementEntry := matchIterator.getNext()
|
|
|
+ if announcementEntry == nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
|
|
|
- announcementEntry := m.announcementQueue.At(i)
|
|
|
+ candidateIndex += 1
|
|
|
|
|
|
// Skip and remove this announcement if its deadline has already
|
|
|
// passed. There is no signal to the awaiting Announce function, as
|
|
|
// it will exit based on the same ctx.
|
|
|
|
|
|
if announcementEntry.ctx.Err() != nil {
|
|
|
- m.removeAnnouncementEntryByIndex(i)
|
|
|
- end -= 1
|
|
|
+ m.removeAnnouncementEntry(false, announcementEntry)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
announcementProperties := &announcementEntry.announcement.Properties
|
|
|
|
|
|
- // There must be a compartment match. If there is a personal
|
|
|
- // compartment match, this match will be preferred.
|
|
|
-
|
|
|
- matchCommonCompartment := HaveCommonIDs(
|
|
|
- announcementProperties.CommonCompartmentIDs, offerProperties.CommonCompartmentIDs)
|
|
|
- matchPersonalCompartment := HaveCommonIDs(
|
|
|
- announcementProperties.PersonalCompartmentIDs, offerProperties.PersonalCompartmentIDs)
|
|
|
- if !matchCommonCompartment && !matchPersonalCompartment {
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
// Disallow matching the same country and ASN, except for personal
|
|
|
// compartment ID matches.
|
|
|
//
|
|
|
@@ -749,7 +805,7 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
|
|
|
// have no circumvention benefit. For personal matching, the user may
|
|
|
// wish to hop their their own or their friend's proxy regardless.
|
|
|
|
|
|
- if !matchPersonalCompartment &&
|
|
|
+ if isCommonCompartments &&
|
|
|
!GetAllowCommonASNMatching() &&
|
|
|
(offerProperties.GeoIPData.Country ==
|
|
|
announcementProperties.GeoIPData.Country &&
|
|
|
@@ -766,49 +822,29 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
|
|
|
matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
|
|
|
|
|
|
// At this point, the candidate is a match. Determine if this is a new
|
|
|
- // best match.
|
|
|
+ // best match, either if there was no previous match, or this is a
|
|
|
+ // better NAT match.
|
|
|
|
|
|
- if bestMatch == -1 {
|
|
|
+ if bestMatch == nil || (!bestMatchNAT && matchNAT) {
|
|
|
|
|
|
- // This is a match, and there was no previous match, so it becomes
|
|
|
- // the provisional best match.
|
|
|
-
|
|
|
- bestMatch = i
|
|
|
+ bestMatch = announcementEntry
|
|
|
+ bestMatchIndex = candidateIndex
|
|
|
bestMatchNAT = matchNAT
|
|
|
- bestMatchCompartment = matchPersonalCompartment
|
|
|
-
|
|
|
- } else if !bestMatchNAT && matchNAT {
|
|
|
-
|
|
|
- // If there was a previous best match which was not a preferred
|
|
|
- // NAT match, this becomes the new best match. The preferred NAT
|
|
|
- // match is prioritized over personal compartment matching.
|
|
|
-
|
|
|
- bestMatch = i
|
|
|
- bestMatchNAT = true
|
|
|
- bestMatchCompartment = matchPersonalCompartment
|
|
|
-
|
|
|
- } else if !bestMatchCompartment && matchPersonalCompartment && (!bestMatchNAT || matchNAT) {
|
|
|
-
|
|
|
- // If there was a previous best match which was not a personal
|
|
|
- // compartment match, and as long as this match doesn't undo a
|
|
|
- // better NAT match, this becomes the new best match.
|
|
|
|
|
|
- bestMatch = i
|
|
|
- bestMatchNAT = matchNAT
|
|
|
- bestMatchCompartment = true
|
|
|
}
|
|
|
|
|
|
- // Stop as soon as we have the best possible match.
|
|
|
+ // Stop as soon as we have the best possible match, or have reached
|
|
|
+ // the probe limit for preferred NAT matches.
|
|
|
+
|
|
|
+ if bestMatch != nil && (bestMatchNAT ||
|
|
|
+ !existsPreferredNATMatch ||
|
|
|
+ candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
|
|
|
|
|
|
- if (bestMatchNAT || !existsPreferredNATMatch) &&
|
|
|
- (matchPersonalCompartment ||
|
|
|
- m.announcementsPersonalCompartmentalizedCount == 0 ||
|
|
|
- len(offerProperties.PersonalCompartmentIDs) == 0) {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return bestMatch, bestMatch != -1
|
|
|
+ return bestMatch, bestMatchIndex
|
|
|
}
|
|
|
|
|
|
// MatcherLimitError is the error type returned by Announce or Offer when the
|
|
|
@@ -903,7 +939,7 @@ func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) err
|
|
|
defer m.announcementQueueMutex.Unlock()
|
|
|
|
|
|
// Ensure the queue doesn't grow larger than the max size.
|
|
|
- if m.announcementQueue.Len() >= matcherAnnouncementQueueMaxSize {
|
|
|
+ if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
|
|
|
return errors.TraceNew("queue full")
|
|
|
}
|
|
|
|
|
|
@@ -916,11 +952,20 @@ func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) err
|
|
|
return errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- m.announcementQueue.PushBack(announcementEntry)
|
|
|
+ // announcementEntry.queueReference should be uninitialized.
|
|
|
+ // announcementMultiQueue.enqueue sets queueReference to be used for
|
|
|
+ // efficient dequeuing.
|
|
|
|
|
|
- m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
|
|
|
+ if announcementEntry.queueReference.entry != nil {
|
|
|
+ return errors.TraceNew("unexpected queue reference")
|
|
|
+ }
|
|
|
+
|
|
|
+ err = m.announcementQueue.enqueue(announcementEntry)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Trace(err)
|
|
|
+ }
|
|
|
|
|
|
- m.adjustAnnouncementCounts(announcementEntry, 1)
|
|
|
+ m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
|
|
|
|
|
|
select {
|
|
|
case m.matchSignal <- struct{}{}:
|
|
|
@@ -930,20 +975,27 @@ func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) err
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry) {
|
|
|
+func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
|
|
|
|
|
|
- m.announcementQueueMutex.Lock()
|
|
|
- defer m.announcementQueueMutex.Unlock()
|
|
|
+ // In the aborting case, the queue isn't already locked. Otherise, assume
|
|
|
+ // it is locked.
|
|
|
+ if aborting {
|
|
|
+ m.announcementQueueMutex.Lock()
|
|
|
+ defer m.announcementQueueMutex.Unlock()
|
|
|
+ }
|
|
|
|
|
|
- found := false
|
|
|
- for i := 0; i < m.announcementQueue.Len(); i++ {
|
|
|
- if m.announcementQueue.At(i) == announcementEntry {
|
|
|
- m.removeAnnouncementEntryByIndex(i)
|
|
|
- found = true
|
|
|
- break
|
|
|
+ found := announcementEntry.queueReference.dequeue()
|
|
|
+
|
|
|
+ if found {
|
|
|
+ // Adjust entry counts by peer IP, used to enforce
|
|
|
+ // matcherAnnouncementQueueMaxEntriesPerIP.
|
|
|
+ m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
|
|
|
+ if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
|
|
|
+ delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
|
|
|
}
|
|
|
}
|
|
|
- if !found {
|
|
|
+
|
|
|
+ if aborting && !found {
|
|
|
|
|
|
// The Announce call is aborting and taking its entry back out of the
|
|
|
// queue. If the entry is not found in the queue, then a concurrent
|
|
|
@@ -964,45 +1016,6 @@ func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (m *Matcher) removeAnnouncementEntryByIndex(i int) {
|
|
|
-
|
|
|
- // Assumes s.announcementQueueMutex lock is held.
|
|
|
-
|
|
|
- announcementEntry := m.announcementQueue.At(i)
|
|
|
-
|
|
|
- // This should be only direct call to Remove, as following adjustments
|
|
|
- // must always be made when removing.
|
|
|
- m.announcementQueue.Remove(i)
|
|
|
-
|
|
|
- // Adjust entry counts by peer IP, used to enforce
|
|
|
- // matcherAnnouncementQueueMaxEntriesPerIP.
|
|
|
- m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
|
|
|
- if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
|
|
|
- delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
|
|
|
- }
|
|
|
-
|
|
|
- m.adjustAnnouncementCounts(announcementEntry, -1)
|
|
|
-}
|
|
|
-
|
|
|
-func (m *Matcher) adjustAnnouncementCounts(
|
|
|
- announcementEntry *announcementEntry, delta int) {
|
|
|
-
|
|
|
- // Assumes s.announcementQueueMutex lock is held.
|
|
|
-
|
|
|
- if announcementEntry.announcement.Properties.IsPersonalCompartmentalized() {
|
|
|
- m.announcementsPersonalCompartmentalizedCount += delta
|
|
|
- }
|
|
|
-
|
|
|
- switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
|
|
|
- case NATTraversalUnlimited:
|
|
|
- m.announcementsUnlimitedNATCount += delta
|
|
|
- case NATTraversalPartiallyLimited:
|
|
|
- m.announcementsPartiallyLimitedNATCount += delta
|
|
|
- case NATTraversalStrictlyLimited:
|
|
|
- m.announcementsStrictlyLimitedNATCount += delta
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
|
|
|
|
|
|
m.offerQueueMutex.Lock()
|
|
|
@@ -1022,7 +1035,14 @@ func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
|
|
|
return errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- m.offerQueue.PushBack(offerEntry)
|
|
|
+ // offerEntry.queueReference should be uninitialized and is set here to be
|
|
|
+ // used for efficient dequeuing.
|
|
|
+
|
|
|
+ if offerEntry.queueReference != nil {
|
|
|
+ return errors.TraceNew("unexpected queue reference")
|
|
|
+ }
|
|
|
+
|
|
|
+ offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
|
|
|
|
|
|
m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
|
|
|
|
|
|
@@ -1034,28 +1054,22 @@ func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (m *Matcher) removeOfferEntry(offerEntry *offerEntry) {
|
|
|
-
|
|
|
- m.offerQueueMutex.Lock()
|
|
|
- defer m.offerQueueMutex.Unlock()
|
|
|
+func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
|
|
|
|
|
|
- for i := 0; i < m.offerQueue.Len(); i++ {
|
|
|
- if m.offerQueue.At(i) == offerEntry {
|
|
|
- m.removeOfferEntryByIndex(i)
|
|
|
- break
|
|
|
- }
|
|
|
+ // In the aborting case, the queue isn't already locked. Otherise, assume
|
|
|
+ // it is locked.
|
|
|
+ if aborting {
|
|
|
+ m.offerQueueMutex.Lock()
|
|
|
+ defer m.offerQueueMutex.Unlock()
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-func (m *Matcher) removeOfferEntryByIndex(i int) {
|
|
|
-
|
|
|
- // Assumes s.offerQueueMutex lock is held.
|
|
|
+ if offerEntry.queueReference == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- offerEntry := m.offerQueue.At(i)
|
|
|
+ m.offerQueue.Remove(offerEntry.queueReference)
|
|
|
|
|
|
- // This should be only direct call to Remove, as following adjustments
|
|
|
- // must always be made when removing.
|
|
|
- m.offerQueue.Remove(i)
|
|
|
+ offerEntry.queueReference = nil
|
|
|
|
|
|
// Adjust entry counts by peer IP, used to enforce
|
|
|
// matcherOfferQueueMaxEntriesPerIP.
|
|
|
@@ -1087,3 +1101,260 @@ func getRateLimitIP(strIP string) string {
|
|
|
// or /56, so rate limit by /56.
|
|
|
return IP.Mask(net.CIDRMask(56, 128)).String()
|
|
|
}
|
|
|
+
|
|
|
+// announcementMultiQueue is a set of announcement queues, one per common or
|
|
|
+// personal compartment ID, providing efficient iteration over announcements
|
|
|
+// matching a specified list of compartment IDs. announcementMultiQueue and
|
|
|
+// its underlying data structures are not safe for concurrent access.
|
|
|
+type announcementMultiQueue struct {
|
|
|
+ commonCompartmentQueues map[ID]*announcementCompartmentQueue
|
|
|
+ personalCompartmentQueues map[ID]*announcementCompartmentQueue
|
|
|
+ totalEntries int
|
|
|
+}
|
|
|
+
|
|
|
+// announcementCompartmentQueue is a single compartment queue within an
|
|
|
+// announcementMultiQueue. The queue is implemented using a doubly-linked
|
|
|
+// list, which provides efficient insert and mid-queue dequeue operations.
|
|
|
+// The announcementCompartmentQueue also records NAT type stats for enqueued
|
|
|
+// announcements, which are used, when matching, to determine when better NAT
|
|
|
+// matches may be possible.
|
|
|
+type announcementCompartmentQueue struct {
|
|
|
+ entries *list.List
|
|
|
+ unlimitedNATCount int
|
|
|
+ partiallyLimitedNATCount int
|
|
|
+ strictlyLimitedNATCount int
|
|
|
+}
|
|
|
+
|
|
|
+// announcementMatchIterator represents the state of an iteration over a
|
|
|
+// subset of announcementMultiQueue compartment queues. Concurrent
|
|
|
+// announcementMatchIterators are not supported.
|
|
|
+type announcementMatchIterator struct {
|
|
|
+ multiQueue *announcementMultiQueue
|
|
|
+ isCommonCompartments bool
|
|
|
+ compartmentQueues []*announcementCompartmentQueue
|
|
|
+ compartmentIDs []ID
|
|
|
+ nextEntries []*list.Element
|
|
|
+}
|
|
|
+
|
|
|
+// announcementQueueReference represents the queue position for a given
|
|
|
+// announcement entry, and provides an efficient dequeue operation.
|
|
|
+type announcementQueueReference struct {
|
|
|
+ multiQueue *announcementMultiQueue
|
|
|
+ compartmentQueue *announcementCompartmentQueue
|
|
|
+ entry *list.Element
|
|
|
+}
|
|
|
+
|
|
|
+func newAnnouncementMultiQueue() *announcementMultiQueue {
|
|
|
+ return &announcementMultiQueue{
|
|
|
+ commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
|
|
|
+ personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (q *announcementMultiQueue) getLen() int {
|
|
|
+ return q.totalEntries
|
|
|
+}
|
|
|
+
|
|
|
+func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
|
|
|
+
|
|
|
+ // Assumes announcementEntry not already enueued.
|
|
|
+
|
|
|
+ // Limitation: only one compartment ID, either common or personal, is
|
|
|
+ // supported per announcement entry. In the common compartment case, the
|
|
|
+ // broker currently assigns only one common compartment ID per proxy
|
|
|
+ // announcement. In the personal compartment case, there is currently no
|
|
|
+ // use case for allowing a proxy to announce under multiple personal
|
|
|
+ // compartment IDs.
|
|
|
+ //
|
|
|
+ // To overcome this limitation, the dequeue operation would need to be
|
|
|
+ // able to remove an announcement entry from multiple
|
|
|
+ // announcementCompartmentQueues.
|
|
|
+
|
|
|
+ commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
|
|
|
+ personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
|
|
|
+
|
|
|
+ if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
|
|
|
+ return errors.TraceNew("announcement must specify exactly one compartment ID")
|
|
|
+ }
|
|
|
+
|
|
|
+ var compartmentID ID
|
|
|
+ var compartmentQueues map[ID]*announcementCompartmentQueue
|
|
|
+ if len(commonCompartmentIDs) > 0 {
|
|
|
+ compartmentID = commonCompartmentIDs[0]
|
|
|
+ compartmentQueues = q.commonCompartmentQueues
|
|
|
+ } else {
|
|
|
+ compartmentID = personalCompartmentIDs[0]
|
|
|
+ compartmentQueues = q.personalCompartmentQueues
|
|
|
+ }
|
|
|
+
|
|
|
+ compartmentQueue, ok := compartmentQueues[compartmentID]
|
|
|
+ if !ok {
|
|
|
+ compartmentQueue = &announcementCompartmentQueue{
|
|
|
+ entries: list.New(),
|
|
|
+ }
|
|
|
+ compartmentQueues[compartmentID] = compartmentQueue
|
|
|
+ }
|
|
|
+
|
|
|
+ entry := compartmentQueue.entries.PushBack(announcementEntry)
|
|
|
+
|
|
|
+ // Update the NAT type counts which are used to determine if a better NAT
|
|
|
+ // match may be made by inspecting more announcement queue entries.
|
|
|
+
|
|
|
+ switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
|
|
|
+ case NATTraversalUnlimited:
|
|
|
+ compartmentQueue.unlimitedNATCount += 1
|
|
|
+ case NATTraversalPartiallyLimited:
|
|
|
+ compartmentQueue.partiallyLimitedNATCount += 1
|
|
|
+ case NATTraversalStrictlyLimited:
|
|
|
+ compartmentQueue.strictlyLimitedNATCount += 1
|
|
|
+ }
|
|
|
+
|
|
|
+ q.totalEntries += 1
|
|
|
+
|
|
|
+ announcementEntry.queueReference = announcementQueueReference{
|
|
|
+ multiQueue: q,
|
|
|
+ compartmentQueue: compartmentQueue,
|
|
|
+ entry: entry,
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// announcementQueueReference returns false if the item is already dequeued.
|
|
|
+func (r announcementQueueReference) dequeue() bool {
|
|
|
+
|
|
|
+ if r.entry == nil {
|
|
|
+ // Already dequeued.
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ announcementEntry := r.entry.Value.(*announcementEntry)
|
|
|
+
|
|
|
+ // Reverse the NAT type counts.
|
|
|
+ switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
|
|
|
+ case NATTraversalUnlimited:
|
|
|
+ r.compartmentQueue.unlimitedNATCount -= 1
|
|
|
+ case NATTraversalPartiallyLimited:
|
|
|
+ r.compartmentQueue.partiallyLimitedNATCount -= 1
|
|
|
+ case NATTraversalStrictlyLimited:
|
|
|
+ r.compartmentQueue.strictlyLimitedNATCount -= 1
|
|
|
+ }
|
|
|
+
|
|
|
+ r.compartmentQueue.entries.Remove(r.entry)
|
|
|
+
|
|
|
+ r.multiQueue.totalEntries -= 1
|
|
|
+
|
|
|
+ // Mark as dequeued.
|
|
|
+ r.entry = nil
|
|
|
+
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+func (q *announcementMultiQueue) startMatching(
|
|
|
+ isCommonCompartments bool,
|
|
|
+ compartmentIDs []ID) *announcementMatchIterator {
|
|
|
+
|
|
|
+ iter := &announcementMatchIterator{
|
|
|
+ multiQueue: q,
|
|
|
+ isCommonCompartments: isCommonCompartments,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Find the matching compartment queues and initialize iteration over
|
|
|
+ // those queues. Building the set of matching queues is a linear time
|
|
|
+ // operation, bounded by the length of compartmentIDs (no more than
|
|
|
+ // maxCompartmentIDs, as enforced in
|
|
|
+ // ClientOfferRequest.ValidateAndGetLogFields).
|
|
|
+
|
|
|
+ compartmentQueues := q.commonCompartmentQueues
|
|
|
+ if !isCommonCompartments {
|
|
|
+ compartmentQueues = q.personalCompartmentQueues
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, ID := range compartmentIDs {
|
|
|
+ if compartmentQueue, ok := compartmentQueues[ID]; ok {
|
|
|
+ iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
|
|
|
+ iter.compartmentIDs = append(iter.compartmentIDs, ID)
|
|
|
+ iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return iter
|
|
|
+}
|
|
|
+
|
|
|
+func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
|
|
|
+
|
|
|
+ // Return the count of NAT types across all matchable compartment queues.
|
|
|
+ //
|
|
|
+ // A potential future enhancement would be to provide per-queue NAT counts
|
|
|
+ // or NAT type indexing in order to quickly find preferred NAT matches.
|
|
|
+
|
|
|
+ unlimitedNATCount := 0
|
|
|
+ partiallyLimitedNATCount := 0
|
|
|
+ strictlyLimitedNATCount := 0
|
|
|
+
|
|
|
+ for _, compartmentQueue := range iter.compartmentQueues {
|
|
|
+ unlimitedNATCount += compartmentQueue.unlimitedNATCount
|
|
|
+ partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
|
|
|
+ strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
|
|
|
+ }
|
|
|
+
|
|
|
+ return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
|
|
|
+}
|
|
|
+
|
|
|
+// announcementMatchIterator returns the next announcement entry candidate in
|
|
|
+// compartment queue FIFO order, selecting the queue with the oldest head
|
|
|
+// item.
|
|
|
+//
|
|
|
+// The caller should invoke announcementEntry.queueReference.dequeue when the
|
|
|
+// candidate is selected. dequeue may be called on any getNext return value
|
|
|
+// without disrupting the iteration state; however,
|
|
|
+// announcementEntry.queueReference.dequeue calls for arbitrary queue entries
|
|
|
+// are not supported during iteration. Iteration and dequeue should all be
|
|
|
+// performed with a lock over the entire announcementMultiQueue, and with
|
|
|
+// only one concurrent announcementMatchIterator.
|
|
|
+func (iter *announcementMatchIterator) getNext() *announcementEntry {
|
|
|
+
|
|
|
+ // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
|
|
|
+ // order.
|
|
|
+
|
|
|
+ // Select the oldest item, by deadline, from all the candidate queue head
|
|
|
+ // items. This operation is linear in the number of matching compartment
|
|
|
+ // ID queues, which is currently bounded by This is a linear time
|
|
|
+ // operation, bounded by the length of matching compartment IDs (no more
|
|
|
+ // than maxCompartmentIDs, as enforced in
|
|
|
+ // ClientOfferRequest.ValidateAndGetLogFields).
|
|
|
+ //
|
|
|
+ // A potential future enhancement is to add more iterator state to track
|
|
|
+ // which queue has the next oldest time to select on the following
|
|
|
+ // getNext call.
|
|
|
+
|
|
|
+ var selectedCandidate *announcementEntry
|
|
|
+ selectedIndex := -1
|
|
|
+
|
|
|
+ for i := 0; i < len(iter.compartmentQueues); i++ {
|
|
|
+ if iter.nextEntries[i] == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if selectedCandidate == nil {
|
|
|
+ selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
|
|
|
+ selectedIndex = i
|
|
|
+ } else {
|
|
|
+ candidate := iter.nextEntries[i].Value.(*announcementEntry)
|
|
|
+ deadline, deadlineOk := candidate.ctx.Deadline()
|
|
|
+ selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
|
|
|
+ if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
|
|
|
+ selectedCandidate = candidate
|
|
|
+ selectedIndex = i
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Advance the selected queue to the next element. This must be done
|
|
|
+ // before any dequeue call, since container/list.remove clears
|
|
|
+ // list.Element.next.
|
|
|
+ if selectedIndex != -1 {
|
|
|
+ iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
|
|
|
+ }
|
|
|
+
|
|
|
+ return selectedCandidate
|
|
|
+}
|