matcher.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. std_errors "errors"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. lrucache "github.com/cognusion/go-cache-lru"
  31. "golang.org/x/time/rate"
  32. )
  33. // TTLs should be aligned with STUN hole punch lifetimes.
  34. const (
  35. matcherAnnouncementQueueMaxSize = 5000000
  36. matcherOfferQueueMaxSize = 5000000
  37. matcherPendingAnswersTTL = 30 * time.Second
  38. matcherPendingAnswersMaxSize = 100000
  39. matcherMaxPreferredNATProbe = 100
  40. matcherMaxProbe = 1000
  41. matcherRateLimiterReapHistoryFrequencySeconds = 300
  42. matcherRateLimiterMaxCacheEntries = 1000000
  43. )
  44. // Matcher matches proxy announcements with client offers. Matcher also
  45. // coordinates pending proxy answers and routes answers to the awaiting
  46. // client offer handler.
  47. //
  48. // Matching prioritizes selecting the oldest announcements and client offers,
  49. // as they are closest to timing out.
  50. //
  51. // The client and proxy must supply matching personal or common compartment
  52. // IDs. Common compartments are managed by Psiphon and can be obtained via a
  53. // tactics parameter or via an OSL embedding. Each proxy announcement or
  54. // client offer may specify only one compartment ID type, either common or
  55. // personal.
  56. //
  57. // Matching prefers to pair proxies and clients in a way that maximizes total
  58. // possible matches. For a client or proxy with less-limited NAT traversal, a
  59. // pairing with more-limited NAT traversal is preferred; and vice versa.
  60. // Candidates with unknown NAT types and mobile network types are assumed to
  61. // have the most limited NAT traversal capability.
  62. //
  63. // Preferred matchings take priority over announcement age.
  64. //
  65. // The client and proxy will not match if they are in the same country and
  66. // ASN, as it's assumed that doesn't provide any blocking circumvention
  67. // benefit. Disallowing proxies in certain blocked countries is handled at a
  68. // higher level; any such proxies should not be enqueued for matching.
  69. type Matcher struct {
  70. config *MatcherConfig
  71. runMutex sync.Mutex
  72. runContext context.Context
  73. stopRunning context.CancelFunc
  74. waitGroup *sync.WaitGroup
  75. // The announcement queue is implicitly sorted by announcement age. The
  76. // count fields are used to skip searching deeper into the queue for
  77. // preferred matches.
  78. // TODO: replace queue and counts with an indexed, in-memory database?
  79. announcementQueueMutex sync.Mutex
  80. announcementQueue *announcementMultiQueue
  81. announcementQueueEntryCountByIP map[string]int
  82. announcementQueueRateLimiters *lrucache.Cache
  83. announcementLimitEntryCount int
  84. announcementRateLimitQuantity int
  85. announcementRateLimitInterval time.Duration
  86. announcementNonlimitedProxyIDs map[ID]struct{}
  87. // The offer queue is also implicitly sorted by offer age. Both an offer
  88. // and announcement queue are required since either announcements or
  89. // offers can arrive while there are no available pairings.
  90. offerQueueMutex sync.Mutex
  91. offerQueue *list.List
  92. offerQueueEntryCountByIP map[string]int
  93. offerQueueRateLimiters *lrucache.Cache
  94. offerLimitEntryCount int
  95. offerRateLimitQuantity int
  96. offerRateLimitInterval time.Duration
  97. matchSignal chan struct{}
  98. pendingAnswers *lrucache.Cache
  99. }
  100. // MatcherConfig specifies the configuration for a matcher.
  101. type MatcherConfig struct {
  102. // Logger is used to log events.
  103. Logger common.Logger
  104. // Announcement queue limits.
  105. AnnouncementLimitEntryCount int
  106. AnnouncementRateLimitQuantity int
  107. AnnouncementRateLimitInterval time.Duration
  108. AnnouncementNonlimitedProxyIDs []ID
  109. // Offer queue limits.
  110. OfferLimitEntryCount int
  111. OfferRateLimitQuantity int
  112. OfferRateLimitInterval time.Duration
  113. // Proxy quality state.
  114. ProxyQualityState *ProxyQualityState
  115. // Broker process load limit state callback. See Broker.Config.
  116. IsLoadLimiting func() bool
  117. }
  118. // MatchProperties specifies the compartment, GeoIP, and network topology
  119. // matching properties of clients and proxies.
  120. type MatchProperties struct {
  121. IsPriority bool
  122. ProtocolVersion int32
  123. CommonCompartmentIDs []ID
  124. PersonalCompartmentIDs []ID
  125. GeoIPData common.GeoIPData
  126. NetworkType NetworkType
  127. NATType NATType
  128. PortMappingTypes PortMappingTypes
  129. }
  130. // EffectiveNATType combines the set of network properties into an effective
  131. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  132. // traversal is assumed. When NAT type is unknown and the network type is
  133. // mobile, CGNAT with limited NAT traversal is assumed.
  134. func (p *MatchProperties) EffectiveNATType() NATType {
  135. if p.PortMappingTypes.Available() {
  136. return NATTypePortMapping
  137. }
  138. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  139. // publicly reachable IPv6 ICE host candidate? If so, change the
  140. // effective NAT type? Depends on whether the matched peer can use IPv6.
  141. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  142. return NATTypeMobileNetwork
  143. }
  144. return p.NATType
  145. }
  146. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  147. // matching given the types of pairing candidates available.
  148. func (p *MatchProperties) ExistsPreferredNATMatch(
  149. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  150. return p.EffectiveNATType().ExistsPreferredMatch(
  151. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  152. }
  153. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  154. // NAT matching.
  155. func (p *MatchProperties) IsPreferredNATMatch(
  156. peerMatchProperties *MatchProperties) bool {
  157. return p.EffectiveNATType().IsPreferredMatch(
  158. peerMatchProperties.EffectiveNATType())
  159. }
  160. // MatchAnnouncement is a proxy announcement to be queued for matching.
  161. type MatchAnnouncement struct {
  162. Properties MatchProperties
  163. ProxyID ID
  164. ProxyMetrics *ProxyMetrics
  165. ConnectionID ID
  166. }
  167. // MatchOffer is a client offer to be queued for matching.
  168. type MatchOffer struct {
  169. Properties MatchProperties
  170. ClientOfferSDP WebRTCSessionDescription
  171. ClientRootObfuscationSecret ObfuscationSecret
  172. DoDTLSRandomization bool
  173. UseMediaStreams bool
  174. TrafficShapingParameters *TrafficShapingParameters
  175. NetworkProtocol NetworkProtocol
  176. DestinationAddress string
  177. DestinationServerID string
  178. }
  179. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  180. // announcement, to be routed to the awaiting client offer.
  181. type MatchAnswer struct {
  182. ProxyIP string
  183. ProxyID ID
  184. ConnectionID ID
  185. ProxyAnswerSDP WebRTCSessionDescription
  186. }
  187. // MatchMetrics records statistics about the match queue state at the time a
  188. // match is made.
  189. type MatchMetrics struct {
  190. OfferMatchIndex int
  191. OfferQueueSize int
  192. AnnouncementMatchIndex int
  193. AnnouncementQueueSize int
  194. }
  195. // GetMetrics converts MatchMetrics to loggable fields.
  196. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  197. if metrics == nil {
  198. return nil
  199. }
  200. return common.LogFields{
  201. "offer_match_index": metrics.OfferMatchIndex,
  202. "offer_queue_size": metrics.OfferQueueSize,
  203. "announcement_match_index": metrics.AnnouncementMatchIndex,
  204. "announcement_queue_size": metrics.AnnouncementQueueSize,
  205. }
  206. }
  207. // announcementEntry is an announcement queue entry, an announcement with its
  208. // associated lifetime context and signaling channel.
  209. type announcementEntry struct {
  210. ctx context.Context
  211. limitIP string
  212. announcement *MatchAnnouncement
  213. offerChan chan *MatchOffer
  214. matchMetrics atomic.Value
  215. // queueReference is initialized by addAnnouncementEntry, and used to
  216. // efficiently dequeue the entry.
  217. queueReference announcementQueueReference
  218. }
  219. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  220. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  221. return matchMetrics
  222. }
  223. // offerEntry is an offer queue entry, an offer with its associated lifetime
  224. // context and signaling channel.
  225. type offerEntry struct {
  226. ctx context.Context
  227. limitIP string
  228. offer *MatchOffer
  229. answerChan chan *answerInfo
  230. matchMetrics atomic.Value
  231. // queueReference is initialized by addOfferEntry, and used to efficiently
  232. // dequeue the entry.
  233. queueReference *list.Element
  234. }
  235. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  236. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  237. return matchMetrics
  238. }
  239. // answerInfo is an answer and its associated announcement.
  240. type answerInfo struct {
  241. announcement *MatchAnnouncement
  242. answer *MatchAnswer
  243. }
  244. // pendingAnswer represents an answer that is expected to arrive from a
  245. // proxy.
  246. type pendingAnswer struct {
  247. announcement *MatchAnnouncement
  248. answerChan chan *answerInfo
  249. }
  250. // NewMatcher creates a new Matcher.
  251. func NewMatcher(config *MatcherConfig) *Matcher {
  252. m := &Matcher{
  253. config: config,
  254. waitGroup: new(sync.WaitGroup),
  255. announcementQueue: newAnnouncementMultiQueue(),
  256. announcementQueueEntryCountByIP: make(map[string]int),
  257. announcementQueueRateLimiters: lrucache.NewWithLRU(
  258. 0,
  259. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  260. matcherRateLimiterMaxCacheEntries),
  261. offerQueue: list.New(),
  262. offerQueueEntryCountByIP: make(map[string]int),
  263. offerQueueRateLimiters: lrucache.NewWithLRU(
  264. 0,
  265. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  266. matcherRateLimiterMaxCacheEntries),
  267. matchSignal: make(chan struct{}, 1),
  268. // matcherPendingAnswersTTL is not configurable; it supplies a default
  269. // that is expected to be ignored when each entry's TTL is set to the
  270. // Offer ctx timeout.
  271. pendingAnswers: lrucache.NewWithLRU(
  272. matcherPendingAnswersTTL,
  273. 1*time.Minute,
  274. matcherPendingAnswersMaxSize),
  275. }
  276. m.SetLimits(
  277. config.AnnouncementLimitEntryCount,
  278. config.AnnouncementRateLimitQuantity,
  279. config.AnnouncementRateLimitInterval,
  280. config.AnnouncementNonlimitedProxyIDs,
  281. config.OfferLimitEntryCount,
  282. config.OfferRateLimitQuantity,
  283. config.OfferRateLimitInterval)
  284. return m
  285. }
  286. // SetLimits sets new queue limits, replacing the previous configuration.
  287. // Existing, cached rate limiters retain their existing rate limit state. New
  288. // entries will use the new quantity/interval configuration. In addition,
  289. // currently enqueued items may exceed any new, lower maximum entry count
  290. // until naturally dequeued.
  291. func (m *Matcher) SetLimits(
  292. announcementLimitEntryCount int,
  293. announcementRateLimitQuantity int,
  294. announcementRateLimitInterval time.Duration,
  295. announcementNonlimitedProxyIDs []ID,
  296. offerLimitEntryCount int,
  297. offerRateLimitQuantity int,
  298. offerRateLimitInterval time.Duration) {
  299. nonlimitedProxyIDs := make(map[ID]struct{})
  300. for _, proxyID := range announcementNonlimitedProxyIDs {
  301. nonlimitedProxyIDs[proxyID] = struct{}{}
  302. }
  303. m.announcementQueueMutex.Lock()
  304. m.announcementLimitEntryCount = announcementLimitEntryCount
  305. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  306. m.announcementRateLimitInterval = announcementRateLimitInterval
  307. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  308. m.announcementQueueMutex.Unlock()
  309. m.offerQueueMutex.Lock()
  310. m.offerLimitEntryCount = offerLimitEntryCount
  311. m.offerRateLimitQuantity = offerRateLimitQuantity
  312. m.offerRateLimitInterval = offerRateLimitInterval
  313. m.offerQueueMutex.Unlock()
  314. }
  315. // Start starts running the Matcher. The Matcher runs a goroutine which
  316. // matches announcements and offers.
  317. func (m *Matcher) Start() error {
  318. m.runMutex.Lock()
  319. defer m.runMutex.Unlock()
  320. if m.runContext != nil {
  321. return errors.TraceNew("already running")
  322. }
  323. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  324. m.waitGroup.Add(1)
  325. go func() {
  326. defer m.waitGroup.Done()
  327. m.matchWorker(m.runContext)
  328. }()
  329. return nil
  330. }
  331. // Stop stops running the Matcher and its worker goroutine.
  332. //
  333. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  334. // can get enqueued during and after a Stop call. Stop is intended more for a
  335. // full broker shutdown, where this won't be a concern.
  336. func (m *Matcher) Stop() {
  337. m.runMutex.Lock()
  338. defer m.runMutex.Unlock()
  339. m.stopRunning()
  340. m.waitGroup.Wait()
  341. m.runContext, m.stopRunning = nil, nil
  342. }
  343. // Announce enqueues the proxy announcement and blocks until it is matched
  344. // with a returned offer or ctx is done. The caller must not mutate the
  345. // announcement or its properties after calling Announce.
  346. //
  347. // Announce assumes that the ctx.Deadline for each call is monotonically
  348. // increasing and that the deadline can be used as part of selecting the next
  349. // nearest-to-expire announcement.
  350. //
  351. // The offer is sent to the proxy by the broker, and then the proxy sends its
  352. // answer back to the broker, which calls Answer with that value.
  353. //
  354. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  355. // match is made, even if there is a later error.
  356. func (m *Matcher) Announce(
  357. ctx context.Context,
  358. proxyIP string,
  359. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  360. // An announcement must specify exactly one compartment ID, of one type,
  361. // common or personal. The limit of one is currently a limitation of the
  362. // multi-queue implementation; see comment in
  363. // announcementMultiQueue.enqueue.
  364. compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
  365. if len(compartmentIDs) == 0 {
  366. compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
  367. } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
  368. return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  369. }
  370. if len(compartmentIDs) != 1 {
  371. return nil, nil, errors.TraceNew("unexpected compartment ID count")
  372. }
  373. isAnnouncement := true
  374. err := m.applyLoadLimit(isAnnouncement)
  375. if err != nil {
  376. return nil, nil, errors.Trace(err)
  377. }
  378. announcementEntry := &announcementEntry{
  379. ctx: ctx,
  380. limitIP: getRateLimitIP(proxyIP),
  381. announcement: proxyAnnouncement,
  382. offerChan: make(chan *MatchOffer, 1),
  383. }
  384. err = m.addAnnouncementEntry(announcementEntry)
  385. if err != nil {
  386. return nil, nil, errors.Trace(err)
  387. }
  388. // Await client offer.
  389. var clientOffer *MatchOffer
  390. select {
  391. case <-ctx.Done():
  392. m.removeAnnouncementEntry(true, announcementEntry)
  393. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  394. case clientOffer = <-announcementEntry.offerChan:
  395. }
  396. return clientOffer, announcementEntry.getMatchMetrics(), nil
  397. }
  398. // Offer enqueues the client offer and blocks until it is matched with a
  399. // returned announcement or ctx is done. The caller must not mutate the offer
  400. // or its properties after calling Announce.
  401. //
  402. // The answer is returned to the client by the broker, and the WebRTC
  403. // connection is dialed. The original announcement is also returned, so its
  404. // match properties can be logged.
  405. //
  406. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  407. // match is made, even if there is a later error.
  408. func (m *Matcher) Offer(
  409. ctx context.Context,
  410. clientIP string,
  411. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  412. // An offer must specify at least one compartment ID, and may only specify
  413. // one type, common or personal, of compartment IDs.
  414. compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
  415. if len(compartmentIDs) == 0 {
  416. compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
  417. } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
  418. return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  419. }
  420. if len(compartmentIDs) < 1 {
  421. return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
  422. }
  423. isAnnouncement := false
  424. err := m.applyLoadLimit(isAnnouncement)
  425. if err != nil {
  426. return nil, nil, nil, errors.Trace(err)
  427. }
  428. offerEntry := &offerEntry{
  429. ctx: ctx,
  430. limitIP: getRateLimitIP(clientIP),
  431. offer: clientOffer,
  432. answerChan: make(chan *answerInfo, 1),
  433. }
  434. err = m.addOfferEntry(offerEntry)
  435. if err != nil {
  436. return nil, nil, nil, errors.Trace(err)
  437. }
  438. // Await proxy answer.
  439. var proxyAnswerInfo *answerInfo
  440. select {
  441. case <-ctx.Done():
  442. m.removeOfferEntry(true, offerEntry)
  443. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  444. // the Offer ctx, the client request, timeout, so it will eventually
  445. // get removed. But a client may abort its request earlier than the
  446. // timeout.
  447. return nil, nil,
  448. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  449. case proxyAnswerInfo = <-offerEntry.answerChan:
  450. }
  451. if proxyAnswerInfo == nil {
  452. // nil will be delivered to the channel when either the proxy
  453. // announcement request concurrently timed out, or the answer
  454. // indicated a proxy error, or the answer did not arrive in time.
  455. return nil, nil,
  456. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  457. }
  458. // This is a sanity check and not expected to fail.
  459. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  460. proxyAnswerInfo.announcement.ConnectionID) {
  461. return nil, nil,
  462. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  463. }
  464. return proxyAnswerInfo.answer,
  465. proxyAnswerInfo.announcement,
  466. offerEntry.getMatchMetrics(),
  467. nil
  468. }
  469. // AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
  470. // announcement identified by the specified proxy ID and connection ID and
  471. // returns whether the announcement has personal compartment IDs, indicating
  472. // personal pairing mode.
  473. //
  474. // If no pending answer is found, an error is returned.
  475. func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
  476. proxyID ID, connectionID ID) (bool, error) {
  477. key := m.pendingAnswerKey(proxyID, connectionID)
  478. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  479. if !ok {
  480. // The input IDs don't correspond to a pending answer, or the client
  481. // is no longer awaiting the response.
  482. return false, errors.TraceNew("no pending answer")
  483. }
  484. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  485. hasPersonalCompartmentIDs := len(
  486. pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
  487. return hasPersonalCompartmentIDs, nil
  488. }
  489. // Answer delivers an answer from the proxy for a previously matched offer.
  490. // The ProxyID and ConnectionID must correspond to the original announcement.
  491. // The caller must not mutate the answer after calling Answer. Answer does
  492. // not block.
  493. //
  494. // The answer is returned to the awaiting Offer call and sent to the matched
  495. // client.
  496. func (m *Matcher) Answer(
  497. proxyAnswer *MatchAnswer) error {
  498. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  499. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  500. if !ok {
  501. // The input IDs don't correspond to a pending answer, or the client
  502. // is no longer awaiting the response.
  503. return errors.TraceNew("no pending answer")
  504. }
  505. m.pendingAnswers.Delete(key)
  506. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  507. pendingAnswer.answerChan <- &answerInfo{
  508. announcement: pendingAnswer.announcement,
  509. answer: proxyAnswer,
  510. }
  511. return nil
  512. }
  513. // AnswerError delivers a failed answer indication from the proxy to an
  514. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  515. // original announcement.
  516. //
  517. // The failure indication is returned to the awaiting Offer call and sent to
  518. // the matched client.
  519. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  520. key := m.pendingAnswerKey(proxyID, connectionID)
  521. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  522. if !ok {
  523. // The client is no longer awaiting the response.
  524. return
  525. }
  526. m.pendingAnswers.Delete(key)
  527. // Closing the channel delivers nil, a failed indicator, to any receiver.
  528. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  529. }
  530. // matchWorker is the matching worker goroutine. It idles until signaled that
  531. // a queue item has been added, and then runs a full matching pass.
  532. func (m *Matcher) matchWorker(ctx context.Context) {
  533. for {
  534. select {
  535. case <-m.matchSignal:
  536. m.matchAllOffers()
  537. case <-ctx.Done():
  538. return
  539. }
  540. }
  541. }
  542. // matchAllOffers iterates over the queues, making all possible matches.
  543. func (m *Matcher) matchAllOffers() {
  544. m.announcementQueueMutex.Lock()
  545. defer m.announcementQueueMutex.Unlock()
  546. m.offerQueueMutex.Lock()
  547. defer m.offerQueueMutex.Unlock()
  548. // Take each offer in turn, and select an announcement match. There is an
  549. // implicit preference for older client offers, sooner to timeout, at the
  550. // front of the queue.
  551. // TODO: consider matching one offer, then releasing the locks to allow
  552. // more announcements to be enqueued, then continuing to match.
  553. nextOffer := m.offerQueue.Front()
  554. offerIndex := -1
  555. for nextOffer != nil && m.announcementQueue.getLen() > 0 {
  556. offerIndex += 1
  557. // nextOffer.Next must be invoked before any removeOfferEntry since
  558. // container/list.remove clears list.Element.next.
  559. offer := nextOffer
  560. nextOffer = nextOffer.Next()
  561. offerEntry := offer.Value.(*offerEntry)
  562. // Skip and remove this offer if its deadline has already passed.
  563. // There is no signal to the awaiting Offer function, as it will exit
  564. // based on the same ctx.
  565. if offerEntry.ctx.Err() != nil {
  566. m.removeOfferEntry(false, offerEntry)
  567. continue
  568. }
  569. announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
  570. if announcementEntry == nil {
  571. continue
  572. }
  573. // Record match metrics.
  574. // The index metrics predate the announcement multi-queue; now, with
  575. // the multi-queue, announcement_index is how many announce entries
  576. // were inspected before matching.
  577. matchMetrics := &MatchMetrics{
  578. OfferMatchIndex: offerIndex,
  579. OfferQueueSize: m.offerQueue.Len(),
  580. AnnouncementMatchIndex: announcementMatchIndex,
  581. AnnouncementQueueSize: m.announcementQueue.getLen(),
  582. }
  583. offerEntry.matchMetrics.Store(matchMetrics)
  584. announcementEntry.matchMetrics.Store(matchMetrics)
  585. // Remove the matched announcement from the queue. Send the offer to
  586. // the announcement entry's offerChan, which will deliver it to the
  587. // blocked Announce call. Add a pending answers entry to await the
  588. // proxy's follow up Answer call. The TTL for the pending answer
  589. // entry is set to the matched Offer call's ctx, as the answer is
  590. // only useful as long as the client is still waiting.
  591. m.removeAnnouncementEntry(false, announcementEntry)
  592. expiry := lrucache.DefaultExpiration
  593. deadline, ok := offerEntry.ctx.Deadline()
  594. if ok {
  595. expiry = time.Until(deadline)
  596. }
  597. key := m.pendingAnswerKey(
  598. announcementEntry.announcement.ProxyID,
  599. announcementEntry.announcement.ConnectionID)
  600. m.pendingAnswers.Set(
  601. key,
  602. &pendingAnswer{
  603. announcement: announcementEntry.announcement,
  604. answerChan: offerEntry.answerChan,
  605. },
  606. expiry)
  607. announcementEntry.offerChan <- offerEntry.offer
  608. // Remove the matched offer from the queue and match the next offer,
  609. // now first in the queue.
  610. m.removeOfferEntry(false, offerEntry)
  611. }
  612. }
  613. func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
  614. // Assumes the caller has the queue mutexes locked.
  615. // Check each candidate announcement in turn, and select a match. There is
  616. // an implicit preference for older proxy announcements, sooner to
  617. // timeout, at the front of the enqueued announcements.
  618. // announcementMultiQueue.startMatching skips to the first matching
  619. // compartment ID(s).
  620. //
  621. // Limitation: since this logic matches each enqueued client in turn, it will
  622. // only make the optimal NAT match for the oldest enqueued client vs. all
  623. // proxies, and not do optimal N x M matching for all clients and all proxies.
  624. //
  625. // Future matching enhancements could include more sophisticated GeoIP
  626. // rules, such as a configuration encoding knowledge of an ASN's NAT
  627. // type, or preferred client/proxy country/ASN matches.
  628. offerProperties := &offerEntry.offer.Properties
  629. // Assumes the caller checks that offer specifies either personal
  630. // compartment IDs or common compartment IDs, but not both.
  631. isCommonCompartments := false
  632. compartmentIDs := offerProperties.PersonalCompartmentIDs
  633. if len(compartmentIDs) == 0 {
  634. isCommonCompartments = true
  635. compartmentIDs = offerProperties.CommonCompartmentIDs
  636. }
  637. if len(compartmentIDs) == 0 {
  638. return nil, -1
  639. }
  640. matchIterator := m.announcementQueue.startMatching(
  641. isCommonCompartments, compartmentIDs)
  642. // Use the NAT traversal type counters to check if there's any preferred
  643. // NAT match for this offer in the announcement queue. When there is, we
  644. // will search beyond the first announcement.
  645. unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
  646. matchIterator.getNATCounts()
  647. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  648. unlimitedNATCount > 0,
  649. partiallyLimitedNATCount > 0,
  650. strictlyLimitedNATCount > 0)
  651. // TODO: add an ExistsCompatibleProtocolVersionMatch check?
  652. //
  653. // Currently, searching for protocol version support that doesn't exist
  654. // may be mitigated by limiting, through tactics, client protocol options
  655. // selection; using the proxy protocol version in PrioritizeProxy; and,
  656. // ultimately, increasing MinimumProxyProtocolVersion.
  657. var bestMatch *announcementEntry
  658. bestMatchIndex := -1
  659. bestMatchIsPriority := false
  660. bestMatchNAT := false
  661. // matcherMaxProbe limits the linear search through the announcement queue
  662. // to find a match. Currently, the queue implementation provides
  663. // constant-time lookup for matching compartment IDs. Other matching
  664. // aspects may require iterating over the queue items, including the
  665. // strict same-country and ASN constraint and protocol version
  666. // compatibility constraint. Best NAT match is not a strict constraint
  667. // and uses a shorter search limit, matcherMaxPreferredNATProbe.
  668. candidateIndex := -1
  669. for candidateIndex <= matcherMaxProbe {
  670. announcementEntry, isPriority := matchIterator.getNext()
  671. if announcementEntry == nil {
  672. break
  673. }
  674. if !isPriority && bestMatchIsPriority {
  675. // There is a priority match, but it wasn't bestMatchNAT and we
  676. // continued to iterate. Now that isPriority is false, we're past the
  677. // end of the priority items, so stop looking for any best NAT match
  678. // and return the previous priority match. When there are zero
  679. // priority items to begin with, this case should not be hit.
  680. break
  681. }
  682. candidateIndex += 1
  683. // Skip and remove this announcement if its deadline has already
  684. // passed. There is no signal to the awaiting Announce function, as
  685. // it will exit based on the same ctx.
  686. if announcementEntry.ctx.Err() != nil {
  687. m.removeAnnouncementEntry(false, announcementEntry)
  688. continue
  689. }
  690. announcementProperties := &announcementEntry.announcement.Properties
  691. // Don't match unless the proxy announcement, client offer, and the
  692. // client's selected protocol options are compatible. UseMediaStreams
  693. // requires at least ProtocolVersion2.
  694. _, ok := negotiateProtocolVersion(
  695. announcementProperties.ProtocolVersion,
  696. offerProperties.ProtocolVersion,
  697. offerEntry.offer.UseMediaStreams)
  698. if !ok {
  699. continue
  700. }
  701. // Disallow matching the same country and ASN, except for personal
  702. // compartment ID matches.
  703. //
  704. // For common matching, hopping through the same ISP is assumed to
  705. // have no circumvention benefit. For personal matching, the user may
  706. // wish to hop their their own or their friend's proxy regardless.
  707. if isCommonCompartments &&
  708. !GetAllowCommonASNMatching() &&
  709. (offerProperties.GeoIPData.Country ==
  710. announcementProperties.GeoIPData.Country &&
  711. offerProperties.GeoIPData.ASN ==
  712. announcementProperties.GeoIPData.ASN) {
  713. continue
  714. }
  715. // Check if this is a preferred NAT match. Ultimately, a match may be
  716. // made with potentially incompatible NATs, but the client/proxy
  717. // reported NAT types may be incorrect or unknown; the client will
  718. // often skip NAT discovery.
  719. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  720. // Use proxy ASN quality as an alternative to preferred NAT matches.
  721. //
  722. // The NAT matching logic depends on RFC5780 NAT discovery test
  723. // results, which may not be entirely accurate, and may not be
  724. // available in the first place, especially if skipped for clients,
  725. // which is the default.
  726. //
  727. // Proxy ASN quality leverages the quality data, provided by servers,
  728. // indicating that the particular proxy recently relayed a successful
  729. // tunnel for some client in the given ASN. When this quality data is
  730. // present, NAT compatibility is assumed, with the caveat that the
  731. // client device and immediate router may not be the same.
  732. //
  733. // Limitations:
  734. // - existsPreferredNATMatch doesn't reflect existence of matching
  735. // proxy ASN quality, so the NAT match probe can end prematurely.
  736. // - IsPreferredNATMatch currently takes precedence over proxy ASN
  737. // quality.
  738. if !matchNAT && isPriority {
  739. matchNAT = m.config.ProxyQualityState.HasQuality(
  740. announcementEntry.announcement.ProxyID,
  741. announcementEntry.announcement.Properties.GeoIPData.ASN,
  742. offerProperties.GeoIPData.ASN)
  743. }
  744. // At this point, the candidate is a match. Determine if this is a new
  745. // best match, either if there was no previous match, or this is a
  746. // better NAT match.
  747. if bestMatch == nil || (!bestMatchNAT && matchNAT) {
  748. bestMatch = announcementEntry
  749. bestMatchIndex = candidateIndex
  750. bestMatchIsPriority = isPriority
  751. bestMatchNAT = matchNAT
  752. }
  753. // Stop as soon as we have the best possible match, or have reached
  754. // the probe limit for preferred NAT matches.
  755. if bestMatch != nil && (bestMatchNAT ||
  756. !existsPreferredNATMatch ||
  757. candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
  758. break
  759. }
  760. }
  761. return bestMatch, bestMatchIndex
  762. }
  763. // applyLoadLimit checks if the broker process is in the load limiting state
  764. // and, in order to reduce load, determines if new proxy announces or client
  765. // offers should be rejected immediately instead of enqueued.
  766. func (m *Matcher) applyLoadLimit(isAnnouncement bool) error {
  767. if m.config.IsLoadLimiting == nil || !m.config.IsLoadLimiting() {
  768. return nil
  769. }
  770. // Acquire the queue locks only when in the load limit state, and in the
  771. // same order as matchAllOffers.
  772. m.announcementQueueMutex.Lock()
  773. defer m.announcementQueueMutex.Unlock()
  774. m.offerQueueMutex.Lock()
  775. defer m.offerQueueMutex.Unlock()
  776. announcementLen := m.announcementQueue.getLen()
  777. offerLen := m.offerQueue.Len()
  778. // When the load limit had been reached, and assuming the broker process
  779. // is running only an in-proxy broker, it's likely, in practise, that
  780. // only one of the two queues has hundreds of thousands of entries while
  781. // the other has few, and there are no matches clearing the queue.
  782. //
  783. // Instead of simply rejecting all enqueue requests, allow the request
  784. // type, announce or offer, that is in shorter supply as these are likely
  785. // to match and draw down the larger queue. This attempts to make
  786. // productive use of enqueued items, and also attempts to avoid simply
  787. // emptying both queues -- as will happen in any case due to timeouts --
  788. // and then have the same larger queue refill again after the load limit
  789. // state exits.
  790. //
  791. // This approach assumes some degree of slack in available system memory
  792. // and CPU in the load limiting state, similar to how the tunnel server
  793. // continues to operate existing tunnels in the same state.
  794. //
  795. // The heuristic below of allowing when less than half the size of the
  796. // larger queue puts a cap on the amount the shorter queue can continue
  797. // to grow in the load limiting state, in the worst case.
  798. //
  799. // Limitation: in some scenarios that are expected to be rare, it can
  800. // happen that allowed requests don't result in a match and memory
  801. // consumption continues to grow, leading to a broker process OOM kill.
  802. var allow bool
  803. if isAnnouncement {
  804. allow = announcementLen < offerLen/2
  805. } else {
  806. allow = offerLen < announcementLen/2
  807. }
  808. if allow {
  809. return nil
  810. }
  811. // Do not return a MatcherLimitError, as is done in applyIPLimits. A
  812. // MatcherLimitError results in a Response.Limited error response, which
  813. // causes a proxy to back off and a client to abort its dial; but in
  814. // neither case is the broker client reset. The error returned here will
  815. // result in a fast 404 response to the proxy or client, which will
  816. // instead trigger a broker client reset, and a chance of moving to a
  817. // different broker that is not overloaded.
  818. //
  819. // Limitation: the 404 response won't be distinguishable, in client or
  820. // proxy diagnostics, from other error conditions.
  821. //
  822. // TODO: add a new Response.LoadLimited flag which the proxy/client can
  823. // use use log a distinct error and also ensure that it doesn't reselect
  824. // the same broker again in the broker client reset random selection.
  825. return errors.TraceNew("load limited")
  826. }
  827. // MatcherLimitError is the error type returned by Announce or Offer when the
  828. // caller has exceeded configured queue entry or rate limits.
  829. type MatcherLimitError struct {
  830. err error
  831. }
  832. func NewMatcherLimitError(err error) *MatcherLimitError {
  833. return &MatcherLimitError{err: err}
  834. }
  835. func (e MatcherLimitError) Error() string {
  836. return e.err.Error()
  837. }
  838. // applyIPLimits checks per-proxy or per-client -- as determined by peer IP
  839. // address -- rate limits and queue entry limits.
  840. func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  841. // Assumes m.announcementQueueMutex or m.offerQueueMutex is locked.
  842. var entryCountByIP map[string]int
  843. var queueRateLimiters *lrucache.Cache
  844. var limitEntryCount int
  845. var quantity int
  846. var interval time.Duration
  847. if isAnnouncement {
  848. // Skip limit checks for non-limited proxies.
  849. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  850. return nil
  851. }
  852. entryCountByIP = m.announcementQueueEntryCountByIP
  853. queueRateLimiters = m.announcementQueueRateLimiters
  854. limitEntryCount = m.announcementLimitEntryCount
  855. quantity = m.announcementRateLimitQuantity
  856. interval = m.announcementRateLimitInterval
  857. } else {
  858. entryCountByIP = m.offerQueueEntryCountByIP
  859. queueRateLimiters = m.offerQueueRateLimiters
  860. limitEntryCount = m.offerLimitEntryCount
  861. quantity = m.offerRateLimitQuantity
  862. interval = m.offerRateLimitInterval
  863. }
  864. // The rate limit is checked first, before the max count check, to ensure
  865. // that the rate limit state is updated regardless of the max count check
  866. // outcome.
  867. if quantity > 0 && interval > 0 {
  868. var rateLimiter *rate.Limiter
  869. entry, ok := queueRateLimiters.Get(limitIP)
  870. if ok {
  871. rateLimiter = entry.(*rate.Limiter)
  872. } else {
  873. limit := float64(quantity) / interval.Seconds()
  874. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  875. queueRateLimiters.Set(
  876. limitIP, rateLimiter, interval)
  877. }
  878. if !rateLimiter.Allow() {
  879. return errors.Trace(
  880. NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
  881. }
  882. }
  883. if limitEntryCount > 0 {
  884. // Limitation: non-limited proxy ID entries are counted in
  885. // entryCountByIP. If both a limited and non-limited proxy ingress
  886. // from the same limitIP, then the non-limited entries will count
  887. // against the limited proxy's limitEntryCount.
  888. entryCount, ok := entryCountByIP[limitIP]
  889. if ok && entryCount >= limitEntryCount {
  890. return errors.Trace(
  891. NewMatcherLimitError(std_errors.New("max entries for IP")))
  892. }
  893. }
  894. return nil
  895. }
  896. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  897. m.announcementQueueMutex.Lock()
  898. defer m.announcementQueueMutex.Unlock()
  899. // Ensure the queue doesn't grow larger than the max size.
  900. if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
  901. return errors.TraceNew("queue full")
  902. }
  903. // Ensure no single peer IP can enqueue a large number of entries or
  904. // rapidly enqueue beyond the configured rate.
  905. isAnnouncement := true
  906. err := m.applyIPLimits(
  907. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  908. if err != nil {
  909. return errors.Trace(err)
  910. }
  911. // announcementEntry.queueReference should be uninitialized.
  912. // announcementMultiQueue.enqueue sets queueReference to be used for
  913. // efficient dequeuing.
  914. if announcementEntry.queueReference.entry != nil {
  915. return errors.TraceNew("unexpected queue reference")
  916. }
  917. err = m.announcementQueue.enqueue(announcementEntry)
  918. if err != nil {
  919. return errors.Trace(err)
  920. }
  921. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  922. select {
  923. case m.matchSignal <- struct{}{}:
  924. default:
  925. }
  926. return nil
  927. }
  928. func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
  929. // In the aborting case, the queue isn't already locked. Otherwise, assume
  930. // it is locked.
  931. if aborting {
  932. m.announcementQueueMutex.Lock()
  933. defer m.announcementQueueMutex.Unlock()
  934. }
  935. found := announcementEntry.queueReference.dequeue()
  936. if found {
  937. // Adjust entry counts by peer IP, used to enforce
  938. // matcherAnnouncementQueueMaxEntriesPerIP.
  939. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  940. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  941. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  942. }
  943. }
  944. if aborting && !found {
  945. // The Announce call is aborting and taking its entry back out of the
  946. // queue. If the entry is not found in the queue, then a concurrent
  947. // Offer has matched the announcement. So check for the pending
  948. // answer corresponding to the announcement and remove it and deliver
  949. // a failure signal to the waiting Offer, so the client doesn't wait
  950. // longer than necessary.
  951. key := m.pendingAnswerKey(
  952. announcementEntry.announcement.ProxyID,
  953. announcementEntry.announcement.ConnectionID)
  954. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  955. if ok {
  956. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  957. m.pendingAnswers.Delete(key)
  958. }
  959. }
  960. }
  961. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  962. m.offerQueueMutex.Lock()
  963. defer m.offerQueueMutex.Unlock()
  964. // Ensure the queue doesn't grow larger than the max size.
  965. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  966. return errors.TraceNew("queue full")
  967. }
  968. // Ensure no single peer IP can enqueue a large number of entries or
  969. // rapidly enqueue beyond the configured rate.
  970. isAnnouncement := false
  971. err := m.applyIPLimits(
  972. isAnnouncement, offerEntry.limitIP, ID{})
  973. if err != nil {
  974. return errors.Trace(err)
  975. }
  976. // offerEntry.queueReference should be uninitialized and is set here to be
  977. // used for efficient dequeuing.
  978. if offerEntry.queueReference != nil {
  979. return errors.TraceNew("unexpected queue reference")
  980. }
  981. offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
  982. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  983. select {
  984. case m.matchSignal <- struct{}{}:
  985. default:
  986. }
  987. return nil
  988. }
  989. func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
  990. // In the aborting case, the queue isn't already locked. Otherise, assume
  991. // it is locked.
  992. if aborting {
  993. m.offerQueueMutex.Lock()
  994. defer m.offerQueueMutex.Unlock()
  995. }
  996. if offerEntry.queueReference == nil {
  997. return
  998. }
  999. m.offerQueue.Remove(offerEntry.queueReference)
  1000. offerEntry.queueReference = nil
  1001. // Adjust entry counts by peer IP, used to enforce
  1002. // matcherOfferQueueMaxEntriesPerIP.
  1003. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  1004. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  1005. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  1006. }
  1007. }
  1008. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  1009. // The pending answer lookup key is used to associate announcements and
  1010. // subsequent answers. While the client learns the ConnectionID, only the
  1011. // proxy knows the ProxyID component, so only the correct proxy can match
  1012. // an answer to an announcement. The ConnectionID component is necessary
  1013. // as a proxy may have multiple, concurrent pending answers.
  1014. return string(proxyID[:]) + string(connectionID[:])
  1015. }
  1016. func getRateLimitIP(strIP string) string {
  1017. IP := net.ParseIP(strIP)
  1018. if IP == nil || IP.To4() != nil {
  1019. return strIP
  1020. }
  1021. // With IPv6, individual users or sites are users commonly allocated a /64
  1022. // or /56, so rate limit by /56.
  1023. return IP.Mask(net.CIDRMask(56, 128)).String()
  1024. }
  1025. // announcementMultiQueue is a set of announcement queues, one per common or
  1026. // personal compartment ID, providing efficient iteration over announcements
  1027. // matching a specified list of compartment IDs. announcementMultiQueue and
  1028. // its underlying data structures are not safe for concurrent access.
  1029. type announcementMultiQueue struct {
  1030. priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
  1031. commonCompartmentQueues map[ID]*announcementCompartmentQueue
  1032. personalCompartmentQueues map[ID]*announcementCompartmentQueue
  1033. totalEntries int
  1034. }
  1035. // announcementCompartmentQueue is a single compartment queue within an
  1036. // announcementMultiQueue. The queue is implemented using a doubly-linked
  1037. // list, which provides efficient insert and mid-queue dequeue operations.
  1038. // The announcementCompartmentQueue also records NAT type stats for enqueued
  1039. // announcements, which are used, when matching, to determine when better NAT
  1040. // matches may be possible.
  1041. type announcementCompartmentQueue struct {
  1042. isCommonCompartment bool
  1043. isPriority bool
  1044. compartmentID ID
  1045. entries *list.List
  1046. unlimitedNATCount int
  1047. partiallyLimitedNATCount int
  1048. strictlyLimitedNATCount int
  1049. }
  1050. // announcementMatchIterator represents the state of an iteration over a
  1051. // subset of announcementMultiQueue compartment queues. Concurrent
  1052. // announcementMatchIterators are not supported.
  1053. type announcementMatchIterator struct {
  1054. multiQueue *announcementMultiQueue
  1055. compartmentQueues []*announcementCompartmentQueue
  1056. compartmentIDs []ID
  1057. nextEntries []*list.Element
  1058. }
  1059. // announcementQueueReference represents the queue position for a given
  1060. // announcement entry, and provides an efficient dequeue operation.
  1061. type announcementQueueReference struct {
  1062. multiQueue *announcementMultiQueue
  1063. compartmentQueue *announcementCompartmentQueue
  1064. entry *list.Element
  1065. }
  1066. func newAnnouncementMultiQueue() *announcementMultiQueue {
  1067. return &announcementMultiQueue{
  1068. priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1069. commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1070. personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1071. }
  1072. }
  1073. func (q *announcementMultiQueue) getLen() int {
  1074. return q.totalEntries
  1075. }
  1076. func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
  1077. // Assumes announcementEntry not already enueued.
  1078. // Limitation: only one compartment ID, either common or personal, is
  1079. // supported per announcement entry. In the common compartment case, the
  1080. // broker currently assigns only one common compartment ID per proxy
  1081. // announcement. In the personal compartment case, there is currently no
  1082. // use case for allowing a proxy to announce under multiple personal
  1083. // compartment IDs.
  1084. //
  1085. // To overcome this limitation, the dequeue operation would need to be
  1086. // able to remove an announcement entry from multiple
  1087. // announcementCompartmentQueues.
  1088. commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
  1089. personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
  1090. if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
  1091. return errors.TraceNew("announcement must specify exactly one compartment ID")
  1092. }
  1093. isPriority := announcementEntry.announcement.Properties.IsPriority
  1094. isCommonCompartment := true
  1095. var compartmentID ID
  1096. var compartmentQueues map[ID]*announcementCompartmentQueue
  1097. if len(commonCompartmentIDs) > 0 {
  1098. compartmentID = commonCompartmentIDs[0]
  1099. compartmentQueues = q.commonCompartmentQueues
  1100. if isPriority {
  1101. compartmentQueues = q.priorityCommonCompartmentQueues
  1102. }
  1103. } else {
  1104. isCommonCompartment = false
  1105. compartmentID = personalCompartmentIDs[0]
  1106. compartmentQueues = q.personalCompartmentQueues
  1107. if isPriority {
  1108. return errors.TraceNew("priority not supported for personal compartments")
  1109. }
  1110. }
  1111. compartmentQueue, ok := compartmentQueues[compartmentID]
  1112. if !ok {
  1113. compartmentQueue = &announcementCompartmentQueue{
  1114. isCommonCompartment: isCommonCompartment,
  1115. isPriority: isPriority,
  1116. compartmentID: compartmentID,
  1117. entries: list.New(),
  1118. }
  1119. compartmentQueues[compartmentID] = compartmentQueue
  1120. }
  1121. entry := compartmentQueue.entries.PushBack(announcementEntry)
  1122. // Update the NAT type counts which are used to determine if a better NAT
  1123. // match may be made by inspecting more announcement queue entries.
  1124. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1125. case NATTraversalUnlimited:
  1126. compartmentQueue.unlimitedNATCount += 1
  1127. case NATTraversalPartiallyLimited:
  1128. compartmentQueue.partiallyLimitedNATCount += 1
  1129. case NATTraversalStrictlyLimited:
  1130. compartmentQueue.strictlyLimitedNATCount += 1
  1131. }
  1132. q.totalEntries += 1
  1133. announcementEntry.queueReference = announcementQueueReference{
  1134. multiQueue: q,
  1135. compartmentQueue: compartmentQueue,
  1136. entry: entry,
  1137. }
  1138. return nil
  1139. }
  1140. // announcementQueueReference returns false if the item is already dequeued.
  1141. func (r *announcementQueueReference) dequeue() bool {
  1142. if r.entry == nil {
  1143. // Already dequeued.
  1144. return false
  1145. }
  1146. announcementEntry := r.entry.Value.(*announcementEntry)
  1147. // Reverse the NAT type counts.
  1148. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1149. case NATTraversalUnlimited:
  1150. r.compartmentQueue.unlimitedNATCount -= 1
  1151. case NATTraversalPartiallyLimited:
  1152. r.compartmentQueue.partiallyLimitedNATCount -= 1
  1153. case NATTraversalStrictlyLimited:
  1154. r.compartmentQueue.strictlyLimitedNATCount -= 1
  1155. }
  1156. r.compartmentQueue.entries.Remove(r.entry)
  1157. if r.compartmentQueue.entries.Len() == 0 {
  1158. // Remove empty compartment queue.
  1159. queues := r.multiQueue.personalCompartmentQueues
  1160. if r.compartmentQueue.isCommonCompartment {
  1161. if r.compartmentQueue.isPriority {
  1162. queues = r.multiQueue.priorityCommonCompartmentQueues
  1163. } else {
  1164. queues = r.multiQueue.commonCompartmentQueues
  1165. }
  1166. }
  1167. delete(queues, r.compartmentQueue.compartmentID)
  1168. }
  1169. r.multiQueue.totalEntries -= 1
  1170. // Mark as dequeued.
  1171. r.entry = nil
  1172. return true
  1173. }
  1174. func (q *announcementMultiQueue) startMatching(
  1175. isCommonCompartments bool,
  1176. compartmentIDs []ID) *announcementMatchIterator {
  1177. iter := &announcementMatchIterator{
  1178. multiQueue: q,
  1179. }
  1180. // Find the matching compartment queues and initialize iteration over
  1181. // those queues. Building the set of matching queues is a linear time
  1182. // operation, bounded by the length of compartmentIDs (no more than
  1183. // maxCompartmentIDs, as enforced in
  1184. // ClientOfferRequest.ValidateAndGetLogFields).
  1185. // Priority queues, when in use, must all be added to the beginning of
  1186. // iter.compartmentQueues in order to ensure that the iteration logic in
  1187. // getNext visits all priority items first.
  1188. var compartmentQueuesList []map[ID]*announcementCompartmentQueue
  1189. if isCommonCompartments {
  1190. compartmentQueuesList = append(
  1191. compartmentQueuesList,
  1192. q.priorityCommonCompartmentQueues,
  1193. q.commonCompartmentQueues)
  1194. } else {
  1195. compartmentQueuesList = append(
  1196. compartmentQueuesList,
  1197. q.personalCompartmentQueues)
  1198. }
  1199. for _, compartmentQueues := range compartmentQueuesList {
  1200. for _, ID := range compartmentIDs {
  1201. if compartmentQueue, ok := compartmentQueues[ID]; ok {
  1202. iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
  1203. iter.compartmentIDs = append(iter.compartmentIDs, ID)
  1204. iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
  1205. }
  1206. }
  1207. }
  1208. return iter
  1209. }
  1210. func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
  1211. // Return the count of NAT types across all matchable compartment queues.
  1212. //
  1213. // A potential future enhancement would be to provide per-queue NAT counts
  1214. // or NAT type indexing in order to quickly find preferred NAT matches.
  1215. unlimitedNATCount := 0
  1216. partiallyLimitedNATCount := 0
  1217. strictlyLimitedNATCount := 0
  1218. for _, compartmentQueue := range iter.compartmentQueues {
  1219. unlimitedNATCount += compartmentQueue.unlimitedNATCount
  1220. partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
  1221. strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
  1222. }
  1223. return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
  1224. }
  1225. // announcementMatchIterator returns the next announcement entry candidate in
  1226. // compartment queue FIFO order, selecting the queue with the oldest head
  1227. // item.
  1228. //
  1229. // The caller should invoke announcementEntry.queueReference.dequeue when the
  1230. // candidate is selected. dequeue may be called on any getNext return value
  1231. // without disrupting the iteration state; however,
  1232. // announcementEntry.queueReference.dequeue calls for arbitrary queue entries
  1233. // are not supported during iteration. Iteration and dequeue should all be
  1234. // performed with a lock over the entire announcementMultiQueue, and with
  1235. // only one concurrent announcementMatchIterator.
  1236. //
  1237. // getNext returns a nil *announcementEntry when there are no more items.
  1238. // getNext also returns an isPriority flag, indicating the announcement is a
  1239. // priority candidate. All priority candidates are guaranteed to be returned
  1240. // before any non-priority candidates.
  1241. func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
  1242. // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
  1243. // order. Also assumes that any priority queues are all at the front of
  1244. // iter.compartmentQueues.
  1245. // Select the oldest item, by deadline, from all the candidate queue head
  1246. // items. This operation is linear in the number of matching compartment
  1247. // ID queues, which is currently bounded by the length of matching
  1248. // compartment IDs (no more than maxCompartmentIDs, as enforced in
  1249. // ClientOfferRequest.ValidateAndGetLogFields).
  1250. //
  1251. // When there are priority candidates, they are selected first, regardless
  1252. // of the deadlines of non-priority candidates. Multiple priority
  1253. // candidates are processed in FIFO deadline order.
  1254. //
  1255. // A potential future enhancement is to add more iterator state to track
  1256. // which queue has the next oldest time to select on the following
  1257. // getNext call. Another potential enhancement is to remove fully
  1258. // consumed queues from compartmentQueues/compartmentIDs/nextEntries.
  1259. var selectedCandidate *announcementEntry
  1260. selectedIndex := -1
  1261. selectedPriority := false
  1262. for i := 0; i < len(iter.compartmentQueues); i++ {
  1263. if iter.nextEntries[i] == nil {
  1264. continue
  1265. }
  1266. isPriority := iter.compartmentQueues[i].isPriority
  1267. if selectedPriority && !isPriority {
  1268. // Ignore older of non-priority entries when there are priority
  1269. // candidates.
  1270. break
  1271. }
  1272. if selectedCandidate == nil {
  1273. selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
  1274. selectedIndex = i
  1275. selectedPriority = isPriority
  1276. } else {
  1277. candidate := iter.nextEntries[i].Value.(*announcementEntry)
  1278. deadline, deadlineOk := candidate.ctx.Deadline()
  1279. selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
  1280. if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
  1281. selectedCandidate = candidate
  1282. selectedIndex = i
  1283. selectedPriority = isPriority
  1284. }
  1285. }
  1286. }
  1287. // Advance the selected queue to the next element. This must be done
  1288. // before any dequeue call, since container/list.remove clears
  1289. // list.Element.next.
  1290. if selectedIndex != -1 {
  1291. iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
  1292. }
  1293. return selectedCandidate, selectedPriority
  1294. }