matcher.go 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. std_errors "errors"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. lrucache "github.com/cognusion/go-cache-lru"
  31. )
  32. // TTLs should be aligned with STUN hole punch lifetimes.
  33. const (
  34. matcherAnnouncementQueueMaxSize = 5000000
  35. matcherOfferQueueMaxSize = 5000000
  36. matcherPendingAnswersTTL = 30 * time.Second
  37. matcherPendingAnswersMaxSize = 5000000
  38. matcherMaxPreferredNATProbe = 100
  39. matcherMaxProbe = 1000
  40. )
  41. // Matcher matches proxy announcements with client offers. Matcher also
  42. // coordinates pending proxy answers and routes answers to the awaiting
  43. // client offer handler.
  44. //
  45. // Matching prioritizes selecting the oldest announcements and client offers,
  46. // as they are closest to timing out.
  47. //
  48. // The client and proxy must supply matching personal or common compartment
  49. // IDs. Common compartments are managed by Psiphon and can be obtained via a
  50. // tactics parameter or via an OSL embedding. Each proxy announcement or
  51. // client offer may specify only one compartment ID type, either common or
  52. // personal.
  53. //
  54. // Matching prefers to pair proxies and clients in a way that maximizes total
  55. // possible matches. For a client or proxy with less-limited NAT traversal, a
  56. // pairing with more-limited NAT traversal is preferred; and vice versa.
  57. // Candidates with unknown NAT types and mobile network types are assumed to
  58. // have the most limited NAT traversal capability.
  59. //
  60. // Preferred matchings take priority over announcement age.
  61. //
  62. // The client and proxy will not match if they are in the same country and
  63. // ASN, as it's assumed that doesn't provide any blocking circumvention
  64. // benefit. Disallowing proxies in certain blocked countries is handled at a
  65. // higher level; any such proxies should not be enqueued for matching.
  66. type Matcher struct {
  67. config *MatcherConfig
  68. runMutex sync.Mutex
  69. runContext context.Context
  70. stopRunning context.CancelFunc
  71. waitGroup *sync.WaitGroup
  72. // The announcement queue is implicitly sorted by announcement age. The
  73. // count fields are used to skip searching deeper into the queue for
  74. // preferred matches.
  75. // TODO: replace queue and counts with an indexed, in-memory database?
  76. announcementQueueMutex sync.Mutex
  77. announcementQueue *announcementMultiQueue
  78. announcementQueueEntryCountByIP map[string]int
  79. announcementQueueRateLimiters *lrucache.Cache
  80. announcementLimitEntryCount int
  81. announcementRateLimitQuantity int
  82. announcementRateLimitInterval time.Duration
  83. announcementNonlimitedProxyIDs map[ID]struct{}
  84. // The offer queue is also implicitly sorted by offer age. Both an offer
  85. // and announcement queue are required since either announcements or
  86. // offers can arrive while there are no available pairings.
  87. offerQueueMutex sync.Mutex
  88. offerQueue *list.List
  89. offerQueueEntryCountByIP map[string]int
  90. offerQueueRateLimiters *lrucache.Cache
  91. offerLimitEntryCount int
  92. offerRateLimitQuantity int
  93. offerRateLimitInterval time.Duration
  94. matchSignal chan struct{}
  95. pendingAnswers *lrucache.Cache
  96. }
  97. // MatcherConfig specifies the configuration for a matcher.
  98. type MatcherConfig struct {
  99. // Logger is used to log events.
  100. Logger common.Logger
  101. // Announcement queue limits.
  102. AnnouncementLimitEntryCount int
  103. AnnouncementRateLimitQuantity int
  104. AnnouncementRateLimitInterval time.Duration
  105. AnnouncementNonlimitedProxyIDs []ID
  106. // Offer queue limits.
  107. OfferLimitEntryCount int
  108. OfferRateLimitQuantity int
  109. OfferRateLimitInterval time.Duration
  110. // Proxy quality state.
  111. ProxyQualityState *ProxyQualityState
  112. // Broker process load limit state callback. See BrokerConfig.
  113. IsLoadLimiting func() bool
  114. // Proxy/client allow match callback. See BrokerConfig.
  115. AllowMatch func(common.GeoIPData, common.GeoIPData) bool
  116. }
  117. // MatchProperties specifies the compartment, GeoIP, and network topology
  118. // matching properties of clients and proxies.
  119. type MatchProperties struct {
  120. IsPriority bool
  121. ProtocolVersion int32
  122. CommonCompartmentIDs []ID
  123. PersonalCompartmentIDs []ID
  124. GeoIPData common.GeoIPData
  125. NetworkType NetworkType
  126. NATType NATType
  127. PortMappingTypes PortMappingTypes
  128. }
  129. // EffectiveNATType combines the set of network properties into an effective
  130. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  131. // traversal is assumed. When NAT type is unknown and the network type is
  132. // mobile, CGNAT with limited NAT traversal is assumed.
  133. func (p *MatchProperties) EffectiveNATType() NATType {
  134. if p.PortMappingTypes.Available() {
  135. return NATTypePortMapping
  136. }
  137. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  138. // publicly reachable IPv6 ICE host candidate? If so, change the
  139. // effective NAT type? Depends on whether the matched peer can use IPv6.
  140. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  141. return NATTypeMobileNetwork
  142. }
  143. return p.NATType
  144. }
  145. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  146. // matching given the types of pairing candidates available.
  147. func (p *MatchProperties) ExistsPreferredNATMatch(
  148. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  149. return p.EffectiveNATType().ExistsPreferredMatch(
  150. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  151. }
  152. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  153. // NAT matching.
  154. func (p *MatchProperties) IsPreferredNATMatch(
  155. peerMatchProperties *MatchProperties) bool {
  156. return p.EffectiveNATType().IsPreferredMatch(
  157. peerMatchProperties.EffectiveNATType())
  158. }
  159. // MatchAnnouncement is a proxy announcement to be queued for matching.
  160. type MatchAnnouncement struct {
  161. Properties MatchProperties
  162. ProxyID ID
  163. ProxyMetrics *ProxyMetrics
  164. ConnectionID ID
  165. }
  166. // MatchOffer is a client offer to be queued for matching.
  167. type MatchOffer struct {
  168. Properties MatchProperties
  169. ClientOfferSDP WebRTCSessionDescription
  170. ClientRootObfuscationSecret ObfuscationSecret
  171. DoDTLSRandomization bool
  172. UseMediaStreams bool
  173. TrafficShapingParameters *TrafficShapingParameters
  174. NetworkProtocol NetworkProtocol
  175. DestinationAddress string
  176. DestinationServerID string
  177. }
  178. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  179. // announcement, to be routed to the awaiting client offer.
  180. type MatchAnswer struct {
  181. ProxyIP string
  182. ProxyID ID
  183. ConnectionID ID
  184. ProxyAnswerSDP WebRTCSessionDescription
  185. }
  186. // MatchMetrics records statistics about the match queue state at the time a
  187. // match is made.
  188. type MatchMetrics struct {
  189. OfferMatchIndex int
  190. OfferQueueSize int
  191. AnnouncementMatchIndex int
  192. AnnouncementQueueSize int
  193. PendingAnswersSize int
  194. }
  195. // GetMetrics converts MatchMetrics to loggable fields.
  196. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  197. if metrics == nil {
  198. return nil
  199. }
  200. return common.LogFields{
  201. "offer_match_index": metrics.OfferMatchIndex,
  202. "offer_queue_size": metrics.OfferQueueSize,
  203. "announcement_match_index": metrics.AnnouncementMatchIndex,
  204. "announcement_queue_size": metrics.AnnouncementQueueSize,
  205. "pending_answers_size": metrics.PendingAnswersSize,
  206. }
  207. }
  208. // announcementEntry is an announcement queue entry, an announcement with its
  209. // associated lifetime context and signaling channel.
  210. type announcementEntry struct {
  211. ctx context.Context
  212. limitIP string
  213. announcement *MatchAnnouncement
  214. offerChan chan *MatchOffer
  215. matchMetrics atomic.Value
  216. // queueReference is initialized by addAnnouncementEntry, and used to
  217. // efficiently dequeue the entry.
  218. queueReference announcementQueueReference
  219. }
  220. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  221. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  222. return matchMetrics
  223. }
  224. // offerEntry is an offer queue entry, an offer with its associated lifetime
  225. // context and signaling channel.
  226. type offerEntry struct {
  227. ctx context.Context
  228. limitIP string
  229. offer *MatchOffer
  230. answerChan chan *answerInfo
  231. matchMetrics atomic.Value
  232. // queueReference is initialized by addOfferEntry, and used to efficiently
  233. // dequeue the entry.
  234. queueReference *list.Element
  235. }
  236. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  237. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  238. return matchMetrics
  239. }
  240. // answerInfo is an answer and its associated announcement.
  241. type answerInfo struct {
  242. announcement *MatchAnnouncement
  243. answer *MatchAnswer
  244. }
  245. // pendingAnswer represents an answer that is expected to arrive from a
  246. // proxy.
  247. type pendingAnswer struct {
  248. announcement *MatchAnnouncement
  249. answerChan chan *answerInfo
  250. }
  251. // NewMatcher creates a new Matcher.
  252. func NewMatcher(config *MatcherConfig) *Matcher {
  253. m := &Matcher{
  254. config: config,
  255. waitGroup: new(sync.WaitGroup),
  256. announcementQueue: newAnnouncementMultiQueue(),
  257. announcementQueueEntryCountByIP: make(map[string]int),
  258. announcementQueueRateLimiters: lrucache.NewWithLRU(
  259. 0,
  260. time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
  261. brokerRateLimiterMaxCacheEntries),
  262. offerQueue: list.New(),
  263. offerQueueEntryCountByIP: make(map[string]int),
  264. offerQueueRateLimiters: lrucache.NewWithLRU(
  265. 0,
  266. time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
  267. brokerRateLimiterMaxCacheEntries),
  268. matchSignal: make(chan struct{}, 1),
  269. // matcherPendingAnswersTTL is not configurable; it supplies a default
  270. // that is expected to be ignored when each entry's TTL is set to the
  271. // Offer ctx timeout.
  272. pendingAnswers: lrucache.NewWithLRU(
  273. matcherPendingAnswersTTL,
  274. 1*time.Minute,
  275. matcherPendingAnswersMaxSize),
  276. }
  277. m.SetLimits(
  278. config.AnnouncementLimitEntryCount,
  279. config.AnnouncementRateLimitQuantity,
  280. config.AnnouncementRateLimitInterval,
  281. config.AnnouncementNonlimitedProxyIDs,
  282. config.OfferLimitEntryCount,
  283. config.OfferRateLimitQuantity,
  284. config.OfferRateLimitInterval)
  285. return m
  286. }
  287. // SetLimits sets new queue limits, replacing the previous configuration.
  288. // Existing, cached rate limiters retain their existing rate limit state. New
  289. // entries will use the new quantity/interval configuration. In addition,
  290. // currently enqueued items may exceed any new, lower maximum entry count
  291. // until naturally dequeued.
  292. func (m *Matcher) SetLimits(
  293. announcementLimitEntryCount int,
  294. announcementRateLimitQuantity int,
  295. announcementRateLimitInterval time.Duration,
  296. announcementNonlimitedProxyIDs []ID,
  297. offerLimitEntryCount int,
  298. offerRateLimitQuantity int,
  299. offerRateLimitInterval time.Duration) {
  300. nonlimitedProxyIDs := make(map[ID]struct{})
  301. for _, proxyID := range announcementNonlimitedProxyIDs {
  302. nonlimitedProxyIDs[proxyID] = struct{}{}
  303. }
  304. m.announcementQueueMutex.Lock()
  305. m.announcementLimitEntryCount = announcementLimitEntryCount
  306. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  307. m.announcementRateLimitInterval = announcementRateLimitInterval
  308. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  309. m.announcementQueueMutex.Unlock()
  310. m.offerQueueMutex.Lock()
  311. m.offerLimitEntryCount = offerLimitEntryCount
  312. m.offerRateLimitQuantity = offerRateLimitQuantity
  313. m.offerRateLimitInterval = offerRateLimitInterval
  314. m.offerQueueMutex.Unlock()
  315. }
  316. // Start starts running the Matcher. The Matcher runs a goroutine which
  317. // matches announcements and offers.
  318. func (m *Matcher) Start() error {
  319. m.runMutex.Lock()
  320. defer m.runMutex.Unlock()
  321. if m.runContext != nil {
  322. return errors.TraceNew("already running")
  323. }
  324. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  325. m.waitGroup.Add(1)
  326. go func() {
  327. defer m.waitGroup.Done()
  328. m.matchWorker(m.runContext)
  329. }()
  330. return nil
  331. }
  332. // Stop stops running the Matcher and its worker goroutine.
  333. //
  334. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  335. // can get enqueued during and after a Stop call. Stop is intended more for a
  336. // full broker shutdown, where this won't be a concern.
  337. func (m *Matcher) Stop() {
  338. m.runMutex.Lock()
  339. defer m.runMutex.Unlock()
  340. m.stopRunning()
  341. m.waitGroup.Wait()
  342. m.runContext, m.stopRunning = nil, nil
  343. }
  344. // Announce enqueues the proxy announcement and blocks until it is matched
  345. // with a returned offer or ctx is done. The caller must not mutate the
  346. // announcement or its properties after calling Announce.
  347. //
  348. // Announce assumes that the ctx.Deadline for each call is monotonically
  349. // increasing and that the deadline can be used as part of selecting the next
  350. // nearest-to-expire announcement.
  351. //
  352. // The offer is sent to the proxy by the broker, and then the proxy sends its
  353. // answer back to the broker, which calls Answer with that value.
  354. //
  355. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  356. // match is made, even if there is a later error.
  357. func (m *Matcher) Announce(
  358. ctx context.Context,
  359. proxyIP string,
  360. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  361. // An announcement must specify exactly one compartment ID, of one type,
  362. // common or personal. The limit of one is currently a limitation of the
  363. // multi-queue implementation; see comment in
  364. // announcementMultiQueue.enqueue.
  365. compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
  366. if len(compartmentIDs) == 0 {
  367. compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
  368. } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
  369. return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  370. }
  371. if len(compartmentIDs) != 1 {
  372. return nil, nil, errors.TraceNew("unexpected compartment ID count")
  373. }
  374. isAnnouncement := true
  375. err := m.applyLoadLimit(isAnnouncement)
  376. if err != nil {
  377. return nil, nil, errors.Trace(err)
  378. }
  379. announcementEntry := &announcementEntry{
  380. ctx: ctx,
  381. limitIP: getRateLimitIP(proxyIP),
  382. announcement: proxyAnnouncement,
  383. offerChan: make(chan *MatchOffer, 1),
  384. }
  385. err = m.addAnnouncementEntry(announcementEntry)
  386. if err != nil {
  387. return nil, nil, errors.Trace(err)
  388. }
  389. // Await client offer.
  390. var clientOffer *MatchOffer
  391. select {
  392. case <-ctx.Done():
  393. m.removeAnnouncementEntry(true, announcementEntry)
  394. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  395. case clientOffer = <-announcementEntry.offerChan:
  396. }
  397. return clientOffer, announcementEntry.getMatchMetrics(), nil
  398. }
  399. // Offer enqueues the client offer and blocks until it is matched with a
  400. // returned announcement or ctx is done. The caller must not mutate the offer
  401. // or its properties after calling Announce.
  402. //
  403. // The answer is returned to the client by the broker, and the WebRTC
  404. // connection is dialed. The original announcement is also returned, so its
  405. // match properties can be logged.
  406. //
  407. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  408. // match is made, even if there is a later error.
  409. func (m *Matcher) Offer(
  410. ctx context.Context,
  411. clientIP string,
  412. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  413. // An offer must specify at least one compartment ID, and may only specify
  414. // one type, common or personal, of compartment IDs.
  415. compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
  416. if len(compartmentIDs) == 0 {
  417. compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
  418. } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
  419. return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  420. }
  421. if len(compartmentIDs) < 1 {
  422. return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
  423. }
  424. isAnnouncement := false
  425. err := m.applyLoadLimit(isAnnouncement)
  426. if err != nil {
  427. return nil, nil, nil, errors.Trace(err)
  428. }
  429. offerEntry := &offerEntry{
  430. ctx: ctx,
  431. limitIP: getRateLimitIP(clientIP),
  432. offer: clientOffer,
  433. answerChan: make(chan *answerInfo, 1),
  434. }
  435. err = m.addOfferEntry(offerEntry)
  436. if err != nil {
  437. return nil, nil, nil, errors.Trace(err)
  438. }
  439. // Await proxy answer.
  440. var proxyAnswerInfo *answerInfo
  441. select {
  442. case <-ctx.Done():
  443. m.removeOfferEntry(true, offerEntry)
  444. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  445. // the Offer ctx, the client request, timeout, so it will eventually
  446. // get removed. But a client may abort its request earlier than the
  447. // timeout.
  448. return nil, nil,
  449. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  450. case proxyAnswerInfo = <-offerEntry.answerChan:
  451. }
  452. if proxyAnswerInfo == nil {
  453. // nil will be delivered to the channel when either the proxy
  454. // announcement request concurrently timed out, or the answer
  455. // indicated a proxy error, or the answer did not arrive in time.
  456. return nil, nil,
  457. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  458. }
  459. // This is a sanity check and not expected to fail.
  460. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  461. proxyAnswerInfo.announcement.ConnectionID) {
  462. return nil, nil,
  463. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  464. }
  465. return proxyAnswerInfo.answer,
  466. proxyAnswerInfo.announcement,
  467. offerEntry.getMatchMetrics(),
  468. nil
  469. }
  470. // AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
  471. // announcement identified by the specified proxy ID and connection ID and
  472. // returns whether the announcement has personal compartment IDs, indicating
  473. // personal pairing mode.
  474. //
  475. // If no pending answer is found, an error is returned.
  476. func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
  477. proxyID ID, connectionID ID) (bool, error) {
  478. key := m.pendingAnswerKey(proxyID, connectionID)
  479. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  480. if !ok {
  481. // The input IDs don't correspond to a pending answer, or the client
  482. // is no longer awaiting the response.
  483. return false, errors.TraceNew("no pending answer")
  484. }
  485. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  486. hasPersonalCompartmentIDs := len(
  487. pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
  488. return hasPersonalCompartmentIDs, nil
  489. }
  490. // Answer delivers an answer from the proxy for a previously matched offer.
  491. // The ProxyID and ConnectionID must correspond to the original announcement.
  492. // The caller must not mutate the answer after calling Answer. Answer does
  493. // not block.
  494. //
  495. // The answer is returned to the awaiting Offer call and sent to the matched
  496. // client.
  497. func (m *Matcher) Answer(
  498. proxyAnswer *MatchAnswer) error {
  499. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  500. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  501. if !ok {
  502. // The input IDs don't correspond to a pending answer, or the client
  503. // is no longer awaiting the response.
  504. return errors.TraceNew("no pending answer")
  505. }
  506. m.pendingAnswers.Delete(key)
  507. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  508. pendingAnswer.answerChan <- &answerInfo{
  509. announcement: pendingAnswer.announcement,
  510. answer: proxyAnswer,
  511. }
  512. return nil
  513. }
  514. // AnswerError delivers a failed answer indication from the proxy to an
  515. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  516. // original announcement.
  517. //
  518. // The failure indication is returned to the awaiting Offer call and sent to
  519. // the matched client.
  520. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  521. key := m.pendingAnswerKey(proxyID, connectionID)
  522. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  523. if !ok {
  524. // The client is no longer awaiting the response.
  525. return
  526. }
  527. m.pendingAnswers.Delete(key)
  528. // Closing the channel delivers nil, a failed indicator, to any receiver.
  529. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  530. }
  531. // matchWorker is the matching worker goroutine. It idles until signaled that
  532. // a queue item has been added, and then runs a full matching pass.
  533. func (m *Matcher) matchWorker(ctx context.Context) {
  534. for {
  535. select {
  536. case <-m.matchSignal:
  537. m.matchAllOffers()
  538. case <-ctx.Done():
  539. return
  540. }
  541. }
  542. }
  543. // matchAllOffers iterates over the queues, making all possible matches.
  544. func (m *Matcher) matchAllOffers() {
  545. m.announcementQueueMutex.Lock()
  546. defer m.announcementQueueMutex.Unlock()
  547. m.offerQueueMutex.Lock()
  548. defer m.offerQueueMutex.Unlock()
  549. // Take each offer in turn, and select an announcement match. There is an
  550. // implicit preference for older client offers, sooner to timeout, at the
  551. // front of the queue.
  552. // TODO: consider matching one offer, then releasing the locks to allow
  553. // more announcements to be enqueued, then continuing to match.
  554. nextOffer := m.offerQueue.Front()
  555. offerIndex := -1
  556. for nextOffer != nil && m.announcementQueue.getLen() > 0 {
  557. offerIndex += 1
  558. // nextOffer.Next must be invoked before any removeOfferEntry since
  559. // container/list.remove clears list.Element.next.
  560. offer := nextOffer
  561. nextOffer = nextOffer.Next()
  562. offerEntry := offer.Value.(*offerEntry)
  563. // Skip and remove this offer if its deadline has already passed.
  564. // There is no signal to the awaiting Offer function, as it will exit
  565. // based on the same ctx.
  566. if offerEntry.ctx.Err() != nil {
  567. m.removeOfferEntry(false, offerEntry)
  568. continue
  569. }
  570. announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
  571. if announcementEntry == nil {
  572. continue
  573. }
  574. // Record match metrics.
  575. // The index metrics predate the announcement multi-queue; now, with
  576. // the multi-queue, announcement_index is how many announce entries
  577. // were inspected before matching.
  578. matchMetrics := &MatchMetrics{
  579. OfferMatchIndex: offerIndex,
  580. OfferQueueSize: m.offerQueue.Len(),
  581. AnnouncementMatchIndex: announcementMatchIndex,
  582. AnnouncementQueueSize: m.announcementQueue.getLen(),
  583. PendingAnswersSize: m.pendingAnswers.ItemCount(),
  584. }
  585. offerEntry.matchMetrics.Store(matchMetrics)
  586. announcementEntry.matchMetrics.Store(matchMetrics)
  587. // Remove the matched announcement from the queue. Send the offer to
  588. // the announcement entry's offerChan, which will deliver it to the
  589. // blocked Announce call. Add a pending answers entry to await the
  590. // proxy's follow up Answer call. The TTL for the pending answer
  591. // entry is set to the matched Offer call's ctx, as the answer is
  592. // only useful as long as the client is still waiting.
  593. m.removeAnnouncementEntry(false, announcementEntry)
  594. expiry := lrucache.DefaultExpiration
  595. deadline, ok := offerEntry.ctx.Deadline()
  596. if ok {
  597. expiry = time.Until(deadline)
  598. }
  599. key := m.pendingAnswerKey(
  600. announcementEntry.announcement.ProxyID,
  601. announcementEntry.announcement.ConnectionID)
  602. m.pendingAnswers.Set(
  603. key,
  604. &pendingAnswer{
  605. announcement: announcementEntry.announcement,
  606. answerChan: offerEntry.answerChan,
  607. },
  608. expiry)
  609. announcementEntry.offerChan <- offerEntry.offer
  610. // Remove the matched offer from the queue and match the next offer,
  611. // now first in the queue.
  612. m.removeOfferEntry(false, offerEntry)
  613. }
  614. }
  615. func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
  616. // Assumes the caller has the queue mutexes locked.
  617. // Check each candidate announcement in turn, and select a match. There is
  618. // an implicit preference for older proxy announcements, sooner to
  619. // timeout, at the front of the enqueued announcements.
  620. // announcementMultiQueue.startMatching skips to the first matching
  621. // compartment ID(s).
  622. //
  623. // Limitation: since this logic matches each enqueued client in turn, it will
  624. // only make the optimal NAT match for the oldest enqueued client vs. all
  625. // proxies, and not do optimal N x M matching for all clients and all proxies.
  626. //
  627. // Future matching enhancements could include more sophisticated GeoIP
  628. // rules, such as a configuration encoding knowledge of an ASN's NAT
  629. // type, or preferred client/proxy country/ASN matches.
  630. offerProperties := &offerEntry.offer.Properties
  631. // Assumes the caller checks that offer specifies either personal
  632. // compartment IDs or common compartment IDs, but not both.
  633. isCommonCompartments := false
  634. compartmentIDs := offerProperties.PersonalCompartmentIDs
  635. if len(compartmentIDs) == 0 {
  636. isCommonCompartments = true
  637. compartmentIDs = offerProperties.CommonCompartmentIDs
  638. }
  639. if len(compartmentIDs) == 0 {
  640. return nil, -1
  641. }
  642. matchIterator := m.announcementQueue.startMatching(
  643. isCommonCompartments, compartmentIDs)
  644. // Use the NAT traversal type counters to check if there's any preferred
  645. // NAT match for this offer in the announcement queue. When there is, we
  646. // will search beyond the first announcement.
  647. unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
  648. matchIterator.getNATCounts()
  649. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  650. unlimitedNATCount > 0,
  651. partiallyLimitedNATCount > 0,
  652. strictlyLimitedNATCount > 0)
  653. // TODO: add an ExistsCompatibleProtocolVersionMatch check?
  654. //
  655. // Currently, searching for protocol version support that doesn't exist
  656. // may be mitigated by limiting, through tactics, client protocol options
  657. // selection; using the proxy protocol version in PrioritizeProxy; and,
  658. // ultimately, increasing MinimumProxyProtocolVersion.
  659. var bestMatch *announcementEntry
  660. bestMatchIndex := -1
  661. bestMatchIsPriority := false
  662. bestMatchNAT := false
  663. // matcherMaxProbe limits the linear search through the announcement queue
  664. // to find a match. Currently, the queue implementation provides
  665. // constant-time lookup for matching compartment IDs. Other matching
  666. // aspects may require iterating over the queue items, including the
  667. // strict same-country and ASN constraint and protocol version
  668. // compatibility constraint. Best NAT match is not a strict constraint
  669. // and uses a shorter search limit, matcherMaxPreferredNATProbe.
  670. candidateIndex := -1
  671. for candidateIndex <= matcherMaxProbe {
  672. announcementEntry, isPriority := matchIterator.getNext()
  673. if announcementEntry == nil {
  674. break
  675. }
  676. if !isPriority && bestMatchIsPriority {
  677. // There is a priority match, but it wasn't bestMatchNAT and we
  678. // continued to iterate. Now that isPriority is false, we're past the
  679. // end of the priority items, so stop looking for any best NAT match
  680. // and return the previous priority match. When there are zero
  681. // priority items to begin with, this case should not be hit.
  682. break
  683. }
  684. candidateIndex += 1
  685. // Skip and remove this announcement if its deadline has already
  686. // passed. There is no signal to the awaiting Announce function, as
  687. // it will exit based on the same ctx.
  688. if announcementEntry.ctx.Err() != nil {
  689. m.removeAnnouncementEntry(false, announcementEntry)
  690. continue
  691. }
  692. announcementProperties := &announcementEntry.announcement.Properties
  693. // Don't match unless the proxy announcement, client offer, and the
  694. // client's selected protocol options are compatible. UseMediaStreams
  695. // requires at least ProtocolVersion2.
  696. _, ok := negotiateProtocolVersion(
  697. announcementProperties.ProtocolVersion,
  698. offerProperties.ProtocolVersion,
  699. offerEntry.offer.UseMediaStreams)
  700. if !ok {
  701. continue
  702. }
  703. // Disallow matching the same country and ASN, or GeoIP combinations
  704. // prohibited by the AllowMatch callback, except for personal
  705. // compartment ID matches.
  706. //
  707. // For common matching, hopping through the same ISP is assumed to
  708. // have no circumvention benefit. For personal matching, the user may
  709. // wish to hop their their own or their friend's proxy regardless.
  710. if isCommonCompartments {
  711. if !GetAllowCommonASNMatching() &&
  712. (offerProperties.GeoIPData.Country ==
  713. announcementProperties.GeoIPData.Country &&
  714. offerProperties.GeoIPData.ASN ==
  715. announcementProperties.GeoIPData.ASN) {
  716. continue
  717. }
  718. if !m.config.AllowMatch(
  719. announcementProperties.GeoIPData,
  720. offerProperties.GeoIPData) {
  721. continue
  722. }
  723. }
  724. // Check if this is a preferred NAT match. Ultimately, a match may be
  725. // made with potentially incompatible NATs, but the client/proxy
  726. // reported NAT types may be incorrect or unknown; the client will
  727. // often skip NAT discovery.
  728. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  729. // Use proxy ASN quality as an alternative to preferred NAT matches.
  730. //
  731. // The NAT matching logic depends on RFC5780 NAT discovery test
  732. // results, which may not be entirely accurate, and may not be
  733. // available in the first place, especially if skipped for clients,
  734. // which is the default.
  735. //
  736. // Proxy ASN quality leverages the quality data, provided by servers,
  737. // indicating that the particular proxy recently relayed a successful
  738. // tunnel for some client in the given ASN. When this quality data is
  739. // present, NAT compatibility is assumed, with the caveat that the
  740. // client device and immediate router may not be the same.
  741. //
  742. // Limitations:
  743. // - existsPreferredNATMatch doesn't reflect existence of matching
  744. // proxy ASN quality, so the NAT match probe can end prematurely.
  745. // - IsPreferredNATMatch currently takes precedence over proxy ASN
  746. // quality.
  747. if !matchNAT && isPriority {
  748. matchNAT = m.config.ProxyQualityState.HasQuality(
  749. announcementEntry.announcement.ProxyID,
  750. announcementEntry.announcement.Properties.GeoIPData.ASN,
  751. offerProperties.GeoIPData.ASN)
  752. }
  753. // At this point, the candidate is a match. Determine if this is a new
  754. // best match, either if there was no previous match, or this is a
  755. // better NAT match.
  756. if bestMatch == nil || (!bestMatchNAT && matchNAT) {
  757. bestMatch = announcementEntry
  758. bestMatchIndex = candidateIndex
  759. bestMatchIsPriority = isPriority
  760. bestMatchNAT = matchNAT
  761. }
  762. // Stop as soon as we have the best possible match, or have reached
  763. // the probe limit for preferred NAT matches.
  764. if bestMatch != nil && (bestMatchNAT ||
  765. !existsPreferredNATMatch ||
  766. candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
  767. break
  768. }
  769. }
  770. return bestMatch, bestMatchIndex
  771. }
  772. // applyLoadLimit checks if the broker process is in the load limiting state
  773. // and, in order to reduce load, determines if new proxy announces or client
  774. // offers should be rejected immediately instead of enqueued.
  775. func (m *Matcher) applyLoadLimit(isAnnouncement bool) error {
  776. if m.config.IsLoadLimiting == nil || !m.config.IsLoadLimiting() {
  777. return nil
  778. }
  779. // Acquire the queue locks only when in the load limit state, and in the
  780. // same order as matchAllOffers.
  781. m.announcementQueueMutex.Lock()
  782. defer m.announcementQueueMutex.Unlock()
  783. m.offerQueueMutex.Lock()
  784. defer m.offerQueueMutex.Unlock()
  785. announcementLen := m.announcementQueue.getLen()
  786. offerLen := m.offerQueue.Len()
  787. // When the load limit had been reached, and assuming the broker process
  788. // is running only an in-proxy broker, it's likely, in practise, that
  789. // only one of the two queues has hundreds of thousands of entries while
  790. // the other has few, and there are no matches clearing the queue.
  791. //
  792. // Instead of simply rejecting all enqueue requests, allow the request
  793. // type, announce or offer, that is in shorter supply as these are likely
  794. // to match and draw down the larger queue. This attempts to make
  795. // productive use of enqueued items, and also attempts to avoid simply
  796. // emptying both queues -- as will happen in any case due to timeouts --
  797. // and then have the same larger queue refill again after the load limit
  798. // state exits.
  799. //
  800. // This approach assumes some degree of slack in available system memory
  801. // and CPU in the load limiting state, similar to how the tunnel server
  802. // continues to operate existing tunnels in the same state.
  803. //
  804. // The heuristic below of allowing when less than half the size of the
  805. // larger queue puts a cap on the amount the shorter queue can continue
  806. // to grow in the load limiting state, in the worst case.
  807. //
  808. // Limitation: in some scenarios that are expected to be rare, it can
  809. // happen that allowed requests don't result in a match and memory
  810. // consumption continues to grow, leading to a broker process OOM kill.
  811. var allow bool
  812. if isAnnouncement {
  813. allow = announcementLen < offerLen/2
  814. } else {
  815. allow = offerLen < announcementLen/2
  816. }
  817. if allow {
  818. return nil
  819. }
  820. // Do not return a MatcherLimitError, as is done in applyIPLimits. A
  821. // MatcherLimitError results in a Response.Limited error response, which
  822. // causes a proxy to back off and a client to abort its dial; but in
  823. // neither case is the broker client reset. The error returned here will
  824. // result in a fast 404 response to the proxy or client, which will
  825. // instead trigger a broker client reset, and a chance of moving to a
  826. // different broker that is not overloaded.
  827. //
  828. // Limitation: the 404 response won't be distinguishable, in client or
  829. // proxy diagnostics, from other error conditions.
  830. //
  831. // TODO: add a new Response.LoadLimited flag which the proxy/client can
  832. // use use log a distinct error and also ensure that it doesn't reselect
  833. // the same broker again in the broker client reset random selection.
  834. return errors.TraceNew("load limited")
  835. }
  836. // MatcherLimitError is the error type returned by Announce or Offer when the
  837. // caller has exceeded configured queue entry or rate limits.
  838. type MatcherLimitError struct {
  839. err error
  840. }
  841. func NewMatcherLimitError(err error) *MatcherLimitError {
  842. return &MatcherLimitError{err: err}
  843. }
  844. func (e MatcherLimitError) Error() string {
  845. return e.err.Error()
  846. }
  847. // applyIPLimits checks per-proxy or per-client -- as determined by peer IP
  848. // address -- rate limits and queue entry limits.
  849. func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  850. // Assumes m.announcementQueueMutex or m.offerQueueMutex is locked.
  851. var entryCountByIP map[string]int
  852. var queueRateLimiters *lrucache.Cache
  853. var limitEntryCount int
  854. var quantity int
  855. var interval time.Duration
  856. if isAnnouncement {
  857. // Skip limit checks for non-limited proxies.
  858. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  859. return nil
  860. }
  861. entryCountByIP = m.announcementQueueEntryCountByIP
  862. queueRateLimiters = m.announcementQueueRateLimiters
  863. limitEntryCount = m.announcementLimitEntryCount
  864. quantity = m.announcementRateLimitQuantity
  865. interval = m.announcementRateLimitInterval
  866. } else {
  867. entryCountByIP = m.offerQueueEntryCountByIP
  868. queueRateLimiters = m.offerQueueRateLimiters
  869. limitEntryCount = m.offerLimitEntryCount
  870. quantity = m.offerRateLimitQuantity
  871. interval = m.offerRateLimitInterval
  872. }
  873. // The rate limit is checked first, before the max count check, to ensure
  874. // that the rate limit state is updated regardless of the max count check
  875. // outcome.
  876. err := brokerRateLimit(
  877. queueRateLimiters,
  878. limitIP,
  879. quantity,
  880. interval)
  881. if err != nil {
  882. return errors.Trace(NewMatcherLimitError(err))
  883. }
  884. if limitEntryCount > 0 {
  885. // Limitation: non-limited proxy ID entries are counted in
  886. // entryCountByIP. If both a limited and non-limited proxy ingress
  887. // from the same limitIP, then the non-limited entries will count
  888. // against the limited proxy's limitEntryCount.
  889. entryCount, ok := entryCountByIP[limitIP]
  890. if ok && entryCount >= limitEntryCount {
  891. return errors.Trace(
  892. NewMatcherLimitError(std_errors.New("max entries for IP")))
  893. }
  894. }
  895. return nil
  896. }
  897. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  898. m.announcementQueueMutex.Lock()
  899. defer m.announcementQueueMutex.Unlock()
  900. // Ensure the queue doesn't grow larger than the max size.
  901. if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
  902. return errors.TraceNew("queue full")
  903. }
  904. // Ensure no single peer IP can enqueue a large number of entries or
  905. // rapidly enqueue beyond the configured rate.
  906. isAnnouncement := true
  907. err := m.applyIPLimits(
  908. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  909. if err != nil {
  910. return errors.Trace(err)
  911. }
  912. // announcementEntry.queueReference should be uninitialized.
  913. // announcementMultiQueue.enqueue sets queueReference to be used for
  914. // efficient dequeuing.
  915. if announcementEntry.queueReference.entry != nil {
  916. return errors.TraceNew("unexpected queue reference")
  917. }
  918. err = m.announcementQueue.enqueue(announcementEntry)
  919. if err != nil {
  920. return errors.Trace(err)
  921. }
  922. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  923. select {
  924. case m.matchSignal <- struct{}{}:
  925. default:
  926. }
  927. return nil
  928. }
  929. func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
  930. // In the aborting case, the queue isn't already locked. Otherwise, assume
  931. // it is locked.
  932. if aborting {
  933. m.announcementQueueMutex.Lock()
  934. defer m.announcementQueueMutex.Unlock()
  935. }
  936. found := announcementEntry.queueReference.dequeue()
  937. if found {
  938. // Adjust entry counts by peer IP, used to enforce
  939. // matcherAnnouncementQueueMaxEntriesPerIP.
  940. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  941. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  942. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  943. }
  944. }
  945. if aborting && !found {
  946. // The Announce call is aborting and taking its entry back out of the
  947. // queue. If the entry is not found in the queue, then a concurrent
  948. // Offer has matched the announcement. So check for the pending
  949. // answer corresponding to the announcement and remove it and deliver
  950. // a failure signal to the waiting Offer, so the client doesn't wait
  951. // longer than necessary.
  952. key := m.pendingAnswerKey(
  953. announcementEntry.announcement.ProxyID,
  954. announcementEntry.announcement.ConnectionID)
  955. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  956. if ok {
  957. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  958. m.pendingAnswers.Delete(key)
  959. }
  960. }
  961. }
  962. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  963. m.offerQueueMutex.Lock()
  964. defer m.offerQueueMutex.Unlock()
  965. // Ensure the queue doesn't grow larger than the max size.
  966. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  967. return errors.TraceNew("queue full")
  968. }
  969. // Ensure no single peer IP can enqueue a large number of entries or
  970. // rapidly enqueue beyond the configured rate.
  971. isAnnouncement := false
  972. err := m.applyIPLimits(
  973. isAnnouncement, offerEntry.limitIP, ID{})
  974. if err != nil {
  975. return errors.Trace(err)
  976. }
  977. // offerEntry.queueReference should be uninitialized and is set here to be
  978. // used for efficient dequeuing.
  979. if offerEntry.queueReference != nil {
  980. return errors.TraceNew("unexpected queue reference")
  981. }
  982. offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
  983. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  984. select {
  985. case m.matchSignal <- struct{}{}:
  986. default:
  987. }
  988. return nil
  989. }
  990. func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
  991. // In the aborting case, the queue isn't already locked. Otherise, assume
  992. // it is locked.
  993. if aborting {
  994. m.offerQueueMutex.Lock()
  995. defer m.offerQueueMutex.Unlock()
  996. }
  997. if offerEntry.queueReference == nil {
  998. return
  999. }
  1000. m.offerQueue.Remove(offerEntry.queueReference)
  1001. offerEntry.queueReference = nil
  1002. // Adjust entry counts by peer IP, used to enforce
  1003. // matcherOfferQueueMaxEntriesPerIP.
  1004. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  1005. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  1006. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  1007. }
  1008. }
  1009. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  1010. // The pending answer lookup key is used to associate announcements and
  1011. // subsequent answers. While the client learns the ConnectionID, only the
  1012. // proxy knows the ProxyID component, so only the correct proxy can match
  1013. // an answer to an announcement. The ConnectionID component is necessary
  1014. // as a proxy may have multiple, concurrent pending answers.
  1015. return string(proxyID[:]) + string(connectionID[:])
  1016. }
  1017. func getRateLimitIP(strIP string) string {
  1018. IP := net.ParseIP(strIP)
  1019. if IP == nil || IP.To4() != nil {
  1020. return strIP
  1021. }
  1022. // With IPv6, individual users or sites are users commonly allocated a /64
  1023. // or /56, so rate limit by /56.
  1024. return IP.Mask(net.CIDRMask(56, 128)).String()
  1025. }
  1026. // announcementMultiQueue is a set of announcement queues, one per common or
  1027. // personal compartment ID, providing efficient iteration over announcements
  1028. // matching a specified list of compartment IDs. announcementMultiQueue and
  1029. // its underlying data structures are not safe for concurrent access.
  1030. type announcementMultiQueue struct {
  1031. priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
  1032. commonCompartmentQueues map[ID]*announcementCompartmentQueue
  1033. personalCompartmentQueues map[ID]*announcementCompartmentQueue
  1034. totalEntries int
  1035. }
  1036. // announcementCompartmentQueue is a single compartment queue within an
  1037. // announcementMultiQueue. The queue is implemented using a doubly-linked
  1038. // list, which provides efficient insert and mid-queue dequeue operations.
  1039. // The announcementCompartmentQueue also records NAT type stats for enqueued
  1040. // announcements, which are used, when matching, to determine when better NAT
  1041. // matches may be possible.
  1042. type announcementCompartmentQueue struct {
  1043. isCommonCompartment bool
  1044. isPriority bool
  1045. compartmentID ID
  1046. entries *list.List
  1047. unlimitedNATCount int
  1048. partiallyLimitedNATCount int
  1049. strictlyLimitedNATCount int
  1050. }
  1051. // announcementMatchIterator represents the state of an iteration over a
  1052. // subset of announcementMultiQueue compartment queues. Concurrent
  1053. // announcementMatchIterators are not supported.
  1054. type announcementMatchIterator struct {
  1055. multiQueue *announcementMultiQueue
  1056. compartmentQueues []*announcementCompartmentQueue
  1057. compartmentIDs []ID
  1058. nextEntries []*list.Element
  1059. }
  1060. // announcementQueueReference represents the queue position for a given
  1061. // announcement entry, and provides an efficient dequeue operation.
  1062. type announcementQueueReference struct {
  1063. multiQueue *announcementMultiQueue
  1064. compartmentQueue *announcementCompartmentQueue
  1065. entry *list.Element
  1066. }
  1067. func newAnnouncementMultiQueue() *announcementMultiQueue {
  1068. return &announcementMultiQueue{
  1069. priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1070. commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1071. personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1072. }
  1073. }
  1074. func (q *announcementMultiQueue) getLen() int {
  1075. return q.totalEntries
  1076. }
  1077. func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
  1078. // Assumes announcementEntry not already enueued.
  1079. // Limitation: only one compartment ID, either common or personal, is
  1080. // supported per announcement entry. In the common compartment case, the
  1081. // broker currently assigns only one common compartment ID per proxy
  1082. // announcement. In the personal compartment case, there is currently no
  1083. // use case for allowing a proxy to announce under multiple personal
  1084. // compartment IDs.
  1085. //
  1086. // To overcome this limitation, the dequeue operation would need to be
  1087. // able to remove an announcement entry from multiple
  1088. // announcementCompartmentQueues.
  1089. commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
  1090. personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
  1091. if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
  1092. return errors.TraceNew("announcement must specify exactly one compartment ID")
  1093. }
  1094. isPriority := announcementEntry.announcement.Properties.IsPriority
  1095. isCommonCompartment := true
  1096. var compartmentID ID
  1097. var compartmentQueues map[ID]*announcementCompartmentQueue
  1098. if len(commonCompartmentIDs) > 0 {
  1099. compartmentID = commonCompartmentIDs[0]
  1100. compartmentQueues = q.commonCompartmentQueues
  1101. if isPriority {
  1102. compartmentQueues = q.priorityCommonCompartmentQueues
  1103. }
  1104. } else {
  1105. isCommonCompartment = false
  1106. compartmentID = personalCompartmentIDs[0]
  1107. compartmentQueues = q.personalCompartmentQueues
  1108. if isPriority {
  1109. return errors.TraceNew("priority not supported for personal compartments")
  1110. }
  1111. }
  1112. compartmentQueue, ok := compartmentQueues[compartmentID]
  1113. if !ok {
  1114. compartmentQueue = &announcementCompartmentQueue{
  1115. isCommonCompartment: isCommonCompartment,
  1116. isPriority: isPriority,
  1117. compartmentID: compartmentID,
  1118. entries: list.New(),
  1119. }
  1120. compartmentQueues[compartmentID] = compartmentQueue
  1121. }
  1122. entry := compartmentQueue.entries.PushBack(announcementEntry)
  1123. // Update the NAT type counts which are used to determine if a better NAT
  1124. // match may be made by inspecting more announcement queue entries.
  1125. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1126. case NATTraversalUnlimited:
  1127. compartmentQueue.unlimitedNATCount += 1
  1128. case NATTraversalPartiallyLimited:
  1129. compartmentQueue.partiallyLimitedNATCount += 1
  1130. case NATTraversalStrictlyLimited:
  1131. compartmentQueue.strictlyLimitedNATCount += 1
  1132. }
  1133. q.totalEntries += 1
  1134. announcementEntry.queueReference = announcementQueueReference{
  1135. multiQueue: q,
  1136. compartmentQueue: compartmentQueue,
  1137. entry: entry,
  1138. }
  1139. return nil
  1140. }
  1141. // announcementQueueReference returns false if the item is already dequeued.
  1142. func (r *announcementQueueReference) dequeue() bool {
  1143. if r.entry == nil {
  1144. // Already dequeued.
  1145. return false
  1146. }
  1147. announcementEntry := r.entry.Value.(*announcementEntry)
  1148. // Reverse the NAT type counts.
  1149. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1150. case NATTraversalUnlimited:
  1151. r.compartmentQueue.unlimitedNATCount -= 1
  1152. case NATTraversalPartiallyLimited:
  1153. r.compartmentQueue.partiallyLimitedNATCount -= 1
  1154. case NATTraversalStrictlyLimited:
  1155. r.compartmentQueue.strictlyLimitedNATCount -= 1
  1156. }
  1157. r.compartmentQueue.entries.Remove(r.entry)
  1158. if r.compartmentQueue.entries.Len() == 0 {
  1159. // Remove empty compartment queue.
  1160. queues := r.multiQueue.personalCompartmentQueues
  1161. if r.compartmentQueue.isCommonCompartment {
  1162. if r.compartmentQueue.isPriority {
  1163. queues = r.multiQueue.priorityCommonCompartmentQueues
  1164. } else {
  1165. queues = r.multiQueue.commonCompartmentQueues
  1166. }
  1167. }
  1168. delete(queues, r.compartmentQueue.compartmentID)
  1169. }
  1170. r.multiQueue.totalEntries -= 1
  1171. // Mark as dequeued.
  1172. r.entry = nil
  1173. return true
  1174. }
  1175. func (q *announcementMultiQueue) startMatching(
  1176. isCommonCompartments bool,
  1177. compartmentIDs []ID) *announcementMatchIterator {
  1178. iter := &announcementMatchIterator{
  1179. multiQueue: q,
  1180. }
  1181. // Find the matching compartment queues and initialize iteration over
  1182. // those queues. Building the set of matching queues is a linear time
  1183. // operation, bounded by the length of compartmentIDs (no more than
  1184. // maxCompartmentIDs, as enforced in
  1185. // ClientOfferRequest.ValidateAndGetLogFields).
  1186. // Priority queues, when in use, must all be added to the beginning of
  1187. // iter.compartmentQueues in order to ensure that the iteration logic in
  1188. // getNext visits all priority items first.
  1189. var compartmentQueuesList []map[ID]*announcementCompartmentQueue
  1190. if isCommonCompartments {
  1191. compartmentQueuesList = append(
  1192. compartmentQueuesList,
  1193. q.priorityCommonCompartmentQueues,
  1194. q.commonCompartmentQueues)
  1195. } else {
  1196. compartmentQueuesList = append(
  1197. compartmentQueuesList,
  1198. q.personalCompartmentQueues)
  1199. }
  1200. for _, compartmentQueues := range compartmentQueuesList {
  1201. for _, ID := range compartmentIDs {
  1202. if compartmentQueue, ok := compartmentQueues[ID]; ok {
  1203. iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
  1204. iter.compartmentIDs = append(iter.compartmentIDs, ID)
  1205. iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
  1206. }
  1207. }
  1208. }
  1209. return iter
  1210. }
  1211. func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
  1212. // Return the count of NAT types across all matchable compartment queues.
  1213. //
  1214. // A potential future enhancement would be to provide per-queue NAT counts
  1215. // or NAT type indexing in order to quickly find preferred NAT matches.
  1216. unlimitedNATCount := 0
  1217. partiallyLimitedNATCount := 0
  1218. strictlyLimitedNATCount := 0
  1219. for _, compartmentQueue := range iter.compartmentQueues {
  1220. unlimitedNATCount += compartmentQueue.unlimitedNATCount
  1221. partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
  1222. strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
  1223. }
  1224. return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
  1225. }
  1226. // announcementMatchIterator returns the next announcement entry candidate in
  1227. // compartment queue FIFO order, selecting the queue with the oldest head
  1228. // item.
  1229. //
  1230. // The caller should invoke announcementEntry.queueReference.dequeue when the
  1231. // candidate is selected. dequeue may be called on any getNext return value
  1232. // without disrupting the iteration state; however,
  1233. // announcementEntry.queueReference.dequeue calls for arbitrary queue entries
  1234. // are not supported during iteration. Iteration and dequeue should all be
  1235. // performed with a lock over the entire announcementMultiQueue, and with
  1236. // only one concurrent announcementMatchIterator.
  1237. //
  1238. // getNext returns a nil *announcementEntry when there are no more items.
  1239. // getNext also returns an isPriority flag, indicating the announcement is a
  1240. // priority candidate. All priority candidates are guaranteed to be returned
  1241. // before any non-priority candidates.
  1242. func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
  1243. // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
  1244. // order. Also assumes that any priority queues are all at the front of
  1245. // iter.compartmentQueues.
  1246. // Select the oldest item, by deadline, from all the candidate queue head
  1247. // items. This operation is linear in the number of matching compartment
  1248. // ID queues, which is currently bounded by the length of matching
  1249. // compartment IDs (no more than maxCompartmentIDs, as enforced in
  1250. // ClientOfferRequest.ValidateAndGetLogFields).
  1251. //
  1252. // When there are priority candidates, they are selected first, regardless
  1253. // of the deadlines of non-priority candidates. Multiple priority
  1254. // candidates are processed in FIFO deadline order.
  1255. //
  1256. // A potential future enhancement is to add more iterator state to track
  1257. // which queue has the next oldest time to select on the following
  1258. // getNext call. Another potential enhancement is to remove fully
  1259. // consumed queues from compartmentQueues/compartmentIDs/nextEntries.
  1260. var selectedCandidate *announcementEntry
  1261. selectedIndex := -1
  1262. selectedPriority := false
  1263. for i := 0; i < len(iter.compartmentQueues); i++ {
  1264. if iter.nextEntries[i] == nil {
  1265. continue
  1266. }
  1267. isPriority := iter.compartmentQueues[i].isPriority
  1268. if selectedPriority && !isPriority {
  1269. // Ignore older of non-priority entries when there are priority
  1270. // candidates.
  1271. break
  1272. }
  1273. if selectedCandidate == nil {
  1274. selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
  1275. selectedIndex = i
  1276. selectedPriority = isPriority
  1277. } else {
  1278. candidate := iter.nextEntries[i].Value.(*announcementEntry)
  1279. deadline, deadlineOk := candidate.ctx.Deadline()
  1280. selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
  1281. if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
  1282. selectedCandidate = candidate
  1283. selectedIndex = i
  1284. selectedPriority = isPriority
  1285. }
  1286. }
  1287. }
  1288. // Advance the selected queue to the next element. This must be done
  1289. // before any dequeue call, since container/list.remove clears
  1290. // list.Element.next.
  1291. if selectedIndex != -1 {
  1292. iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
  1293. }
  1294. return selectedCandidate, selectedPriority
  1295. }