matcher.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. std_errors "errors"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. lrucache "github.com/cognusion/go-cache-lru"
  31. "golang.org/x/time/rate"
  32. )
  33. // TTLs should be aligned with STUN hole punch lifetimes.
  34. const (
  35. matcherAnnouncementQueueMaxSize = 5000000
  36. matcherOfferQueueMaxSize = 5000000
  37. matcherPendingAnswersTTL = 30 * time.Second
  38. matcherPendingAnswersMaxSize = 100000
  39. matcherMaxPreferredNATProbe = 100
  40. matcherMaxProbe = 1000
  41. matcherRateLimiterReapHistoryFrequencySeconds = 300
  42. matcherRateLimiterMaxCacheEntries = 1000000
  43. )
  44. // Matcher matches proxy announcements with client offers. Matcher also
  45. // coordinates pending proxy answers and routes answers to the awaiting
  46. // client offer handler.
  47. //
  48. // Matching prioritizes selecting the oldest announcements and client offers,
  49. // as they are closest to timing out.
  50. //
  51. // The client and proxy must supply matching personal or common compartment
  52. // IDs. Common compartments are managed by Psiphon and can be obtained via a
  53. // tactics parameter or via an OSL embedding. Each proxy announcement or
  54. // client offer may specify only one compartment ID type, either common or
  55. // personal.
  56. //
  57. // Matching prefers to pair proxies and clients in a way that maximizes total
  58. // possible matches. For a client or proxy with less-limited NAT traversal, a
  59. // pairing with more-limited NAT traversal is preferred; and vice versa.
  60. // Candidates with unknown NAT types and mobile network types are assumed to
  61. // have the most limited NAT traversal capability.
  62. //
  63. // Preferred matchings take priority over announcement age.
  64. //
  65. // The client and proxy will not match if they are in the same country and
  66. // ASN, as it's assumed that doesn't provide any blocking circumvention
  67. // benefit. Disallowing proxies in certain blocked countries is handled at a
  68. // higher level; any such proxies should not be enqueued for matching.
  69. type Matcher struct {
  70. config *MatcherConfig
  71. runMutex sync.Mutex
  72. runContext context.Context
  73. stopRunning context.CancelFunc
  74. waitGroup *sync.WaitGroup
  75. // The announcement queue is implicitly sorted by announcement age. The
  76. // count fields are used to skip searching deeper into the queue for
  77. // preferred matches.
  78. // TODO: replace queue and counts with an indexed, in-memory database?
  79. announcementQueueMutex sync.Mutex
  80. announcementQueue *announcementMultiQueue
  81. announcementQueueEntryCountByIP map[string]int
  82. announcementQueueRateLimiters *lrucache.Cache
  83. announcementLimitEntryCount int
  84. announcementRateLimitQuantity int
  85. announcementRateLimitInterval time.Duration
  86. announcementNonlimitedProxyIDs map[ID]struct{}
  87. // The offer queue is also implicitly sorted by offer age. Both an offer
  88. // and announcement queue are required since either announcements or
  89. // offers can arrive while there are no available pairings.
  90. offerQueueMutex sync.Mutex
  91. offerQueue *list.List
  92. offerQueueEntryCountByIP map[string]int
  93. offerQueueRateLimiters *lrucache.Cache
  94. offerLimitEntryCount int
  95. offerRateLimitQuantity int
  96. offerRateLimitInterval time.Duration
  97. matchSignal chan struct{}
  98. pendingAnswers *lrucache.Cache
  99. }
  100. // MatcherConfig specifies the configuration for a matcher.
  101. type MatcherConfig struct {
  102. // Logger is used to log events.
  103. Logger common.Logger
  104. // Announcement queue limits.
  105. AnnouncementLimitEntryCount int
  106. AnnouncementRateLimitQuantity int
  107. AnnouncementRateLimitInterval time.Duration
  108. AnnouncementNonlimitedProxyIDs []ID
  109. // Offer queue limits.
  110. OfferLimitEntryCount int
  111. OfferRateLimitQuantity int
  112. OfferRateLimitInterval time.Duration
  113. // Proxy quality state.
  114. ProxyQualityState *ProxyQualityState
  115. // Broker process load limit state callback. See BrokerConfig.
  116. IsLoadLimiting func() bool
  117. // Proxy/client allow match callback. See BrokerConfig.
  118. AllowMatch func(common.GeoIPData, common.GeoIPData) bool
  119. }
  120. // MatchProperties specifies the compartment, GeoIP, and network topology
  121. // matching properties of clients and proxies.
  122. type MatchProperties struct {
  123. IsPriority bool
  124. ProtocolVersion int32
  125. CommonCompartmentIDs []ID
  126. PersonalCompartmentIDs []ID
  127. GeoIPData common.GeoIPData
  128. NetworkType NetworkType
  129. NATType NATType
  130. PortMappingTypes PortMappingTypes
  131. }
  132. // EffectiveNATType combines the set of network properties into an effective
  133. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  134. // traversal is assumed. When NAT type is unknown and the network type is
  135. // mobile, CGNAT with limited NAT traversal is assumed.
  136. func (p *MatchProperties) EffectiveNATType() NATType {
  137. if p.PortMappingTypes.Available() {
  138. return NATTypePortMapping
  139. }
  140. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  141. // publicly reachable IPv6 ICE host candidate? If so, change the
  142. // effective NAT type? Depends on whether the matched peer can use IPv6.
  143. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  144. return NATTypeMobileNetwork
  145. }
  146. return p.NATType
  147. }
  148. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  149. // matching given the types of pairing candidates available.
  150. func (p *MatchProperties) ExistsPreferredNATMatch(
  151. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  152. return p.EffectiveNATType().ExistsPreferredMatch(
  153. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  154. }
  155. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  156. // NAT matching.
  157. func (p *MatchProperties) IsPreferredNATMatch(
  158. peerMatchProperties *MatchProperties) bool {
  159. return p.EffectiveNATType().IsPreferredMatch(
  160. peerMatchProperties.EffectiveNATType())
  161. }
  162. // MatchAnnouncement is a proxy announcement to be queued for matching.
  163. type MatchAnnouncement struct {
  164. Properties MatchProperties
  165. ProxyID ID
  166. ProxyMetrics *ProxyMetrics
  167. ConnectionID ID
  168. }
  169. // MatchOffer is a client offer to be queued for matching.
  170. type MatchOffer struct {
  171. Properties MatchProperties
  172. ClientOfferSDP WebRTCSessionDescription
  173. ClientRootObfuscationSecret ObfuscationSecret
  174. DoDTLSRandomization bool
  175. UseMediaStreams bool
  176. TrafficShapingParameters *TrafficShapingParameters
  177. NetworkProtocol NetworkProtocol
  178. DestinationAddress string
  179. DestinationServerID string
  180. }
  181. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  182. // announcement, to be routed to the awaiting client offer.
  183. type MatchAnswer struct {
  184. ProxyIP string
  185. ProxyID ID
  186. ConnectionID ID
  187. ProxyAnswerSDP WebRTCSessionDescription
  188. }
  189. // MatchMetrics records statistics about the match queue state at the time a
  190. // match is made.
  191. type MatchMetrics struct {
  192. OfferMatchIndex int
  193. OfferQueueSize int
  194. AnnouncementMatchIndex int
  195. AnnouncementQueueSize int
  196. }
  197. // GetMetrics converts MatchMetrics to loggable fields.
  198. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  199. if metrics == nil {
  200. return nil
  201. }
  202. return common.LogFields{
  203. "offer_match_index": metrics.OfferMatchIndex,
  204. "offer_queue_size": metrics.OfferQueueSize,
  205. "announcement_match_index": metrics.AnnouncementMatchIndex,
  206. "announcement_queue_size": metrics.AnnouncementQueueSize,
  207. }
  208. }
  209. // announcementEntry is an announcement queue entry, an announcement with its
  210. // associated lifetime context and signaling channel.
  211. type announcementEntry struct {
  212. ctx context.Context
  213. limitIP string
  214. announcement *MatchAnnouncement
  215. offerChan chan *MatchOffer
  216. matchMetrics atomic.Value
  217. // queueReference is initialized by addAnnouncementEntry, and used to
  218. // efficiently dequeue the entry.
  219. queueReference announcementQueueReference
  220. }
  221. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  222. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  223. return matchMetrics
  224. }
  225. // offerEntry is an offer queue entry, an offer with its associated lifetime
  226. // context and signaling channel.
  227. type offerEntry struct {
  228. ctx context.Context
  229. limitIP string
  230. offer *MatchOffer
  231. answerChan chan *answerInfo
  232. matchMetrics atomic.Value
  233. // queueReference is initialized by addOfferEntry, and used to efficiently
  234. // dequeue the entry.
  235. queueReference *list.Element
  236. }
  237. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  238. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  239. return matchMetrics
  240. }
  241. // answerInfo is an answer and its associated announcement.
  242. type answerInfo struct {
  243. announcement *MatchAnnouncement
  244. answer *MatchAnswer
  245. }
  246. // pendingAnswer represents an answer that is expected to arrive from a
  247. // proxy.
  248. type pendingAnswer struct {
  249. announcement *MatchAnnouncement
  250. answerChan chan *answerInfo
  251. }
  252. // NewMatcher creates a new Matcher.
  253. func NewMatcher(config *MatcherConfig) *Matcher {
  254. m := &Matcher{
  255. config: config,
  256. waitGroup: new(sync.WaitGroup),
  257. announcementQueue: newAnnouncementMultiQueue(),
  258. announcementQueueEntryCountByIP: make(map[string]int),
  259. announcementQueueRateLimiters: lrucache.NewWithLRU(
  260. 0,
  261. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  262. matcherRateLimiterMaxCacheEntries),
  263. offerQueue: list.New(),
  264. offerQueueEntryCountByIP: make(map[string]int),
  265. offerQueueRateLimiters: lrucache.NewWithLRU(
  266. 0,
  267. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  268. matcherRateLimiterMaxCacheEntries),
  269. matchSignal: make(chan struct{}, 1),
  270. // matcherPendingAnswersTTL is not configurable; it supplies a default
  271. // that is expected to be ignored when each entry's TTL is set to the
  272. // Offer ctx timeout.
  273. pendingAnswers: lrucache.NewWithLRU(
  274. matcherPendingAnswersTTL,
  275. 1*time.Minute,
  276. matcherPendingAnswersMaxSize),
  277. }
  278. m.SetLimits(
  279. config.AnnouncementLimitEntryCount,
  280. config.AnnouncementRateLimitQuantity,
  281. config.AnnouncementRateLimitInterval,
  282. config.AnnouncementNonlimitedProxyIDs,
  283. config.OfferLimitEntryCount,
  284. config.OfferRateLimitQuantity,
  285. config.OfferRateLimitInterval)
  286. return m
  287. }
  288. // SetLimits sets new queue limits, replacing the previous configuration.
  289. // Existing, cached rate limiters retain their existing rate limit state. New
  290. // entries will use the new quantity/interval configuration. In addition,
  291. // currently enqueued items may exceed any new, lower maximum entry count
  292. // until naturally dequeued.
  293. func (m *Matcher) SetLimits(
  294. announcementLimitEntryCount int,
  295. announcementRateLimitQuantity int,
  296. announcementRateLimitInterval time.Duration,
  297. announcementNonlimitedProxyIDs []ID,
  298. offerLimitEntryCount int,
  299. offerRateLimitQuantity int,
  300. offerRateLimitInterval time.Duration) {
  301. nonlimitedProxyIDs := make(map[ID]struct{})
  302. for _, proxyID := range announcementNonlimitedProxyIDs {
  303. nonlimitedProxyIDs[proxyID] = struct{}{}
  304. }
  305. m.announcementQueueMutex.Lock()
  306. m.announcementLimitEntryCount = announcementLimitEntryCount
  307. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  308. m.announcementRateLimitInterval = announcementRateLimitInterval
  309. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  310. m.announcementQueueMutex.Unlock()
  311. m.offerQueueMutex.Lock()
  312. m.offerLimitEntryCount = offerLimitEntryCount
  313. m.offerRateLimitQuantity = offerRateLimitQuantity
  314. m.offerRateLimitInterval = offerRateLimitInterval
  315. m.offerQueueMutex.Unlock()
  316. }
  317. // Start starts running the Matcher. The Matcher runs a goroutine which
  318. // matches announcements and offers.
  319. func (m *Matcher) Start() error {
  320. m.runMutex.Lock()
  321. defer m.runMutex.Unlock()
  322. if m.runContext != nil {
  323. return errors.TraceNew("already running")
  324. }
  325. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  326. m.waitGroup.Add(1)
  327. go func() {
  328. defer m.waitGroup.Done()
  329. m.matchWorker(m.runContext)
  330. }()
  331. return nil
  332. }
  333. // Stop stops running the Matcher and its worker goroutine.
  334. //
  335. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  336. // can get enqueued during and after a Stop call. Stop is intended more for a
  337. // full broker shutdown, where this won't be a concern.
  338. func (m *Matcher) Stop() {
  339. m.runMutex.Lock()
  340. defer m.runMutex.Unlock()
  341. m.stopRunning()
  342. m.waitGroup.Wait()
  343. m.runContext, m.stopRunning = nil, nil
  344. }
  345. // Announce enqueues the proxy announcement and blocks until it is matched
  346. // with a returned offer or ctx is done. The caller must not mutate the
  347. // announcement or its properties after calling Announce.
  348. //
  349. // Announce assumes that the ctx.Deadline for each call is monotonically
  350. // increasing and that the deadline can be used as part of selecting the next
  351. // nearest-to-expire announcement.
  352. //
  353. // The offer is sent to the proxy by the broker, and then the proxy sends its
  354. // answer back to the broker, which calls Answer with that value.
  355. //
  356. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  357. // match is made, even if there is a later error.
  358. func (m *Matcher) Announce(
  359. ctx context.Context,
  360. proxyIP string,
  361. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  362. // An announcement must specify exactly one compartment ID, of one type,
  363. // common or personal. The limit of one is currently a limitation of the
  364. // multi-queue implementation; see comment in
  365. // announcementMultiQueue.enqueue.
  366. compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
  367. if len(compartmentIDs) == 0 {
  368. compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
  369. } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
  370. return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  371. }
  372. if len(compartmentIDs) != 1 {
  373. return nil, nil, errors.TraceNew("unexpected compartment ID count")
  374. }
  375. isAnnouncement := true
  376. err := m.applyLoadLimit(isAnnouncement)
  377. if err != nil {
  378. return nil, nil, errors.Trace(err)
  379. }
  380. announcementEntry := &announcementEntry{
  381. ctx: ctx,
  382. limitIP: getRateLimitIP(proxyIP),
  383. announcement: proxyAnnouncement,
  384. offerChan: make(chan *MatchOffer, 1),
  385. }
  386. err = m.addAnnouncementEntry(announcementEntry)
  387. if err != nil {
  388. return nil, nil, errors.Trace(err)
  389. }
  390. // Await client offer.
  391. var clientOffer *MatchOffer
  392. select {
  393. case <-ctx.Done():
  394. m.removeAnnouncementEntry(true, announcementEntry)
  395. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  396. case clientOffer = <-announcementEntry.offerChan:
  397. }
  398. return clientOffer, announcementEntry.getMatchMetrics(), nil
  399. }
  400. // Offer enqueues the client offer and blocks until it is matched with a
  401. // returned announcement or ctx is done. The caller must not mutate the offer
  402. // or its properties after calling Announce.
  403. //
  404. // The answer is returned to the client by the broker, and the WebRTC
  405. // connection is dialed. The original announcement is also returned, so its
  406. // match properties can be logged.
  407. //
  408. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  409. // match is made, even if there is a later error.
  410. func (m *Matcher) Offer(
  411. ctx context.Context,
  412. clientIP string,
  413. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  414. // An offer must specify at least one compartment ID, and may only specify
  415. // one type, common or personal, of compartment IDs.
  416. compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
  417. if len(compartmentIDs) == 0 {
  418. compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
  419. } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
  420. return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  421. }
  422. if len(compartmentIDs) < 1 {
  423. return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
  424. }
  425. isAnnouncement := false
  426. err := m.applyLoadLimit(isAnnouncement)
  427. if err != nil {
  428. return nil, nil, nil, errors.Trace(err)
  429. }
  430. offerEntry := &offerEntry{
  431. ctx: ctx,
  432. limitIP: getRateLimitIP(clientIP),
  433. offer: clientOffer,
  434. answerChan: make(chan *answerInfo, 1),
  435. }
  436. err = m.addOfferEntry(offerEntry)
  437. if err != nil {
  438. return nil, nil, nil, errors.Trace(err)
  439. }
  440. // Await proxy answer.
  441. var proxyAnswerInfo *answerInfo
  442. select {
  443. case <-ctx.Done():
  444. m.removeOfferEntry(true, offerEntry)
  445. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  446. // the Offer ctx, the client request, timeout, so it will eventually
  447. // get removed. But a client may abort its request earlier than the
  448. // timeout.
  449. return nil, nil,
  450. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  451. case proxyAnswerInfo = <-offerEntry.answerChan:
  452. }
  453. if proxyAnswerInfo == nil {
  454. // nil will be delivered to the channel when either the proxy
  455. // announcement request concurrently timed out, or the answer
  456. // indicated a proxy error, or the answer did not arrive in time.
  457. return nil, nil,
  458. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  459. }
  460. // This is a sanity check and not expected to fail.
  461. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  462. proxyAnswerInfo.announcement.ConnectionID) {
  463. return nil, nil,
  464. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  465. }
  466. return proxyAnswerInfo.answer,
  467. proxyAnswerInfo.announcement,
  468. offerEntry.getMatchMetrics(),
  469. nil
  470. }
  471. // AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
  472. // announcement identified by the specified proxy ID and connection ID and
  473. // returns whether the announcement has personal compartment IDs, indicating
  474. // personal pairing mode.
  475. //
  476. // If no pending answer is found, an error is returned.
  477. func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
  478. proxyID ID, connectionID ID) (bool, error) {
  479. key := m.pendingAnswerKey(proxyID, connectionID)
  480. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  481. if !ok {
  482. // The input IDs don't correspond to a pending answer, or the client
  483. // is no longer awaiting the response.
  484. return false, errors.TraceNew("no pending answer")
  485. }
  486. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  487. hasPersonalCompartmentIDs := len(
  488. pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
  489. return hasPersonalCompartmentIDs, nil
  490. }
  491. // Answer delivers an answer from the proxy for a previously matched offer.
  492. // The ProxyID and ConnectionID must correspond to the original announcement.
  493. // The caller must not mutate the answer after calling Answer. Answer does
  494. // not block.
  495. //
  496. // The answer is returned to the awaiting Offer call and sent to the matched
  497. // client.
  498. func (m *Matcher) Answer(
  499. proxyAnswer *MatchAnswer) error {
  500. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  501. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  502. if !ok {
  503. // The input IDs don't correspond to a pending answer, or the client
  504. // is no longer awaiting the response.
  505. return errors.TraceNew("no pending answer")
  506. }
  507. m.pendingAnswers.Delete(key)
  508. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  509. pendingAnswer.answerChan <- &answerInfo{
  510. announcement: pendingAnswer.announcement,
  511. answer: proxyAnswer,
  512. }
  513. return nil
  514. }
  515. // AnswerError delivers a failed answer indication from the proxy to an
  516. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  517. // original announcement.
  518. //
  519. // The failure indication is returned to the awaiting Offer call and sent to
  520. // the matched client.
  521. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  522. key := m.pendingAnswerKey(proxyID, connectionID)
  523. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  524. if !ok {
  525. // The client is no longer awaiting the response.
  526. return
  527. }
  528. m.pendingAnswers.Delete(key)
  529. // Closing the channel delivers nil, a failed indicator, to any receiver.
  530. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  531. }
  532. // matchWorker is the matching worker goroutine. It idles until signaled that
  533. // a queue item has been added, and then runs a full matching pass.
  534. func (m *Matcher) matchWorker(ctx context.Context) {
  535. for {
  536. select {
  537. case <-m.matchSignal:
  538. m.matchAllOffers()
  539. case <-ctx.Done():
  540. return
  541. }
  542. }
  543. }
  544. // matchAllOffers iterates over the queues, making all possible matches.
  545. func (m *Matcher) matchAllOffers() {
  546. m.announcementQueueMutex.Lock()
  547. defer m.announcementQueueMutex.Unlock()
  548. m.offerQueueMutex.Lock()
  549. defer m.offerQueueMutex.Unlock()
  550. // Take each offer in turn, and select an announcement match. There is an
  551. // implicit preference for older client offers, sooner to timeout, at the
  552. // front of the queue.
  553. // TODO: consider matching one offer, then releasing the locks to allow
  554. // more announcements to be enqueued, then continuing to match.
  555. nextOffer := m.offerQueue.Front()
  556. offerIndex := -1
  557. for nextOffer != nil && m.announcementQueue.getLen() > 0 {
  558. offerIndex += 1
  559. // nextOffer.Next must be invoked before any removeOfferEntry since
  560. // container/list.remove clears list.Element.next.
  561. offer := nextOffer
  562. nextOffer = nextOffer.Next()
  563. offerEntry := offer.Value.(*offerEntry)
  564. // Skip and remove this offer if its deadline has already passed.
  565. // There is no signal to the awaiting Offer function, as it will exit
  566. // based on the same ctx.
  567. if offerEntry.ctx.Err() != nil {
  568. m.removeOfferEntry(false, offerEntry)
  569. continue
  570. }
  571. announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
  572. if announcementEntry == nil {
  573. continue
  574. }
  575. // Record match metrics.
  576. // The index metrics predate the announcement multi-queue; now, with
  577. // the multi-queue, announcement_index is how many announce entries
  578. // were inspected before matching.
  579. matchMetrics := &MatchMetrics{
  580. OfferMatchIndex: offerIndex,
  581. OfferQueueSize: m.offerQueue.Len(),
  582. AnnouncementMatchIndex: announcementMatchIndex,
  583. AnnouncementQueueSize: m.announcementQueue.getLen(),
  584. }
  585. offerEntry.matchMetrics.Store(matchMetrics)
  586. announcementEntry.matchMetrics.Store(matchMetrics)
  587. // Remove the matched announcement from the queue. Send the offer to
  588. // the announcement entry's offerChan, which will deliver it to the
  589. // blocked Announce call. Add a pending answers entry to await the
  590. // proxy's follow up Answer call. The TTL for the pending answer
  591. // entry is set to the matched Offer call's ctx, as the answer is
  592. // only useful as long as the client is still waiting.
  593. m.removeAnnouncementEntry(false, announcementEntry)
  594. expiry := lrucache.DefaultExpiration
  595. deadline, ok := offerEntry.ctx.Deadline()
  596. if ok {
  597. expiry = time.Until(deadline)
  598. }
  599. key := m.pendingAnswerKey(
  600. announcementEntry.announcement.ProxyID,
  601. announcementEntry.announcement.ConnectionID)
  602. m.pendingAnswers.Set(
  603. key,
  604. &pendingAnswer{
  605. announcement: announcementEntry.announcement,
  606. answerChan: offerEntry.answerChan,
  607. },
  608. expiry)
  609. announcementEntry.offerChan <- offerEntry.offer
  610. // Remove the matched offer from the queue and match the next offer,
  611. // now first in the queue.
  612. m.removeOfferEntry(false, offerEntry)
  613. }
  614. }
  615. func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
  616. // Assumes the caller has the queue mutexes locked.
  617. // Check each candidate announcement in turn, and select a match. There is
  618. // an implicit preference for older proxy announcements, sooner to
  619. // timeout, at the front of the enqueued announcements.
  620. // announcementMultiQueue.startMatching skips to the first matching
  621. // compartment ID(s).
  622. //
  623. // Limitation: since this logic matches each enqueued client in turn, it will
  624. // only make the optimal NAT match for the oldest enqueued client vs. all
  625. // proxies, and not do optimal N x M matching for all clients and all proxies.
  626. //
  627. // Future matching enhancements could include more sophisticated GeoIP
  628. // rules, such as a configuration encoding knowledge of an ASN's NAT
  629. // type, or preferred client/proxy country/ASN matches.
  630. offerProperties := &offerEntry.offer.Properties
  631. // Assumes the caller checks that offer specifies either personal
  632. // compartment IDs or common compartment IDs, but not both.
  633. isCommonCompartments := false
  634. compartmentIDs := offerProperties.PersonalCompartmentIDs
  635. if len(compartmentIDs) == 0 {
  636. isCommonCompartments = true
  637. compartmentIDs = offerProperties.CommonCompartmentIDs
  638. }
  639. if len(compartmentIDs) == 0 {
  640. return nil, -1
  641. }
  642. matchIterator := m.announcementQueue.startMatching(
  643. isCommonCompartments, compartmentIDs)
  644. // Use the NAT traversal type counters to check if there's any preferred
  645. // NAT match for this offer in the announcement queue. When there is, we
  646. // will search beyond the first announcement.
  647. unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
  648. matchIterator.getNATCounts()
  649. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  650. unlimitedNATCount > 0,
  651. partiallyLimitedNATCount > 0,
  652. strictlyLimitedNATCount > 0)
  653. // TODO: add an ExistsCompatibleProtocolVersionMatch check?
  654. //
  655. // Currently, searching for protocol version support that doesn't exist
  656. // may be mitigated by limiting, through tactics, client protocol options
  657. // selection; using the proxy protocol version in PrioritizeProxy; and,
  658. // ultimately, increasing MinimumProxyProtocolVersion.
  659. var bestMatch *announcementEntry
  660. bestMatchIndex := -1
  661. bestMatchIsPriority := false
  662. bestMatchNAT := false
  663. // matcherMaxProbe limits the linear search through the announcement queue
  664. // to find a match. Currently, the queue implementation provides
  665. // constant-time lookup for matching compartment IDs. Other matching
  666. // aspects may require iterating over the queue items, including the
  667. // strict same-country and ASN constraint and protocol version
  668. // compatibility constraint. Best NAT match is not a strict constraint
  669. // and uses a shorter search limit, matcherMaxPreferredNATProbe.
  670. candidateIndex := -1
  671. for candidateIndex <= matcherMaxProbe {
  672. announcementEntry, isPriority := matchIterator.getNext()
  673. if announcementEntry == nil {
  674. break
  675. }
  676. if !isPriority && bestMatchIsPriority {
  677. // There is a priority match, but it wasn't bestMatchNAT and we
  678. // continued to iterate. Now that isPriority is false, we're past the
  679. // end of the priority items, so stop looking for any best NAT match
  680. // and return the previous priority match. When there are zero
  681. // priority items to begin with, this case should not be hit.
  682. break
  683. }
  684. candidateIndex += 1
  685. // Skip and remove this announcement if its deadline has already
  686. // passed. There is no signal to the awaiting Announce function, as
  687. // it will exit based on the same ctx.
  688. if announcementEntry.ctx.Err() != nil {
  689. m.removeAnnouncementEntry(false, announcementEntry)
  690. continue
  691. }
  692. announcementProperties := &announcementEntry.announcement.Properties
  693. // Don't match unless the proxy announcement, client offer, and the
  694. // client's selected protocol options are compatible. UseMediaStreams
  695. // requires at least ProtocolVersion2.
  696. _, ok := negotiateProtocolVersion(
  697. announcementProperties.ProtocolVersion,
  698. offerProperties.ProtocolVersion,
  699. offerEntry.offer.UseMediaStreams)
  700. if !ok {
  701. continue
  702. }
  703. // Disallow matching the same country and ASN, or GeoIP combinations
  704. // prohibited by the AllowMatch callback, except for personal
  705. // compartment ID matches.
  706. //
  707. // For common matching, hopping through the same ISP is assumed to
  708. // have no circumvention benefit. For personal matching, the user may
  709. // wish to hop their their own or their friend's proxy regardless.
  710. if isCommonCompartments {
  711. if !GetAllowCommonASNMatching() &&
  712. (offerProperties.GeoIPData.Country ==
  713. announcementProperties.GeoIPData.Country &&
  714. offerProperties.GeoIPData.ASN ==
  715. announcementProperties.GeoIPData.ASN) {
  716. continue
  717. }
  718. if !m.config.AllowMatch(
  719. announcementProperties.GeoIPData,
  720. offerProperties.GeoIPData) {
  721. continue
  722. }
  723. }
  724. // Check if this is a preferred NAT match. Ultimately, a match may be
  725. // made with potentially incompatible NATs, but the client/proxy
  726. // reported NAT types may be incorrect or unknown; the client will
  727. // often skip NAT discovery.
  728. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  729. // Use proxy ASN quality as an alternative to preferred NAT matches.
  730. //
  731. // The NAT matching logic depends on RFC5780 NAT discovery test
  732. // results, which may not be entirely accurate, and may not be
  733. // available in the first place, especially if skipped for clients,
  734. // which is the default.
  735. //
  736. // Proxy ASN quality leverages the quality data, provided by servers,
  737. // indicating that the particular proxy recently relayed a successful
  738. // tunnel for some client in the given ASN. When this quality data is
  739. // present, NAT compatibility is assumed, with the caveat that the
  740. // client device and immediate router may not be the same.
  741. //
  742. // Limitations:
  743. // - existsPreferredNATMatch doesn't reflect existence of matching
  744. // proxy ASN quality, so the NAT match probe can end prematurely.
  745. // - IsPreferredNATMatch currently takes precedence over proxy ASN
  746. // quality.
  747. if !matchNAT && isPriority {
  748. matchNAT = m.config.ProxyQualityState.HasQuality(
  749. announcementEntry.announcement.ProxyID,
  750. announcementEntry.announcement.Properties.GeoIPData.ASN,
  751. offerProperties.GeoIPData.ASN)
  752. }
  753. // At this point, the candidate is a match. Determine if this is a new
  754. // best match, either if there was no previous match, or this is a
  755. // better NAT match.
  756. if bestMatch == nil || (!bestMatchNAT && matchNAT) {
  757. bestMatch = announcementEntry
  758. bestMatchIndex = candidateIndex
  759. bestMatchIsPriority = isPriority
  760. bestMatchNAT = matchNAT
  761. }
  762. // Stop as soon as we have the best possible match, or have reached
  763. // the probe limit for preferred NAT matches.
  764. if bestMatch != nil && (bestMatchNAT ||
  765. !existsPreferredNATMatch ||
  766. candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
  767. break
  768. }
  769. }
  770. return bestMatch, bestMatchIndex
  771. }
  772. // applyLoadLimit checks if the broker process is in the load limiting state
  773. // and, in order to reduce load, determines if new proxy announces or client
  774. // offers should be rejected immediately instead of enqueued.
  775. func (m *Matcher) applyLoadLimit(isAnnouncement bool) error {
  776. if m.config.IsLoadLimiting == nil || !m.config.IsLoadLimiting() {
  777. return nil
  778. }
  779. // Acquire the queue locks only when in the load limit state, and in the
  780. // same order as matchAllOffers.
  781. m.announcementQueueMutex.Lock()
  782. defer m.announcementQueueMutex.Unlock()
  783. m.offerQueueMutex.Lock()
  784. defer m.offerQueueMutex.Unlock()
  785. announcementLen := m.announcementQueue.getLen()
  786. offerLen := m.offerQueue.Len()
  787. // When the load limit had been reached, and assuming the broker process
  788. // is running only an in-proxy broker, it's likely, in practise, that
  789. // only one of the two queues has hundreds of thousands of entries while
  790. // the other has few, and there are no matches clearing the queue.
  791. //
  792. // Instead of simply rejecting all enqueue requests, allow the request
  793. // type, announce or offer, that is in shorter supply as these are likely
  794. // to match and draw down the larger queue. This attempts to make
  795. // productive use of enqueued items, and also attempts to avoid simply
  796. // emptying both queues -- as will happen in any case due to timeouts --
  797. // and then have the same larger queue refill again after the load limit
  798. // state exits.
  799. //
  800. // This approach assumes some degree of slack in available system memory
  801. // and CPU in the load limiting state, similar to how the tunnel server
  802. // continues to operate existing tunnels in the same state.
  803. //
  804. // The heuristic below of allowing when less than half the size of the
  805. // larger queue puts a cap on the amount the shorter queue can continue
  806. // to grow in the load limiting state, in the worst case.
  807. //
  808. // Limitation: in some scenarios that are expected to be rare, it can
  809. // happen that allowed requests don't result in a match and memory
  810. // consumption continues to grow, leading to a broker process OOM kill.
  811. var allow bool
  812. if isAnnouncement {
  813. allow = announcementLen < offerLen/2
  814. } else {
  815. allow = offerLen < announcementLen/2
  816. }
  817. if allow {
  818. return nil
  819. }
  820. // Do not return a MatcherLimitError, as is done in applyIPLimits. A
  821. // MatcherLimitError results in a Response.Limited error response, which
  822. // causes a proxy to back off and a client to abort its dial; but in
  823. // neither case is the broker client reset. The error returned here will
  824. // result in a fast 404 response to the proxy or client, which will
  825. // instead trigger a broker client reset, and a chance of moving to a
  826. // different broker that is not overloaded.
  827. //
  828. // Limitation: the 404 response won't be distinguishable, in client or
  829. // proxy diagnostics, from other error conditions.
  830. //
  831. // TODO: add a new Response.LoadLimited flag which the proxy/client can
  832. // use use log a distinct error and also ensure that it doesn't reselect
  833. // the same broker again in the broker client reset random selection.
  834. return errors.TraceNew("load limited")
  835. }
  836. // MatcherLimitError is the error type returned by Announce or Offer when the
  837. // caller has exceeded configured queue entry or rate limits.
  838. type MatcherLimitError struct {
  839. err error
  840. }
  841. func NewMatcherLimitError(err error) *MatcherLimitError {
  842. return &MatcherLimitError{err: err}
  843. }
  844. func (e MatcherLimitError) Error() string {
  845. return e.err.Error()
  846. }
  847. // applyIPLimits checks per-proxy or per-client -- as determined by peer IP
  848. // address -- rate limits and queue entry limits.
  849. func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  850. // Assumes m.announcementQueueMutex or m.offerQueueMutex is locked.
  851. var entryCountByIP map[string]int
  852. var queueRateLimiters *lrucache.Cache
  853. var limitEntryCount int
  854. var quantity int
  855. var interval time.Duration
  856. if isAnnouncement {
  857. // Skip limit checks for non-limited proxies.
  858. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  859. return nil
  860. }
  861. entryCountByIP = m.announcementQueueEntryCountByIP
  862. queueRateLimiters = m.announcementQueueRateLimiters
  863. limitEntryCount = m.announcementLimitEntryCount
  864. quantity = m.announcementRateLimitQuantity
  865. interval = m.announcementRateLimitInterval
  866. } else {
  867. entryCountByIP = m.offerQueueEntryCountByIP
  868. queueRateLimiters = m.offerQueueRateLimiters
  869. limitEntryCount = m.offerLimitEntryCount
  870. quantity = m.offerRateLimitQuantity
  871. interval = m.offerRateLimitInterval
  872. }
  873. // The rate limit is checked first, before the max count check, to ensure
  874. // that the rate limit state is updated regardless of the max count check
  875. // outcome.
  876. if quantity > 0 && interval > 0 {
  877. var rateLimiter *rate.Limiter
  878. entry, ok := queueRateLimiters.Get(limitIP)
  879. if ok {
  880. rateLimiter = entry.(*rate.Limiter)
  881. } else {
  882. limit := float64(quantity) / interval.Seconds()
  883. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  884. queueRateLimiters.Set(
  885. limitIP, rateLimiter, interval)
  886. }
  887. if !rateLimiter.Allow() {
  888. return errors.Trace(
  889. NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
  890. }
  891. }
  892. if limitEntryCount > 0 {
  893. // Limitation: non-limited proxy ID entries are counted in
  894. // entryCountByIP. If both a limited and non-limited proxy ingress
  895. // from the same limitIP, then the non-limited entries will count
  896. // against the limited proxy's limitEntryCount.
  897. entryCount, ok := entryCountByIP[limitIP]
  898. if ok && entryCount >= limitEntryCount {
  899. return errors.Trace(
  900. NewMatcherLimitError(std_errors.New("max entries for IP")))
  901. }
  902. }
  903. return nil
  904. }
  905. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  906. m.announcementQueueMutex.Lock()
  907. defer m.announcementQueueMutex.Unlock()
  908. // Ensure the queue doesn't grow larger than the max size.
  909. if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
  910. return errors.TraceNew("queue full")
  911. }
  912. // Ensure no single peer IP can enqueue a large number of entries or
  913. // rapidly enqueue beyond the configured rate.
  914. isAnnouncement := true
  915. err := m.applyIPLimits(
  916. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  917. if err != nil {
  918. return errors.Trace(err)
  919. }
  920. // announcementEntry.queueReference should be uninitialized.
  921. // announcementMultiQueue.enqueue sets queueReference to be used for
  922. // efficient dequeuing.
  923. if announcementEntry.queueReference.entry != nil {
  924. return errors.TraceNew("unexpected queue reference")
  925. }
  926. err = m.announcementQueue.enqueue(announcementEntry)
  927. if err != nil {
  928. return errors.Trace(err)
  929. }
  930. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  931. select {
  932. case m.matchSignal <- struct{}{}:
  933. default:
  934. }
  935. return nil
  936. }
  937. func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
  938. // In the aborting case, the queue isn't already locked. Otherwise, assume
  939. // it is locked.
  940. if aborting {
  941. m.announcementQueueMutex.Lock()
  942. defer m.announcementQueueMutex.Unlock()
  943. }
  944. found := announcementEntry.queueReference.dequeue()
  945. if found {
  946. // Adjust entry counts by peer IP, used to enforce
  947. // matcherAnnouncementQueueMaxEntriesPerIP.
  948. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  949. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  950. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  951. }
  952. }
  953. if aborting && !found {
  954. // The Announce call is aborting and taking its entry back out of the
  955. // queue. If the entry is not found in the queue, then a concurrent
  956. // Offer has matched the announcement. So check for the pending
  957. // answer corresponding to the announcement and remove it and deliver
  958. // a failure signal to the waiting Offer, so the client doesn't wait
  959. // longer than necessary.
  960. key := m.pendingAnswerKey(
  961. announcementEntry.announcement.ProxyID,
  962. announcementEntry.announcement.ConnectionID)
  963. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  964. if ok {
  965. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  966. m.pendingAnswers.Delete(key)
  967. }
  968. }
  969. }
  970. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  971. m.offerQueueMutex.Lock()
  972. defer m.offerQueueMutex.Unlock()
  973. // Ensure the queue doesn't grow larger than the max size.
  974. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  975. return errors.TraceNew("queue full")
  976. }
  977. // Ensure no single peer IP can enqueue a large number of entries or
  978. // rapidly enqueue beyond the configured rate.
  979. isAnnouncement := false
  980. err := m.applyIPLimits(
  981. isAnnouncement, offerEntry.limitIP, ID{})
  982. if err != nil {
  983. return errors.Trace(err)
  984. }
  985. // offerEntry.queueReference should be uninitialized and is set here to be
  986. // used for efficient dequeuing.
  987. if offerEntry.queueReference != nil {
  988. return errors.TraceNew("unexpected queue reference")
  989. }
  990. offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
  991. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  992. select {
  993. case m.matchSignal <- struct{}{}:
  994. default:
  995. }
  996. return nil
  997. }
  998. func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
  999. // In the aborting case, the queue isn't already locked. Otherise, assume
  1000. // it is locked.
  1001. if aborting {
  1002. m.offerQueueMutex.Lock()
  1003. defer m.offerQueueMutex.Unlock()
  1004. }
  1005. if offerEntry.queueReference == nil {
  1006. return
  1007. }
  1008. m.offerQueue.Remove(offerEntry.queueReference)
  1009. offerEntry.queueReference = nil
  1010. // Adjust entry counts by peer IP, used to enforce
  1011. // matcherOfferQueueMaxEntriesPerIP.
  1012. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  1013. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  1014. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  1015. }
  1016. }
  1017. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  1018. // The pending answer lookup key is used to associate announcements and
  1019. // subsequent answers. While the client learns the ConnectionID, only the
  1020. // proxy knows the ProxyID component, so only the correct proxy can match
  1021. // an answer to an announcement. The ConnectionID component is necessary
  1022. // as a proxy may have multiple, concurrent pending answers.
  1023. return string(proxyID[:]) + string(connectionID[:])
  1024. }
  1025. func getRateLimitIP(strIP string) string {
  1026. IP := net.ParseIP(strIP)
  1027. if IP == nil || IP.To4() != nil {
  1028. return strIP
  1029. }
  1030. // With IPv6, individual users or sites are users commonly allocated a /64
  1031. // or /56, so rate limit by /56.
  1032. return IP.Mask(net.CIDRMask(56, 128)).String()
  1033. }
  1034. // announcementMultiQueue is a set of announcement queues, one per common or
  1035. // personal compartment ID, providing efficient iteration over announcements
  1036. // matching a specified list of compartment IDs. announcementMultiQueue and
  1037. // its underlying data structures are not safe for concurrent access.
  1038. type announcementMultiQueue struct {
  1039. priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
  1040. commonCompartmentQueues map[ID]*announcementCompartmentQueue
  1041. personalCompartmentQueues map[ID]*announcementCompartmentQueue
  1042. totalEntries int
  1043. }
  1044. // announcementCompartmentQueue is a single compartment queue within an
  1045. // announcementMultiQueue. The queue is implemented using a doubly-linked
  1046. // list, which provides efficient insert and mid-queue dequeue operations.
  1047. // The announcementCompartmentQueue also records NAT type stats for enqueued
  1048. // announcements, which are used, when matching, to determine when better NAT
  1049. // matches may be possible.
  1050. type announcementCompartmentQueue struct {
  1051. isCommonCompartment bool
  1052. isPriority bool
  1053. compartmentID ID
  1054. entries *list.List
  1055. unlimitedNATCount int
  1056. partiallyLimitedNATCount int
  1057. strictlyLimitedNATCount int
  1058. }
  1059. // announcementMatchIterator represents the state of an iteration over a
  1060. // subset of announcementMultiQueue compartment queues. Concurrent
  1061. // announcementMatchIterators are not supported.
  1062. type announcementMatchIterator struct {
  1063. multiQueue *announcementMultiQueue
  1064. compartmentQueues []*announcementCompartmentQueue
  1065. compartmentIDs []ID
  1066. nextEntries []*list.Element
  1067. }
  1068. // announcementQueueReference represents the queue position for a given
  1069. // announcement entry, and provides an efficient dequeue operation.
  1070. type announcementQueueReference struct {
  1071. multiQueue *announcementMultiQueue
  1072. compartmentQueue *announcementCompartmentQueue
  1073. entry *list.Element
  1074. }
  1075. func newAnnouncementMultiQueue() *announcementMultiQueue {
  1076. return &announcementMultiQueue{
  1077. priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1078. commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1079. personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1080. }
  1081. }
  1082. func (q *announcementMultiQueue) getLen() int {
  1083. return q.totalEntries
  1084. }
  1085. func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
  1086. // Assumes announcementEntry not already enueued.
  1087. // Limitation: only one compartment ID, either common or personal, is
  1088. // supported per announcement entry. In the common compartment case, the
  1089. // broker currently assigns only one common compartment ID per proxy
  1090. // announcement. In the personal compartment case, there is currently no
  1091. // use case for allowing a proxy to announce under multiple personal
  1092. // compartment IDs.
  1093. //
  1094. // To overcome this limitation, the dequeue operation would need to be
  1095. // able to remove an announcement entry from multiple
  1096. // announcementCompartmentQueues.
  1097. commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
  1098. personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
  1099. if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
  1100. return errors.TraceNew("announcement must specify exactly one compartment ID")
  1101. }
  1102. isPriority := announcementEntry.announcement.Properties.IsPriority
  1103. isCommonCompartment := true
  1104. var compartmentID ID
  1105. var compartmentQueues map[ID]*announcementCompartmentQueue
  1106. if len(commonCompartmentIDs) > 0 {
  1107. compartmentID = commonCompartmentIDs[0]
  1108. compartmentQueues = q.commonCompartmentQueues
  1109. if isPriority {
  1110. compartmentQueues = q.priorityCommonCompartmentQueues
  1111. }
  1112. } else {
  1113. isCommonCompartment = false
  1114. compartmentID = personalCompartmentIDs[0]
  1115. compartmentQueues = q.personalCompartmentQueues
  1116. if isPriority {
  1117. return errors.TraceNew("priority not supported for personal compartments")
  1118. }
  1119. }
  1120. compartmentQueue, ok := compartmentQueues[compartmentID]
  1121. if !ok {
  1122. compartmentQueue = &announcementCompartmentQueue{
  1123. isCommonCompartment: isCommonCompartment,
  1124. isPriority: isPriority,
  1125. compartmentID: compartmentID,
  1126. entries: list.New(),
  1127. }
  1128. compartmentQueues[compartmentID] = compartmentQueue
  1129. }
  1130. entry := compartmentQueue.entries.PushBack(announcementEntry)
  1131. // Update the NAT type counts which are used to determine if a better NAT
  1132. // match may be made by inspecting more announcement queue entries.
  1133. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1134. case NATTraversalUnlimited:
  1135. compartmentQueue.unlimitedNATCount += 1
  1136. case NATTraversalPartiallyLimited:
  1137. compartmentQueue.partiallyLimitedNATCount += 1
  1138. case NATTraversalStrictlyLimited:
  1139. compartmentQueue.strictlyLimitedNATCount += 1
  1140. }
  1141. q.totalEntries += 1
  1142. announcementEntry.queueReference = announcementQueueReference{
  1143. multiQueue: q,
  1144. compartmentQueue: compartmentQueue,
  1145. entry: entry,
  1146. }
  1147. return nil
  1148. }
  1149. // announcementQueueReference returns false if the item is already dequeued.
  1150. func (r *announcementQueueReference) dequeue() bool {
  1151. if r.entry == nil {
  1152. // Already dequeued.
  1153. return false
  1154. }
  1155. announcementEntry := r.entry.Value.(*announcementEntry)
  1156. // Reverse the NAT type counts.
  1157. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1158. case NATTraversalUnlimited:
  1159. r.compartmentQueue.unlimitedNATCount -= 1
  1160. case NATTraversalPartiallyLimited:
  1161. r.compartmentQueue.partiallyLimitedNATCount -= 1
  1162. case NATTraversalStrictlyLimited:
  1163. r.compartmentQueue.strictlyLimitedNATCount -= 1
  1164. }
  1165. r.compartmentQueue.entries.Remove(r.entry)
  1166. if r.compartmentQueue.entries.Len() == 0 {
  1167. // Remove empty compartment queue.
  1168. queues := r.multiQueue.personalCompartmentQueues
  1169. if r.compartmentQueue.isCommonCompartment {
  1170. if r.compartmentQueue.isPriority {
  1171. queues = r.multiQueue.priorityCommonCompartmentQueues
  1172. } else {
  1173. queues = r.multiQueue.commonCompartmentQueues
  1174. }
  1175. }
  1176. delete(queues, r.compartmentQueue.compartmentID)
  1177. }
  1178. r.multiQueue.totalEntries -= 1
  1179. // Mark as dequeued.
  1180. r.entry = nil
  1181. return true
  1182. }
  1183. func (q *announcementMultiQueue) startMatching(
  1184. isCommonCompartments bool,
  1185. compartmentIDs []ID) *announcementMatchIterator {
  1186. iter := &announcementMatchIterator{
  1187. multiQueue: q,
  1188. }
  1189. // Find the matching compartment queues and initialize iteration over
  1190. // those queues. Building the set of matching queues is a linear time
  1191. // operation, bounded by the length of compartmentIDs (no more than
  1192. // maxCompartmentIDs, as enforced in
  1193. // ClientOfferRequest.ValidateAndGetLogFields).
  1194. // Priority queues, when in use, must all be added to the beginning of
  1195. // iter.compartmentQueues in order to ensure that the iteration logic in
  1196. // getNext visits all priority items first.
  1197. var compartmentQueuesList []map[ID]*announcementCompartmentQueue
  1198. if isCommonCompartments {
  1199. compartmentQueuesList = append(
  1200. compartmentQueuesList,
  1201. q.priorityCommonCompartmentQueues,
  1202. q.commonCompartmentQueues)
  1203. } else {
  1204. compartmentQueuesList = append(
  1205. compartmentQueuesList,
  1206. q.personalCompartmentQueues)
  1207. }
  1208. for _, compartmentQueues := range compartmentQueuesList {
  1209. for _, ID := range compartmentIDs {
  1210. if compartmentQueue, ok := compartmentQueues[ID]; ok {
  1211. iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
  1212. iter.compartmentIDs = append(iter.compartmentIDs, ID)
  1213. iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
  1214. }
  1215. }
  1216. }
  1217. return iter
  1218. }
  1219. func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
  1220. // Return the count of NAT types across all matchable compartment queues.
  1221. //
  1222. // A potential future enhancement would be to provide per-queue NAT counts
  1223. // or NAT type indexing in order to quickly find preferred NAT matches.
  1224. unlimitedNATCount := 0
  1225. partiallyLimitedNATCount := 0
  1226. strictlyLimitedNATCount := 0
  1227. for _, compartmentQueue := range iter.compartmentQueues {
  1228. unlimitedNATCount += compartmentQueue.unlimitedNATCount
  1229. partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
  1230. strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
  1231. }
  1232. return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
  1233. }
  1234. // announcementMatchIterator returns the next announcement entry candidate in
  1235. // compartment queue FIFO order, selecting the queue with the oldest head
  1236. // item.
  1237. //
  1238. // The caller should invoke announcementEntry.queueReference.dequeue when the
  1239. // candidate is selected. dequeue may be called on any getNext return value
  1240. // without disrupting the iteration state; however,
  1241. // announcementEntry.queueReference.dequeue calls for arbitrary queue entries
  1242. // are not supported during iteration. Iteration and dequeue should all be
  1243. // performed with a lock over the entire announcementMultiQueue, and with
  1244. // only one concurrent announcementMatchIterator.
  1245. //
  1246. // getNext returns a nil *announcementEntry when there are no more items.
  1247. // getNext also returns an isPriority flag, indicating the announcement is a
  1248. // priority candidate. All priority candidates are guaranteed to be returned
  1249. // before any non-priority candidates.
  1250. func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
  1251. // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
  1252. // order. Also assumes that any priority queues are all at the front of
  1253. // iter.compartmentQueues.
  1254. // Select the oldest item, by deadline, from all the candidate queue head
  1255. // items. This operation is linear in the number of matching compartment
  1256. // ID queues, which is currently bounded by the length of matching
  1257. // compartment IDs (no more than maxCompartmentIDs, as enforced in
  1258. // ClientOfferRequest.ValidateAndGetLogFields).
  1259. //
  1260. // When there are priority candidates, they are selected first, regardless
  1261. // of the deadlines of non-priority candidates. Multiple priority
  1262. // candidates are processed in FIFO deadline order.
  1263. //
  1264. // A potential future enhancement is to add more iterator state to track
  1265. // which queue has the next oldest time to select on the following
  1266. // getNext call. Another potential enhancement is to remove fully
  1267. // consumed queues from compartmentQueues/compartmentIDs/nextEntries.
  1268. var selectedCandidate *announcementEntry
  1269. selectedIndex := -1
  1270. selectedPriority := false
  1271. for i := 0; i < len(iter.compartmentQueues); i++ {
  1272. if iter.nextEntries[i] == nil {
  1273. continue
  1274. }
  1275. isPriority := iter.compartmentQueues[i].isPriority
  1276. if selectedPriority && !isPriority {
  1277. // Ignore older of non-priority entries when there are priority
  1278. // candidates.
  1279. break
  1280. }
  1281. if selectedCandidate == nil {
  1282. selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
  1283. selectedIndex = i
  1284. selectedPriority = isPriority
  1285. } else {
  1286. candidate := iter.nextEntries[i].Value.(*announcementEntry)
  1287. deadline, deadlineOk := candidate.ctx.Deadline()
  1288. selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
  1289. if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
  1290. selectedCandidate = candidate
  1291. selectedIndex = i
  1292. selectedPriority = isPriority
  1293. }
  1294. }
  1295. }
  1296. // Advance the selected queue to the next element. This must be done
  1297. // before any dequeue call, since container/list.remove clears
  1298. // list.Element.next.
  1299. if selectedIndex != -1 {
  1300. iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
  1301. }
  1302. return selectedCandidate, selectedPriority
  1303. }