matcher.go 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. std_errors "errors"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. lrucache "github.com/cognusion/go-cache-lru"
  31. "golang.org/x/time/rate"
  32. )
  33. // TTLs should be aligned with STUN hole punch lifetimes.
  34. const (
  35. matcherAnnouncementQueueMaxSize = 5000000
  36. matcherOfferQueueMaxSize = 5000000
  37. matcherPendingAnswersTTL = 30 * time.Second
  38. matcherPendingAnswersMaxSize = 100000
  39. matcherMaxPreferredNATProbe = 100
  40. matcherMaxProbe = 1000
  41. matcherRateLimiterReapHistoryFrequencySeconds = 300
  42. matcherRateLimiterMaxCacheEntries = 1000000
  43. )
  44. // Matcher matches proxy announcements with client offers. Matcher also
  45. // coordinates pending proxy answers and routes answers to the awaiting
  46. // client offer handler.
  47. //
  48. // Matching prioritizes selecting the oldest announcements and client offers,
  49. // as they are closest to timing out.
  50. //
  51. // The client and proxy must supply matching personal or common compartment
  52. // IDs. Common compartments are managed by Psiphon and can be obtained via a
  53. // tactics parameter or via an OSL embedding. Each proxy announcement or
  54. // client offer may specify only one compartment ID type, either common or
  55. // personal.
  56. //
  57. // Matching prefers to pair proxies and clients in a way that maximizes total
  58. // possible matches. For a client or proxy with less-limited NAT traversal, a
  59. // pairing with more-limited NAT traversal is preferred; and vice versa.
  60. // Candidates with unknown NAT types and mobile network types are assumed to
  61. // have the most limited NAT traversal capability.
  62. //
  63. // Preferred matchings take priority over announcement age.
  64. //
  65. // The client and proxy will not match if they are in the same country and
  66. // ASN, as it's assumed that doesn't provide any blocking circumvention
  67. // benefit. Disallowing proxies in certain blocked countries is handled at a
  68. // higher level; any such proxies should not be enqueued for matching.
  69. type Matcher struct {
  70. config *MatcherConfig
  71. runMutex sync.Mutex
  72. runContext context.Context
  73. stopRunning context.CancelFunc
  74. waitGroup *sync.WaitGroup
  75. // The announcement queue is implicitly sorted by announcement age. The
  76. // count fields are used to skip searching deeper into the queue for
  77. // preferred matches.
  78. // TODO: replace queue and counts with an indexed, in-memory database?
  79. announcementQueueMutex sync.Mutex
  80. announcementQueue *announcementMultiQueue
  81. announcementQueueEntryCountByIP map[string]int
  82. announcementQueueRateLimiters *lrucache.Cache
  83. announcementLimitEntryCount int
  84. announcementRateLimitQuantity int
  85. announcementRateLimitInterval time.Duration
  86. announcementNonlimitedProxyIDs map[ID]struct{}
  87. // The offer queue is also implicitly sorted by offer age. Both an offer
  88. // and announcement queue are required since either announcements or
  89. // offers can arrive while there are no available pairings.
  90. offerQueueMutex sync.Mutex
  91. offerQueue *list.List
  92. offerQueueEntryCountByIP map[string]int
  93. offerQueueRateLimiters *lrucache.Cache
  94. offerLimitEntryCount int
  95. offerRateLimitQuantity int
  96. offerRateLimitInterval time.Duration
  97. matchSignal chan struct{}
  98. pendingAnswers *lrucache.Cache
  99. }
  100. // MatchProperties specifies the compartment, GeoIP, and network topology
  101. // matching properties of clients and proxies.
  102. type MatchProperties struct {
  103. IsPriority bool
  104. ProtocolVersion int32
  105. CommonCompartmentIDs []ID
  106. PersonalCompartmentIDs []ID
  107. GeoIPData common.GeoIPData
  108. NetworkType NetworkType
  109. NATType NATType
  110. PortMappingTypes PortMappingTypes
  111. }
  112. // EffectiveNATType combines the set of network properties into an effective
  113. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  114. // traversal is assumed. When NAT type is unknown and the network type is
  115. // mobile, CGNAT with limited NAT traversal is assumed.
  116. func (p *MatchProperties) EffectiveNATType() NATType {
  117. if p.PortMappingTypes.Available() {
  118. return NATTypePortMapping
  119. }
  120. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  121. // publicly reachable IPv6 ICE host candidate? If so, change the
  122. // effective NAT type? Depends on whether the matched peer can use IPv6.
  123. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  124. return NATTypeMobileNetwork
  125. }
  126. return p.NATType
  127. }
  128. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  129. // matching given the types of pairing candidates available.
  130. func (p *MatchProperties) ExistsPreferredNATMatch(
  131. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  132. return p.EffectiveNATType().ExistsPreferredMatch(
  133. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  134. }
  135. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  136. // NAT matching.
  137. func (p *MatchProperties) IsPreferredNATMatch(
  138. peerMatchProperties *MatchProperties) bool {
  139. return p.EffectiveNATType().IsPreferredMatch(
  140. peerMatchProperties.EffectiveNATType())
  141. }
  142. // MatchAnnouncement is a proxy announcement to be queued for matching.
  143. type MatchAnnouncement struct {
  144. Properties MatchProperties
  145. ProxyID ID
  146. ProxyMetrics *ProxyMetrics
  147. ConnectionID ID
  148. }
  149. // MatchOffer is a client offer to be queued for matching.
  150. type MatchOffer struct {
  151. Properties MatchProperties
  152. ClientOfferSDP WebRTCSessionDescription
  153. ClientRootObfuscationSecret ObfuscationSecret
  154. DoDTLSRandomization bool
  155. UseMediaStreams bool
  156. TrafficShapingParameters *TrafficShapingParameters
  157. NetworkProtocol NetworkProtocol
  158. DestinationAddress string
  159. DestinationServerID string
  160. }
  161. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  162. // announcement, to be routed to the awaiting client offer.
  163. type MatchAnswer struct {
  164. ProxyIP string
  165. ProxyID ID
  166. ConnectionID ID
  167. ProxyAnswerSDP WebRTCSessionDescription
  168. }
  169. // MatchMetrics records statistics about the match queue state at the time a
  170. // match is made.
  171. type MatchMetrics struct {
  172. OfferMatchIndex int
  173. OfferQueueSize int
  174. AnnouncementMatchIndex int
  175. AnnouncementQueueSize int
  176. }
  177. // GetMetrics converts MatchMetrics to loggable fields.
  178. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  179. if metrics == nil {
  180. return nil
  181. }
  182. return common.LogFields{
  183. "offer_match_index": metrics.OfferMatchIndex,
  184. "offer_queue_size": metrics.OfferQueueSize,
  185. "announcement_match_index": metrics.AnnouncementMatchIndex,
  186. "announcement_queue_size": metrics.AnnouncementQueueSize,
  187. }
  188. }
  189. // announcementEntry is an announcement queue entry, an announcement with its
  190. // associated lifetime context and signaling channel.
  191. type announcementEntry struct {
  192. ctx context.Context
  193. limitIP string
  194. announcement *MatchAnnouncement
  195. offerChan chan *MatchOffer
  196. matchMetrics atomic.Value
  197. // queueReference is initialized by addAnnouncementEntry, and used to
  198. // efficiently dequeue the entry.
  199. queueReference announcementQueueReference
  200. }
  201. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  202. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  203. return matchMetrics
  204. }
  205. // offerEntry is an offer queue entry, an offer with its associated lifetime
  206. // context and signaling channel.
  207. type offerEntry struct {
  208. ctx context.Context
  209. limitIP string
  210. offer *MatchOffer
  211. answerChan chan *answerInfo
  212. matchMetrics atomic.Value
  213. // queueReference is initialized by addOfferEntry, and used to efficiently
  214. // dequeue the entry.
  215. queueReference *list.Element
  216. }
  217. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  218. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  219. return matchMetrics
  220. }
  221. // answerInfo is an answer and its associated announcement.
  222. type answerInfo struct {
  223. announcement *MatchAnnouncement
  224. answer *MatchAnswer
  225. }
  226. // pendingAnswer represents an answer that is expected to arrive from a
  227. // proxy.
  228. type pendingAnswer struct {
  229. announcement *MatchAnnouncement
  230. answerChan chan *answerInfo
  231. }
  232. // MatcherConfig specifies the configuration for a matcher.
  233. type MatcherConfig struct {
  234. // Logger is used to log events.
  235. Logger common.Logger
  236. // Announcement queue limits.
  237. AnnouncementLimitEntryCount int
  238. AnnouncementRateLimitQuantity int
  239. AnnouncementRateLimitInterval time.Duration
  240. AnnouncementNonlimitedProxyIDs []ID
  241. // Offer queue limits.
  242. OfferLimitEntryCount int
  243. OfferRateLimitQuantity int
  244. OfferRateLimitInterval time.Duration
  245. // Broker process load limit state callback. See Broker.Config.
  246. IsLoadLimiting func() bool
  247. }
  248. // NewMatcher creates a new Matcher.
  249. func NewMatcher(config *MatcherConfig) *Matcher {
  250. m := &Matcher{
  251. config: config,
  252. waitGroup: new(sync.WaitGroup),
  253. announcementQueue: newAnnouncementMultiQueue(),
  254. announcementQueueEntryCountByIP: make(map[string]int),
  255. announcementQueueRateLimiters: lrucache.NewWithLRU(
  256. 0,
  257. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  258. matcherRateLimiterMaxCacheEntries),
  259. offerQueue: list.New(),
  260. offerQueueEntryCountByIP: make(map[string]int),
  261. offerQueueRateLimiters: lrucache.NewWithLRU(
  262. 0,
  263. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  264. matcherRateLimiterMaxCacheEntries),
  265. matchSignal: make(chan struct{}, 1),
  266. // matcherPendingAnswersTTL is not configurable; it supplies a default
  267. // that is expected to be ignored when each entry's TTL is set to the
  268. // Offer ctx timeout.
  269. pendingAnswers: lrucache.NewWithLRU(
  270. matcherPendingAnswersTTL,
  271. 1*time.Minute,
  272. matcherPendingAnswersMaxSize),
  273. }
  274. m.SetLimits(
  275. config.AnnouncementLimitEntryCount,
  276. config.AnnouncementRateLimitQuantity,
  277. config.AnnouncementRateLimitInterval,
  278. config.AnnouncementNonlimitedProxyIDs,
  279. config.OfferLimitEntryCount,
  280. config.OfferRateLimitQuantity,
  281. config.OfferRateLimitInterval)
  282. return m
  283. }
  284. // SetLimits sets new queue limits, replacing the previous configuration.
  285. // Existing, cached rate limiters retain their existing rate limit state. New
  286. // entries will use the new quantity/interval configuration. In addition,
  287. // currently enqueued items may exceed any new, lower maximum entry count
  288. // until naturally dequeued.
  289. func (m *Matcher) SetLimits(
  290. announcementLimitEntryCount int,
  291. announcementRateLimitQuantity int,
  292. announcementRateLimitInterval time.Duration,
  293. announcementNonlimitedProxyIDs []ID,
  294. offerLimitEntryCount int,
  295. offerRateLimitQuantity int,
  296. offerRateLimitInterval time.Duration) {
  297. nonlimitedProxyIDs := make(map[ID]struct{})
  298. for _, proxyID := range announcementNonlimitedProxyIDs {
  299. nonlimitedProxyIDs[proxyID] = struct{}{}
  300. }
  301. m.announcementQueueMutex.Lock()
  302. m.announcementLimitEntryCount = announcementLimitEntryCount
  303. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  304. m.announcementRateLimitInterval = announcementRateLimitInterval
  305. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  306. m.announcementQueueMutex.Unlock()
  307. m.offerQueueMutex.Lock()
  308. m.offerLimitEntryCount = offerLimitEntryCount
  309. m.offerRateLimitQuantity = offerRateLimitQuantity
  310. m.offerRateLimitInterval = offerRateLimitInterval
  311. m.offerQueueMutex.Unlock()
  312. }
  313. // Start starts running the Matcher. The Matcher runs a goroutine which
  314. // matches announcements and offers.
  315. func (m *Matcher) Start() error {
  316. m.runMutex.Lock()
  317. defer m.runMutex.Unlock()
  318. if m.runContext != nil {
  319. return errors.TraceNew("already running")
  320. }
  321. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  322. m.waitGroup.Add(1)
  323. go func() {
  324. defer m.waitGroup.Done()
  325. m.matchWorker(m.runContext)
  326. }()
  327. return nil
  328. }
  329. // Stop stops running the Matcher and its worker goroutine.
  330. //
  331. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  332. // can get enqueued during and after a Stop call. Stop is intended more for a
  333. // full broker shutdown, where this won't be a concern.
  334. func (m *Matcher) Stop() {
  335. m.runMutex.Lock()
  336. defer m.runMutex.Unlock()
  337. m.stopRunning()
  338. m.waitGroup.Wait()
  339. m.runContext, m.stopRunning = nil, nil
  340. }
  341. // Announce enqueues the proxy announcement and blocks until it is matched
  342. // with a returned offer or ctx is done. The caller must not mutate the
  343. // announcement or its properties after calling Announce.
  344. //
  345. // Announce assumes that the ctx.Deadline for each call is monotonically
  346. // increasing and that the deadline can be used as part of selecting the next
  347. // nearest-to-expire announcement.
  348. //
  349. // The offer is sent to the proxy by the broker, and then the proxy sends its
  350. // answer back to the broker, which calls Answer with that value.
  351. //
  352. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  353. // match is made, even if there is a later error.
  354. func (m *Matcher) Announce(
  355. ctx context.Context,
  356. proxyIP string,
  357. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  358. // An announcement must specify exactly one compartment ID, of one type,
  359. // common or personal. The limit of one is currently a limitation of the
  360. // multi-queue implementation; see comment in
  361. // announcementMultiQueue.enqueue.
  362. compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
  363. if len(compartmentIDs) == 0 {
  364. compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
  365. } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
  366. return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  367. }
  368. if len(compartmentIDs) != 1 {
  369. return nil, nil, errors.TraceNew("unexpected compartment ID count")
  370. }
  371. isAnnouncement := true
  372. err := m.applyLoadLimit(isAnnouncement)
  373. if err != nil {
  374. return nil, nil, errors.Trace(err)
  375. }
  376. announcementEntry := &announcementEntry{
  377. ctx: ctx,
  378. limitIP: getRateLimitIP(proxyIP),
  379. announcement: proxyAnnouncement,
  380. offerChan: make(chan *MatchOffer, 1),
  381. }
  382. err = m.addAnnouncementEntry(announcementEntry)
  383. if err != nil {
  384. return nil, nil, errors.Trace(err)
  385. }
  386. // Await client offer.
  387. var clientOffer *MatchOffer
  388. select {
  389. case <-ctx.Done():
  390. m.removeAnnouncementEntry(true, announcementEntry)
  391. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  392. case clientOffer = <-announcementEntry.offerChan:
  393. }
  394. return clientOffer, announcementEntry.getMatchMetrics(), nil
  395. }
  396. // Offer enqueues the client offer and blocks until it is matched with a
  397. // returned announcement or ctx is done. The caller must not mutate the offer
  398. // or its properties after calling Announce.
  399. //
  400. // The answer is returned to the client by the broker, and the WebRTC
  401. // connection is dialed. The original announcement is also returned, so its
  402. // match properties can be logged.
  403. //
  404. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  405. // match is made, even if there is a later error.
  406. func (m *Matcher) Offer(
  407. ctx context.Context,
  408. clientIP string,
  409. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  410. // An offer must specify at least one compartment ID, and may only specify
  411. // one type, common or personal, of compartment IDs.
  412. compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
  413. if len(compartmentIDs) == 0 {
  414. compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
  415. } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
  416. return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  417. }
  418. if len(compartmentIDs) < 1 {
  419. return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
  420. }
  421. isAnnouncement := false
  422. err := m.applyLoadLimit(isAnnouncement)
  423. if err != nil {
  424. return nil, nil, nil, errors.Trace(err)
  425. }
  426. offerEntry := &offerEntry{
  427. ctx: ctx,
  428. limitIP: getRateLimitIP(clientIP),
  429. offer: clientOffer,
  430. answerChan: make(chan *answerInfo, 1),
  431. }
  432. err = m.addOfferEntry(offerEntry)
  433. if err != nil {
  434. return nil, nil, nil, errors.Trace(err)
  435. }
  436. // Await proxy answer.
  437. var proxyAnswerInfo *answerInfo
  438. select {
  439. case <-ctx.Done():
  440. m.removeOfferEntry(true, offerEntry)
  441. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  442. // the Offer ctx, the client request, timeout, so it will eventually
  443. // get removed. But a client may abort its request earlier than the
  444. // timeout.
  445. return nil, nil,
  446. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  447. case proxyAnswerInfo = <-offerEntry.answerChan:
  448. }
  449. if proxyAnswerInfo == nil {
  450. // nil will be delivered to the channel when either the proxy
  451. // announcement request concurrently timed out, or the answer
  452. // indicated a proxy error, or the answer did not arrive in time.
  453. return nil, nil,
  454. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  455. }
  456. // This is a sanity check and not expected to fail.
  457. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  458. proxyAnswerInfo.announcement.ConnectionID) {
  459. return nil, nil,
  460. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  461. }
  462. return proxyAnswerInfo.answer,
  463. proxyAnswerInfo.announcement,
  464. offerEntry.getMatchMetrics(),
  465. nil
  466. }
  467. // AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
  468. // announcement identified by the specified proxy ID and connection ID and
  469. // returns whether the announcement has personal compartment IDs, indicating
  470. // personal pairing mode.
  471. //
  472. // If no pending answer is found, an error is returned.
  473. func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
  474. proxyID ID, connectionID ID) (bool, error) {
  475. key := m.pendingAnswerKey(proxyID, connectionID)
  476. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  477. if !ok {
  478. // The input IDs don't correspond to a pending answer, or the client
  479. // is no longer awaiting the response.
  480. return false, errors.TraceNew("no pending answer")
  481. }
  482. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  483. hasPersonalCompartmentIDs := len(
  484. pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
  485. return hasPersonalCompartmentIDs, nil
  486. }
  487. // Answer delivers an answer from the proxy for a previously matched offer.
  488. // The ProxyID and ConnectionID must correspond to the original announcement.
  489. // The caller must not mutate the answer after calling Answer. Answer does
  490. // not block.
  491. //
  492. // The answer is returned to the awaiting Offer call and sent to the matched
  493. // client.
  494. func (m *Matcher) Answer(
  495. proxyAnswer *MatchAnswer) error {
  496. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  497. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  498. if !ok {
  499. // The input IDs don't correspond to a pending answer, or the client
  500. // is no longer awaiting the response.
  501. return errors.TraceNew("no pending answer")
  502. }
  503. m.pendingAnswers.Delete(key)
  504. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  505. pendingAnswer.answerChan <- &answerInfo{
  506. announcement: pendingAnswer.announcement,
  507. answer: proxyAnswer,
  508. }
  509. return nil
  510. }
  511. // AnswerError delivers a failed answer indication from the proxy to an
  512. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  513. // original announcement.
  514. //
  515. // The failure indication is returned to the awaiting Offer call and sent to
  516. // the matched client.
  517. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  518. key := m.pendingAnswerKey(proxyID, connectionID)
  519. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  520. if !ok {
  521. // The client is no longer awaiting the response.
  522. return
  523. }
  524. m.pendingAnswers.Delete(key)
  525. // Closing the channel delivers nil, a failed indicator, to any receiver.
  526. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  527. }
  528. // matchWorker is the matching worker goroutine. It idles until signaled that
  529. // a queue item has been added, and then runs a full matching pass.
  530. func (m *Matcher) matchWorker(ctx context.Context) {
  531. for {
  532. select {
  533. case <-m.matchSignal:
  534. m.matchAllOffers()
  535. case <-ctx.Done():
  536. return
  537. }
  538. }
  539. }
  540. // matchAllOffers iterates over the queues, making all possible matches.
  541. func (m *Matcher) matchAllOffers() {
  542. m.announcementQueueMutex.Lock()
  543. defer m.announcementQueueMutex.Unlock()
  544. m.offerQueueMutex.Lock()
  545. defer m.offerQueueMutex.Unlock()
  546. // Take each offer in turn, and select an announcement match. There is an
  547. // implicit preference for older client offers, sooner to timeout, at the
  548. // front of the queue.
  549. // TODO: consider matching one offer, then releasing the locks to allow
  550. // more announcements to be enqueued, then continuing to match.
  551. nextOffer := m.offerQueue.Front()
  552. offerIndex := -1
  553. for nextOffer != nil && m.announcementQueue.getLen() > 0 {
  554. offerIndex += 1
  555. // nextOffer.Next must be invoked before any removeOfferEntry since
  556. // container/list.remove clears list.Element.next.
  557. offer := nextOffer
  558. nextOffer = nextOffer.Next()
  559. offerEntry := offer.Value.(*offerEntry)
  560. // Skip and remove this offer if its deadline has already passed.
  561. // There is no signal to the awaiting Offer function, as it will exit
  562. // based on the same ctx.
  563. if offerEntry.ctx.Err() != nil {
  564. m.removeOfferEntry(false, offerEntry)
  565. continue
  566. }
  567. announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
  568. if announcementEntry == nil {
  569. continue
  570. }
  571. // Record match metrics.
  572. // The index metrics predate the announcement multi-queue; now, with
  573. // the multi-queue, announcement_index is how many announce entries
  574. // were inspected before matching.
  575. matchMetrics := &MatchMetrics{
  576. OfferMatchIndex: offerIndex,
  577. OfferQueueSize: m.offerQueue.Len(),
  578. AnnouncementMatchIndex: announcementMatchIndex,
  579. AnnouncementQueueSize: m.announcementQueue.getLen(),
  580. }
  581. offerEntry.matchMetrics.Store(matchMetrics)
  582. announcementEntry.matchMetrics.Store(matchMetrics)
  583. // Remove the matched announcement from the queue. Send the offer to
  584. // the announcement entry's offerChan, which will deliver it to the
  585. // blocked Announce call. Add a pending answers entry to await the
  586. // proxy's follow up Answer call. The TTL for the pending answer
  587. // entry is set to the matched Offer call's ctx, as the answer is
  588. // only useful as long as the client is still waiting.
  589. m.removeAnnouncementEntry(false, announcementEntry)
  590. expiry := lrucache.DefaultExpiration
  591. deadline, ok := offerEntry.ctx.Deadline()
  592. if ok {
  593. expiry = time.Until(deadline)
  594. }
  595. key := m.pendingAnswerKey(
  596. announcementEntry.announcement.ProxyID,
  597. announcementEntry.announcement.ConnectionID)
  598. m.pendingAnswers.Set(
  599. key,
  600. &pendingAnswer{
  601. announcement: announcementEntry.announcement,
  602. answerChan: offerEntry.answerChan,
  603. },
  604. expiry)
  605. announcementEntry.offerChan <- offerEntry.offer
  606. // Remove the matched offer from the queue and match the next offer,
  607. // now first in the queue.
  608. m.removeOfferEntry(false, offerEntry)
  609. }
  610. }
  611. func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
  612. // Assumes the caller has the queue mutexes locked.
  613. // Check each candidate announcement in turn, and select a match. There is
  614. // an implicit preference for older proxy announcements, sooner to
  615. // timeout, at the front of the enqueued announcements.
  616. // announcementMultiQueue.startMatching skips to the first matching
  617. // compartment ID(s).
  618. //
  619. // Limitation: since this logic matches each enqueued client in turn, it will
  620. // only make the optimal NAT match for the oldest enqueued client vs. all
  621. // proxies, and not do optimal N x M matching for all clients and all proxies.
  622. //
  623. // Future matching enhancements could include more sophisticated GeoIP
  624. // rules, such as a configuration encoding knowledge of an ASN's NAT
  625. // type, or preferred client/proxy country/ASN matches.
  626. offerProperties := &offerEntry.offer.Properties
  627. // Assumes the caller checks that offer specifies either personal
  628. // compartment IDs or common compartment IDs, but not both.
  629. isCommonCompartments := false
  630. compartmentIDs := offerProperties.PersonalCompartmentIDs
  631. if len(compartmentIDs) == 0 {
  632. isCommonCompartments = true
  633. compartmentIDs = offerProperties.CommonCompartmentIDs
  634. }
  635. if len(compartmentIDs) == 0 {
  636. return nil, -1
  637. }
  638. matchIterator := m.announcementQueue.startMatching(
  639. isCommonCompartments, compartmentIDs)
  640. // Use the NAT traversal type counters to check if there's any preferred
  641. // NAT match for this offer in the announcement queue. When there is, we
  642. // will search beyond the first announcement.
  643. unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
  644. matchIterator.getNATCounts()
  645. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  646. unlimitedNATCount > 0,
  647. partiallyLimitedNATCount > 0,
  648. strictlyLimitedNATCount > 0)
  649. // TODO: add an ExistsCompatibleProtocolVersionMatch check?
  650. //
  651. // Currently, searching for protocol version support that doesn't exist
  652. // may be mitigated by limiting, through tactics, client protocol options
  653. // selection; using the proxy protocol version in PrioritizeProxy; and,
  654. // ultimately, increasing MinimumProxyProtocolVersion.
  655. var bestMatch *announcementEntry
  656. bestMatchIndex := -1
  657. bestMatchIsPriority := false
  658. bestMatchNAT := false
  659. // matcherMaxProbe limits the linear search through the announcement queue
  660. // to find a match. Currently, the queue implementation provides
  661. // constant-time lookup for matching compartment IDs. Other matching
  662. // aspects may require iterating over the queue items, including the
  663. // strict same-country and ASN constraint and protocol version
  664. // compatibility constraint. Best NAT match is not a strict constraint
  665. // and uses a shorter search limit, matcherMaxPreferredNATProbe.
  666. candidateIndex := -1
  667. for candidateIndex <= matcherMaxProbe {
  668. announcementEntry, isPriority := matchIterator.getNext()
  669. if announcementEntry == nil {
  670. break
  671. }
  672. if !isPriority && bestMatchIsPriority {
  673. // There is a priority match, but it wasn't bestMatchNAT and we
  674. // continued to iterate. Now that isPriority is false, we're past the
  675. // end of the priority items, so stop looking for any best NAT match
  676. // and return the previous priority match. When there are zero
  677. // priority items to begin with, this case should not be hit.
  678. break
  679. }
  680. candidateIndex += 1
  681. // Skip and remove this announcement if its deadline has already
  682. // passed. There is no signal to the awaiting Announce function, as
  683. // it will exit based on the same ctx.
  684. if announcementEntry.ctx.Err() != nil {
  685. m.removeAnnouncementEntry(false, announcementEntry)
  686. continue
  687. }
  688. announcementProperties := &announcementEntry.announcement.Properties
  689. // Don't match unless the proxy announcement, client offer, and the
  690. // client's selected protocol options are compatible. UseMediaStreams
  691. // requires at least ProtocolVersion2.
  692. _, ok := negotiateProtocolVersion(
  693. announcementProperties.ProtocolVersion,
  694. offerProperties.ProtocolVersion,
  695. offerEntry.offer.UseMediaStreams)
  696. if !ok {
  697. continue
  698. }
  699. // Disallow matching the same country and ASN, except for personal
  700. // compartment ID matches.
  701. //
  702. // For common matching, hopping through the same ISP is assumed to
  703. // have no circumvention benefit. For personal matching, the user may
  704. // wish to hop their their own or their friend's proxy regardless.
  705. if isCommonCompartments &&
  706. !GetAllowCommonASNMatching() &&
  707. (offerProperties.GeoIPData.Country ==
  708. announcementProperties.GeoIPData.Country &&
  709. offerProperties.GeoIPData.ASN ==
  710. announcementProperties.GeoIPData.ASN) {
  711. continue
  712. }
  713. // Check if this is a preferred NAT match. Ultimately, a match may be
  714. // made with potentially incompatible NATs, but the client/proxy
  715. // reported NAT types may be incorrect or unknown; the client will
  716. // often skip NAT discovery.
  717. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  718. // At this point, the candidate is a match. Determine if this is a new
  719. // best match, either if there was no previous match, or this is a
  720. // better NAT match.
  721. if bestMatch == nil || (!bestMatchNAT && matchNAT) {
  722. bestMatch = announcementEntry
  723. bestMatchIndex = candidateIndex
  724. bestMatchIsPriority = isPriority
  725. bestMatchNAT = matchNAT
  726. }
  727. // Stop as soon as we have the best possible match, or have reached
  728. // the probe limit for preferred NAT matches.
  729. if bestMatch != nil && (bestMatchNAT ||
  730. !existsPreferredNATMatch ||
  731. candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
  732. break
  733. }
  734. }
  735. return bestMatch, bestMatchIndex
  736. }
  737. // applyLoadLimit checks if the broker process is in the load limiting state
  738. // and, in order to reduce load, determines if new proxy announces or client
  739. // offers should be rejected immediately instead of enqueued.
  740. func (m *Matcher) applyLoadLimit(isAnnouncement bool) error {
  741. if m.config.IsLoadLimiting == nil || !m.config.IsLoadLimiting() {
  742. return nil
  743. }
  744. // Acquire the queue locks only when in the load limit state, and in the
  745. // same order as matchAllOffers.
  746. m.announcementQueueMutex.Lock()
  747. defer m.announcementQueueMutex.Unlock()
  748. m.offerQueueMutex.Lock()
  749. defer m.offerQueueMutex.Unlock()
  750. announcementLen := m.announcementQueue.getLen()
  751. offerLen := m.offerQueue.Len()
  752. // When the load limit had been reached, and assuming the broker process
  753. // is running only an in-proxy broker, it's likely, in practise, that
  754. // only one of the two queues has hundreds of thousands of entries while
  755. // the other has few, and there are no matches clearing the queue.
  756. //
  757. // Instead of simply rejecting all enqueue requests, allow the request
  758. // type, announce or offer, that is in shorter supply as these are likely
  759. // to match and draw down the larger queue. This attempts to make
  760. // productive use of enqueued items, and also attempts to avoid simply
  761. // emptying both queues -- as will happen in any case due to timeouts --
  762. // and then have the same larger queue refill again after the load limit
  763. // state exits.
  764. //
  765. // This approach assumes some degree of slack in available system memory
  766. // and CPU in the load limiting state, similar to how the tunnel server
  767. // continues to operate existing tunnels in the same state.
  768. //
  769. // The heuristic below of allowing when less than half the size of the
  770. // larger queue puts a cap on the amount the shorter queue can continue
  771. // to grow in the load limiting state, in the worst case.
  772. //
  773. // Limitation: in some scenarios that are expected to be rare, it can
  774. // happen that allowed requests don't result in a match and memory
  775. // consumption continues to grow, leading to a broker process OOM kill.
  776. var allow bool
  777. if isAnnouncement {
  778. allow = announcementLen < offerLen/2
  779. } else {
  780. allow = offerLen < announcementLen/2
  781. }
  782. if allow {
  783. return nil
  784. }
  785. // Do not return a MatcherLimitError, as is done in applyIPLimits. A
  786. // MatcherLimitError results in a Response.Limited error response, which
  787. // causes a proxy to back off and a client to abort its dial; but in
  788. // neither case is the broker client reset. The error returned here will
  789. // result in a fast 404 response to the proxy or client, which will
  790. // instead trigger a broker client reset, and a chance of moving to a
  791. // different broker that is not overloaded.
  792. //
  793. // Limitation: the 404 response won't be distinguishable, in client or
  794. // proxy diagnostics, from other error conditions.
  795. //
  796. // TODO: add a new Response.LoadLimited flag which the proxy/client can
  797. // use use log a distinct error and also ensure that it doesn't reselect
  798. // the same broker again in the broker client reset random selection.
  799. return errors.TraceNew("load limited")
  800. }
  801. // MatcherLimitError is the error type returned by Announce or Offer when the
  802. // caller has exceeded configured queue entry or rate limits.
  803. type MatcherLimitError struct {
  804. err error
  805. }
  806. func NewMatcherLimitError(err error) *MatcherLimitError {
  807. return &MatcherLimitError{err: err}
  808. }
  809. func (e MatcherLimitError) Error() string {
  810. return e.err.Error()
  811. }
  812. // applyIPLimits checks per-proxy or per-client -- as determined by peer IP
  813. // address -- rate limits and queue entry limits.
  814. func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  815. // Assumes m.announcementQueueMutex or m.offerQueueMutex is locked.
  816. var entryCountByIP map[string]int
  817. var queueRateLimiters *lrucache.Cache
  818. var limitEntryCount int
  819. var quantity int
  820. var interval time.Duration
  821. if isAnnouncement {
  822. // Skip limit checks for non-limited proxies.
  823. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  824. return nil
  825. }
  826. entryCountByIP = m.announcementQueueEntryCountByIP
  827. queueRateLimiters = m.announcementQueueRateLimiters
  828. limitEntryCount = m.announcementLimitEntryCount
  829. quantity = m.announcementRateLimitQuantity
  830. interval = m.announcementRateLimitInterval
  831. } else {
  832. entryCountByIP = m.offerQueueEntryCountByIP
  833. queueRateLimiters = m.offerQueueRateLimiters
  834. limitEntryCount = m.offerLimitEntryCount
  835. quantity = m.offerRateLimitQuantity
  836. interval = m.offerRateLimitInterval
  837. }
  838. // The rate limit is checked first, before the max count check, to ensure
  839. // that the rate limit state is updated regardless of the max count check
  840. // outcome.
  841. if quantity > 0 && interval > 0 {
  842. var rateLimiter *rate.Limiter
  843. entry, ok := queueRateLimiters.Get(limitIP)
  844. if ok {
  845. rateLimiter = entry.(*rate.Limiter)
  846. } else {
  847. limit := float64(quantity) / interval.Seconds()
  848. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  849. queueRateLimiters.Set(
  850. limitIP, rateLimiter, interval)
  851. }
  852. if !rateLimiter.Allow() {
  853. return errors.Trace(
  854. NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
  855. }
  856. }
  857. if limitEntryCount > 0 {
  858. // Limitation: non-limited proxy ID entries are counted in
  859. // entryCountByIP. If both a limited and non-limited proxy ingress
  860. // from the same limitIP, then the non-limited entries will count
  861. // against the limited proxy's limitEntryCount.
  862. entryCount, ok := entryCountByIP[limitIP]
  863. if ok && entryCount >= limitEntryCount {
  864. return errors.Trace(
  865. NewMatcherLimitError(std_errors.New("max entries for IP")))
  866. }
  867. }
  868. return nil
  869. }
  870. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  871. m.announcementQueueMutex.Lock()
  872. defer m.announcementQueueMutex.Unlock()
  873. // Ensure the queue doesn't grow larger than the max size.
  874. if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
  875. return errors.TraceNew("queue full")
  876. }
  877. // Ensure no single peer IP can enqueue a large number of entries or
  878. // rapidly enqueue beyond the configured rate.
  879. isAnnouncement := true
  880. err := m.applyIPLimits(
  881. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  882. if err != nil {
  883. return errors.Trace(err)
  884. }
  885. // announcementEntry.queueReference should be uninitialized.
  886. // announcementMultiQueue.enqueue sets queueReference to be used for
  887. // efficient dequeuing.
  888. if announcementEntry.queueReference.entry != nil {
  889. return errors.TraceNew("unexpected queue reference")
  890. }
  891. err = m.announcementQueue.enqueue(announcementEntry)
  892. if err != nil {
  893. return errors.Trace(err)
  894. }
  895. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  896. select {
  897. case m.matchSignal <- struct{}{}:
  898. default:
  899. }
  900. return nil
  901. }
  902. func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
  903. // In the aborting case, the queue isn't already locked. Otherwise, assume
  904. // it is locked.
  905. if aborting {
  906. m.announcementQueueMutex.Lock()
  907. defer m.announcementQueueMutex.Unlock()
  908. }
  909. found := announcementEntry.queueReference.dequeue()
  910. if found {
  911. // Adjust entry counts by peer IP, used to enforce
  912. // matcherAnnouncementQueueMaxEntriesPerIP.
  913. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  914. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  915. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  916. }
  917. }
  918. if aborting && !found {
  919. // The Announce call is aborting and taking its entry back out of the
  920. // queue. If the entry is not found in the queue, then a concurrent
  921. // Offer has matched the announcement. So check for the pending
  922. // answer corresponding to the announcement and remove it and deliver
  923. // a failure signal to the waiting Offer, so the client doesn't wait
  924. // longer than necessary.
  925. key := m.pendingAnswerKey(
  926. announcementEntry.announcement.ProxyID,
  927. announcementEntry.announcement.ConnectionID)
  928. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  929. if ok {
  930. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  931. m.pendingAnswers.Delete(key)
  932. }
  933. }
  934. }
  935. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  936. m.offerQueueMutex.Lock()
  937. defer m.offerQueueMutex.Unlock()
  938. // Ensure the queue doesn't grow larger than the max size.
  939. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  940. return errors.TraceNew("queue full")
  941. }
  942. // Ensure no single peer IP can enqueue a large number of entries or
  943. // rapidly enqueue beyond the configured rate.
  944. isAnnouncement := false
  945. err := m.applyIPLimits(
  946. isAnnouncement, offerEntry.limitIP, ID{})
  947. if err != nil {
  948. return errors.Trace(err)
  949. }
  950. // offerEntry.queueReference should be uninitialized and is set here to be
  951. // used for efficient dequeuing.
  952. if offerEntry.queueReference != nil {
  953. return errors.TraceNew("unexpected queue reference")
  954. }
  955. offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
  956. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  957. select {
  958. case m.matchSignal <- struct{}{}:
  959. default:
  960. }
  961. return nil
  962. }
  963. func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
  964. // In the aborting case, the queue isn't already locked. Otherise, assume
  965. // it is locked.
  966. if aborting {
  967. m.offerQueueMutex.Lock()
  968. defer m.offerQueueMutex.Unlock()
  969. }
  970. if offerEntry.queueReference == nil {
  971. return
  972. }
  973. m.offerQueue.Remove(offerEntry.queueReference)
  974. offerEntry.queueReference = nil
  975. // Adjust entry counts by peer IP, used to enforce
  976. // matcherOfferQueueMaxEntriesPerIP.
  977. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  978. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  979. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  980. }
  981. }
  982. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  983. // The pending answer lookup key is used to associate announcements and
  984. // subsequent answers. While the client learns the ConnectionID, only the
  985. // proxy knows the ProxyID component, so only the correct proxy can match
  986. // an answer to an announcement. The ConnectionID component is necessary
  987. // as a proxy may have multiple, concurrent pending answers.
  988. return string(proxyID[:]) + string(connectionID[:])
  989. }
  990. func getRateLimitIP(strIP string) string {
  991. IP := net.ParseIP(strIP)
  992. if IP == nil || IP.To4() != nil {
  993. return strIP
  994. }
  995. // With IPv6, individual users or sites are users commonly allocated a /64
  996. // or /56, so rate limit by /56.
  997. return IP.Mask(net.CIDRMask(56, 128)).String()
  998. }
  999. // announcementMultiQueue is a set of announcement queues, one per common or
  1000. // personal compartment ID, providing efficient iteration over announcements
  1001. // matching a specified list of compartment IDs. announcementMultiQueue and
  1002. // its underlying data structures are not safe for concurrent access.
  1003. type announcementMultiQueue struct {
  1004. priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
  1005. commonCompartmentQueues map[ID]*announcementCompartmentQueue
  1006. personalCompartmentQueues map[ID]*announcementCompartmentQueue
  1007. totalEntries int
  1008. }
  1009. // announcementCompartmentQueue is a single compartment queue within an
  1010. // announcementMultiQueue. The queue is implemented using a doubly-linked
  1011. // list, which provides efficient insert and mid-queue dequeue operations.
  1012. // The announcementCompartmentQueue also records NAT type stats for enqueued
  1013. // announcements, which are used, when matching, to determine when better NAT
  1014. // matches may be possible.
  1015. type announcementCompartmentQueue struct {
  1016. isCommonCompartment bool
  1017. isPriority bool
  1018. compartmentID ID
  1019. entries *list.List
  1020. unlimitedNATCount int
  1021. partiallyLimitedNATCount int
  1022. strictlyLimitedNATCount int
  1023. }
  1024. // announcementMatchIterator represents the state of an iteration over a
  1025. // subset of announcementMultiQueue compartment queues. Concurrent
  1026. // announcementMatchIterators are not supported.
  1027. type announcementMatchIterator struct {
  1028. multiQueue *announcementMultiQueue
  1029. compartmentQueues []*announcementCompartmentQueue
  1030. compartmentIDs []ID
  1031. nextEntries []*list.Element
  1032. }
  1033. // announcementQueueReference represents the queue position for a given
  1034. // announcement entry, and provides an efficient dequeue operation.
  1035. type announcementQueueReference struct {
  1036. multiQueue *announcementMultiQueue
  1037. compartmentQueue *announcementCompartmentQueue
  1038. entry *list.Element
  1039. }
  1040. func newAnnouncementMultiQueue() *announcementMultiQueue {
  1041. return &announcementMultiQueue{
  1042. priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1043. commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1044. personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1045. }
  1046. }
  1047. func (q *announcementMultiQueue) getLen() int {
  1048. return q.totalEntries
  1049. }
  1050. func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
  1051. // Assumes announcementEntry not already enueued.
  1052. // Limitation: only one compartment ID, either common or personal, is
  1053. // supported per announcement entry. In the common compartment case, the
  1054. // broker currently assigns only one common compartment ID per proxy
  1055. // announcement. In the personal compartment case, there is currently no
  1056. // use case for allowing a proxy to announce under multiple personal
  1057. // compartment IDs.
  1058. //
  1059. // To overcome this limitation, the dequeue operation would need to be
  1060. // able to remove an announcement entry from multiple
  1061. // announcementCompartmentQueues.
  1062. commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
  1063. personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
  1064. if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
  1065. return errors.TraceNew("announcement must specify exactly one compartment ID")
  1066. }
  1067. isPriority := announcementEntry.announcement.Properties.IsPriority
  1068. isCommonCompartment := true
  1069. var compartmentID ID
  1070. var compartmentQueues map[ID]*announcementCompartmentQueue
  1071. if len(commonCompartmentIDs) > 0 {
  1072. compartmentID = commonCompartmentIDs[0]
  1073. compartmentQueues = q.commonCompartmentQueues
  1074. if isPriority {
  1075. compartmentQueues = q.priorityCommonCompartmentQueues
  1076. }
  1077. } else {
  1078. isCommonCompartment = false
  1079. compartmentID = personalCompartmentIDs[0]
  1080. compartmentQueues = q.personalCompartmentQueues
  1081. if isPriority {
  1082. return errors.TraceNew("priority not supported for personal compartments")
  1083. }
  1084. }
  1085. compartmentQueue, ok := compartmentQueues[compartmentID]
  1086. if !ok {
  1087. compartmentQueue = &announcementCompartmentQueue{
  1088. isCommonCompartment: isCommonCompartment,
  1089. isPriority: isPriority,
  1090. compartmentID: compartmentID,
  1091. entries: list.New(),
  1092. }
  1093. compartmentQueues[compartmentID] = compartmentQueue
  1094. }
  1095. entry := compartmentQueue.entries.PushBack(announcementEntry)
  1096. // Update the NAT type counts which are used to determine if a better NAT
  1097. // match may be made by inspecting more announcement queue entries.
  1098. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1099. case NATTraversalUnlimited:
  1100. compartmentQueue.unlimitedNATCount += 1
  1101. case NATTraversalPartiallyLimited:
  1102. compartmentQueue.partiallyLimitedNATCount += 1
  1103. case NATTraversalStrictlyLimited:
  1104. compartmentQueue.strictlyLimitedNATCount += 1
  1105. }
  1106. q.totalEntries += 1
  1107. announcementEntry.queueReference = announcementQueueReference{
  1108. multiQueue: q,
  1109. compartmentQueue: compartmentQueue,
  1110. entry: entry,
  1111. }
  1112. return nil
  1113. }
  1114. // announcementQueueReference returns false if the item is already dequeued.
  1115. func (r *announcementQueueReference) dequeue() bool {
  1116. if r.entry == nil {
  1117. // Already dequeued.
  1118. return false
  1119. }
  1120. announcementEntry := r.entry.Value.(*announcementEntry)
  1121. // Reverse the NAT type counts.
  1122. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1123. case NATTraversalUnlimited:
  1124. r.compartmentQueue.unlimitedNATCount -= 1
  1125. case NATTraversalPartiallyLimited:
  1126. r.compartmentQueue.partiallyLimitedNATCount -= 1
  1127. case NATTraversalStrictlyLimited:
  1128. r.compartmentQueue.strictlyLimitedNATCount -= 1
  1129. }
  1130. r.compartmentQueue.entries.Remove(r.entry)
  1131. if r.compartmentQueue.entries.Len() == 0 {
  1132. // Remove empty compartment queue.
  1133. queues := r.multiQueue.personalCompartmentQueues
  1134. if r.compartmentQueue.isCommonCompartment {
  1135. if r.compartmentQueue.isPriority {
  1136. queues = r.multiQueue.priorityCommonCompartmentQueues
  1137. } else {
  1138. queues = r.multiQueue.commonCompartmentQueues
  1139. }
  1140. }
  1141. delete(queues, r.compartmentQueue.compartmentID)
  1142. }
  1143. r.multiQueue.totalEntries -= 1
  1144. // Mark as dequeued.
  1145. r.entry = nil
  1146. return true
  1147. }
  1148. func (q *announcementMultiQueue) startMatching(
  1149. isCommonCompartments bool,
  1150. compartmentIDs []ID) *announcementMatchIterator {
  1151. iter := &announcementMatchIterator{
  1152. multiQueue: q,
  1153. }
  1154. // Find the matching compartment queues and initialize iteration over
  1155. // those queues. Building the set of matching queues is a linear time
  1156. // operation, bounded by the length of compartmentIDs (no more than
  1157. // maxCompartmentIDs, as enforced in
  1158. // ClientOfferRequest.ValidateAndGetLogFields).
  1159. // Priority queues, when in use, must all be added to the beginning of
  1160. // iter.compartmentQueues in order to ensure that the iteration logic in
  1161. // getNext visits all priority items first.
  1162. var compartmentQueuesList []map[ID]*announcementCompartmentQueue
  1163. if isCommonCompartments {
  1164. compartmentQueuesList = append(
  1165. compartmentQueuesList,
  1166. q.priorityCommonCompartmentQueues,
  1167. q.commonCompartmentQueues)
  1168. } else {
  1169. compartmentQueuesList = append(
  1170. compartmentQueuesList,
  1171. q.personalCompartmentQueues)
  1172. }
  1173. for _, compartmentQueues := range compartmentQueuesList {
  1174. for _, ID := range compartmentIDs {
  1175. if compartmentQueue, ok := compartmentQueues[ID]; ok {
  1176. iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
  1177. iter.compartmentIDs = append(iter.compartmentIDs, ID)
  1178. iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
  1179. }
  1180. }
  1181. }
  1182. return iter
  1183. }
  1184. func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
  1185. // Return the count of NAT types across all matchable compartment queues.
  1186. //
  1187. // A potential future enhancement would be to provide per-queue NAT counts
  1188. // or NAT type indexing in order to quickly find preferred NAT matches.
  1189. unlimitedNATCount := 0
  1190. partiallyLimitedNATCount := 0
  1191. strictlyLimitedNATCount := 0
  1192. for _, compartmentQueue := range iter.compartmentQueues {
  1193. unlimitedNATCount += compartmentQueue.unlimitedNATCount
  1194. partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
  1195. strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
  1196. }
  1197. return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
  1198. }
  1199. // announcementMatchIterator returns the next announcement entry candidate in
  1200. // compartment queue FIFO order, selecting the queue with the oldest head
  1201. // item.
  1202. //
  1203. // The caller should invoke announcementEntry.queueReference.dequeue when the
  1204. // candidate is selected. dequeue may be called on any getNext return value
  1205. // without disrupting the iteration state; however,
  1206. // announcementEntry.queueReference.dequeue calls for arbitrary queue entries
  1207. // are not supported during iteration. Iteration and dequeue should all be
  1208. // performed with a lock over the entire announcementMultiQueue, and with
  1209. // only one concurrent announcementMatchIterator.
  1210. //
  1211. // getNext returns a nil *announcementEntry when there are no more items.
  1212. // getNext also returns an isPriority flag, indicating the announcement is a
  1213. // priority candidate. All priority candidates are guaranteed to be returned
  1214. // before any non-priority candidates.
  1215. func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
  1216. // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
  1217. // order. Also assumes that any priority queues are all at the front of
  1218. // iter.compartmentQueues.
  1219. // Select the oldest item, by deadline, from all the candidate queue head
  1220. // items. This operation is linear in the number of matching compartment
  1221. // ID queues, which is currently bounded by the length of matching
  1222. // compartment IDs (no more than maxCompartmentIDs, as enforced in
  1223. // ClientOfferRequest.ValidateAndGetLogFields).
  1224. //
  1225. // When there are priority candidates, they are selected first, regardless
  1226. // of the deadlines of non-priority candidates. Multiple priority
  1227. // candidates are processed in FIFO deadline order.
  1228. //
  1229. // A potential future enhancement is to add more iterator state to track
  1230. // which queue has the next oldest time to select on the following
  1231. // getNext call. Another potential enhancement is to remove fully
  1232. // consumed queues from compartmentQueues/compartmentIDs/nextEntries.
  1233. var selectedCandidate *announcementEntry
  1234. selectedIndex := -1
  1235. selectedPriority := false
  1236. for i := 0; i < len(iter.compartmentQueues); i++ {
  1237. if iter.nextEntries[i] == nil {
  1238. continue
  1239. }
  1240. isPriority := iter.compartmentQueues[i].isPriority
  1241. if selectedPriority && !isPriority {
  1242. // Ignore older of non-priority entries when there are priority
  1243. // candidates.
  1244. break
  1245. }
  1246. if selectedCandidate == nil {
  1247. selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
  1248. selectedIndex = i
  1249. selectedPriority = isPriority
  1250. } else {
  1251. candidate := iter.nextEntries[i].Value.(*announcementEntry)
  1252. deadline, deadlineOk := candidate.ctx.Deadline()
  1253. selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
  1254. if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
  1255. selectedCandidate = candidate
  1256. selectedIndex = i
  1257. selectedPriority = isPriority
  1258. }
  1259. }
  1260. }
  1261. // Advance the selected queue to the next element. This must be done
  1262. // before any dequeue call, since container/list.remove clears
  1263. // list.Element.next.
  1264. if selectedIndex != -1 {
  1265. iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
  1266. }
  1267. return selectedCandidate, selectedPriority
  1268. }