matcher.go 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. std_errors "errors"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. lrucache "github.com/cognusion/go-cache-lru"
  31. "golang.org/x/time/rate"
  32. )
  33. // TTLs should be aligned with STUN hole punch lifetimes.
  34. const (
  35. matcherAnnouncementQueueMaxSize = 5000000
  36. matcherOfferQueueMaxSize = 5000000
  37. matcherPendingAnswersTTL = 30 * time.Second
  38. matcherPendingAnswersMaxSize = 100000
  39. matcherMaxPreferredNATProbe = 100
  40. matcherRateLimiterReapHistoryFrequencySeconds = 300
  41. matcherRateLimiterMaxCacheEntries = 1000000
  42. )
  43. // Matcher matches proxy announcements with client offers. Matcher also
  44. // coordinates pending proxy answers and routes answers to the awaiting
  45. // client offer handler.
  46. //
  47. // Matching prioritizes selecting the oldest announcements and client offers,
  48. // as they are closest to timing out.
  49. //
  50. // The client and proxy must supply matching personal or common compartment
  51. // IDs. Common compartments are managed by Psiphon and can be obtained via a
  52. // tactics parameter or via an OSL embedding. Each proxy announcement or
  53. // client offer may specify only one compartment ID type, either common or
  54. // personal.
  55. //
  56. // Matching prefers to pair proxies and clients in a way that maximizes total
  57. // possible matches. For a client or proxy with less-limited NAT traversal, a
  58. // pairing with more-limited NAT traversal is preferred; and vice versa.
  59. // Candidates with unknown NAT types and mobile network types are assumed to
  60. // have the most limited NAT traversal capability.
  61. //
  62. // Preferred matchings take priority over announcement age.
  63. //
  64. // The client and proxy will not match if they are in the same country and
  65. // ASN, as it's assumed that doesn't provide any blocking circumvention
  66. // benefit. Disallowing proxies in certain blocked countries is handled at a
  67. // higher level; any such proxies should not be enqueued for matching.
  68. type Matcher struct {
  69. config *MatcherConfig
  70. runMutex sync.Mutex
  71. runContext context.Context
  72. stopRunning context.CancelFunc
  73. waitGroup *sync.WaitGroup
  74. // The announcement queue is implicitly sorted by announcement age. The
  75. // count fields are used to skip searching deeper into the queue for
  76. // preferred matches.
  77. // TODO: replace queue and counts with an indexed, in-memory database?
  78. announcementQueueMutex sync.Mutex
  79. announcementQueue *announcementMultiQueue
  80. announcementQueueEntryCountByIP map[string]int
  81. announcementQueueRateLimiters *lrucache.Cache
  82. announcementLimitEntryCount int
  83. announcementRateLimitQuantity int
  84. announcementRateLimitInterval time.Duration
  85. announcementNonlimitedProxyIDs map[ID]struct{}
  86. // The offer queue is also implicitly sorted by offer age. Both an offer
  87. // and announcement queue are required since either announcements or
  88. // offers can arrive while there are no available pairings.
  89. offerQueueMutex sync.Mutex
  90. offerQueue *list.List
  91. offerQueueEntryCountByIP map[string]int
  92. offerQueueRateLimiters *lrucache.Cache
  93. offerLimitEntryCount int
  94. offerRateLimitQuantity int
  95. offerRateLimitInterval time.Duration
  96. matchSignal chan struct{}
  97. pendingAnswers *lrucache.Cache
  98. }
  99. // MatchProperties specifies the compartment, GeoIP, and network topology
  100. // matching roperties of clients and proxies.
  101. type MatchProperties struct {
  102. CommonCompartmentIDs []ID
  103. PersonalCompartmentIDs []ID
  104. GeoIPData common.GeoIPData
  105. NetworkType NetworkType
  106. NATType NATType
  107. PortMappingTypes PortMappingTypes
  108. }
  109. // EffectiveNATType combines the set of network properties into an effective
  110. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  111. // traversal is assumed. When NAT type is unknown and the network type is
  112. // mobile, CGNAT with limited NAT traversal is assumed.
  113. func (p *MatchProperties) EffectiveNATType() NATType {
  114. if p.PortMappingTypes.Available() {
  115. return NATTypePortMapping
  116. }
  117. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  118. // publicly reachable IPv6 ICE host candidate? If so, change the
  119. // effective NAT type? Depends on whether the matched peer can use IPv6.
  120. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  121. return NATTypeMobileNetwork
  122. }
  123. return p.NATType
  124. }
  125. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  126. // matching given the types of pairing candidates available.
  127. func (p *MatchProperties) ExistsPreferredNATMatch(
  128. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  129. return p.EffectiveNATType().ExistsPreferredMatch(
  130. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  131. }
  132. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  133. // NAT matching.
  134. func (p *MatchProperties) IsPreferredNATMatch(
  135. peerMatchProperties *MatchProperties) bool {
  136. return p.EffectiveNATType().IsPreferredMatch(
  137. peerMatchProperties.EffectiveNATType())
  138. }
  139. // MatchAnnouncement is a proxy announcement to be queued for matching.
  140. type MatchAnnouncement struct {
  141. Properties MatchProperties
  142. ProxyID ID
  143. ConnectionID ID
  144. ProxyProtocolVersion int32
  145. }
  146. // MatchOffer is a client offer to be queued for matching.
  147. type MatchOffer struct {
  148. Properties MatchProperties
  149. ClientProxyProtocolVersion int32
  150. ClientOfferSDP WebRTCSessionDescription
  151. ClientRootObfuscationSecret ObfuscationSecret
  152. DoDTLSRandomization bool
  153. TrafficShapingParameters *DataChannelTrafficShapingParameters
  154. NetworkProtocol NetworkProtocol
  155. DestinationAddress string
  156. DestinationServerID string
  157. }
  158. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  159. // announcement, to be routed to the awaiting client offer.
  160. type MatchAnswer struct {
  161. ProxyIP string
  162. ProxyID ID
  163. ConnectionID ID
  164. SelectedProxyProtocolVersion int32
  165. ProxyAnswerSDP WebRTCSessionDescription
  166. }
  167. // MatchMetrics records statistics about the match queue state at the time a
  168. // match is made.
  169. type MatchMetrics struct {
  170. OfferMatchIndex int
  171. OfferQueueSize int
  172. AnnouncementMatchIndex int
  173. AnnouncementQueueSize int
  174. }
  175. // GetMetrics converts MatchMetrics to loggable fields.
  176. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  177. if metrics == nil {
  178. return nil
  179. }
  180. return common.LogFields{
  181. "offer_match_index": metrics.OfferMatchIndex,
  182. "offer_queue_size": metrics.OfferQueueSize,
  183. "announcement_match_index": metrics.AnnouncementMatchIndex,
  184. "announcement_queue_size": metrics.AnnouncementQueueSize,
  185. }
  186. }
  187. // announcementEntry is an announcement queue entry, an announcement with its
  188. // associated lifetime context and signaling channel.
  189. type announcementEntry struct {
  190. ctx context.Context
  191. limitIP string
  192. announcement *MatchAnnouncement
  193. offerChan chan *MatchOffer
  194. matchMetrics atomic.Value
  195. // queueReference is initialized by addAnnouncementEntry, and used to
  196. // efficiently dequeue the entry.
  197. queueReference announcementQueueReference
  198. }
  199. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  200. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  201. return matchMetrics
  202. }
  203. // offerEntry is an offer queue entry, an offer with its associated lifetime
  204. // context and signaling channel.
  205. type offerEntry struct {
  206. ctx context.Context
  207. limitIP string
  208. offer *MatchOffer
  209. answerChan chan *answerInfo
  210. matchMetrics atomic.Value
  211. // queueReference is initialized by addOfferEntry, and used to efficiently
  212. // dequeue the entry.
  213. queueReference *list.Element
  214. }
  215. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  216. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  217. return matchMetrics
  218. }
  219. // answerInfo is an answer and its associated announcement.
  220. type answerInfo struct {
  221. announcement *MatchAnnouncement
  222. answer *MatchAnswer
  223. }
  224. // pendingAnswer represents an answer that is expected to arrive from a
  225. // proxy.
  226. type pendingAnswer struct {
  227. announcement *MatchAnnouncement
  228. answerChan chan *answerInfo
  229. }
  230. // MatcherConfig specifies the configuration for a matcher.
  231. type MatcherConfig struct {
  232. // Logger is used to log events.
  233. Logger common.Logger
  234. // Accouncement queue limits.
  235. AnnouncementLimitEntryCount int
  236. AnnouncementRateLimitQuantity int
  237. AnnouncementRateLimitInterval time.Duration
  238. AnnouncementNonlimitedProxyIDs []ID
  239. // Offer queue limits.
  240. OfferLimitEntryCount int
  241. OfferRateLimitQuantity int
  242. OfferRateLimitInterval time.Duration
  243. }
  244. // NewMatcher creates a new Matcher.
  245. func NewMatcher(config *MatcherConfig) *Matcher {
  246. m := &Matcher{
  247. config: config,
  248. waitGroup: new(sync.WaitGroup),
  249. announcementQueue: newAnnouncementMultiQueue(),
  250. announcementQueueEntryCountByIP: make(map[string]int),
  251. announcementQueueRateLimiters: lrucache.NewWithLRU(
  252. 0,
  253. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  254. matcherRateLimiterMaxCacheEntries),
  255. offerQueue: list.New(),
  256. offerQueueEntryCountByIP: make(map[string]int),
  257. offerQueueRateLimiters: lrucache.NewWithLRU(
  258. 0,
  259. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  260. matcherRateLimiterMaxCacheEntries),
  261. matchSignal: make(chan struct{}, 1),
  262. // matcherPendingAnswersTTL is not configurable; it supplies a default
  263. // that is expected to be ignored when each entry's TTL is set to the
  264. // Offer ctx timeout.
  265. pendingAnswers: lrucache.NewWithLRU(
  266. matcherPendingAnswersTTL,
  267. 1*time.Minute,
  268. matcherPendingAnswersMaxSize),
  269. }
  270. m.SetLimits(
  271. config.AnnouncementLimitEntryCount,
  272. config.AnnouncementRateLimitQuantity,
  273. config.AnnouncementRateLimitInterval,
  274. config.AnnouncementNonlimitedProxyIDs,
  275. config.OfferLimitEntryCount,
  276. config.OfferRateLimitQuantity,
  277. config.OfferRateLimitInterval)
  278. return m
  279. }
  280. // SetLimits sets new queue limits, replacing the previous configuration.
  281. // Existing, cached rate limiters retain their existing rate limit state. New
  282. // entries will use the new quantity/interval configuration. In addition,
  283. // currently enqueued items may exceed any new, lower maximum entry count
  284. // until naturally dequeued.
  285. func (m *Matcher) SetLimits(
  286. announcementLimitEntryCount int,
  287. announcementRateLimitQuantity int,
  288. announcementRateLimitInterval time.Duration,
  289. announcementNonlimitedProxyIDs []ID,
  290. offerLimitEntryCount int,
  291. offerRateLimitQuantity int,
  292. offerRateLimitInterval time.Duration) {
  293. nonlimitedProxyIDs := make(map[ID]struct{})
  294. for _, proxyID := range announcementNonlimitedProxyIDs {
  295. nonlimitedProxyIDs[proxyID] = struct{}{}
  296. }
  297. m.announcementQueueMutex.Lock()
  298. m.announcementLimitEntryCount = announcementLimitEntryCount
  299. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  300. m.announcementRateLimitInterval = announcementRateLimitInterval
  301. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  302. m.announcementQueueMutex.Unlock()
  303. m.offerQueueMutex.Lock()
  304. m.offerLimitEntryCount = offerLimitEntryCount
  305. m.offerRateLimitQuantity = offerRateLimitQuantity
  306. m.offerRateLimitInterval = offerRateLimitInterval
  307. m.offerQueueMutex.Unlock()
  308. }
  309. // Start starts running the Matcher. The Matcher runs a goroutine which
  310. // matches announcements and offers.
  311. func (m *Matcher) Start() error {
  312. m.runMutex.Lock()
  313. defer m.runMutex.Unlock()
  314. if m.runContext != nil {
  315. return errors.TraceNew("already running")
  316. }
  317. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  318. m.waitGroup.Add(1)
  319. go func() {
  320. defer m.waitGroup.Done()
  321. m.matchWorker(m.runContext)
  322. }()
  323. return nil
  324. }
  325. // Stop stops running the Matcher and its worker goroutine.
  326. //
  327. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  328. // can get enqueued during and after a Stop call. Stop is intended more for a
  329. // full broker shutdown, where this won't be a concern.
  330. func (m *Matcher) Stop() {
  331. m.runMutex.Lock()
  332. defer m.runMutex.Unlock()
  333. m.stopRunning()
  334. m.waitGroup.Wait()
  335. m.runContext, m.stopRunning = nil, nil
  336. }
  337. // Announce enqueues the proxy announcement and blocks until it is matched
  338. // with a returned offer or ctx is done. The caller must not mutate the
  339. // announcement or its properties after calling Announce.
  340. //
  341. // Announce assumes that the ctx.Deadline for each call is monotonically
  342. // increasing and that the deadline can be used as part of selecting the next
  343. // nearest-to-expire announcement.
  344. //
  345. // The offer is sent to the proxy by the broker, and then the proxy sends its
  346. // answer back to the broker, which calls Answer with that value.
  347. //
  348. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  349. // match is made, even if there is a later error.
  350. func (m *Matcher) Announce(
  351. ctx context.Context,
  352. proxyIP string,
  353. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  354. // An announcement must specify exactly one compartment ID, of one type,
  355. // common or personal. The limit of one is currently a limitation of the
  356. // multi-queue implementation; see comment in
  357. // announcementMultiQueue.enqueue.
  358. compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
  359. if len(compartmentIDs) == 0 {
  360. compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
  361. } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
  362. return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  363. }
  364. if len(compartmentIDs) != 1 {
  365. return nil, nil, errors.TraceNew("unexpected compartment ID count")
  366. }
  367. announcementEntry := &announcementEntry{
  368. ctx: ctx,
  369. limitIP: getRateLimitIP(proxyIP),
  370. announcement: proxyAnnouncement,
  371. offerChan: make(chan *MatchOffer, 1),
  372. }
  373. err := m.addAnnouncementEntry(announcementEntry)
  374. if err != nil {
  375. return nil, nil, errors.Trace(err)
  376. }
  377. // Await client offer.
  378. var clientOffer *MatchOffer
  379. select {
  380. case <-ctx.Done():
  381. m.removeAnnouncementEntry(true, announcementEntry)
  382. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  383. case clientOffer = <-announcementEntry.offerChan:
  384. }
  385. return clientOffer, announcementEntry.getMatchMetrics(), nil
  386. }
  387. // Offer enqueues the client offer and blocks until it is matched with a
  388. // returned announcement or ctx is done. The caller must not mutate the offer
  389. // or its properties after calling Announce.
  390. //
  391. // The answer is returned to the client by the broker, and the WebRTC
  392. // connection is dialed. The original announcement is also returned, so its
  393. // match properties can be logged.
  394. //
  395. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  396. // match is made, even if there is a later error.
  397. func (m *Matcher) Offer(
  398. ctx context.Context,
  399. clientIP string,
  400. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  401. // An offer must specify at least one compartment ID, and may only specify
  402. // one type, common or personal, of compartment IDs.
  403. compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
  404. if len(compartmentIDs) == 0 {
  405. compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
  406. } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
  407. return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  408. }
  409. if len(compartmentIDs) < 1 {
  410. return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
  411. }
  412. offerEntry := &offerEntry{
  413. ctx: ctx,
  414. limitIP: getRateLimitIP(clientIP),
  415. offer: clientOffer,
  416. answerChan: make(chan *answerInfo, 1),
  417. }
  418. err := m.addOfferEntry(offerEntry)
  419. if err != nil {
  420. return nil, nil, nil, errors.Trace(err)
  421. }
  422. // Await proxy answer.
  423. var proxyAnswerInfo *answerInfo
  424. select {
  425. case <-ctx.Done():
  426. m.removeOfferEntry(true, offerEntry)
  427. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  428. // the Offer ctx, the client request, timeout, so it will eventually
  429. // get removed. But a client may abort its request earlier than the
  430. // timeout.
  431. return nil, nil,
  432. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  433. case proxyAnswerInfo = <-offerEntry.answerChan:
  434. }
  435. if proxyAnswerInfo == nil {
  436. // nil will be delivered to the channel when either the proxy
  437. // announcement request concurrently timed out, or the answer
  438. // indicated a proxy error, or the answer did not arrive in time.
  439. return nil, nil,
  440. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  441. }
  442. // This is a sanity check and not expected to fail.
  443. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  444. proxyAnswerInfo.announcement.ConnectionID) {
  445. return nil, nil,
  446. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  447. }
  448. return proxyAnswerInfo.answer,
  449. proxyAnswerInfo.announcement,
  450. offerEntry.getMatchMetrics(),
  451. nil
  452. }
  453. // AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
  454. // announcement identified by the specified proxy ID and connection ID and
  455. // returns whether the announcement has personal compartment IDs, indicating
  456. // personal pairing mode.
  457. //
  458. // If no pending answer is found, an error is returned.
  459. func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
  460. proxyID ID, connectionID ID) (bool, error) {
  461. key := m.pendingAnswerKey(proxyID, connectionID)
  462. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  463. if !ok {
  464. // The input IDs don't correspond to a pending answer, or the client
  465. // is no longer awaiting the response.
  466. return false, errors.TraceNew("no pending answer")
  467. }
  468. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  469. hasPersonalCompartmentIDs := len(
  470. pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
  471. return hasPersonalCompartmentIDs, nil
  472. }
  473. // Answer delivers an answer from the proxy for a previously matched offer.
  474. // The ProxyID and ConnectionID must correspond to the original announcement.
  475. // The caller must not mutate the answer after calling Answer. Answer does
  476. // not block.
  477. //
  478. // The answer is returned to the awaiting Offer call and sent to the matched
  479. // client.
  480. func (m *Matcher) Answer(
  481. proxyAnswer *MatchAnswer) error {
  482. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  483. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  484. if !ok {
  485. // The input IDs don't correspond to a pending answer, or the client
  486. // is no longer awaiting the response.
  487. return errors.TraceNew("no pending answer")
  488. }
  489. m.pendingAnswers.Delete(key)
  490. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  491. pendingAnswer.answerChan <- &answerInfo{
  492. announcement: pendingAnswer.announcement,
  493. answer: proxyAnswer,
  494. }
  495. return nil
  496. }
  497. // AnswerError delivers a failed answer indication from the proxy to an
  498. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  499. // original announcement.
  500. //
  501. // The failure indication is returned to the awaiting Offer call and sent to
  502. // the matched client.
  503. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  504. key := m.pendingAnswerKey(proxyID, connectionID)
  505. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  506. if !ok {
  507. // The client is no longer awaiting the response.
  508. return
  509. }
  510. m.pendingAnswers.Delete(key)
  511. // Closing the channel delivers nil, a failed indicator, to any receiver.
  512. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  513. }
  514. // matchWorker is the matching worker goroutine. It idles until signaled that
  515. // a queue item has been added, and then runs a full matching pass.
  516. func (m *Matcher) matchWorker(ctx context.Context) {
  517. for {
  518. select {
  519. case <-m.matchSignal:
  520. m.matchAllOffers()
  521. case <-ctx.Done():
  522. return
  523. }
  524. }
  525. }
  526. // matchAllOffers iterates over the queues, making all possible matches.
  527. func (m *Matcher) matchAllOffers() {
  528. m.announcementQueueMutex.Lock()
  529. defer m.announcementQueueMutex.Unlock()
  530. m.offerQueueMutex.Lock()
  531. defer m.offerQueueMutex.Unlock()
  532. // Take each offer in turn, and select an announcement match. There is an
  533. // implicit preference for older client offers, sooner to timeout, at the
  534. // front of the queue.
  535. // TODO: consider matching one offer, then releasing the locks to allow
  536. // more announcements to be enqueued, then continuing to match.
  537. nextOffer := m.offerQueue.Front()
  538. offerIndex := -1
  539. for nextOffer != nil && m.announcementQueue.getLen() > 0 {
  540. offerIndex += 1
  541. // nextOffer.Next must be invoked before any removeOfferEntry since
  542. // container/list.remove clears list.Element.next.
  543. offer := nextOffer
  544. nextOffer = nextOffer.Next()
  545. offerEntry := offer.Value.(*offerEntry)
  546. // Skip and remove this offer if its deadline has already passed.
  547. // There is no signal to the awaiting Offer function, as it will exit
  548. // based on the same ctx.
  549. if offerEntry.ctx.Err() != nil {
  550. m.removeOfferEntry(false, offerEntry)
  551. continue
  552. }
  553. announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
  554. if announcementEntry == nil {
  555. continue
  556. }
  557. // Record match metrics.
  558. // The index metrics predate the announcement multi-queue; now, with
  559. // the multi-queue, announcement_index is how many announce entries
  560. // were inspected before matching.
  561. matchMetrics := &MatchMetrics{
  562. OfferMatchIndex: offerIndex,
  563. OfferQueueSize: m.offerQueue.Len(),
  564. AnnouncementMatchIndex: announcementMatchIndex,
  565. AnnouncementQueueSize: m.announcementQueue.getLen(),
  566. }
  567. offerEntry.matchMetrics.Store(matchMetrics)
  568. announcementEntry.matchMetrics.Store(matchMetrics)
  569. // Remove the matched announcement from the queue. Send the offer to
  570. // the announcement entry's offerChan, which will deliver it to the
  571. // blocked Announce call. Add a pending answers entry to await the
  572. // proxy's follow up Answer call. The TTL for the pending answer
  573. // entry is set to the matched Offer call's ctx, as the answer is
  574. // only useful as long as the client is still waiting.
  575. m.removeAnnouncementEntry(false, announcementEntry)
  576. expiry := lrucache.DefaultExpiration
  577. deadline, ok := offerEntry.ctx.Deadline()
  578. if ok {
  579. expiry = time.Until(deadline)
  580. }
  581. key := m.pendingAnswerKey(
  582. announcementEntry.announcement.ProxyID,
  583. announcementEntry.announcement.ConnectionID)
  584. m.pendingAnswers.Set(
  585. key,
  586. &pendingAnswer{
  587. announcement: announcementEntry.announcement,
  588. answerChan: offerEntry.answerChan,
  589. },
  590. expiry)
  591. announcementEntry.offerChan <- offerEntry.offer
  592. // Remove the matched offer from the queue and match the next offer,
  593. // now first in the queue.
  594. m.removeOfferEntry(false, offerEntry)
  595. }
  596. }
  597. func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
  598. // Assumes the caller has the queue mutexed locked.
  599. // Check each candidate announcement in turn, and select a match. There is
  600. // an implicit preference for older proxy announcements, sooner to
  601. // timeout, at the front of the enqueued announcements.
  602. // announcementMultiQueue.startMatching skips to the first matching
  603. // compartment ID(s).
  604. //
  605. // Limitation: since this logic matches each enqueued client in turn, it will
  606. // only make the optimal NAT match for the oldest enqueued client vs. all
  607. // proxies, and not do optimal N x M matching for all clients and all proxies.
  608. //
  609. // Future matching enhancements could include more sophisticated GeoIP
  610. // rules, such as a configuration encoding knowledge of an ASN's NAT
  611. // type, or preferred client/proxy country/ASN matches.
  612. // TODO: match supported protocol versions. Currently, all announces and
  613. // offers must specify ProxyProtocolVersion1, so there's no protocol
  614. // version match logic.
  615. offerProperties := &offerEntry.offer.Properties
  616. // Assumes the caller checks that offer specifies either personal
  617. // compartment IDs or common compartment IDs, but not both.
  618. isCommonCompartments := false
  619. compartmentIDs := offerProperties.PersonalCompartmentIDs
  620. if len(compartmentIDs) == 0 {
  621. isCommonCompartments = true
  622. compartmentIDs = offerProperties.CommonCompartmentIDs
  623. }
  624. if len(compartmentIDs) == 0 {
  625. return nil, -1
  626. }
  627. matchIterator := m.announcementQueue.startMatching(
  628. isCommonCompartments, compartmentIDs)
  629. // Use the NAT traversal type counters to check if there's any preferred
  630. // NAT match for this offer in the announcement queue. When there is, we
  631. // will search beyond the first announcement.
  632. unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
  633. matchIterator.getNATCounts()
  634. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  635. unlimitedNATCount > 0,
  636. partiallyLimitedNATCount > 0,
  637. strictlyLimitedNATCount > 0)
  638. var bestMatch *announcementEntry
  639. bestMatchIndex := -1
  640. bestMatchNAT := false
  641. candidateIndex := -1
  642. for {
  643. announcementEntry := matchIterator.getNext()
  644. if announcementEntry == nil {
  645. break
  646. }
  647. candidateIndex += 1
  648. // Skip and remove this announcement if its deadline has already
  649. // passed. There is no signal to the awaiting Announce function, as
  650. // it will exit based on the same ctx.
  651. if announcementEntry.ctx.Err() != nil {
  652. m.removeAnnouncementEntry(false, announcementEntry)
  653. continue
  654. }
  655. announcementProperties := &announcementEntry.announcement.Properties
  656. // Disallow matching the same country and ASN, except for personal
  657. // compartment ID matches.
  658. //
  659. // For common matching, hopping through the same ISP is assumed to
  660. // have no circumvention benefit. For personal matching, the user may
  661. // wish to hop their their own or their friend's proxy regardless.
  662. if isCommonCompartments &&
  663. !GetAllowCommonASNMatching() &&
  664. (offerProperties.GeoIPData.Country ==
  665. announcementProperties.GeoIPData.Country &&
  666. offerProperties.GeoIPData.ASN ==
  667. announcementProperties.GeoIPData.ASN) {
  668. continue
  669. }
  670. // Check if this is a preferred NAT match. Ultimately, a match may be
  671. // made with potentially incompatible NATs, but the client/proxy
  672. // reported NAT types may be incorrect or unknown; the client will
  673. // often skip NAT discovery.
  674. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  675. // At this point, the candidate is a match. Determine if this is a new
  676. // best match, either if there was no previous match, or this is a
  677. // better NAT match.
  678. if bestMatch == nil || (!bestMatchNAT && matchNAT) {
  679. bestMatch = announcementEntry
  680. bestMatchIndex = candidateIndex
  681. bestMatchNAT = matchNAT
  682. }
  683. // Stop as soon as we have the best possible match, or have reached
  684. // the probe limit for preferred NAT matches.
  685. if bestMatch != nil && (bestMatchNAT ||
  686. !existsPreferredNATMatch ||
  687. candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
  688. break
  689. }
  690. }
  691. return bestMatch, bestMatchIndex
  692. }
  693. // MatcherLimitError is the error type returned by Announce or Offer when the
  694. // caller has exceeded configured queue entry or rate limits.
  695. type MatcherLimitError struct {
  696. err error
  697. }
  698. func NewMatcherLimitError(err error) *MatcherLimitError {
  699. return &MatcherLimitError{err: err}
  700. }
  701. func (e MatcherLimitError) Error() string {
  702. return e.err.Error()
  703. }
  704. func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  705. // Assumes the m.announcementQueueMutex or m.offerQueue mutex is locked.
  706. var entryCountByIP map[string]int
  707. var queueRateLimiters *lrucache.Cache
  708. var limitEntryCount int
  709. var quantity int
  710. var interval time.Duration
  711. if isAnnouncement {
  712. // Skip limit checks for non-limited proxies.
  713. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  714. return nil
  715. }
  716. entryCountByIP = m.announcementQueueEntryCountByIP
  717. queueRateLimiters = m.announcementQueueRateLimiters
  718. limitEntryCount = m.announcementLimitEntryCount
  719. quantity = m.announcementRateLimitQuantity
  720. interval = m.announcementRateLimitInterval
  721. } else {
  722. entryCountByIP = m.offerQueueEntryCountByIP
  723. queueRateLimiters = m.offerQueueRateLimiters
  724. limitEntryCount = m.offerLimitEntryCount
  725. quantity = m.offerRateLimitQuantity
  726. interval = m.offerRateLimitInterval
  727. }
  728. // The rate limit is checked first, before the max count check, to ensure
  729. // that the rate limit state is updated regardless of the max count check
  730. // outcome.
  731. if quantity > 0 && interval > 0 {
  732. var rateLimiter *rate.Limiter
  733. entry, ok := queueRateLimiters.Get(limitIP)
  734. if ok {
  735. rateLimiter = entry.(*rate.Limiter)
  736. } else {
  737. limit := float64(quantity) / interval.Seconds()
  738. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  739. queueRateLimiters.Set(
  740. limitIP, rateLimiter, interval)
  741. }
  742. if !rateLimiter.Allow() {
  743. return errors.Trace(
  744. NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
  745. }
  746. }
  747. if limitEntryCount > 0 {
  748. // Limitation: non-limited proxy ID entries are counted in
  749. // entryCountByIP. If both a limited and non-limited proxy ingress
  750. // from the same limitIP, then the non-limited entries will count
  751. // against the limited proxy's limitEntryCount.
  752. entryCount, ok := entryCountByIP[limitIP]
  753. if ok && entryCount >= limitEntryCount {
  754. return errors.Trace(
  755. NewMatcherLimitError(std_errors.New("max entries for IP")))
  756. }
  757. }
  758. return nil
  759. }
  760. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  761. m.announcementQueueMutex.Lock()
  762. defer m.announcementQueueMutex.Unlock()
  763. // Ensure the queue doesn't grow larger than the max size.
  764. if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
  765. return errors.TraceNew("queue full")
  766. }
  767. // Ensure no single peer IP can enqueue a large number of entries or
  768. // rapidly enqueue beyond the configured rate.
  769. isAnnouncement := true
  770. err := m.applyLimits(
  771. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  772. if err != nil {
  773. return errors.Trace(err)
  774. }
  775. // announcementEntry.queueReference should be uninitialized.
  776. // announcementMultiQueue.enqueue sets queueReference to be used for
  777. // efficient dequeuing.
  778. if announcementEntry.queueReference.entry != nil {
  779. return errors.TraceNew("unexpected queue reference")
  780. }
  781. err = m.announcementQueue.enqueue(announcementEntry)
  782. if err != nil {
  783. return errors.Trace(err)
  784. }
  785. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  786. select {
  787. case m.matchSignal <- struct{}{}:
  788. default:
  789. }
  790. return nil
  791. }
  792. func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
  793. // In the aborting case, the queue isn't already locked. Otherise, assume
  794. // it is locked.
  795. if aborting {
  796. m.announcementQueueMutex.Lock()
  797. defer m.announcementQueueMutex.Unlock()
  798. }
  799. found := announcementEntry.queueReference.dequeue()
  800. if found {
  801. // Adjust entry counts by peer IP, used to enforce
  802. // matcherAnnouncementQueueMaxEntriesPerIP.
  803. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  804. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  805. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  806. }
  807. }
  808. if aborting && !found {
  809. // The Announce call is aborting and taking its entry back out of the
  810. // queue. If the entry is not found in the queue, then a concurrent
  811. // Offer has matched the announcement. So check for the pending
  812. // answer corresponding to the announcement and remove it and deliver
  813. // a failure signal to the waiting Offer, so the client doesn't wait
  814. // longer than necessary.
  815. key := m.pendingAnswerKey(
  816. announcementEntry.announcement.ProxyID,
  817. announcementEntry.announcement.ConnectionID)
  818. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  819. if ok {
  820. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  821. m.pendingAnswers.Delete(key)
  822. }
  823. }
  824. }
  825. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  826. m.offerQueueMutex.Lock()
  827. defer m.offerQueueMutex.Unlock()
  828. // Ensure the queue doesn't grow larger than the max size.
  829. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  830. return errors.TraceNew("queue full")
  831. }
  832. // Ensure no single peer IP can enqueue a large number of entries or
  833. // rapidly enqueue beyond the configured rate.
  834. isAnnouncement := false
  835. err := m.applyLimits(
  836. isAnnouncement, offerEntry.limitIP, ID{})
  837. if err != nil {
  838. return errors.Trace(err)
  839. }
  840. // offerEntry.queueReference should be uninitialized and is set here to be
  841. // used for efficient dequeuing.
  842. if offerEntry.queueReference != nil {
  843. return errors.TraceNew("unexpected queue reference")
  844. }
  845. offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
  846. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  847. select {
  848. case m.matchSignal <- struct{}{}:
  849. default:
  850. }
  851. return nil
  852. }
  853. func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
  854. // In the aborting case, the queue isn't already locked. Otherise, assume
  855. // it is locked.
  856. if aborting {
  857. m.offerQueueMutex.Lock()
  858. defer m.offerQueueMutex.Unlock()
  859. }
  860. if offerEntry.queueReference == nil {
  861. return
  862. }
  863. m.offerQueue.Remove(offerEntry.queueReference)
  864. offerEntry.queueReference = nil
  865. // Adjust entry counts by peer IP, used to enforce
  866. // matcherOfferQueueMaxEntriesPerIP.
  867. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  868. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  869. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  870. }
  871. }
  872. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  873. // The pending answer lookup key is used to associate announcements and
  874. // subsequent answers. While the client learns the ConnectionID, only the
  875. // proxy knows the ProxyID component, so only the correct proxy can match
  876. // an answer to an announcement. The ConnectionID component is necessary
  877. // as a proxy may have multiple, concurrent pending answers.
  878. return string(proxyID[:]) + string(connectionID[:])
  879. }
  880. func getRateLimitIP(strIP string) string {
  881. IP := net.ParseIP(strIP)
  882. if IP == nil || IP.To4() != nil {
  883. return strIP
  884. }
  885. // With IPv6, individual users or sites are users commonly allocated a /64
  886. // or /56, so rate limit by /56.
  887. return IP.Mask(net.CIDRMask(56, 128)).String()
  888. }
  889. // announcementMultiQueue is a set of announcement queues, one per common or
  890. // personal compartment ID, providing efficient iteration over announcements
  891. // matching a specified list of compartment IDs. announcementMultiQueue and
  892. // its underlying data structures are not safe for concurrent access.
  893. type announcementMultiQueue struct {
  894. commonCompartmentQueues map[ID]*announcementCompartmentQueue
  895. personalCompartmentQueues map[ID]*announcementCompartmentQueue
  896. totalEntries int
  897. }
  898. // announcementCompartmentQueue is a single compartment queue within an
  899. // announcementMultiQueue. The queue is implemented using a doubly-linked
  900. // list, which provides efficient insert and mid-queue dequeue operations.
  901. // The announcementCompartmentQueue also records NAT type stats for enqueued
  902. // announcements, which are used, when matching, to determine when better NAT
  903. // matches may be possible.
  904. type announcementCompartmentQueue struct {
  905. entries *list.List
  906. unlimitedNATCount int
  907. partiallyLimitedNATCount int
  908. strictlyLimitedNATCount int
  909. }
  910. // announcementMatchIterator represents the state of an iteration over a
  911. // subset of announcementMultiQueue compartment queues. Concurrent
  912. // announcementMatchIterators are not supported.
  913. type announcementMatchIterator struct {
  914. multiQueue *announcementMultiQueue
  915. isCommonCompartments bool
  916. compartmentQueues []*announcementCompartmentQueue
  917. compartmentIDs []ID
  918. nextEntries []*list.Element
  919. }
  920. // announcementQueueReference represents the queue position for a given
  921. // announcement entry, and provides an efficient dequeue operation.
  922. type announcementQueueReference struct {
  923. multiQueue *announcementMultiQueue
  924. compartmentQueue *announcementCompartmentQueue
  925. entry *list.Element
  926. }
  927. func newAnnouncementMultiQueue() *announcementMultiQueue {
  928. return &announcementMultiQueue{
  929. commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  930. personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  931. }
  932. }
  933. func (q *announcementMultiQueue) getLen() int {
  934. return q.totalEntries
  935. }
  936. func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
  937. // Assumes announcementEntry not already enueued.
  938. // Limitation: only one compartment ID, either common or personal, is
  939. // supported per announcement entry. In the common compartment case, the
  940. // broker currently assigns only one common compartment ID per proxy
  941. // announcement. In the personal compartment case, there is currently no
  942. // use case for allowing a proxy to announce under multiple personal
  943. // compartment IDs.
  944. //
  945. // To overcome this limitation, the dequeue operation would need to be
  946. // able to remove an announcement entry from multiple
  947. // announcementCompartmentQueues.
  948. commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
  949. personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
  950. if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
  951. return errors.TraceNew("announcement must specify exactly one compartment ID")
  952. }
  953. var compartmentID ID
  954. var compartmentQueues map[ID]*announcementCompartmentQueue
  955. if len(commonCompartmentIDs) > 0 {
  956. compartmentID = commonCompartmentIDs[0]
  957. compartmentQueues = q.commonCompartmentQueues
  958. } else {
  959. compartmentID = personalCompartmentIDs[0]
  960. compartmentQueues = q.personalCompartmentQueues
  961. }
  962. compartmentQueue, ok := compartmentQueues[compartmentID]
  963. if !ok {
  964. compartmentQueue = &announcementCompartmentQueue{
  965. entries: list.New(),
  966. }
  967. compartmentQueues[compartmentID] = compartmentQueue
  968. }
  969. entry := compartmentQueue.entries.PushBack(announcementEntry)
  970. // Update the NAT type counts which are used to determine if a better NAT
  971. // match may be made by inspecting more announcement queue entries.
  972. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  973. case NATTraversalUnlimited:
  974. compartmentQueue.unlimitedNATCount += 1
  975. case NATTraversalPartiallyLimited:
  976. compartmentQueue.partiallyLimitedNATCount += 1
  977. case NATTraversalStrictlyLimited:
  978. compartmentQueue.strictlyLimitedNATCount += 1
  979. }
  980. q.totalEntries += 1
  981. announcementEntry.queueReference = announcementQueueReference{
  982. multiQueue: q,
  983. compartmentQueue: compartmentQueue,
  984. entry: entry,
  985. }
  986. return nil
  987. }
  988. // announcementQueueReference returns false if the item is already dequeued.
  989. func (r announcementQueueReference) dequeue() bool {
  990. if r.entry == nil {
  991. // Already dequeued.
  992. return false
  993. }
  994. announcementEntry := r.entry.Value.(*announcementEntry)
  995. // Reverse the NAT type counts.
  996. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  997. case NATTraversalUnlimited:
  998. r.compartmentQueue.unlimitedNATCount -= 1
  999. case NATTraversalPartiallyLimited:
  1000. r.compartmentQueue.partiallyLimitedNATCount -= 1
  1001. case NATTraversalStrictlyLimited:
  1002. r.compartmentQueue.strictlyLimitedNATCount -= 1
  1003. }
  1004. r.compartmentQueue.entries.Remove(r.entry)
  1005. r.multiQueue.totalEntries -= 1
  1006. // Mark as dequeued.
  1007. r.entry = nil
  1008. return true
  1009. }
  1010. func (q *announcementMultiQueue) startMatching(
  1011. isCommonCompartments bool,
  1012. compartmentIDs []ID) *announcementMatchIterator {
  1013. iter := &announcementMatchIterator{
  1014. multiQueue: q,
  1015. isCommonCompartments: isCommonCompartments,
  1016. }
  1017. // Find the matching compartment queues and initialize iteration over
  1018. // those queues. Building the set of matching queues is a linear time
  1019. // operation, bounded by the length of compartmentIDs (no more than
  1020. // maxCompartmentIDs, as enforced in
  1021. // ClientOfferRequest.ValidateAndGetLogFields).
  1022. compartmentQueues := q.commonCompartmentQueues
  1023. if !isCommonCompartments {
  1024. compartmentQueues = q.personalCompartmentQueues
  1025. }
  1026. for _, ID := range compartmentIDs {
  1027. if compartmentQueue, ok := compartmentQueues[ID]; ok {
  1028. iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
  1029. iter.compartmentIDs = append(iter.compartmentIDs, ID)
  1030. iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
  1031. }
  1032. }
  1033. return iter
  1034. }
  1035. func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
  1036. // Return the count of NAT types across all matchable compartment queues.
  1037. //
  1038. // A potential future enhancement would be to provide per-queue NAT counts
  1039. // or NAT type indexing in order to quickly find preferred NAT matches.
  1040. unlimitedNATCount := 0
  1041. partiallyLimitedNATCount := 0
  1042. strictlyLimitedNATCount := 0
  1043. for _, compartmentQueue := range iter.compartmentQueues {
  1044. unlimitedNATCount += compartmentQueue.unlimitedNATCount
  1045. partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
  1046. strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
  1047. }
  1048. return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
  1049. }
  1050. // announcementMatchIterator returns the next announcement entry candidate in
  1051. // compartment queue FIFO order, selecting the queue with the oldest head
  1052. // item.
  1053. //
  1054. // The caller should invoke announcementEntry.queueReference.dequeue when the
  1055. // candidate is selected. dequeue may be called on any getNext return value
  1056. // without disrupting the iteration state; however,
  1057. // announcementEntry.queueReference.dequeue calls for arbitrary queue entries
  1058. // are not supported during iteration. Iteration and dequeue should all be
  1059. // performed with a lock over the entire announcementMultiQueue, and with
  1060. // only one concurrent announcementMatchIterator.
  1061. func (iter *announcementMatchIterator) getNext() *announcementEntry {
  1062. // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
  1063. // order.
  1064. // Select the oldest item, by deadline, from all the candidate queue head
  1065. // items. This operation is linear in the number of matching compartment
  1066. // ID queues, which is currently bounded by This is a linear time
  1067. // operation, bounded by the length of matching compartment IDs (no more
  1068. // than maxCompartmentIDs, as enforced in
  1069. // ClientOfferRequest.ValidateAndGetLogFields).
  1070. //
  1071. // A potential future enhancement is to add more iterator state to track
  1072. // which queue has the next oldest time to select on the following
  1073. // getNext call.
  1074. var selectedCandidate *announcementEntry
  1075. selectedIndex := -1
  1076. for i := 0; i < len(iter.compartmentQueues); i++ {
  1077. if iter.nextEntries[i] == nil {
  1078. continue
  1079. }
  1080. if selectedCandidate == nil {
  1081. selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
  1082. selectedIndex = i
  1083. } else {
  1084. candidate := iter.nextEntries[i].Value.(*announcementEntry)
  1085. deadline, deadlineOk := candidate.ctx.Deadline()
  1086. selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
  1087. if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
  1088. selectedCandidate = candidate
  1089. selectedIndex = i
  1090. }
  1091. }
  1092. }
  1093. // Advance the selected queue to the next element. This must be done
  1094. // before any dequeue call, since container/list.remove clears
  1095. // list.Element.next.
  1096. if selectedIndex != -1 {
  1097. iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
  1098. }
  1099. return selectedCandidate
  1100. }