matcher.go 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. std_errors "errors"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. lrucache "github.com/cognusion/go-cache-lru"
  31. "golang.org/x/time/rate"
  32. )
  33. // TTLs should be aligned with STUN hole punch lifetimes.
  34. const (
  35. matcherAnnouncementQueueMaxSize = 5000000
  36. matcherOfferQueueMaxSize = 5000000
  37. matcherPendingAnswersTTL = 30 * time.Second
  38. matcherPendingAnswersMaxSize = 100000
  39. matcherMaxPreferredNATProbe = 100
  40. matcherRateLimiterReapHistoryFrequencySeconds = 300
  41. matcherRateLimiterMaxCacheEntries = 1000000
  42. )
  43. // Matcher matches proxy announcements with client offers. Matcher also
  44. // coordinates pending proxy answers and routes answers to the awaiting
  45. // client offer handler.
  46. //
  47. // Matching prioritizes selecting the oldest announcements and client offers,
  48. // as they are closest to timing out.
  49. //
  50. // The client and proxy must supply matching personal or common compartment
  51. // IDs. Common compartments are managed by Psiphon and can be obtained via a
  52. // tactics parameter or via an OSL embedding. Each proxy announcement or
  53. // client offer may specify only one compartment ID type, either common or
  54. // personal.
  55. //
  56. // Matching prefers to pair proxies and clients in a way that maximizes total
  57. // possible matches. For a client or proxy with less-limited NAT traversal, a
  58. // pairing with more-limited NAT traversal is preferred; and vice versa.
  59. // Candidates with unknown NAT types and mobile network types are assumed to
  60. // have the most limited NAT traversal capability.
  61. //
  62. // Preferred matchings take priority over announcement age.
  63. //
  64. // The client and proxy will not match if they are in the same country and
  65. // ASN, as it's assumed that doesn't provide any blocking circumvention
  66. // benefit. Disallowing proxies in certain blocked countries is handled at a
  67. // higher level; any such proxies should not be enqueued for matching.
  68. type Matcher struct {
  69. config *MatcherConfig
  70. runMutex sync.Mutex
  71. runContext context.Context
  72. stopRunning context.CancelFunc
  73. waitGroup *sync.WaitGroup
  74. // The announcement queue is implicitly sorted by announcement age. The
  75. // count fields are used to skip searching deeper into the queue for
  76. // preferred matches.
  77. // TODO: replace queue and counts with an indexed, in-memory database?
  78. announcementQueueMutex sync.Mutex
  79. announcementQueue *announcementMultiQueue
  80. announcementQueueEntryCountByIP map[string]int
  81. announcementQueueRateLimiters *lrucache.Cache
  82. announcementLimitEntryCount int
  83. announcementRateLimitQuantity int
  84. announcementRateLimitInterval time.Duration
  85. announcementNonlimitedProxyIDs map[ID]struct{}
  86. // The offer queue is also implicitly sorted by offer age. Both an offer
  87. // and announcement queue are required since either announcements or
  88. // offers can arrive while there are no available pairings.
  89. offerQueueMutex sync.Mutex
  90. offerQueue *list.List
  91. offerQueueEntryCountByIP map[string]int
  92. offerQueueRateLimiters *lrucache.Cache
  93. offerLimitEntryCount int
  94. offerRateLimitQuantity int
  95. offerRateLimitInterval time.Duration
  96. matchSignal chan struct{}
  97. pendingAnswers *lrucache.Cache
  98. }
  99. // MatchProperties specifies the compartment, GeoIP, and network topology
  100. // matching roperties of clients and proxies.
  101. type MatchProperties struct {
  102. IsPriority bool
  103. CommonCompartmentIDs []ID
  104. PersonalCompartmentIDs []ID
  105. GeoIPData common.GeoIPData
  106. NetworkType NetworkType
  107. NATType NATType
  108. PortMappingTypes PortMappingTypes
  109. }
  110. // EffectiveNATType combines the set of network properties into an effective
  111. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  112. // traversal is assumed. When NAT type is unknown and the network type is
  113. // mobile, CGNAT with limited NAT traversal is assumed.
  114. func (p *MatchProperties) EffectiveNATType() NATType {
  115. if p.PortMappingTypes.Available() {
  116. return NATTypePortMapping
  117. }
  118. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  119. // publicly reachable IPv6 ICE host candidate? If so, change the
  120. // effective NAT type? Depends on whether the matched peer can use IPv6.
  121. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  122. return NATTypeMobileNetwork
  123. }
  124. return p.NATType
  125. }
  126. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  127. // matching given the types of pairing candidates available.
  128. func (p *MatchProperties) ExistsPreferredNATMatch(
  129. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  130. return p.EffectiveNATType().ExistsPreferredMatch(
  131. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  132. }
  133. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  134. // NAT matching.
  135. func (p *MatchProperties) IsPreferredNATMatch(
  136. peerMatchProperties *MatchProperties) bool {
  137. return p.EffectiveNATType().IsPreferredMatch(
  138. peerMatchProperties.EffectiveNATType())
  139. }
  140. // MatchAnnouncement is a proxy announcement to be queued for matching.
  141. type MatchAnnouncement struct {
  142. Properties MatchProperties
  143. ProxyID ID
  144. ProxyMetrics *ProxyMetrics
  145. ConnectionID ID
  146. }
  147. // MatchOffer is a client offer to be queued for matching.
  148. type MatchOffer struct {
  149. Properties MatchProperties
  150. ClientProxyProtocolVersion int32
  151. ClientOfferSDP WebRTCSessionDescription
  152. ClientRootObfuscationSecret ObfuscationSecret
  153. DoDTLSRandomization bool
  154. TrafficShapingParameters *DataChannelTrafficShapingParameters
  155. NetworkProtocol NetworkProtocol
  156. DestinationAddress string
  157. DestinationServerID string
  158. }
  159. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  160. // announcement, to be routed to the awaiting client offer.
  161. type MatchAnswer struct {
  162. ProxyIP string
  163. ProxyID ID
  164. ConnectionID ID
  165. SelectedProxyProtocolVersion int32
  166. ProxyAnswerSDP WebRTCSessionDescription
  167. }
  168. // MatchMetrics records statistics about the match queue state at the time a
  169. // match is made.
  170. type MatchMetrics struct {
  171. OfferMatchIndex int
  172. OfferQueueSize int
  173. AnnouncementMatchIndex int
  174. AnnouncementQueueSize int
  175. }
  176. // GetMetrics converts MatchMetrics to loggable fields.
  177. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  178. if metrics == nil {
  179. return nil
  180. }
  181. return common.LogFields{
  182. "offer_match_index": metrics.OfferMatchIndex,
  183. "offer_queue_size": metrics.OfferQueueSize,
  184. "announcement_match_index": metrics.AnnouncementMatchIndex,
  185. "announcement_queue_size": metrics.AnnouncementQueueSize,
  186. }
  187. }
  188. // announcementEntry is an announcement queue entry, an announcement with its
  189. // associated lifetime context and signaling channel.
  190. type announcementEntry struct {
  191. ctx context.Context
  192. limitIP string
  193. announcement *MatchAnnouncement
  194. offerChan chan *MatchOffer
  195. matchMetrics atomic.Value
  196. // queueReference is initialized by addAnnouncementEntry, and used to
  197. // efficiently dequeue the entry.
  198. queueReference announcementQueueReference
  199. }
  200. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  201. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  202. return matchMetrics
  203. }
  204. // offerEntry is an offer queue entry, an offer with its associated lifetime
  205. // context and signaling channel.
  206. type offerEntry struct {
  207. ctx context.Context
  208. limitIP string
  209. offer *MatchOffer
  210. answerChan chan *answerInfo
  211. matchMetrics atomic.Value
  212. // queueReference is initialized by addOfferEntry, and used to efficiently
  213. // dequeue the entry.
  214. queueReference *list.Element
  215. }
  216. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  217. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  218. return matchMetrics
  219. }
  220. // answerInfo is an answer and its associated announcement.
  221. type answerInfo struct {
  222. announcement *MatchAnnouncement
  223. answer *MatchAnswer
  224. }
  225. // pendingAnswer represents an answer that is expected to arrive from a
  226. // proxy.
  227. type pendingAnswer struct {
  228. announcement *MatchAnnouncement
  229. answerChan chan *answerInfo
  230. }
  231. // MatcherConfig specifies the configuration for a matcher.
  232. type MatcherConfig struct {
  233. // Logger is used to log events.
  234. Logger common.Logger
  235. // Announcement queue limits.
  236. AnnouncementLimitEntryCount int
  237. AnnouncementRateLimitQuantity int
  238. AnnouncementRateLimitInterval time.Duration
  239. AnnouncementNonlimitedProxyIDs []ID
  240. // Offer queue limits.
  241. OfferLimitEntryCount int
  242. OfferRateLimitQuantity int
  243. OfferRateLimitInterval time.Duration
  244. // Broker process load limit state callback. See Broker.Config.
  245. IsLoadLimiting func() bool
  246. }
  247. // NewMatcher creates a new Matcher.
  248. func NewMatcher(config *MatcherConfig) *Matcher {
  249. m := &Matcher{
  250. config: config,
  251. waitGroup: new(sync.WaitGroup),
  252. announcementQueue: newAnnouncementMultiQueue(),
  253. announcementQueueEntryCountByIP: make(map[string]int),
  254. announcementQueueRateLimiters: lrucache.NewWithLRU(
  255. 0,
  256. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  257. matcherRateLimiterMaxCacheEntries),
  258. offerQueue: list.New(),
  259. offerQueueEntryCountByIP: make(map[string]int),
  260. offerQueueRateLimiters: lrucache.NewWithLRU(
  261. 0,
  262. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  263. matcherRateLimiterMaxCacheEntries),
  264. matchSignal: make(chan struct{}, 1),
  265. // matcherPendingAnswersTTL is not configurable; it supplies a default
  266. // that is expected to be ignored when each entry's TTL is set to the
  267. // Offer ctx timeout.
  268. pendingAnswers: lrucache.NewWithLRU(
  269. matcherPendingAnswersTTL,
  270. 1*time.Minute,
  271. matcherPendingAnswersMaxSize),
  272. }
  273. m.SetLimits(
  274. config.AnnouncementLimitEntryCount,
  275. config.AnnouncementRateLimitQuantity,
  276. config.AnnouncementRateLimitInterval,
  277. config.AnnouncementNonlimitedProxyIDs,
  278. config.OfferLimitEntryCount,
  279. config.OfferRateLimitQuantity,
  280. config.OfferRateLimitInterval)
  281. return m
  282. }
  283. // SetLimits sets new queue limits, replacing the previous configuration.
  284. // Existing, cached rate limiters retain their existing rate limit state. New
  285. // entries will use the new quantity/interval configuration. In addition,
  286. // currently enqueued items may exceed any new, lower maximum entry count
  287. // until naturally dequeued.
  288. func (m *Matcher) SetLimits(
  289. announcementLimitEntryCount int,
  290. announcementRateLimitQuantity int,
  291. announcementRateLimitInterval time.Duration,
  292. announcementNonlimitedProxyIDs []ID,
  293. offerLimitEntryCount int,
  294. offerRateLimitQuantity int,
  295. offerRateLimitInterval time.Duration) {
  296. nonlimitedProxyIDs := make(map[ID]struct{})
  297. for _, proxyID := range announcementNonlimitedProxyIDs {
  298. nonlimitedProxyIDs[proxyID] = struct{}{}
  299. }
  300. m.announcementQueueMutex.Lock()
  301. m.announcementLimitEntryCount = announcementLimitEntryCount
  302. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  303. m.announcementRateLimitInterval = announcementRateLimitInterval
  304. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  305. m.announcementQueueMutex.Unlock()
  306. m.offerQueueMutex.Lock()
  307. m.offerLimitEntryCount = offerLimitEntryCount
  308. m.offerRateLimitQuantity = offerRateLimitQuantity
  309. m.offerRateLimitInterval = offerRateLimitInterval
  310. m.offerQueueMutex.Unlock()
  311. }
  312. // Start starts running the Matcher. The Matcher runs a goroutine which
  313. // matches announcements and offers.
  314. func (m *Matcher) Start() error {
  315. m.runMutex.Lock()
  316. defer m.runMutex.Unlock()
  317. if m.runContext != nil {
  318. return errors.TraceNew("already running")
  319. }
  320. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  321. m.waitGroup.Add(1)
  322. go func() {
  323. defer m.waitGroup.Done()
  324. m.matchWorker(m.runContext)
  325. }()
  326. return nil
  327. }
  328. // Stop stops running the Matcher and its worker goroutine.
  329. //
  330. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  331. // can get enqueued during and after a Stop call. Stop is intended more for a
  332. // full broker shutdown, where this won't be a concern.
  333. func (m *Matcher) Stop() {
  334. m.runMutex.Lock()
  335. defer m.runMutex.Unlock()
  336. m.stopRunning()
  337. m.waitGroup.Wait()
  338. m.runContext, m.stopRunning = nil, nil
  339. }
  340. // Announce enqueues the proxy announcement and blocks until it is matched
  341. // with a returned offer or ctx is done. The caller must not mutate the
  342. // announcement or its properties after calling Announce.
  343. //
  344. // Announce assumes that the ctx.Deadline for each call is monotonically
  345. // increasing and that the deadline can be used as part of selecting the next
  346. // nearest-to-expire announcement.
  347. //
  348. // The offer is sent to the proxy by the broker, and then the proxy sends its
  349. // answer back to the broker, which calls Answer with that value.
  350. //
  351. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  352. // match is made, even if there is a later error.
  353. func (m *Matcher) Announce(
  354. ctx context.Context,
  355. proxyIP string,
  356. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  357. // An announcement must specify exactly one compartment ID, of one type,
  358. // common or personal. The limit of one is currently a limitation of the
  359. // multi-queue implementation; see comment in
  360. // announcementMultiQueue.enqueue.
  361. compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
  362. if len(compartmentIDs) == 0 {
  363. compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
  364. } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
  365. return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  366. }
  367. if len(compartmentIDs) != 1 {
  368. return nil, nil, errors.TraceNew("unexpected compartment ID count")
  369. }
  370. isAnnouncement := true
  371. err := m.applyLoadLimit(isAnnouncement)
  372. if err != nil {
  373. return nil, nil, errors.Trace(err)
  374. }
  375. announcementEntry := &announcementEntry{
  376. ctx: ctx,
  377. limitIP: getRateLimitIP(proxyIP),
  378. announcement: proxyAnnouncement,
  379. offerChan: make(chan *MatchOffer, 1),
  380. }
  381. err = m.addAnnouncementEntry(announcementEntry)
  382. if err != nil {
  383. return nil, nil, errors.Trace(err)
  384. }
  385. // Await client offer.
  386. var clientOffer *MatchOffer
  387. select {
  388. case <-ctx.Done():
  389. m.removeAnnouncementEntry(true, announcementEntry)
  390. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  391. case clientOffer = <-announcementEntry.offerChan:
  392. }
  393. return clientOffer, announcementEntry.getMatchMetrics(), nil
  394. }
  395. // Offer enqueues the client offer and blocks until it is matched with a
  396. // returned announcement or ctx is done. The caller must not mutate the offer
  397. // or its properties after calling Announce.
  398. //
  399. // The answer is returned to the client by the broker, and the WebRTC
  400. // connection is dialed. The original announcement is also returned, so its
  401. // match properties can be logged.
  402. //
  403. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  404. // match is made, even if there is a later error.
  405. func (m *Matcher) Offer(
  406. ctx context.Context,
  407. clientIP string,
  408. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  409. // An offer must specify at least one compartment ID, and may only specify
  410. // one type, common or personal, of compartment IDs.
  411. compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
  412. if len(compartmentIDs) == 0 {
  413. compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
  414. } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
  415. return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  416. }
  417. if len(compartmentIDs) < 1 {
  418. return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
  419. }
  420. isAnnouncement := false
  421. err := m.applyLoadLimit(isAnnouncement)
  422. if err != nil {
  423. return nil, nil, nil, errors.Trace(err)
  424. }
  425. offerEntry := &offerEntry{
  426. ctx: ctx,
  427. limitIP: getRateLimitIP(clientIP),
  428. offer: clientOffer,
  429. answerChan: make(chan *answerInfo, 1),
  430. }
  431. err = m.addOfferEntry(offerEntry)
  432. if err != nil {
  433. return nil, nil, nil, errors.Trace(err)
  434. }
  435. // Await proxy answer.
  436. var proxyAnswerInfo *answerInfo
  437. select {
  438. case <-ctx.Done():
  439. m.removeOfferEntry(true, offerEntry)
  440. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  441. // the Offer ctx, the client request, timeout, so it will eventually
  442. // get removed. But a client may abort its request earlier than the
  443. // timeout.
  444. return nil, nil,
  445. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  446. case proxyAnswerInfo = <-offerEntry.answerChan:
  447. }
  448. if proxyAnswerInfo == nil {
  449. // nil will be delivered to the channel when either the proxy
  450. // announcement request concurrently timed out, or the answer
  451. // indicated a proxy error, or the answer did not arrive in time.
  452. return nil, nil,
  453. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  454. }
  455. // This is a sanity check and not expected to fail.
  456. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  457. proxyAnswerInfo.announcement.ConnectionID) {
  458. return nil, nil,
  459. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  460. }
  461. return proxyAnswerInfo.answer,
  462. proxyAnswerInfo.announcement,
  463. offerEntry.getMatchMetrics(),
  464. nil
  465. }
  466. // AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
  467. // announcement identified by the specified proxy ID and connection ID and
  468. // returns whether the announcement has personal compartment IDs, indicating
  469. // personal pairing mode.
  470. //
  471. // If no pending answer is found, an error is returned.
  472. func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
  473. proxyID ID, connectionID ID) (bool, error) {
  474. key := m.pendingAnswerKey(proxyID, connectionID)
  475. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  476. if !ok {
  477. // The input IDs don't correspond to a pending answer, or the client
  478. // is no longer awaiting the response.
  479. return false, errors.TraceNew("no pending answer")
  480. }
  481. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  482. hasPersonalCompartmentIDs := len(
  483. pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
  484. return hasPersonalCompartmentIDs, nil
  485. }
  486. // Answer delivers an answer from the proxy for a previously matched offer.
  487. // The ProxyID and ConnectionID must correspond to the original announcement.
  488. // The caller must not mutate the answer after calling Answer. Answer does
  489. // not block.
  490. //
  491. // The answer is returned to the awaiting Offer call and sent to the matched
  492. // client.
  493. func (m *Matcher) Answer(
  494. proxyAnswer *MatchAnswer) error {
  495. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  496. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  497. if !ok {
  498. // The input IDs don't correspond to a pending answer, or the client
  499. // is no longer awaiting the response.
  500. return errors.TraceNew("no pending answer")
  501. }
  502. m.pendingAnswers.Delete(key)
  503. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  504. pendingAnswer.answerChan <- &answerInfo{
  505. announcement: pendingAnswer.announcement,
  506. answer: proxyAnswer,
  507. }
  508. return nil
  509. }
  510. // AnswerError delivers a failed answer indication from the proxy to an
  511. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  512. // original announcement.
  513. //
  514. // The failure indication is returned to the awaiting Offer call and sent to
  515. // the matched client.
  516. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  517. key := m.pendingAnswerKey(proxyID, connectionID)
  518. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  519. if !ok {
  520. // The client is no longer awaiting the response.
  521. return
  522. }
  523. m.pendingAnswers.Delete(key)
  524. // Closing the channel delivers nil, a failed indicator, to any receiver.
  525. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  526. }
  527. // matchWorker is the matching worker goroutine. It idles until signaled that
  528. // a queue item has been added, and then runs a full matching pass.
  529. func (m *Matcher) matchWorker(ctx context.Context) {
  530. for {
  531. select {
  532. case <-m.matchSignal:
  533. m.matchAllOffers()
  534. case <-ctx.Done():
  535. return
  536. }
  537. }
  538. }
  539. // matchAllOffers iterates over the queues, making all possible matches.
  540. func (m *Matcher) matchAllOffers() {
  541. m.announcementQueueMutex.Lock()
  542. defer m.announcementQueueMutex.Unlock()
  543. m.offerQueueMutex.Lock()
  544. defer m.offerQueueMutex.Unlock()
  545. // Take each offer in turn, and select an announcement match. There is an
  546. // implicit preference for older client offers, sooner to timeout, at the
  547. // front of the queue.
  548. // TODO: consider matching one offer, then releasing the locks to allow
  549. // more announcements to be enqueued, then continuing to match.
  550. nextOffer := m.offerQueue.Front()
  551. offerIndex := -1
  552. for nextOffer != nil && m.announcementQueue.getLen() > 0 {
  553. offerIndex += 1
  554. // nextOffer.Next must be invoked before any removeOfferEntry since
  555. // container/list.remove clears list.Element.next.
  556. offer := nextOffer
  557. nextOffer = nextOffer.Next()
  558. offerEntry := offer.Value.(*offerEntry)
  559. // Skip and remove this offer if its deadline has already passed.
  560. // There is no signal to the awaiting Offer function, as it will exit
  561. // based on the same ctx.
  562. if offerEntry.ctx.Err() != nil {
  563. m.removeOfferEntry(false, offerEntry)
  564. continue
  565. }
  566. announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
  567. if announcementEntry == nil {
  568. continue
  569. }
  570. // Record match metrics.
  571. // The index metrics predate the announcement multi-queue; now, with
  572. // the multi-queue, announcement_index is how many announce entries
  573. // were inspected before matching.
  574. matchMetrics := &MatchMetrics{
  575. OfferMatchIndex: offerIndex,
  576. OfferQueueSize: m.offerQueue.Len(),
  577. AnnouncementMatchIndex: announcementMatchIndex,
  578. AnnouncementQueueSize: m.announcementQueue.getLen(),
  579. }
  580. offerEntry.matchMetrics.Store(matchMetrics)
  581. announcementEntry.matchMetrics.Store(matchMetrics)
  582. // Remove the matched announcement from the queue. Send the offer to
  583. // the announcement entry's offerChan, which will deliver it to the
  584. // blocked Announce call. Add a pending answers entry to await the
  585. // proxy's follow up Answer call. The TTL for the pending answer
  586. // entry is set to the matched Offer call's ctx, as the answer is
  587. // only useful as long as the client is still waiting.
  588. m.removeAnnouncementEntry(false, announcementEntry)
  589. expiry := lrucache.DefaultExpiration
  590. deadline, ok := offerEntry.ctx.Deadline()
  591. if ok {
  592. expiry = time.Until(deadline)
  593. }
  594. key := m.pendingAnswerKey(
  595. announcementEntry.announcement.ProxyID,
  596. announcementEntry.announcement.ConnectionID)
  597. m.pendingAnswers.Set(
  598. key,
  599. &pendingAnswer{
  600. announcement: announcementEntry.announcement,
  601. answerChan: offerEntry.answerChan,
  602. },
  603. expiry)
  604. announcementEntry.offerChan <- offerEntry.offer
  605. // Remove the matched offer from the queue and match the next offer,
  606. // now first in the queue.
  607. m.removeOfferEntry(false, offerEntry)
  608. }
  609. }
  610. func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
  611. // Assumes the caller has the queue mutexed locked.
  612. // Check each candidate announcement in turn, and select a match. There is
  613. // an implicit preference for older proxy announcements, sooner to
  614. // timeout, at the front of the enqueued announcements.
  615. // announcementMultiQueue.startMatching skips to the first matching
  616. // compartment ID(s).
  617. //
  618. // Limitation: since this logic matches each enqueued client in turn, it will
  619. // only make the optimal NAT match for the oldest enqueued client vs. all
  620. // proxies, and not do optimal N x M matching for all clients and all proxies.
  621. //
  622. // Future matching enhancements could include more sophisticated GeoIP
  623. // rules, such as a configuration encoding knowledge of an ASN's NAT
  624. // type, or preferred client/proxy country/ASN matches.
  625. // TODO: match supported protocol versions. Currently, all announces and
  626. // offers must specify ProxyProtocolVersion1, so there's no protocol
  627. // version match logic.
  628. offerProperties := &offerEntry.offer.Properties
  629. // Assumes the caller checks that offer specifies either personal
  630. // compartment IDs or common compartment IDs, but not both.
  631. isCommonCompartments := false
  632. compartmentIDs := offerProperties.PersonalCompartmentIDs
  633. if len(compartmentIDs) == 0 {
  634. isCommonCompartments = true
  635. compartmentIDs = offerProperties.CommonCompartmentIDs
  636. }
  637. if len(compartmentIDs) == 0 {
  638. return nil, -1
  639. }
  640. matchIterator := m.announcementQueue.startMatching(
  641. isCommonCompartments, compartmentIDs)
  642. // Use the NAT traversal type counters to check if there's any preferred
  643. // NAT match for this offer in the announcement queue. When there is, we
  644. // will search beyond the first announcement.
  645. unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
  646. matchIterator.getNATCounts()
  647. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  648. unlimitedNATCount > 0,
  649. partiallyLimitedNATCount > 0,
  650. strictlyLimitedNATCount > 0)
  651. var bestMatch *announcementEntry
  652. bestMatchIndex := -1
  653. bestMatchIsPriority := false
  654. bestMatchNAT := false
  655. candidateIndex := -1
  656. for {
  657. announcementEntry, isPriority := matchIterator.getNext()
  658. if announcementEntry == nil {
  659. break
  660. }
  661. if !isPriority && bestMatchIsPriority {
  662. // There is a priority match, but it wasn't bestMatchNAT and we
  663. // continued to iterate. Now that isPriority is false, we're past the
  664. // end of the priority items, so stop looking for any best NAT match
  665. // and return the previous priority match. When there are zero
  666. // priority items to begin with, this case should not be hit.
  667. break
  668. }
  669. candidateIndex += 1
  670. // Skip and remove this announcement if its deadline has already
  671. // passed. There is no signal to the awaiting Announce function, as
  672. // it will exit based on the same ctx.
  673. if announcementEntry.ctx.Err() != nil {
  674. m.removeAnnouncementEntry(false, announcementEntry)
  675. continue
  676. }
  677. announcementProperties := &announcementEntry.announcement.Properties
  678. // Disallow matching the same country and ASN, except for personal
  679. // compartment ID matches.
  680. //
  681. // For common matching, hopping through the same ISP is assumed to
  682. // have no circumvention benefit. For personal matching, the user may
  683. // wish to hop their their own or their friend's proxy regardless.
  684. if isCommonCompartments &&
  685. !GetAllowCommonASNMatching() &&
  686. (offerProperties.GeoIPData.Country ==
  687. announcementProperties.GeoIPData.Country &&
  688. offerProperties.GeoIPData.ASN ==
  689. announcementProperties.GeoIPData.ASN) {
  690. continue
  691. }
  692. // Check if this is a preferred NAT match. Ultimately, a match may be
  693. // made with potentially incompatible NATs, but the client/proxy
  694. // reported NAT types may be incorrect or unknown; the client will
  695. // often skip NAT discovery.
  696. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  697. // At this point, the candidate is a match. Determine if this is a new
  698. // best match, either if there was no previous match, or this is a
  699. // better NAT match.
  700. if bestMatch == nil || (!bestMatchNAT && matchNAT) {
  701. bestMatch = announcementEntry
  702. bestMatchIndex = candidateIndex
  703. bestMatchIsPriority = isPriority
  704. bestMatchNAT = matchNAT
  705. }
  706. // Stop as soon as we have the best possible match, or have reached
  707. // the probe limit for preferred NAT matches.
  708. if bestMatch != nil && (bestMatchNAT ||
  709. !existsPreferredNATMatch ||
  710. candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
  711. break
  712. }
  713. }
  714. return bestMatch, bestMatchIndex
  715. }
  716. // applyLoadLimit checks if the broker process is in the load limiting state
  717. // and, in order to reduce load, determines if new proxy announces or client
  718. // offers should be rejected immediately instead of enqueued.
  719. func (m *Matcher) applyLoadLimit(isAnnouncement bool) error {
  720. if m.config.IsLoadLimiting == nil || !m.config.IsLoadLimiting() {
  721. return nil
  722. }
  723. // Acquire the queue locks only when in the load limit state, and in the
  724. // same order as matchAllOffers.
  725. m.announcementQueueMutex.Lock()
  726. defer m.announcementQueueMutex.Unlock()
  727. m.offerQueueMutex.Lock()
  728. defer m.offerQueueMutex.Unlock()
  729. announcementLen := m.announcementQueue.getLen()
  730. offerLen := m.offerQueue.Len()
  731. // When the load limit had been reached, and assuming the broker process
  732. // is running only an in-proxy broker, it's likely, in practise, that
  733. // only one of the two queues has hundreds of thousands of entries while
  734. // the other has few, and there are no matches clearing the queue.
  735. //
  736. // Instead of simply rejecting all enqueue requests, allow the request
  737. // type, announce or offer, that is in shorter supply as these are likely
  738. // to match and draw down the larger queue. This attempts to make
  739. // productive use of enqueued items, and also attempts to avoid simply
  740. // emptying both queues -- as will happen in any case due to timeouts --
  741. // and then have the same larger queue refill again after the load limit
  742. // state exits.
  743. //
  744. // This approach assumes some degree of slack in available system memory
  745. // and CPU in the load limiting state, similar to how the tunnel server
  746. // continues to operate existing tunnels in the same state.
  747. //
  748. // The heuristic below of allowing when less than half the size of the
  749. // larger queue puts a cap on the amount the shorter queue can continue
  750. // to grow in the load limiting state, in the worst case.
  751. //
  752. // Limitation: in some scenarios that are expected to be rare, it can
  753. // happen that allowed requests don't result in a match and memory
  754. // consumption continues to grow, leading to a broker process OOM kill.
  755. var allow bool
  756. if isAnnouncement {
  757. allow = announcementLen < offerLen/2
  758. } else {
  759. allow = offerLen < announcementLen/2
  760. }
  761. if allow {
  762. return nil
  763. }
  764. // Do not return a MatcherLimitError, as is done in applyIPLimits. A
  765. // MatcherLimitError results in a Response.Limited error response, which
  766. // causes a proxy to back off and a client to abort its dial; but in
  767. // neither case is the broker client reset. The error returned here will
  768. // result in a fast 404 response to the proxy or client, which will
  769. // instead trigger a broker client reset, and a chance of moving to a
  770. // different broker that is not overloaded.
  771. //
  772. // Limitation: the 404 response won't be distinguishable, in client or
  773. // proxy diagnostics, from other error conditions.
  774. //
  775. // TODO: add a new Response.LoadLimited flag which the proxy/client can
  776. // use use log a distinct error and also ensure that it doesn't reselect
  777. // the same broker again in the broker client reset random selection.
  778. return errors.TraceNew("load limited")
  779. }
  780. // MatcherLimitError is the error type returned by Announce or Offer when the
  781. // caller has exceeded configured queue entry or rate limits.
  782. type MatcherLimitError struct {
  783. err error
  784. }
  785. func NewMatcherLimitError(err error) *MatcherLimitError {
  786. return &MatcherLimitError{err: err}
  787. }
  788. func (e MatcherLimitError) Error() string {
  789. return e.err.Error()
  790. }
  791. // applyIPLimits checks per-proxy or per-client -- as determined by peer IP
  792. // address -- rate limits and queue entry limits.
  793. func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  794. // Assumes m.announcementQueueMutex or m.offerQueueMutex is locked.
  795. var entryCountByIP map[string]int
  796. var queueRateLimiters *lrucache.Cache
  797. var limitEntryCount int
  798. var quantity int
  799. var interval time.Duration
  800. if isAnnouncement {
  801. // Skip limit checks for non-limited proxies.
  802. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  803. return nil
  804. }
  805. entryCountByIP = m.announcementQueueEntryCountByIP
  806. queueRateLimiters = m.announcementQueueRateLimiters
  807. limitEntryCount = m.announcementLimitEntryCount
  808. quantity = m.announcementRateLimitQuantity
  809. interval = m.announcementRateLimitInterval
  810. } else {
  811. entryCountByIP = m.offerQueueEntryCountByIP
  812. queueRateLimiters = m.offerQueueRateLimiters
  813. limitEntryCount = m.offerLimitEntryCount
  814. quantity = m.offerRateLimitQuantity
  815. interval = m.offerRateLimitInterval
  816. }
  817. // The rate limit is checked first, before the max count check, to ensure
  818. // that the rate limit state is updated regardless of the max count check
  819. // outcome.
  820. if quantity > 0 && interval > 0 {
  821. var rateLimiter *rate.Limiter
  822. entry, ok := queueRateLimiters.Get(limitIP)
  823. if ok {
  824. rateLimiter = entry.(*rate.Limiter)
  825. } else {
  826. limit := float64(quantity) / interval.Seconds()
  827. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  828. queueRateLimiters.Set(
  829. limitIP, rateLimiter, interval)
  830. }
  831. if !rateLimiter.Allow() {
  832. return errors.Trace(
  833. NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
  834. }
  835. }
  836. if limitEntryCount > 0 {
  837. // Limitation: non-limited proxy ID entries are counted in
  838. // entryCountByIP. If both a limited and non-limited proxy ingress
  839. // from the same limitIP, then the non-limited entries will count
  840. // against the limited proxy's limitEntryCount.
  841. entryCount, ok := entryCountByIP[limitIP]
  842. if ok && entryCount >= limitEntryCount {
  843. return errors.Trace(
  844. NewMatcherLimitError(std_errors.New("max entries for IP")))
  845. }
  846. }
  847. return nil
  848. }
  849. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  850. m.announcementQueueMutex.Lock()
  851. defer m.announcementQueueMutex.Unlock()
  852. // Ensure the queue doesn't grow larger than the max size.
  853. if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
  854. return errors.TraceNew("queue full")
  855. }
  856. // Ensure no single peer IP can enqueue a large number of entries or
  857. // rapidly enqueue beyond the configured rate.
  858. isAnnouncement := true
  859. err := m.applyIPLimits(
  860. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  861. if err != nil {
  862. return errors.Trace(err)
  863. }
  864. // announcementEntry.queueReference should be uninitialized.
  865. // announcementMultiQueue.enqueue sets queueReference to be used for
  866. // efficient dequeuing.
  867. if announcementEntry.queueReference.entry != nil {
  868. return errors.TraceNew("unexpected queue reference")
  869. }
  870. err = m.announcementQueue.enqueue(announcementEntry)
  871. if err != nil {
  872. return errors.Trace(err)
  873. }
  874. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  875. select {
  876. case m.matchSignal <- struct{}{}:
  877. default:
  878. }
  879. return nil
  880. }
  881. func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
  882. // In the aborting case, the queue isn't already locked. Otherwise, assume
  883. // it is locked.
  884. if aborting {
  885. m.announcementQueueMutex.Lock()
  886. defer m.announcementQueueMutex.Unlock()
  887. }
  888. found := announcementEntry.queueReference.dequeue()
  889. if found {
  890. // Adjust entry counts by peer IP, used to enforce
  891. // matcherAnnouncementQueueMaxEntriesPerIP.
  892. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  893. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  894. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  895. }
  896. }
  897. if aborting && !found {
  898. // The Announce call is aborting and taking its entry back out of the
  899. // queue. If the entry is not found in the queue, then a concurrent
  900. // Offer has matched the announcement. So check for the pending
  901. // answer corresponding to the announcement and remove it and deliver
  902. // a failure signal to the waiting Offer, so the client doesn't wait
  903. // longer than necessary.
  904. key := m.pendingAnswerKey(
  905. announcementEntry.announcement.ProxyID,
  906. announcementEntry.announcement.ConnectionID)
  907. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  908. if ok {
  909. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  910. m.pendingAnswers.Delete(key)
  911. }
  912. }
  913. }
  914. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  915. m.offerQueueMutex.Lock()
  916. defer m.offerQueueMutex.Unlock()
  917. // Ensure the queue doesn't grow larger than the max size.
  918. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  919. return errors.TraceNew("queue full")
  920. }
  921. // Ensure no single peer IP can enqueue a large number of entries or
  922. // rapidly enqueue beyond the configured rate.
  923. isAnnouncement := false
  924. err := m.applyIPLimits(
  925. isAnnouncement, offerEntry.limitIP, ID{})
  926. if err != nil {
  927. return errors.Trace(err)
  928. }
  929. // offerEntry.queueReference should be uninitialized and is set here to be
  930. // used for efficient dequeuing.
  931. if offerEntry.queueReference != nil {
  932. return errors.TraceNew("unexpected queue reference")
  933. }
  934. offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
  935. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  936. select {
  937. case m.matchSignal <- struct{}{}:
  938. default:
  939. }
  940. return nil
  941. }
  942. func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
  943. // In the aborting case, the queue isn't already locked. Otherise, assume
  944. // it is locked.
  945. if aborting {
  946. m.offerQueueMutex.Lock()
  947. defer m.offerQueueMutex.Unlock()
  948. }
  949. if offerEntry.queueReference == nil {
  950. return
  951. }
  952. m.offerQueue.Remove(offerEntry.queueReference)
  953. offerEntry.queueReference = nil
  954. // Adjust entry counts by peer IP, used to enforce
  955. // matcherOfferQueueMaxEntriesPerIP.
  956. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  957. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  958. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  959. }
  960. }
  961. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  962. // The pending answer lookup key is used to associate announcements and
  963. // subsequent answers. While the client learns the ConnectionID, only the
  964. // proxy knows the ProxyID component, so only the correct proxy can match
  965. // an answer to an announcement. The ConnectionID component is necessary
  966. // as a proxy may have multiple, concurrent pending answers.
  967. return string(proxyID[:]) + string(connectionID[:])
  968. }
  969. func getRateLimitIP(strIP string) string {
  970. IP := net.ParseIP(strIP)
  971. if IP == nil || IP.To4() != nil {
  972. return strIP
  973. }
  974. // With IPv6, individual users or sites are users commonly allocated a /64
  975. // or /56, so rate limit by /56.
  976. return IP.Mask(net.CIDRMask(56, 128)).String()
  977. }
  978. // announcementMultiQueue is a set of announcement queues, one per common or
  979. // personal compartment ID, providing efficient iteration over announcements
  980. // matching a specified list of compartment IDs. announcementMultiQueue and
  981. // its underlying data structures are not safe for concurrent access.
  982. type announcementMultiQueue struct {
  983. priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
  984. commonCompartmentQueues map[ID]*announcementCompartmentQueue
  985. personalCompartmentQueues map[ID]*announcementCompartmentQueue
  986. totalEntries int
  987. }
  988. // announcementCompartmentQueue is a single compartment queue within an
  989. // announcementMultiQueue. The queue is implemented using a doubly-linked
  990. // list, which provides efficient insert and mid-queue dequeue operations.
  991. // The announcementCompartmentQueue also records NAT type stats for enqueued
  992. // announcements, which are used, when matching, to determine when better NAT
  993. // matches may be possible.
  994. type announcementCompartmentQueue struct {
  995. isCommonCompartment bool
  996. isPriority bool
  997. compartmentID ID
  998. entries *list.List
  999. unlimitedNATCount int
  1000. partiallyLimitedNATCount int
  1001. strictlyLimitedNATCount int
  1002. }
  1003. // announcementMatchIterator represents the state of an iteration over a
  1004. // subset of announcementMultiQueue compartment queues. Concurrent
  1005. // announcementMatchIterators are not supported.
  1006. type announcementMatchIterator struct {
  1007. multiQueue *announcementMultiQueue
  1008. compartmentQueues []*announcementCompartmentQueue
  1009. compartmentIDs []ID
  1010. nextEntries []*list.Element
  1011. }
  1012. // announcementQueueReference represents the queue position for a given
  1013. // announcement entry, and provides an efficient dequeue operation.
  1014. type announcementQueueReference struct {
  1015. multiQueue *announcementMultiQueue
  1016. compartmentQueue *announcementCompartmentQueue
  1017. entry *list.Element
  1018. }
  1019. func newAnnouncementMultiQueue() *announcementMultiQueue {
  1020. return &announcementMultiQueue{
  1021. priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1022. commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1023. personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1024. }
  1025. }
  1026. func (q *announcementMultiQueue) getLen() int {
  1027. return q.totalEntries
  1028. }
  1029. func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
  1030. // Assumes announcementEntry not already enueued.
  1031. // Limitation: only one compartment ID, either common or personal, is
  1032. // supported per announcement entry. In the common compartment case, the
  1033. // broker currently assigns only one common compartment ID per proxy
  1034. // announcement. In the personal compartment case, there is currently no
  1035. // use case for allowing a proxy to announce under multiple personal
  1036. // compartment IDs.
  1037. //
  1038. // To overcome this limitation, the dequeue operation would need to be
  1039. // able to remove an announcement entry from multiple
  1040. // announcementCompartmentQueues.
  1041. commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
  1042. personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
  1043. if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
  1044. return errors.TraceNew("announcement must specify exactly one compartment ID")
  1045. }
  1046. isPriority := announcementEntry.announcement.Properties.IsPriority
  1047. isCommonCompartment := true
  1048. var compartmentID ID
  1049. var compartmentQueues map[ID]*announcementCompartmentQueue
  1050. if len(commonCompartmentIDs) > 0 {
  1051. compartmentID = commonCompartmentIDs[0]
  1052. compartmentQueues = q.commonCompartmentQueues
  1053. if isPriority {
  1054. compartmentQueues = q.priorityCommonCompartmentQueues
  1055. }
  1056. } else {
  1057. isCommonCompartment = false
  1058. compartmentID = personalCompartmentIDs[0]
  1059. compartmentQueues = q.personalCompartmentQueues
  1060. if isPriority {
  1061. return errors.TraceNew("priority not supported for personal compartments")
  1062. }
  1063. }
  1064. compartmentQueue, ok := compartmentQueues[compartmentID]
  1065. if !ok {
  1066. compartmentQueue = &announcementCompartmentQueue{
  1067. isCommonCompartment: isCommonCompartment,
  1068. isPriority: isPriority,
  1069. compartmentID: compartmentID,
  1070. entries: list.New(),
  1071. }
  1072. compartmentQueues[compartmentID] = compartmentQueue
  1073. }
  1074. entry := compartmentQueue.entries.PushBack(announcementEntry)
  1075. // Update the NAT type counts which are used to determine if a better NAT
  1076. // match may be made by inspecting more announcement queue entries.
  1077. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1078. case NATTraversalUnlimited:
  1079. compartmentQueue.unlimitedNATCount += 1
  1080. case NATTraversalPartiallyLimited:
  1081. compartmentQueue.partiallyLimitedNATCount += 1
  1082. case NATTraversalStrictlyLimited:
  1083. compartmentQueue.strictlyLimitedNATCount += 1
  1084. }
  1085. q.totalEntries += 1
  1086. announcementEntry.queueReference = announcementQueueReference{
  1087. multiQueue: q,
  1088. compartmentQueue: compartmentQueue,
  1089. entry: entry,
  1090. }
  1091. return nil
  1092. }
  1093. // announcementQueueReference returns false if the item is already dequeued.
  1094. func (r *announcementQueueReference) dequeue() bool {
  1095. if r.entry == nil {
  1096. // Already dequeued.
  1097. return false
  1098. }
  1099. announcementEntry := r.entry.Value.(*announcementEntry)
  1100. // Reverse the NAT type counts.
  1101. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1102. case NATTraversalUnlimited:
  1103. r.compartmentQueue.unlimitedNATCount -= 1
  1104. case NATTraversalPartiallyLimited:
  1105. r.compartmentQueue.partiallyLimitedNATCount -= 1
  1106. case NATTraversalStrictlyLimited:
  1107. r.compartmentQueue.strictlyLimitedNATCount -= 1
  1108. }
  1109. r.compartmentQueue.entries.Remove(r.entry)
  1110. if r.compartmentQueue.entries.Len() == 0 {
  1111. // Remove empty compartment queue.
  1112. queues := r.multiQueue.personalCompartmentQueues
  1113. if r.compartmentQueue.isCommonCompartment {
  1114. if r.compartmentQueue.isPriority {
  1115. queues = r.multiQueue.priorityCommonCompartmentQueues
  1116. } else {
  1117. queues = r.multiQueue.commonCompartmentQueues
  1118. }
  1119. }
  1120. delete(queues, r.compartmentQueue.compartmentID)
  1121. }
  1122. r.multiQueue.totalEntries -= 1
  1123. // Mark as dequeued.
  1124. r.entry = nil
  1125. return true
  1126. }
  1127. func (q *announcementMultiQueue) startMatching(
  1128. isCommonCompartments bool,
  1129. compartmentIDs []ID) *announcementMatchIterator {
  1130. iter := &announcementMatchIterator{
  1131. multiQueue: q,
  1132. }
  1133. // Find the matching compartment queues and initialize iteration over
  1134. // those queues. Building the set of matching queues is a linear time
  1135. // operation, bounded by the length of compartmentIDs (no more than
  1136. // maxCompartmentIDs, as enforced in
  1137. // ClientOfferRequest.ValidateAndGetLogFields).
  1138. // Priority queues, when in use, must all be added to the beginning of
  1139. // iter.compartmentQueues in order to ensure that the iteration logic in
  1140. // getNext visits all priority items first.
  1141. var compartmentQueuesList []map[ID]*announcementCompartmentQueue
  1142. if isCommonCompartments {
  1143. compartmentQueuesList = append(
  1144. compartmentQueuesList,
  1145. q.priorityCommonCompartmentQueues,
  1146. q.commonCompartmentQueues)
  1147. } else {
  1148. compartmentQueuesList = append(
  1149. compartmentQueuesList,
  1150. q.personalCompartmentQueues)
  1151. }
  1152. for _, compartmentQueues := range compartmentQueuesList {
  1153. for _, ID := range compartmentIDs {
  1154. if compartmentQueue, ok := compartmentQueues[ID]; ok {
  1155. iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
  1156. iter.compartmentIDs = append(iter.compartmentIDs, ID)
  1157. iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
  1158. }
  1159. }
  1160. }
  1161. return iter
  1162. }
  1163. func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
  1164. // Return the count of NAT types across all matchable compartment queues.
  1165. //
  1166. // A potential future enhancement would be to provide per-queue NAT counts
  1167. // or NAT type indexing in order to quickly find preferred NAT matches.
  1168. unlimitedNATCount := 0
  1169. partiallyLimitedNATCount := 0
  1170. strictlyLimitedNATCount := 0
  1171. for _, compartmentQueue := range iter.compartmentQueues {
  1172. unlimitedNATCount += compartmentQueue.unlimitedNATCount
  1173. partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
  1174. strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
  1175. }
  1176. return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
  1177. }
  1178. // announcementMatchIterator returns the next announcement entry candidate in
  1179. // compartment queue FIFO order, selecting the queue with the oldest head
  1180. // item.
  1181. //
  1182. // The caller should invoke announcementEntry.queueReference.dequeue when the
  1183. // candidate is selected. dequeue may be called on any getNext return value
  1184. // without disrupting the iteration state; however,
  1185. // announcementEntry.queueReference.dequeue calls for arbitrary queue entries
  1186. // are not supported during iteration. Iteration and dequeue should all be
  1187. // performed with a lock over the entire announcementMultiQueue, and with
  1188. // only one concurrent announcementMatchIterator.
  1189. //
  1190. // getNext returns a nil *announcementEntry when there are no more items.
  1191. // getNext also returns an isPriority flag, indicating the announcement is a
  1192. // priority candidate. All priority candidates are guaranteed to be returned
  1193. // before any non-priority candidates.
  1194. func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
  1195. // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
  1196. // order. Also assumes that any priority queues are all at the front of
  1197. // iter.compartmentQueues.
  1198. // Select the oldest item, by deadline, from all the candidate queue head
  1199. // items. This operation is linear in the number of matching compartment
  1200. // ID queues, which is currently bounded by the length of matching
  1201. // compartment IDs (no more than maxCompartmentIDs, as enforced in
  1202. // ClientOfferRequest.ValidateAndGetLogFields).
  1203. //
  1204. // When there are priority candidates, they are selected first, regardless
  1205. // of the deadlines of non-priority candidates. Multiple priority
  1206. // candidates are processed in FIFO deadline order.
  1207. //
  1208. // A potential future enhancement is to add more iterator state to track
  1209. // which queue has the next oldest time to select on the following
  1210. // getNext call. Another potential enhancement is to remove fully
  1211. // consumed queues from compartmentQueues/compartmentIDs/nextEntries.
  1212. var selectedCandidate *announcementEntry
  1213. selectedIndex := -1
  1214. selectedPriority := false
  1215. for i := 0; i < len(iter.compartmentQueues); i++ {
  1216. if iter.nextEntries[i] == nil {
  1217. continue
  1218. }
  1219. isPriority := iter.compartmentQueues[i].isPriority
  1220. if selectedPriority && !isPriority {
  1221. // Ignore older of non-priority entries when there are priority
  1222. // candidates.
  1223. break
  1224. }
  1225. if selectedCandidate == nil {
  1226. selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
  1227. selectedIndex = i
  1228. selectedPriority = isPriority
  1229. } else {
  1230. candidate := iter.nextEntries[i].Value.(*announcementEntry)
  1231. deadline, deadlineOk := candidate.ctx.Deadline()
  1232. selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
  1233. if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
  1234. selectedCandidate = candidate
  1235. selectedIndex = i
  1236. selectedPriority = isPriority
  1237. }
  1238. }
  1239. }
  1240. // Advance the selected queue to the next element. This must be done
  1241. // before any dequeue call, since container/list.remove clears
  1242. // list.Element.next.
  1243. if selectedIndex != -1 {
  1244. iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
  1245. }
  1246. return selectedCandidate, selectedPriority
  1247. }