| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000 |
- /*
- * Copyright (c) 2023, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package inproxy
- import (
- "context"
- "net"
- "sync"
- "time"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- lrucache "github.com/cognusion/go-cache-lru"
- "github.com/gammazero/deque"
- "github.com/juju/ratelimit"
- "github.com/pion/webrtc/v3"
- )
- // TTLs should be aligned with STUN hole punch lifetimes.
- const (
- matcherAnnouncementQueueMaxSize = 5000000
- matcherOfferQueueMaxSize = 5000000
- matcherPendingAnswersTTL = 30 * time.Second
- matcherPendingAnswersMaxSize = 100000
- matcherRateLimiterReapHistoryFrequencySeconds = 300
- matcherRateLimiterMaxCacheEntries = 1000000
- )
- // Matcher matches proxy announcements with client offers. Matcher also
- // coordinates pending proxy answers and routes answers to the awaiting
- // client offer handler.
- //
- // Matching prioritizes selecting the oldest announcments and client offers,
- // as they are closest to timing out.
- //
- // The client and proxy must supply matching personal or common compartment
- // IDs. Personal compartment matching is preferred. Common compartments are
- // managed by Psiphon and can be obtained via a tactics parameter or via an
- // OSL embedding.
- //
- // A client may opt form personal-only matching by not supplying any common
- // compartment IDs.
- //
- // Matching prefers to pair proxies and clients in a way that maximizes total
- // possible matches. For a client or proxy with less-limited NAT traversal, a
- // pairing with more-limited NAT traversal is preferred; and vice versa.
- // Candidates with unknown NAT types and mobile network types are assumed to
- // have the most limited NAT traversal capability.
- //
- // Preferred matchings take priority over announcment age.
- //
- // The client and proxy will not match if they are in the same country and
- // ASN, as it's assumed that doesn't provide any blocking circumvention
- // benefit. Disallowing proxies in certain blocked countries is handled at a
- // higher level; any such proxies should not be enqueued for matching.
- type Matcher struct {
- config *MatcherConfig
- runMutex sync.Mutex
- runContext context.Context
- stopRunning context.CancelFunc
- waitGroup *sync.WaitGroup
- // The announcement queue is implicitly sorted by announcement age. The
- // count fields are used to skip searching deeper into the queue for
- // preferred matches.
- // TODO: replace queue and counts with an indexed, in-memory database?
- announcementQueueMutex sync.Mutex
- announcementQueue *deque.Deque[*announcementEntry]
- announcementQueueEntryCountByIP map[string]int
- announcementQueueRateLimiters *lrucache.Cache
- announcementLimitEntryCount int
- announcementRateLimitQuantity int
- announcementRateLimitInterval time.Duration
- announcementNonlimitedProxyIDs map[ID]struct{}
- announcementsPersonalCompartmentalizedCount int
- announcementsUnlimitedNATCount int
- announcementsPartiallyLimitedNATCount int
- announcementsStrictlyLimitedNATCount int
- // The offer queue is also implicitly sorted by offer age. Both an offer
- // and announcement queue are required since either announcements or
- // offers can arrive while there are no available pairings.
- offerQueueMutex sync.Mutex
- offerQueue *deque.Deque[*offerEntry]
- offerQueueEntryCountByIP map[string]int
- offerQueueRateLimiters *lrucache.Cache
- offerLimitEntryCount int
- offerRateLimitQuantity int
- offerRateLimitInterval time.Duration
- matchSignal chan struct{}
- pendingAnswers *lrucache.Cache
- }
- // MatchProperties specifies the compartment, GeoIP, and network topology
- // matching roperties of clients and proxies.
- type MatchProperties struct {
- CommonCompartmentIDs []ID
- PersonalCompartmentIDs []ID
- GeoIPData common.GeoIPData
- NetworkType NetworkType
- NATType NATType
- PortMappingTypes PortMappingTypes
- }
- // EffectiveNATType combines the set of network properties into an effective
- // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
- // traversal is assumed. When NAT type is unknown and the network type is
- // mobile, CGNAT with limited NAT traversal is assumed.
- func (p *MatchProperties) EffectiveNATType() NATType {
- if p.PortMappingTypes.Available() {
- return NATTypePortMapping
- }
- // TODO: can a peer have limited NAT travseral for IPv4 and also have a
- // publicly reachable IPv6 ICE host candidate? If so, change the
- // effective NAT type? Depends on whether the matched peer can use IPv6.
- if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
- return NATTypeMobileNetwork
- }
- return p.NATType
- }
- // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
- // matching given the types of pairing candidates available.
- func (p *MatchProperties) ExistsPreferredNATMatch(
- unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
- return p.EffectiveNATType().ExistsPreferredMatch(
- unlimitedNAT, partiallyLimitedNAT, limitedNAT)
- }
- // IsPreferredNATMatch indicates whether the peer candidate is a preferred
- // NAT matching.
- func (p *MatchProperties) IsPreferredNATMatch(
- peerMatchProperties *MatchProperties) bool {
- return p.EffectiveNATType().IsPreferredMatch(
- peerMatchProperties.EffectiveNATType())
- }
- // IsPersonalCompartmentalized indicates whether the candidate has personal
- // compartment IDs.
- func (p *MatchProperties) IsPersonalCompartmentalized() bool {
- return len(p.PersonalCompartmentIDs) > 0
- }
- // MatchAnnouncement is a proxy announcement to be queued for matching.
- type MatchAnnouncement struct {
- Properties MatchProperties
- ProxyID ID
- ConnectionID ID
- ProxyProtocolVersion int32
- }
- // MatchOffer is a client offer to be queued for matching.
- type MatchOffer struct {
- Properties MatchProperties
- ClientProxyProtocolVersion int32
- ClientOfferSDP webrtc.SessionDescription
- ClientRootObfuscationSecret ObfuscationSecret
- DoDTLSRandomization bool
- TrafficShapingParameters *DataChannelTrafficShapingParameters
- NetworkProtocol NetworkProtocol
- DestinationAddress string
- DestinationServerID string
- }
- // MatchAnswer is a proxy answer, the proxy's follow up to a matched
- // announcement, to be routed to the awaiting client offer.
- type MatchAnswer struct {
- ProxyIP string
- ProxyID ID
- ConnectionID ID
- SelectedProxyProtocolVersion int32
- ProxyAnswerSDP webrtc.SessionDescription
- }
- // announcementEntry is an announcement queue entry, an announcement with its
- // associated lifetime context and signaling channel.
- type announcementEntry struct {
- ctx context.Context
- limitIP string
- announcement *MatchAnnouncement
- offerChan chan *MatchOffer
- }
- // offerEntry is an offer queue entry, an offer with its associated lifetime
- // context and signaling channel.
- type offerEntry struct {
- ctx context.Context
- limitIP string
- offer *MatchOffer
- answerChan chan *answerInfo
- }
- // answerInfo is an answer and its associated announcement.
- type answerInfo struct {
- announcement *MatchAnnouncement
- answer *MatchAnswer
- }
- // pendingAnswer represents an answer that is expected to arrive from a
- // proxy.
- type pendingAnswer struct {
- announcement *MatchAnnouncement
- answerChan chan *answerInfo
- }
- // MatcherConfig specifies the configuration for a matcher.
- type MatcherConfig struct {
- // Logger is used to log events.
- Logger common.Logger
- // Accouncement queue limits.
- AnnouncementLimitEntryCount int
- AnnouncementRateLimitQuantity int
- AnnouncementRateLimitInterval time.Duration
- AnnouncementNonlimitedProxyIDs []ID
- // Offer queue limits.
- OfferLimitEntryCount int
- OfferRateLimitQuantity int
- OfferRateLimitInterval time.Duration
- }
- // NewMatcher creates a new Matcher.
- func NewMatcher(config *MatcherConfig) *Matcher {
- m := &Matcher{
- config: config,
- waitGroup: new(sync.WaitGroup),
- announcementQueue: deque.New[*announcementEntry](),
- announcementQueueEntryCountByIP: make(map[string]int),
- announcementQueueRateLimiters: lrucache.NewWithLRU(
- 0,
- time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
- matcherRateLimiterMaxCacheEntries),
- offerQueue: deque.New[*offerEntry](),
- offerQueueEntryCountByIP: make(map[string]int),
- offerQueueRateLimiters: lrucache.NewWithLRU(
- 0,
- time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
- matcherRateLimiterMaxCacheEntries),
- matchSignal: make(chan struct{}, 1),
- // matcherPendingAnswersTTL is not configurable; it supplies a default
- // that is expected to be ignored when each entry's TTL is set to the
- // Offer ctx timeout.
- pendingAnswers: lrucache.NewWithLRU(
- matcherPendingAnswersTTL,
- 1*time.Minute,
- matcherPendingAnswersMaxSize),
- }
- m.SetLimits(
- config.AnnouncementLimitEntryCount,
- config.AnnouncementRateLimitQuantity,
- config.AnnouncementRateLimitInterval,
- config.AnnouncementNonlimitedProxyIDs,
- config.OfferLimitEntryCount,
- config.OfferRateLimitQuantity,
- config.OfferRateLimitInterval)
- return m
- }
- // SetLimits sets new queue limits, replacing the previous configuration.
- // Existing, cached rate limiters retain their existing rate limit state. New
- // entries will use the new quantity/interval configuration. In addition,
- // currently enqueued items may exceed any new, lower maximum entry count
- // until naturally dequeued.
- func (m *Matcher) SetLimits(
- announcementLimitEntryCount int,
- announcementRateLimitQuantity int,
- announcementRateLimitInterval time.Duration,
- announcementNonlimitedProxyIDs []ID,
- offerLimitEntryCount int,
- offerRateLimitQuantity int,
- offerRateLimitInterval time.Duration) {
- nonlimitedProxyIDs := make(map[ID]struct{})
- for _, proxyID := range announcementNonlimitedProxyIDs {
- nonlimitedProxyIDs[proxyID] = struct{}{}
- }
- m.announcementQueueMutex.Lock()
- m.announcementLimitEntryCount = announcementLimitEntryCount
- m.announcementRateLimitQuantity = announcementRateLimitQuantity
- m.announcementRateLimitInterval = announcementRateLimitInterval
- m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
- m.announcementQueueMutex.Unlock()
- m.offerQueueMutex.Lock()
- m.offerLimitEntryCount = offerLimitEntryCount
- m.offerRateLimitQuantity = offerRateLimitQuantity
- m.offerRateLimitInterval = offerRateLimitInterval
- m.offerQueueMutex.Unlock()
- }
- // Start starts running the Matcher. The Matcher runs a goroutine which
- // matches announcements and offers.
- func (m *Matcher) Start() error {
- m.runMutex.Lock()
- defer m.runMutex.Unlock()
- if m.runContext != nil {
- return errors.TraceNew("already running")
- }
- m.runContext, m.stopRunning = context.WithCancel(context.Background())
- m.waitGroup.Add(1)
- go func() {
- defer m.waitGroup.Done()
- m.matchWorker(m.runContext)
- }()
- return nil
- }
- // Stop stops running the Matcher and its worker goroutine.
- //
- // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
- // can get enqueued during and after a Stop call. Stop is intended more for a
- // full broker shutdown, where this won't be a concern.
- func (m *Matcher) Stop() {
- m.runMutex.Lock()
- defer m.runMutex.Unlock()
- m.stopRunning()
- m.waitGroup.Wait()
- m.runContext, m.stopRunning = nil, nil
- }
- // Announce enqueues the proxy announcement and blocks until it is matched
- // with a returned offer or ctx is done. The caller must not mutate the
- // announcement or its properties after calling Announce.
- //
- // The offer is sent to the proxy by the broker, and then the proxy sends its
- // answer back to the broker, which calls Answer with that value.
- func (m *Matcher) Announce(
- ctx context.Context,
- proxyIP string,
- proxyAnnouncement *MatchAnnouncement) (*MatchOffer, error) {
- announcementEntry := &announcementEntry{
- ctx: ctx,
- limitIP: getRateLimitIP(proxyIP),
- announcement: proxyAnnouncement,
- offerChan: make(chan *MatchOffer, 1),
- }
- err := m.addAnnouncementEntry(announcementEntry)
- if err != nil {
- return nil, errors.Trace(err)
- }
- // Await client offer.
- var clientOffer *MatchOffer
- select {
- case <-ctx.Done():
- m.removeAnnouncementEntry(announcementEntry)
- return nil, errors.Trace(ctx.Err())
- case clientOffer = <-announcementEntry.offerChan:
- }
- return clientOffer, nil
- }
- // Offer enqueues the client offer and blocks until it is matched with a
- // returned announcement or ctx is done. The caller must not mutate the offer
- // or its properties after calling Announce.
- //
- // The answer is returned to the client by the broker, and the WebRTC
- // connection is dialed. The original announcement is also returned, so its
- // match properties can be logged.
- func (m *Matcher) Offer(
- ctx context.Context,
- clientIP string,
- clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, error) {
- offerEntry := &offerEntry{
- ctx: ctx,
- limitIP: getRateLimitIP(clientIP),
- offer: clientOffer,
- answerChan: make(chan *answerInfo, 1),
- }
- err := m.addOfferEntry(offerEntry)
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- // Await proxy answer.
- var proxyAnswerInfo *answerInfo
- select {
- case <-ctx.Done():
- m.removeOfferEntry(offerEntry)
- // TODO: also remove any pendingAnswers entry? The entry TTL is set to
- // the Offer ctx, the client request, timeout, so it will eventually
- // get removed. But a client may abort its request earlier than the
- // timeout.
- return nil, nil, errors.Trace(ctx.Err())
- case proxyAnswerInfo = <-offerEntry.answerChan:
- }
- if proxyAnswerInfo == nil {
- // nil will be delivered to the channel when either the proxy
- // announcment request concurrently timed out, or the answer
- // indicated a proxy error, or the answer did not arrive in time.
- return nil, nil, errors.TraceNew("no answer")
- }
- // This is a sanity check and not expected to fail.
- if !proxyAnswerInfo.answer.ConnectionID.Equal(
- proxyAnswerInfo.announcement.ConnectionID) {
- return nil, nil, errors.TraceNew("unexpected connection ID")
- }
- return proxyAnswerInfo.answer, proxyAnswerInfo.announcement, nil
- }
- // Answer delivers an answer from the proxy for a previously matched offer.
- // The ProxyID and ConnectionID must correspond to the original announcement.
- // The caller must not mutate the answer after calling Answer. Answer does
- // not block.
- //
- // The answer is returned to the awaiting Offer call and sent to the matched
- // client.
- func (m *Matcher) Answer(
- proxyAnswer *MatchAnswer) error {
- key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
- pendingAnswerValue, ok := m.pendingAnswers.Get(key)
- if !ok {
- // The client is no longer awaiting the response.
- return errors.TraceNew("no client")
- }
- m.pendingAnswers.Delete(key)
- pendingAnswer := pendingAnswerValue.(*pendingAnswer)
- pendingAnswer.answerChan <- &answerInfo{
- announcement: pendingAnswer.announcement,
- answer: proxyAnswer,
- }
- return nil
- }
- // AnswerError delivers a failed answer indication from the proxy to an
- // awaiting offer. The ProxyID and ConnectionID must correspond to the
- // original announcement.
- //
- // The failure indication is returned to the awaiting Offer call and sent to
- // the matched client.
- func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
- key := m.pendingAnswerKey(proxyID, connectionID)
- pendingAnswerValue, ok := m.pendingAnswers.Get(key)
- if !ok {
- // The client is no longer awaiting the response.
- return
- }
- m.pendingAnswers.Delete(key)
- // Closing the channel delivers nil, a failed indicator, to any receiver.
- close(pendingAnswerValue.(*pendingAnswer).answerChan)
- }
- // matchWorker is the matching worker goroutine. It idles until signaled that
- // a queue item has been added, and then runs a full matching pass.
- func (m *Matcher) matchWorker(ctx context.Context) {
- for {
- select {
- case <-m.matchSignal:
- m.matchAllOffers()
- case <-ctx.Done():
- return
- }
- }
- }
- // matchAllOffers iterates over the queues, making all possible matches.
- func (m *Matcher) matchAllOffers() {
- m.announcementQueueMutex.Lock()
- defer m.announcementQueueMutex.Unlock()
- m.offerQueueMutex.Lock()
- defer m.offerQueueMutex.Unlock()
- // Take each offer in turn, and select an announcement match. There is an
- // implicit preference for older client offers, sooner to timeout, at the
- // front of the queue.
- // TODO: consider matching one offer, then releasing the locks to allow
- // more announcements to be enqueued, then continuing to match.
- i := 0
- end := m.offerQueue.Len()
- for i < end && m.announcementQueue.Len() > 0 {
- offerEntry := m.offerQueue.At(i)
- // Skip and remove this offer if its deadline has already passed.
- // There is no signal to the awaiting Offer function, as it will exit
- // based on the same ctx.
- if offerEntry.ctx.Err() != nil {
- m.removeOfferEntryByIndex(i)
- end -= 1
- continue
- }
- j, ok := m.matchOffer(offerEntry)
- if !ok {
- // No match, so leave this offer in place in the queue and move to
- // the next.
- i++
- continue
- }
- if m.config.Logger.IsLogLevelDebug() {
- m.config.Logger.WithTraceFields(common.LogFields{
- "match_index": j,
- "offer_queue_size": m.offerQueue.Len(),
- "announcement_queue_size": m.announcementQueue.Len(),
- }).Debug("match metrics")
- }
- // Remove the matched announcement from the queue. Send the offer to
- // the announcment entry's offerChan, which will deliver it to the
- // blocked Announce call. Add a pending answers entry to await the
- // proxy's follow up Answer call. The TTL for the pending answer
- // entry is set to the matched Offer call's ctx, as the answer is
- // only useful as long as the client is still waiting.
- announcementEntry := m.announcementQueue.At(j)
- expiry := lrucache.DefaultExpiration
- deadline, ok := offerEntry.ctx.Deadline()
- if ok {
- expiry = time.Until(deadline)
- }
- key := m.pendingAnswerKey(
- announcementEntry.announcement.ProxyID,
- announcementEntry.announcement.ConnectionID)
- m.pendingAnswers.Set(
- key,
- &pendingAnswer{
- announcement: announcementEntry.announcement,
- answerChan: offerEntry.answerChan,
- },
- expiry)
- announcementEntry.offerChan <- offerEntry.offer
- m.removeAnnouncementEntryByIndex(j)
- // Remove the matched offer from the queue and match the next offer,
- // now first in the queue.
- m.removeOfferEntryByIndex(i)
- end -= 1
- }
- }
- func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
- // Assumes the caller has the queue mutexed locked.
- // Check each announcement in turn, and select a match. There is an
- // implicit preference for older proxy announcments, sooner to timeout, at the
- // front of the queue.
- //
- // Limitation: since this logic matches each enqueued client in turn, it will
- // only make the optimal NAT match for the oldest enqueued client vs. all
- // proxies, and not do optimal N x M matching for all clients and all proxies.
- //
- // Future matching enhancements could include more sophisticated GeoIP
- // rules, such as a configuration encoding knowledge of an ASN's NAT
- // type, or preferred client/proxy country/ASN matches.
- offerProperties := &offerEntry.offer.Properties
- // Use the NAT traversal type counters to check if there's any preferred
- // NAT match for this offer in the announcement queue. When there is, we
- // will search beyond the first announcement.
- existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
- m.announcementsUnlimitedNATCount > 0,
- m.announcementsPartiallyLimitedNATCount > 0,
- m.announcementsStrictlyLimitedNATCount > 0)
- bestMatch := -1
- bestMatchNAT := false
- bestMatchCompartment := false
- end := m.announcementQueue.Len()
- for i := 0; i < end; i++ {
- announcementEntry := m.announcementQueue.At(i)
- // Skip and remove this announcement if its deadline has already
- // passed. There is no signal to the awaiting Announce function, as
- // it will exit based on the same ctx.
- if announcementEntry.ctx.Err() != nil {
- m.removeAnnouncementEntryByIndex(i)
- end -= 1
- continue
- }
- announcementProperties := &announcementEntry.announcement.Properties
- // There must be a compartment match. If there is a personal
- // compartment match, this match will be preferred.
- matchCommonCompartment := HaveCommonIDs(
- announcementProperties.CommonCompartmentIDs, offerProperties.CommonCompartmentIDs)
- matchPersonalCompartment := HaveCommonIDs(
- announcementProperties.PersonalCompartmentIDs, offerProperties.PersonalCompartmentIDs)
- if !matchCommonCompartment && !matchPersonalCompartment {
- continue
- }
- // Disallow matching the same country and ASN, except for personal
- // compartment ID matches.
- //
- // For common matching, hopping through the same ISP is assumed to
- // have no circumvention benefit. For personal matching, the user may
- // wish to hop their their own or their friend's proxy regardless.
- if !matchPersonalCompartment &&
- !GetAllowCommonASNMatching() &&
- (offerProperties.GeoIPData.Country ==
- announcementProperties.GeoIPData.Country &&
- offerProperties.GeoIPData.ASN ==
- announcementProperties.GeoIPData.ASN) {
- continue
- }
- // Check if this is a preferred NAT match. Ultimately, a match may be
- // made with potentially incompatible NATs, but the client/proxy
- // reported NAT types may be incorrect or unknown; the client will
- // often skip NAT discovery.
- matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
- // At this point, the candidate is a match. Determine if this is a new
- // best match.
- if bestMatch == -1 {
- // This is a match, and there was no previous match, so it becomes
- // the provisional best match.
- bestMatch = i
- bestMatchNAT = matchNAT
- bestMatchCompartment = matchPersonalCompartment
- } else if !bestMatchNAT && matchNAT {
- // If there was a previous best match which was not a preferred
- // NAT match, this becomes the new best match. The preferred NAT
- // match is prioritized over personal compartment matching.
- bestMatch = i
- bestMatchNAT = true
- bestMatchCompartment = matchPersonalCompartment
- } else if !bestMatchCompartment && matchPersonalCompartment && (!bestMatchNAT || matchNAT) {
- // If there was a previous best match which was not a personal
- // compartment match, and as long as this match doesn't undo a
- // better NAT match, this becomes the new best match.
- bestMatch = i
- bestMatchNAT = matchNAT
- bestMatchCompartment = true
- }
- // Stop as soon as we have the best possible match.
- if (bestMatchNAT || !existsPreferredNATMatch) &&
- (matchPersonalCompartment || m.announcementsPersonalCompartmentalizedCount == 0) {
- break
- }
- }
- return bestMatch, bestMatch != -1
- }
- func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
- // Assumes the m.announcementQueueMutex or m.offerQueue mutex is locked.
- var entryCountByIP map[string]int
- var queueRateLimiters *lrucache.Cache
- var limitEntryCount int
- var quantity int
- var interval time.Duration
- if isAnnouncement {
- // Skip limit checks for non-limited proxies.
- if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
- return nil
- }
- entryCountByIP = m.announcementQueueEntryCountByIP
- queueRateLimiters = m.announcementQueueRateLimiters
- limitEntryCount = m.announcementLimitEntryCount
- quantity = m.announcementRateLimitQuantity
- interval = m.announcementRateLimitInterval
- } else {
- entryCountByIP = m.offerQueueEntryCountByIP
- queueRateLimiters = m.offerQueueRateLimiters
- limitEntryCount = m.offerLimitEntryCount
- quantity = m.offerRateLimitQuantity
- interval = m.offerRateLimitInterval
- }
- // The rate limit is checked first, before the max count check, to ensure
- // that the rate limit state is updated regardless of the max count check
- // outcome.
- if quantity > 0 && interval > 0 {
- var rateLimiter *ratelimit.Bucket
- entry, ok := queueRateLimiters.Get(limitIP)
- if ok {
- rateLimiter = entry.(*ratelimit.Bucket)
- } else {
- rateLimiter = ratelimit.NewBucketWithQuantum(
- interval, int64(quantity), int64(quantity))
- queueRateLimiters.Set(
- limitIP, rateLimiter, interval)
- }
- if rateLimiter.TakeAvailable(1) < 1 {
- return errors.TraceNew("rate exceeded for IP")
- }
- }
- if limitEntryCount > 0 {
- entryCount, ok := entryCountByIP[limitIP]
- if ok && entryCount >= limitEntryCount {
- return errors.TraceNew("max entries for IP")
- }
- }
- return nil
- }
- func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
- m.announcementQueueMutex.Lock()
- defer m.announcementQueueMutex.Unlock()
- // Ensure the queue doesn't grow larger than the max size.
- if m.announcementQueue.Len() >= matcherAnnouncementQueueMaxSize {
- return errors.TraceNew("queue full")
- }
- // Ensure no single peer IP can enqueue a large number of entries or
- // rapidly enqueue beyond the configured rate.
- isAnnouncement := true
- err := m.applyLimits(
- isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
- if err != nil {
- return errors.Trace(err)
- }
- m.announcementQueue.PushBack(announcementEntry)
- m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
- m.adjustAnnouncementCounts(announcementEntry, 1)
- select {
- case m.matchSignal <- struct{}{}:
- default:
- }
- return nil
- }
- func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry) {
- m.announcementQueueMutex.Lock()
- defer m.announcementQueueMutex.Unlock()
- found := false
- for i := 0; i < m.announcementQueue.Len(); i++ {
- if m.announcementQueue.At(i) == announcementEntry {
- m.removeAnnouncementEntryByIndex(i)
- found = true
- break
- }
- }
- if !found {
- // The Announce call is aborting and taking its entry back out of the
- // queue. If the entry is not found in the queue, then a concurrent
- // Offer has matched the announcement. So check for the pending
- // answer corresponding to the announcement and remove it and deliver
- // a failure signal to the waiting Offer, so the client doesn't wait
- // longer than necessary.
- key := m.pendingAnswerKey(
- announcementEntry.announcement.ProxyID,
- announcementEntry.announcement.ConnectionID)
- pendingAnswerValue, ok := m.pendingAnswers.Get(key)
- if ok {
- close(pendingAnswerValue.(*pendingAnswer).answerChan)
- m.pendingAnswers.Delete(key)
- }
- }
- }
- func (m *Matcher) removeAnnouncementEntryByIndex(i int) {
- // Assumes s.announcementQueueMutex lock is held.
- announcementEntry := m.announcementQueue.At(i)
- // This should be only direct call to Remove, as following adjustments
- // must always be made when removing.
- m.announcementQueue.Remove(i)
- // Adjust entry counts by peer IP, used to enforce
- // matcherAnnouncementQueueMaxEntriesPerIP.
- m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
- if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
- delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
- }
- m.adjustAnnouncementCounts(announcementEntry, -1)
- }
- func (m *Matcher) adjustAnnouncementCounts(
- announcementEntry *announcementEntry, delta int) {
- // Assumes s.announcementQueueMutex lock is held.
- if announcementEntry.announcement.Properties.IsPersonalCompartmentalized() {
- m.announcementsPersonalCompartmentalizedCount += delta
- }
- switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
- case NATTraversalUnlimited:
- m.announcementsUnlimitedNATCount += delta
- case NATTraversalPartiallyLimited:
- m.announcementsPartiallyLimitedNATCount += delta
- case NATTraversalStrictlyLimited:
- m.announcementsStrictlyLimitedNATCount += delta
- }
- }
- func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
- m.offerQueueMutex.Lock()
- defer m.offerQueueMutex.Unlock()
- // Ensure the queue doesn't grow larger than the max size.
- if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
- return errors.TraceNew("queue full")
- }
- // Ensure no single peer IP can enqueue a large number of entries or
- // rapidly enqueue beyond the configured rate.
- isAnnouncement := false
- err := m.applyLimits(
- isAnnouncement, offerEntry.limitIP, ID{})
- if err != nil {
- return errors.Trace(err)
- }
- m.offerQueue.PushBack(offerEntry)
- m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
- select {
- case m.matchSignal <- struct{}{}:
- default:
- }
- return nil
- }
- func (m *Matcher) removeOfferEntry(offerEntry *offerEntry) {
- m.offerQueueMutex.Lock()
- defer m.offerQueueMutex.Unlock()
- for i := 0; i < m.offerQueue.Len(); i++ {
- if m.offerQueue.At(i) == offerEntry {
- m.removeOfferEntryByIndex(i)
- break
- }
- }
- }
- func (m *Matcher) removeOfferEntryByIndex(i int) {
- // Assumes s.offerQueueMutex lock is held.
- offerEntry := m.offerQueue.At(i)
- // This should be only direct call to Remove, as following adjustments
- // must always be made when removing.
- m.offerQueue.Remove(i)
- // Adjust entry counts by peer IP, used to enforce
- // matcherOfferQueueMaxEntriesPerIP.
- m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
- if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
- delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
- }
- }
- func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
- // The pending answer lookup key is used to associate announcements and
- // subsequent answers. While the client learns the ConnectionID, only the
- // proxy knows the ProxyID component, so only the correct proxy can match
- // an answer to an announcement. The ConnectionID component is necessary
- // as a proxy may have multiple, concurrent pending answers.
- return string(proxyID[:]) + string(connectionID[:])
- }
- func getRateLimitIP(strIP string) string {
- IP := net.ParseIP(strIP)
- if IP == nil || IP.To4() != nil {
- return strIP
- }
- // With IPv6, individual users or sites are users commonly allocated a /64
- // or /56, so rate limit by /56.
- return IP.Mask(net.CIDRMask(56, 128)).String()
- }
|