matcher.go 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "container/list"
  22. "context"
  23. std_errors "errors"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. lrucache "github.com/cognusion/go-cache-lru"
  31. )
  32. // TTLs should be aligned with STUN hole punch lifetimes.
  33. const (
  34. matcherAnnouncementQueueMaxSize = 5000000
  35. matcherOfferQueueMaxSize = 5000000
  36. matcherPendingAnswersTTL = 30 * time.Second
  37. matcherPendingAnswersMaxSize = 5000000
  38. matcherMaxPreferredNATProbe = 100
  39. matcherMaxProbe = 1000
  40. )
  41. // Matcher matches proxy announcements with client offers. Matcher also
  42. // coordinates pending proxy answers and routes answers to the awaiting
  43. // client offer handler.
  44. //
  45. // Matching prioritizes selecting the oldest announcements and client offers,
  46. // as they are closest to timing out.
  47. //
  48. // The client and proxy must supply matching personal or common compartment
  49. // IDs. Common compartments are managed by Psiphon and can be obtained via a
  50. // tactics parameter or via an OSL embedding. Each proxy announcement or
  51. // client offer may specify only one compartment ID type, either common or
  52. // personal.
  53. //
  54. // Matching prefers to pair proxies and clients in a way that maximizes total
  55. // possible matches. For a client or proxy with less-limited NAT traversal, a
  56. // pairing with more-limited NAT traversal is preferred; and vice versa.
  57. // Candidates with unknown NAT types and mobile network types are assumed to
  58. // have the most limited NAT traversal capability.
  59. //
  60. // Preferred matchings take priority over announcement age.
  61. //
  62. // The client and proxy will not match if they are in the same country and
  63. // ASN, as it's assumed that doesn't provide any blocking circumvention
  64. // benefit. Disallowing proxies in certain blocked countries is handled at a
  65. // higher level; any such proxies should not be enqueued for matching.
  66. type Matcher struct {
  67. config *MatcherConfig
  68. runMutex sync.Mutex
  69. runContext context.Context
  70. stopRunning context.CancelFunc
  71. waitGroup *sync.WaitGroup
  72. // The announcement queue is implicitly sorted by announcement age. The
  73. // count fields are used to skip searching deeper into the queue for
  74. // preferred matches.
  75. // TODO: replace queue and counts with an indexed, in-memory database?
  76. announcementQueueMutex sync.Mutex
  77. announcementQueue *announcementMultiQueue
  78. announcementQueueEntryCountByIP map[string]int
  79. announcementQueueRateLimiters *lrucache.Cache
  80. announcementLimitEntryCount int
  81. announcementRateLimitQuantity int
  82. announcementRateLimitInterval time.Duration
  83. announcementNonlimitedProxyIDs map[ID]struct{}
  84. // The offer queue is also implicitly sorted by offer age. Both an offer
  85. // and announcement queue are required since either announcements or
  86. // offers can arrive while there are no available pairings.
  87. offerQueueMutex sync.Mutex
  88. offerQueue *list.List
  89. offerQueueEntryCountByIP map[string]int
  90. offerQueueRateLimiters *lrucache.Cache
  91. offerLimitEntryCount int
  92. offerRateLimitQuantity int
  93. offerRateLimitInterval time.Duration
  94. offerMinimumDeadline time.Duration
  95. matchSignal chan struct{}
  96. pendingAnswers *lrucache.Cache
  97. }
  98. // MatcherConfig specifies the configuration for a matcher.
  99. type MatcherConfig struct {
  100. // Logger is used to log events.
  101. Logger common.Logger
  102. // Announcement queue limits.
  103. AnnouncementLimitEntryCount int
  104. AnnouncementRateLimitQuantity int
  105. AnnouncementRateLimitInterval time.Duration
  106. AnnouncementNonlimitedProxyIDs []ID
  107. // Offer queue limits.
  108. OfferLimitEntryCount int
  109. OfferRateLimitQuantity int
  110. OfferRateLimitInterval time.Duration
  111. OfferMinimumDeadline time.Duration
  112. // Proxy quality state.
  113. ProxyQualityState *ProxyQualityState
  114. // Broker process load limit state callback. See BrokerConfig.
  115. IsLoadLimiting func() bool
  116. // Proxy/client allow match callback. See BrokerConfig.
  117. AllowMatch func(common.GeoIPData, common.GeoIPData) bool
  118. }
  119. // MatchProperties specifies the compartment, GeoIP, and network topology
  120. // matching properties of clients and proxies.
  121. type MatchProperties struct {
  122. IsPriority bool
  123. ProtocolVersion int32
  124. CommonCompartmentIDs []ID
  125. PersonalCompartmentIDs []ID
  126. GeoIPData common.GeoIPData
  127. NetworkType NetworkType
  128. NATType NATType
  129. PortMappingTypes PortMappingTypes
  130. }
  131. // EffectiveNATType combines the set of network properties into an effective
  132. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  133. // traversal is assumed. When NAT type is unknown and the network type is
  134. // mobile, CGNAT with limited NAT traversal is assumed.
  135. func (p *MatchProperties) EffectiveNATType() NATType {
  136. if p.PortMappingTypes.Available() {
  137. return NATTypePortMapping
  138. }
  139. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  140. // publicly reachable IPv6 ICE host candidate? If so, change the
  141. // effective NAT type? Depends on whether the matched peer can use IPv6.
  142. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  143. return NATTypeMobileNetwork
  144. }
  145. return p.NATType
  146. }
  147. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  148. // matching given the types of pairing candidates available.
  149. func (p *MatchProperties) ExistsPreferredNATMatch(
  150. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  151. return p.EffectiveNATType().ExistsPreferredMatch(
  152. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  153. }
  154. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  155. // NAT matching.
  156. func (p *MatchProperties) IsPreferredNATMatch(
  157. peerMatchProperties *MatchProperties) bool {
  158. return p.EffectiveNATType().IsPreferredMatch(
  159. peerMatchProperties.EffectiveNATType())
  160. }
  161. // MatchAnnouncement is a proxy announcement to be queued for matching.
  162. type MatchAnnouncement struct {
  163. Properties MatchProperties
  164. ProxyID ID
  165. ProxyMetrics *ProxyMetrics
  166. ConnectionID ID
  167. }
  168. // MatchOffer is a client offer to be queued for matching.
  169. type MatchOffer struct {
  170. Properties MatchProperties
  171. ClientOfferSDP WebRTCSessionDescription
  172. ClientRootObfuscationSecret ObfuscationSecret
  173. DoDTLSRandomization bool
  174. UseMediaStreams bool
  175. TrafficShapingParameters *TrafficShapingParameters
  176. NetworkProtocol NetworkProtocol
  177. DestinationAddress string
  178. DestinationServerID string
  179. }
  180. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  181. // announcement, to be routed to the awaiting client offer.
  182. type MatchAnswer struct {
  183. ProxyIP string
  184. ProxyID ID
  185. ConnectionID ID
  186. ProxyAnswerSDP WebRTCSessionDescription
  187. }
  188. // MatchMetrics records statistics about the match queue state at the time a
  189. // match is made.
  190. type MatchMetrics struct {
  191. OfferDeadline time.Duration
  192. OfferMatchIndex int
  193. OfferQueueSize int
  194. AnnouncementMatchIndex int
  195. AnnouncementQueueSize int
  196. PendingAnswersSize int
  197. MatchDuration time.Duration
  198. }
  199. // GetMetrics converts MatchMetrics to loggable fields.
  200. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  201. if metrics == nil {
  202. return nil
  203. }
  204. // match_duration, the time between matchAllOffers starting and a match
  205. // being made, is reported in microseconds. The common millisecond
  206. // duration resolution is expected to be too coarse.
  207. return common.LogFields{
  208. "offer_deadline": int64(metrics.OfferDeadline / time.Millisecond),
  209. "offer_match_index": metrics.OfferMatchIndex,
  210. "offer_queue_size": metrics.OfferQueueSize,
  211. "announcement_match_index": metrics.AnnouncementMatchIndex,
  212. "announcement_queue_size": metrics.AnnouncementQueueSize,
  213. "pending_answers_size": metrics.PendingAnswersSize,
  214. "match_duration": int64(metrics.MatchDuration / time.Microsecond),
  215. }
  216. }
  217. // announcementEntry is an announcement queue entry, an announcement with its
  218. // associated lifetime context and signaling channel.
  219. type announcementEntry struct {
  220. ctx context.Context
  221. limitIP string
  222. announcement *MatchAnnouncement
  223. offerChan chan *MatchOffer
  224. matchMetrics atomic.Value
  225. // queueReference is initialized by addAnnouncementEntry, and used to
  226. // efficiently dequeue the entry.
  227. queueReference announcementQueueReference
  228. }
  229. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  230. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  231. return matchMetrics
  232. }
  233. // offerEntry is an offer queue entry, an offer with its associated lifetime
  234. // context and signaling channel.
  235. type offerEntry struct {
  236. ctx context.Context
  237. limitIP string
  238. offer *MatchOffer
  239. answerChan chan *answerInfo
  240. matchMetrics atomic.Value
  241. // queueReference is initialized by addOfferEntry, and used to efficiently
  242. // dequeue the entry.
  243. queueReference *list.Element
  244. }
  245. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  246. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  247. return matchMetrics
  248. }
  249. // answerInfo is an answer and its associated announcement.
  250. type answerInfo struct {
  251. announcement *MatchAnnouncement
  252. answer *MatchAnswer
  253. // offerDropped is sent to Offer's answer channel when the offer has been
  254. // dropped by the matcher due to age. This allows Offer to return
  255. // immediately on drop and the request handler to log this outcome.
  256. offerDropped bool
  257. }
  258. // pendingAnswer represents an answer that is expected to arrive from a
  259. // proxy.
  260. type pendingAnswer struct {
  261. announcement *MatchAnnouncement
  262. answerChan chan *answerInfo
  263. }
  264. // NewMatcher creates a new Matcher.
  265. func NewMatcher(config *MatcherConfig) *Matcher {
  266. m := &Matcher{
  267. config: config,
  268. waitGroup: new(sync.WaitGroup),
  269. announcementQueue: newAnnouncementMultiQueue(),
  270. announcementQueueEntryCountByIP: make(map[string]int),
  271. announcementQueueRateLimiters: lrucache.NewWithLRU(
  272. 0,
  273. time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
  274. brokerRateLimiterMaxCacheEntries),
  275. offerQueue: list.New(),
  276. offerQueueEntryCountByIP: make(map[string]int),
  277. offerQueueRateLimiters: lrucache.NewWithLRU(
  278. 0,
  279. time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
  280. brokerRateLimiterMaxCacheEntries),
  281. matchSignal: make(chan struct{}, 1),
  282. // matcherPendingAnswersTTL is not configurable; it supplies a default
  283. // that is expected to be ignored when each entry's TTL is set to the
  284. // Offer ctx timeout.
  285. pendingAnswers: lrucache.NewWithLRU(
  286. matcherPendingAnswersTTL,
  287. 1*time.Minute,
  288. matcherPendingAnswersMaxSize),
  289. }
  290. m.SetLimits(
  291. config.AnnouncementLimitEntryCount,
  292. config.AnnouncementRateLimitQuantity,
  293. config.AnnouncementRateLimitInterval,
  294. config.AnnouncementNonlimitedProxyIDs,
  295. config.OfferLimitEntryCount,
  296. config.OfferRateLimitQuantity,
  297. config.OfferRateLimitInterval,
  298. config.OfferMinimumDeadline)
  299. return m
  300. }
  301. // SetLimits sets new queue limits, replacing the previous configuration.
  302. // Existing, cached rate limiters retain their existing rate limit state. New
  303. // entries will use the new quantity/interval configuration. In addition,
  304. // currently enqueued items may exceed any new, lower maximum entry count
  305. // until naturally dequeued.
  306. func (m *Matcher) SetLimits(
  307. announcementLimitEntryCount int,
  308. announcementRateLimitQuantity int,
  309. announcementRateLimitInterval time.Duration,
  310. announcementNonlimitedProxyIDs []ID,
  311. offerLimitEntryCount int,
  312. offerRateLimitQuantity int,
  313. offerRateLimitInterval time.Duration,
  314. offerMinimumDeadline time.Duration) {
  315. nonlimitedProxyIDs := make(map[ID]struct{})
  316. for _, proxyID := range announcementNonlimitedProxyIDs {
  317. nonlimitedProxyIDs[proxyID] = struct{}{}
  318. }
  319. m.announcementQueueMutex.Lock()
  320. m.announcementLimitEntryCount = announcementLimitEntryCount
  321. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  322. m.announcementRateLimitInterval = announcementRateLimitInterval
  323. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  324. m.announcementQueueMutex.Unlock()
  325. m.offerQueueMutex.Lock()
  326. m.offerLimitEntryCount = offerLimitEntryCount
  327. m.offerRateLimitQuantity = offerRateLimitQuantity
  328. m.offerRateLimitInterval = offerRateLimitInterval
  329. m.offerMinimumDeadline = offerMinimumDeadline
  330. m.offerQueueMutex.Unlock()
  331. }
  332. // Start starts running the Matcher. The Matcher runs a goroutine which
  333. // matches announcements and offers.
  334. func (m *Matcher) Start() error {
  335. m.runMutex.Lock()
  336. defer m.runMutex.Unlock()
  337. if m.runContext != nil {
  338. return errors.TraceNew("already running")
  339. }
  340. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  341. m.waitGroup.Add(1)
  342. go func() {
  343. defer m.waitGroup.Done()
  344. m.matchWorker(m.runContext)
  345. }()
  346. return nil
  347. }
  348. // Stop stops running the Matcher and its worker goroutine.
  349. //
  350. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  351. // can get enqueued during and after a Stop call. Stop is intended more for a
  352. // full broker shutdown, where this won't be a concern.
  353. func (m *Matcher) Stop() {
  354. m.runMutex.Lock()
  355. defer m.runMutex.Unlock()
  356. m.stopRunning()
  357. m.waitGroup.Wait()
  358. m.runContext, m.stopRunning = nil, nil
  359. }
  360. // Announce enqueues the proxy announcement and blocks until it is matched
  361. // with a returned offer or ctx is done. The caller must not mutate the
  362. // announcement or its properties after calling Announce.
  363. //
  364. // Announce assumes that the ctx.Deadline for each call is monotonically
  365. // increasing and that the deadline can be used as part of selecting the next
  366. // nearest-to-expire announcement.
  367. //
  368. // The offer is sent to the proxy by the broker, and then the proxy sends its
  369. // answer back to the broker, which calls Answer with that value.
  370. //
  371. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  372. // match is made, even if there is a later error.
  373. func (m *Matcher) Announce(
  374. ctx context.Context,
  375. proxyIP string,
  376. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  377. // An announcement must specify exactly one compartment ID, of one type,
  378. // common or personal. The limit of one is currently a limitation of the
  379. // multi-queue implementation; see comment in
  380. // announcementMultiQueue.enqueue.
  381. compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
  382. if len(compartmentIDs) == 0 {
  383. compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
  384. } else if len(proxyAnnouncement.Properties.PersonalCompartmentIDs) > 0 {
  385. return nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  386. }
  387. if len(compartmentIDs) != 1 {
  388. return nil, nil, errors.TraceNew("unexpected compartment ID count")
  389. }
  390. isAnnouncement := true
  391. err := m.applyLoadLimit(isAnnouncement)
  392. if err != nil {
  393. return nil, nil, errors.Trace(err)
  394. }
  395. announcementEntry := &announcementEntry{
  396. ctx: ctx,
  397. limitIP: getRateLimitIP(proxyIP),
  398. announcement: proxyAnnouncement,
  399. offerChan: make(chan *MatchOffer, 1),
  400. }
  401. err = m.addAnnouncementEntry(announcementEntry)
  402. if err != nil {
  403. return nil, nil, errors.Trace(err)
  404. }
  405. // Await client offer.
  406. var clientOffer *MatchOffer
  407. select {
  408. case <-ctx.Done():
  409. m.removeAnnouncementEntry(true, announcementEntry)
  410. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  411. case clientOffer = <-announcementEntry.offerChan:
  412. }
  413. return clientOffer, announcementEntry.getMatchMetrics(), nil
  414. }
  415. var errOfferDropped = std_errors.New("offer dropped")
  416. // Offer enqueues the client offer and blocks until it is matched with a
  417. // returned announcement or ctx is done. The caller must not mutate the offer
  418. // or its properties after calling Announce.
  419. //
  420. // The answer is returned to the client by the broker, and the WebRTC
  421. // connection is dialed. The original announcement is also returned, so its
  422. // match properties can be logged.
  423. //
  424. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  425. // match is made, even if there is a later error.
  426. func (m *Matcher) Offer(
  427. ctx context.Context,
  428. clientIP string,
  429. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  430. // An offer must specify at least one compartment ID, and may only specify
  431. // one type, common or personal, of compartment IDs.
  432. compartmentIDs := clientOffer.Properties.CommonCompartmentIDs
  433. if len(compartmentIDs) == 0 {
  434. compartmentIDs = clientOffer.Properties.PersonalCompartmentIDs
  435. } else if len(clientOffer.Properties.PersonalCompartmentIDs) > 0 {
  436. return nil, nil, nil, errors.TraceNew("unexpected multiple compartment ID types")
  437. }
  438. if len(compartmentIDs) < 1 {
  439. return nil, nil, nil, errors.TraceNew("unexpected missing compartment IDs")
  440. }
  441. isAnnouncement := false
  442. err := m.applyLoadLimit(isAnnouncement)
  443. if err != nil {
  444. return nil, nil, nil, errors.Trace(err)
  445. }
  446. offerEntry := &offerEntry{
  447. ctx: ctx,
  448. limitIP: getRateLimitIP(clientIP),
  449. offer: clientOffer,
  450. answerChan: make(chan *answerInfo, 1),
  451. }
  452. err = m.addOfferEntry(offerEntry)
  453. if err != nil {
  454. return nil, nil, nil, errors.Trace(err)
  455. }
  456. // Await proxy answer.
  457. var proxyAnswerInfo *answerInfo
  458. select {
  459. case <-ctx.Done():
  460. m.removeOfferEntry(true, offerEntry)
  461. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  462. // the Offer ctx, the client request, timeout, so it will eventually
  463. // get removed. But a client may abort its request earlier than the
  464. // timeout.
  465. return nil, nil,
  466. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  467. case proxyAnswerInfo = <-offerEntry.answerChan:
  468. }
  469. if proxyAnswerInfo == nil {
  470. // nil will be delivered to the channel when either the proxy
  471. // announcement request concurrently timed out, or the answer
  472. // indicated a proxy error, or the answer did not arrive in time.
  473. return nil, nil,
  474. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  475. }
  476. if proxyAnswerInfo.offerDropped {
  477. return nil, nil,
  478. offerEntry.getMatchMetrics(), errOfferDropped
  479. }
  480. // This is a sanity check and not expected to fail.
  481. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  482. proxyAnswerInfo.announcement.ConnectionID) {
  483. return nil, nil,
  484. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  485. }
  486. return proxyAnswerInfo.answer,
  487. proxyAnswerInfo.announcement,
  488. offerEntry.getMatchMetrics(),
  489. nil
  490. }
  491. var errNoPendingAnswer = std_errors.New("no pending answer")
  492. // AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
  493. // announcement identified by the specified proxy ID and connection ID and
  494. // returns whether the announcement has personal compartment IDs, indicating
  495. // personal pairing mode.
  496. //
  497. // If no pending answer is found, an error is returned.
  498. func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
  499. proxyID ID, connectionID ID) (bool, error) {
  500. key := m.pendingAnswerKey(proxyID, connectionID)
  501. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  502. if !ok {
  503. // The input IDs don't correspond to a pending answer, or the client
  504. // is no longer awaiting the response.
  505. return false, errors.Trace(errNoPendingAnswer)
  506. }
  507. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  508. hasPersonalCompartmentIDs := len(
  509. pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
  510. return hasPersonalCompartmentIDs, nil
  511. }
  512. // Answer delivers an answer from the proxy for a previously matched offer.
  513. // The ProxyID and ConnectionID must correspond to the original announcement.
  514. // The caller must not mutate the answer after calling Answer. Answer does
  515. // not block.
  516. //
  517. // The answer is returned to the awaiting Offer call and sent to the matched
  518. // client.
  519. func (m *Matcher) Answer(
  520. proxyAnswer *MatchAnswer) error {
  521. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  522. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  523. if !ok {
  524. // The input IDs don't correspond to a pending answer, or the client
  525. // is no longer awaiting the response.
  526. return errors.Trace(errNoPendingAnswer)
  527. }
  528. m.pendingAnswers.Delete(key)
  529. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  530. pendingAnswer.answerChan <- &answerInfo{
  531. announcement: pendingAnswer.announcement,
  532. answer: proxyAnswer,
  533. }
  534. return nil
  535. }
  536. // AnswerError delivers a failed answer indication from the proxy to an
  537. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  538. // original announcement.
  539. //
  540. // The failure indication is returned to the awaiting Offer call and sent to
  541. // the matched client.
  542. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  543. key := m.pendingAnswerKey(proxyID, connectionID)
  544. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  545. if !ok {
  546. // The client is no longer awaiting the response.
  547. return
  548. }
  549. m.pendingAnswers.Delete(key)
  550. // Closing the channel delivers nil, a failed indicator, to any receiver.
  551. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  552. }
  553. // matchWorker is the matching worker goroutine. It idles until signaled that
  554. // a queue item has been added, and then runs a full matching pass.
  555. func (m *Matcher) matchWorker(ctx context.Context) {
  556. for {
  557. select {
  558. case <-m.matchSignal:
  559. m.matchAllOffers()
  560. case <-ctx.Done():
  561. return
  562. }
  563. }
  564. }
  565. // matchAllOffers iterates over the queues, making all possible matches.
  566. func (m *Matcher) matchAllOffers() {
  567. // Include lock acquisition time in MatchDuration metric.
  568. startTime := time.Now()
  569. m.announcementQueueMutex.Lock()
  570. defer m.announcementQueueMutex.Unlock()
  571. m.offerQueueMutex.Lock()
  572. defer m.offerQueueMutex.Unlock()
  573. // Take each offer in turn, and select an announcement match. There is an
  574. // implicit preference for older client offers, sooner to timeout, at the
  575. // front of the queue.
  576. // TODO: consider matching one offer, then releasing the locks to allow
  577. // more announcements to be enqueued, then continuing to match.
  578. nextOffer := m.offerQueue.Front()
  579. offerIndex := -1
  580. for nextOffer != nil && m.announcementQueue.getLen() > 0 {
  581. offerIndex += 1
  582. // nextOffer.Next must be invoked before any removeOfferEntry since
  583. // container/list.remove clears list.Element.next.
  584. offer := nextOffer
  585. nextOffer = nextOffer.Next()
  586. offerEntry := offer.Value.(*offerEntry)
  587. // Skip and remove this offer if its deadline has already passed or
  588. // the context is canceled. There is no signal to the awaiting Offer
  589. // function, as it will exit based on the same ctx.
  590. if offerEntry.ctx.Err() != nil {
  591. m.removeOfferEntry(false, offerEntry)
  592. continue
  593. }
  594. offerDeadline, _ := offerEntry.ctx.Deadline()
  595. untilOfferDeadline := time.Until(offerDeadline)
  596. // Drop this offer if it no longer has a sufficient remaining deadline
  597. // for the proxy answer phase. This case signals Offer's answerChan
  598. // so it can return immediately.
  599. if m.offerMinimumDeadline > 0 &&
  600. untilOfferDeadline < m.offerMinimumDeadline {
  601. m.removeOfferEntry(false, offerEntry)
  602. offerEntry.answerChan <- &answerInfo{offerDropped: true}
  603. continue
  604. }
  605. announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
  606. if announcementEntry == nil {
  607. continue
  608. }
  609. // Record match metrics.
  610. // The index metrics predate the announcement multi-queue; now, with
  611. // the multi-queue, announcement_index is how many announce entries
  612. // were inspected before matching.
  613. //
  614. // MatchDuration is intended to capture a sample of matchAllOffer
  615. // processing time without resorting to new log events.
  616. // Limitation: MatchDuration does not fully represent matchAllOffer
  617. // elapsed time since it doesn't include passes with no matches or
  618. // time spent failing to match after the last match in the pass.
  619. matchMetrics := &MatchMetrics{
  620. OfferDeadline: untilOfferDeadline,
  621. OfferMatchIndex: offerIndex,
  622. OfferQueueSize: m.offerQueue.Len(),
  623. AnnouncementMatchIndex: announcementMatchIndex,
  624. AnnouncementQueueSize: m.announcementQueue.getLen(),
  625. PendingAnswersSize: m.pendingAnswers.ItemCount(),
  626. MatchDuration: time.Since(startTime),
  627. }
  628. offerEntry.matchMetrics.Store(matchMetrics)
  629. announcementEntry.matchMetrics.Store(matchMetrics)
  630. // Remove the matched announcement from the queue. Send the offer to
  631. // the announcement entry's offerChan, which will deliver it to the
  632. // blocked Announce call. Add a pending answers entry to await the
  633. // proxy's follow up Answer call. The TTL for the pending answer
  634. // entry is set to the matched Offer call's ctx, as the answer is
  635. // only useful as long as the client is still waiting.
  636. m.removeAnnouncementEntry(false, announcementEntry)
  637. expiry := lrucache.DefaultExpiration
  638. deadline, ok := offerEntry.ctx.Deadline()
  639. if ok {
  640. expiry = time.Until(deadline)
  641. }
  642. key := m.pendingAnswerKey(
  643. announcementEntry.announcement.ProxyID,
  644. announcementEntry.announcement.ConnectionID)
  645. m.pendingAnswers.Set(
  646. key,
  647. &pendingAnswer{
  648. announcement: announcementEntry.announcement,
  649. answerChan: offerEntry.answerChan,
  650. },
  651. expiry)
  652. announcementEntry.offerChan <- offerEntry.offer
  653. // Remove the matched offer from the queue and match the next offer,
  654. // now first in the queue.
  655. m.removeOfferEntry(false, offerEntry)
  656. }
  657. }
  658. func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
  659. // Assumes the caller has the queue mutexes locked.
  660. // Check each candidate announcement in turn, and select a match. There is
  661. // an implicit preference for older proxy announcements, sooner to
  662. // timeout, at the front of the enqueued announcements.
  663. // announcementMultiQueue.startMatching skips to the first matching
  664. // compartment ID(s).
  665. //
  666. // Limitation: since this logic matches each enqueued client in turn, it will
  667. // only make the optimal NAT match for the oldest enqueued client vs. all
  668. // proxies, and not do optimal N x M matching for all clients and all proxies.
  669. //
  670. // Future matching enhancements could include more sophisticated GeoIP
  671. // rules, such as a configuration encoding knowledge of an ASN's NAT
  672. // type, or preferred client/proxy country/ASN matches.
  673. offerProperties := &offerEntry.offer.Properties
  674. // Assumes the caller checks that offer specifies either personal
  675. // compartment IDs or common compartment IDs, but not both.
  676. isCommonCompartments := false
  677. compartmentIDs := offerProperties.PersonalCompartmentIDs
  678. if len(compartmentIDs) == 0 {
  679. isCommonCompartments = true
  680. compartmentIDs = offerProperties.CommonCompartmentIDs
  681. }
  682. if len(compartmentIDs) == 0 {
  683. return nil, -1
  684. }
  685. matchIterator := m.announcementQueue.startMatching(
  686. isCommonCompartments, compartmentIDs)
  687. // Use the NAT traversal type counters to check if there's any preferred
  688. // NAT match for this offer in the announcement queue. When there is, we
  689. // will search beyond the first announcement.
  690. unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount :=
  691. matchIterator.getNATCounts()
  692. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  693. unlimitedNATCount > 0,
  694. partiallyLimitedNATCount > 0,
  695. strictlyLimitedNATCount > 0)
  696. // TODO: add an ExistsCompatibleProtocolVersionMatch check?
  697. //
  698. // Currently, searching for protocol version support that doesn't exist
  699. // may be mitigated by limiting, through tactics, client protocol options
  700. // selection; using the proxy protocol version in PrioritizeProxy; and,
  701. // ultimately, increasing MinimumProxyProtocolVersion.
  702. var bestMatch *announcementEntry
  703. bestMatchIndex := -1
  704. bestMatchIsPriority := false
  705. bestMatchNAT := false
  706. // matcherMaxProbe limits the linear search through the announcement queue
  707. // to find a match. Currently, the queue implementation provides
  708. // constant-time lookup for matching compartment IDs. Other matching
  709. // aspects may require iterating over the queue items, including the
  710. // strict same-country and ASN constraint and protocol version
  711. // compatibility constraint. Best NAT match is not a strict constraint
  712. // and uses a shorter search limit, matcherMaxPreferredNATProbe.
  713. candidateIndex := -1
  714. for candidateIndex <= matcherMaxProbe {
  715. announcementEntry, isPriority := matchIterator.getNext()
  716. if announcementEntry == nil {
  717. break
  718. }
  719. if !isPriority && bestMatchIsPriority {
  720. // There is a priority match, but it wasn't bestMatchNAT and we
  721. // continued to iterate. Now that isPriority is false, we're past the
  722. // end of the priority items, so stop looking for any best NAT match
  723. // and return the previous priority match. When there are zero
  724. // priority items to begin with, this case should not be hit.
  725. break
  726. }
  727. candidateIndex += 1
  728. // Skip and remove this announcement if its deadline has already
  729. // passed. There is no signal to the awaiting Announce function, as
  730. // it will exit based on the same ctx.
  731. if announcementEntry.ctx.Err() != nil {
  732. m.removeAnnouncementEntry(false, announcementEntry)
  733. continue
  734. }
  735. announcementProperties := &announcementEntry.announcement.Properties
  736. // Don't match unless the proxy announcement, client offer, and the
  737. // client's selected protocol options are compatible. UseMediaStreams
  738. // requires at least ProtocolVersion2.
  739. _, ok := negotiateProtocolVersion(
  740. announcementProperties.ProtocolVersion,
  741. offerProperties.ProtocolVersion,
  742. offerEntry.offer.UseMediaStreams)
  743. if !ok {
  744. continue
  745. }
  746. // Disallow matching the same country and ASN, or GeoIP combinations
  747. // prohibited by the AllowMatch callback, except for personal
  748. // compartment ID matches.
  749. //
  750. // For common matching, hopping through the same ISP is assumed to
  751. // have no circumvention benefit. For personal matching, the user may
  752. // wish to hop their their own or their friend's proxy regardless.
  753. if isCommonCompartments {
  754. if !GetAllowCommonASNMatching() &&
  755. (offerProperties.GeoIPData.Country ==
  756. announcementProperties.GeoIPData.Country &&
  757. offerProperties.GeoIPData.ASN ==
  758. announcementProperties.GeoIPData.ASN) {
  759. continue
  760. }
  761. if !m.config.AllowMatch(
  762. announcementProperties.GeoIPData,
  763. offerProperties.GeoIPData) {
  764. continue
  765. }
  766. }
  767. // Currently, there is no check or preference that the offer and
  768. // announce have at least one of hasIPv4 or hasIPv6 in common, as
  769. // clients are allowed to offer with no candidates and IPv6
  770. // transition mechanisms such as 4in6/6in4/464XLAT/Teredo/etc. may be
  771. // available.
  772. // Check if this is a preferred NAT match. Ultimately, a match may be
  773. // made with potentially incompatible NATs, but the client/proxy
  774. // reported NAT types may be incorrect or unknown; the client will
  775. // often skip NAT discovery.
  776. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  777. // Use proxy ASN quality as an alternative to preferred NAT matches.
  778. //
  779. // The NAT matching logic depends on RFC5780 NAT discovery test
  780. // results, which may not be entirely accurate, and may not be
  781. // available in the first place, especially if skipped for clients,
  782. // which is the default.
  783. //
  784. // Proxy ASN quality leverages the quality data, provided by servers,
  785. // indicating that the particular proxy recently relayed a successful
  786. // tunnel for some client in the given ASN. When this quality data is
  787. // present, NAT compatibility is assumed, with the caveat that the
  788. // client device and immediate router may not be the same.
  789. //
  790. // Limitations:
  791. // - existsPreferredNATMatch doesn't reflect existence of matching
  792. // proxy ASN quality, so the NAT match probe can end prematurely.
  793. // - IsPreferredNATMatch currently takes precedence over proxy ASN
  794. // quality.
  795. if !matchNAT && isPriority {
  796. matchNAT = m.config.ProxyQualityState.HasQuality(
  797. announcementEntry.announcement.ProxyID,
  798. announcementEntry.announcement.Properties.GeoIPData.ASN,
  799. offerProperties.GeoIPData.ASN)
  800. }
  801. // At this point, the candidate is a match. Determine if this is a new
  802. // best match, either if there was no previous match, or this is a
  803. // better NAT match.
  804. if bestMatch == nil || (!bestMatchNAT && matchNAT) {
  805. bestMatch = announcementEntry
  806. bestMatchIndex = candidateIndex
  807. bestMatchIsPriority = isPriority
  808. bestMatchNAT = matchNAT
  809. }
  810. // Stop as soon as we have the best possible match, or have reached
  811. // the probe limit for preferred NAT matches.
  812. if bestMatch != nil && (bestMatchNAT ||
  813. !existsPreferredNATMatch ||
  814. candidateIndex-bestMatchIndex >= matcherMaxPreferredNATProbe) {
  815. break
  816. }
  817. }
  818. return bestMatch, bestMatchIndex
  819. }
  820. // applyLoadLimit checks if the broker process is in the load limiting state
  821. // and, in order to reduce load, determines if new proxy announces or client
  822. // offers should be rejected immediately instead of enqueued.
  823. func (m *Matcher) applyLoadLimit(isAnnouncement bool) error {
  824. if m.config.IsLoadLimiting == nil || !m.config.IsLoadLimiting() {
  825. return nil
  826. }
  827. // Acquire the queue locks only when in the load limit state, and in the
  828. // same order as matchAllOffers.
  829. m.announcementQueueMutex.Lock()
  830. defer m.announcementQueueMutex.Unlock()
  831. m.offerQueueMutex.Lock()
  832. defer m.offerQueueMutex.Unlock()
  833. announcementLen := m.announcementQueue.getLen()
  834. offerLen := m.offerQueue.Len()
  835. // When the load limit had been reached, and assuming the broker process
  836. // is running only an in-proxy broker, it's likely, in practise, that
  837. // only one of the two queues has hundreds of thousands of entries while
  838. // the other has few, and there are no matches clearing the queue.
  839. //
  840. // Instead of simply rejecting all enqueue requests, allow the request
  841. // type, announce or offer, that is in shorter supply as these are likely
  842. // to match and draw down the larger queue. This attempts to make
  843. // productive use of enqueued items, and also attempts to avoid simply
  844. // emptying both queues -- as will happen in any case due to timeouts --
  845. // and then have the same larger queue refill again after the load limit
  846. // state exits.
  847. //
  848. // This approach assumes some degree of slack in available system memory
  849. // and CPU in the load limiting state, similar to how the tunnel server
  850. // continues to operate existing tunnels in the same state.
  851. //
  852. // The heuristic below of allowing when less than half the size of the
  853. // larger queue puts a cap on the amount the shorter queue can continue
  854. // to grow in the load limiting state, in the worst case.
  855. //
  856. // Limitation: in some scenarios that are expected to be rare, it can
  857. // happen that allowed requests don't result in a match and memory
  858. // consumption continues to grow, leading to a broker process OOM kill.
  859. var allow bool
  860. if isAnnouncement {
  861. allow = announcementLen < offerLen/2
  862. } else {
  863. allow = offerLen < announcementLen/2
  864. }
  865. if allow {
  866. return nil
  867. }
  868. // Do not return a MatcherLimitError, as is done in applyIPLimits. A
  869. // MatcherLimitError results in a Response.Limited error response, which
  870. // causes a proxy to back off and a client to abort its dial; but in
  871. // neither case is the broker client reset. The error returned here will
  872. // result in a fast 404 response to the proxy or client, which will
  873. // instead trigger a broker client reset, and a chance of moving to a
  874. // different broker that is not overloaded.
  875. //
  876. // Limitation: the 404 response won't be distinguishable, in client or
  877. // proxy diagnostics, from other error conditions.
  878. //
  879. // TODO: add a new Response.LoadLimited flag which the proxy/client can
  880. // use use log a distinct error and also ensure that it doesn't reselect
  881. // the same broker again in the broker client reset random selection.
  882. return errors.TraceNew("load limited")
  883. }
  884. // MatcherLimitError is the error type returned by Announce or Offer when the
  885. // caller has exceeded configured queue entry or rate limits.
  886. type MatcherLimitError struct {
  887. err error
  888. }
  889. func NewMatcherLimitError(err error) *MatcherLimitError {
  890. return &MatcherLimitError{err: err}
  891. }
  892. func (e MatcherLimitError) Error() string {
  893. return e.err.Error()
  894. }
  895. // applyIPLimits checks per-proxy or per-client -- as determined by peer IP
  896. // address -- rate limits and queue entry limits.
  897. func (m *Matcher) applyIPLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  898. // Assumes m.announcementQueueMutex or m.offerQueueMutex is locked.
  899. var entryCountByIP map[string]int
  900. var queueRateLimiters *lrucache.Cache
  901. var limitEntryCount int
  902. var quantity int
  903. var interval time.Duration
  904. if isAnnouncement {
  905. // Skip limit checks for non-limited proxies.
  906. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  907. return nil
  908. }
  909. entryCountByIP = m.announcementQueueEntryCountByIP
  910. queueRateLimiters = m.announcementQueueRateLimiters
  911. limitEntryCount = m.announcementLimitEntryCount
  912. quantity = m.announcementRateLimitQuantity
  913. interval = m.announcementRateLimitInterval
  914. } else {
  915. entryCountByIP = m.offerQueueEntryCountByIP
  916. queueRateLimiters = m.offerQueueRateLimiters
  917. limitEntryCount = m.offerLimitEntryCount
  918. quantity = m.offerRateLimitQuantity
  919. interval = m.offerRateLimitInterval
  920. }
  921. // The rate limit is checked first, before the max count check, to ensure
  922. // that the rate limit state is updated regardless of the max count check
  923. // outcome.
  924. err := brokerRateLimit(
  925. queueRateLimiters,
  926. limitIP,
  927. quantity,
  928. interval)
  929. if err != nil {
  930. return errors.Trace(NewMatcherLimitError(err))
  931. }
  932. if limitEntryCount > 0 {
  933. // Limitation: non-limited proxy ID entries are counted in
  934. // entryCountByIP. If both a limited and non-limited proxy ingress
  935. // from the same limitIP, then the non-limited entries will count
  936. // against the limited proxy's limitEntryCount.
  937. entryCount, ok := entryCountByIP[limitIP]
  938. if ok && entryCount >= limitEntryCount {
  939. return errors.Trace(
  940. NewMatcherLimitError(std_errors.New("max entries for IP")))
  941. }
  942. }
  943. return nil
  944. }
  945. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  946. m.announcementQueueMutex.Lock()
  947. defer m.announcementQueueMutex.Unlock()
  948. // Ensure the queue doesn't grow larger than the max size.
  949. if m.announcementQueue.getLen() >= matcherAnnouncementQueueMaxSize {
  950. return errors.TraceNew("queue full")
  951. }
  952. // Ensure no single peer IP can enqueue a large number of entries or
  953. // rapidly enqueue beyond the configured rate.
  954. isAnnouncement := true
  955. err := m.applyIPLimits(
  956. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  957. if err != nil {
  958. return errors.Trace(err)
  959. }
  960. // announcementEntry.queueReference should be uninitialized.
  961. // announcementMultiQueue.enqueue sets queueReference to be used for
  962. // efficient dequeuing.
  963. if announcementEntry.queueReference.entry != nil {
  964. return errors.TraceNew("unexpected queue reference")
  965. }
  966. err = m.announcementQueue.enqueue(announcementEntry)
  967. if err != nil {
  968. return errors.Trace(err)
  969. }
  970. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  971. select {
  972. case m.matchSignal <- struct{}{}:
  973. default:
  974. }
  975. return nil
  976. }
  977. func (m *Matcher) removeAnnouncementEntry(aborting bool, announcementEntry *announcementEntry) {
  978. // In the aborting case, the queue isn't already locked. Otherwise, assume
  979. // it is locked.
  980. if aborting {
  981. m.announcementQueueMutex.Lock()
  982. defer m.announcementQueueMutex.Unlock()
  983. }
  984. found := announcementEntry.queueReference.dequeue()
  985. if found {
  986. // Adjust entry counts by peer IP, used to enforce
  987. // matcherAnnouncementQueueMaxEntriesPerIP.
  988. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  989. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  990. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  991. }
  992. }
  993. if aborting && !found {
  994. // The Announce call is aborting and taking its entry back out of the
  995. // queue. If the entry is not found in the queue, then a concurrent
  996. // Offer has matched the announcement. So check for the pending
  997. // answer corresponding to the announcement and remove it and deliver
  998. // a failure signal to the waiting Offer, so the client doesn't wait
  999. // longer than necessary.
  1000. key := m.pendingAnswerKey(
  1001. announcementEntry.announcement.ProxyID,
  1002. announcementEntry.announcement.ConnectionID)
  1003. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  1004. if ok {
  1005. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  1006. m.pendingAnswers.Delete(key)
  1007. }
  1008. }
  1009. }
  1010. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  1011. m.offerQueueMutex.Lock()
  1012. defer m.offerQueueMutex.Unlock()
  1013. // Ensure the queue doesn't grow larger than the max size.
  1014. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  1015. return errors.TraceNew("queue full")
  1016. }
  1017. // Ensure no single peer IP can enqueue a large number of entries or
  1018. // rapidly enqueue beyond the configured rate.
  1019. isAnnouncement := false
  1020. err := m.applyIPLimits(
  1021. isAnnouncement, offerEntry.limitIP, ID{})
  1022. if err != nil {
  1023. return errors.Trace(err)
  1024. }
  1025. // offerEntry.queueReference should be uninitialized and is set here to be
  1026. // used for efficient dequeuing.
  1027. if offerEntry.queueReference != nil {
  1028. return errors.TraceNew("unexpected queue reference")
  1029. }
  1030. offerEntry.queueReference = m.offerQueue.PushBack(offerEntry)
  1031. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  1032. select {
  1033. case m.matchSignal <- struct{}{}:
  1034. default:
  1035. }
  1036. return nil
  1037. }
  1038. func (m *Matcher) removeOfferEntry(aborting bool, offerEntry *offerEntry) {
  1039. // In the aborting case, the queue isn't already locked. Otherise, assume
  1040. // it is locked.
  1041. if aborting {
  1042. m.offerQueueMutex.Lock()
  1043. defer m.offerQueueMutex.Unlock()
  1044. }
  1045. if offerEntry.queueReference == nil {
  1046. return
  1047. }
  1048. m.offerQueue.Remove(offerEntry.queueReference)
  1049. offerEntry.queueReference = nil
  1050. // Adjust entry counts by peer IP, used to enforce
  1051. // matcherOfferQueueMaxEntriesPerIP.
  1052. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  1053. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  1054. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  1055. }
  1056. }
  1057. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  1058. // The pending answer lookup key is used to associate announcements and
  1059. // subsequent answers. While the client learns the ConnectionID, only the
  1060. // proxy knows the ProxyID component, so only the correct proxy can match
  1061. // an answer to an announcement. The ConnectionID component is necessary
  1062. // as a proxy may have multiple, concurrent pending answers.
  1063. return string(proxyID[:]) + string(connectionID[:])
  1064. }
  1065. func getRateLimitIP(strIP string) string {
  1066. IP := net.ParseIP(strIP)
  1067. if IP == nil || IP.To4() != nil {
  1068. return strIP
  1069. }
  1070. // With IPv6, individual users or sites are users commonly allocated a /64
  1071. // or /56, so rate limit by /56.
  1072. return IP.Mask(net.CIDRMask(56, 128)).String()
  1073. }
  1074. // announcementMultiQueue is a set of announcement queues, one per common or
  1075. // personal compartment ID, providing efficient iteration over announcements
  1076. // matching a specified list of compartment IDs. announcementMultiQueue and
  1077. // its underlying data structures are not safe for concurrent access.
  1078. type announcementMultiQueue struct {
  1079. priorityCommonCompartmentQueues map[ID]*announcementCompartmentQueue
  1080. commonCompartmentQueues map[ID]*announcementCompartmentQueue
  1081. personalCompartmentQueues map[ID]*announcementCompartmentQueue
  1082. totalEntries int
  1083. iterator *announcementMatchIterator
  1084. }
  1085. // announcementCompartmentQueue is a single compartment queue within an
  1086. // announcementMultiQueue. The queue is implemented using a doubly-linked
  1087. // list, which provides efficient insert and mid-queue dequeue operations.
  1088. // The announcementCompartmentQueue also records NAT type stats for enqueued
  1089. // announcements, which are used, when matching, to determine when better NAT
  1090. // matches may be possible.
  1091. type announcementCompartmentQueue struct {
  1092. isCommonCompartment bool
  1093. isPriority bool
  1094. compartmentID ID
  1095. entries *list.List
  1096. unlimitedNATCount int
  1097. partiallyLimitedNATCount int
  1098. strictlyLimitedNATCount int
  1099. }
  1100. // announcementMatchIterator represents the state of an iteration over a
  1101. // subset of announcementMultiQueue compartment queues. Concurrent
  1102. // announcementMatchIterators are not supported.
  1103. type announcementMatchIterator struct {
  1104. multiQueue *announcementMultiQueue
  1105. compartmentQueues []*announcementCompartmentQueue
  1106. nextEntries []*list.Element
  1107. }
  1108. // announcementQueueReference represents the queue position for a given
  1109. // announcement entry, and provides an efficient dequeue operation.
  1110. type announcementQueueReference struct {
  1111. multiQueue *announcementMultiQueue
  1112. compartmentQueue *announcementCompartmentQueue
  1113. entry *list.Element
  1114. }
  1115. func newAnnouncementMultiQueue() *announcementMultiQueue {
  1116. q := &announcementMultiQueue{
  1117. priorityCommonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1118. commonCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1119. personalCompartmentQueues: make(map[ID]*announcementCompartmentQueue),
  1120. }
  1121. q.iterator = &announcementMatchIterator{
  1122. multiQueue: q,
  1123. }
  1124. return q
  1125. }
  1126. func (q *announcementMultiQueue) getLen() int {
  1127. return q.totalEntries
  1128. }
  1129. func (q *announcementMultiQueue) enqueue(announcementEntry *announcementEntry) error {
  1130. // Assumes announcementEntry not already enueued.
  1131. // Limitation: only one compartment ID, either common or personal, is
  1132. // supported per announcement entry. In the common compartment case, the
  1133. // broker currently assigns only one common compartment ID per proxy
  1134. // announcement. In the personal compartment case, there is currently no
  1135. // use case for allowing a proxy to announce under multiple personal
  1136. // compartment IDs.
  1137. //
  1138. // To overcome this limitation, the dequeue operation would need to be
  1139. // able to remove an announcement entry from multiple
  1140. // announcementCompartmentQueues.
  1141. commonCompartmentIDs := announcementEntry.announcement.Properties.CommonCompartmentIDs
  1142. personalCompartmentIDs := announcementEntry.announcement.Properties.PersonalCompartmentIDs
  1143. if len(commonCompartmentIDs)+len(personalCompartmentIDs) != 1 {
  1144. return errors.TraceNew("announcement must specify exactly one compartment ID")
  1145. }
  1146. isPriority := announcementEntry.announcement.Properties.IsPriority
  1147. isCommonCompartment := true
  1148. var compartmentID ID
  1149. var compartmentQueues map[ID]*announcementCompartmentQueue
  1150. if len(commonCompartmentIDs) > 0 {
  1151. compartmentID = commonCompartmentIDs[0]
  1152. compartmentQueues = q.commonCompartmentQueues
  1153. if isPriority {
  1154. compartmentQueues = q.priorityCommonCompartmentQueues
  1155. }
  1156. } else {
  1157. isCommonCompartment = false
  1158. compartmentID = personalCompartmentIDs[0]
  1159. compartmentQueues = q.personalCompartmentQueues
  1160. if isPriority {
  1161. return errors.TraceNew("priority not supported for personal compartments")
  1162. }
  1163. }
  1164. compartmentQueue, ok := compartmentQueues[compartmentID]
  1165. if !ok {
  1166. compartmentQueue = &announcementCompartmentQueue{
  1167. isCommonCompartment: isCommonCompartment,
  1168. isPriority: isPriority,
  1169. compartmentID: compartmentID,
  1170. entries: list.New(),
  1171. }
  1172. compartmentQueues[compartmentID] = compartmentQueue
  1173. }
  1174. entry := compartmentQueue.entries.PushBack(announcementEntry)
  1175. // Update the NAT type counts which are used to determine if a better NAT
  1176. // match may be made by inspecting more announcement queue entries.
  1177. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1178. case NATTraversalUnlimited:
  1179. compartmentQueue.unlimitedNATCount += 1
  1180. case NATTraversalPartiallyLimited:
  1181. compartmentQueue.partiallyLimitedNATCount += 1
  1182. case NATTraversalStrictlyLimited:
  1183. compartmentQueue.strictlyLimitedNATCount += 1
  1184. }
  1185. q.totalEntries += 1
  1186. announcementEntry.queueReference = announcementQueueReference{
  1187. multiQueue: q,
  1188. compartmentQueue: compartmentQueue,
  1189. entry: entry,
  1190. }
  1191. return nil
  1192. }
  1193. // announcementQueueReference returns false if the item is already dequeued.
  1194. func (r *announcementQueueReference) dequeue() bool {
  1195. if r.entry == nil {
  1196. // Already dequeued.
  1197. return false
  1198. }
  1199. announcementEntry := r.entry.Value.(*announcementEntry)
  1200. // Reverse the NAT type counts.
  1201. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  1202. case NATTraversalUnlimited:
  1203. r.compartmentQueue.unlimitedNATCount -= 1
  1204. case NATTraversalPartiallyLimited:
  1205. r.compartmentQueue.partiallyLimitedNATCount -= 1
  1206. case NATTraversalStrictlyLimited:
  1207. r.compartmentQueue.strictlyLimitedNATCount -= 1
  1208. }
  1209. r.compartmentQueue.entries.Remove(r.entry)
  1210. if r.compartmentQueue.entries.Len() == 0 {
  1211. // Remove empty compartment queue.
  1212. queues := r.multiQueue.personalCompartmentQueues
  1213. if r.compartmentQueue.isCommonCompartment {
  1214. if r.compartmentQueue.isPriority {
  1215. queues = r.multiQueue.priorityCommonCompartmentQueues
  1216. } else {
  1217. queues = r.multiQueue.commonCompartmentQueues
  1218. }
  1219. }
  1220. delete(queues, r.compartmentQueue.compartmentID)
  1221. }
  1222. r.multiQueue.totalEntries -= 1
  1223. // Mark as dequeued.
  1224. r.entry = nil
  1225. return true
  1226. }
  1227. // startMatching returns a newly initialized announcementMatchIterator for
  1228. // queues corresponding to the specified compartment IDs.
  1229. //
  1230. // In order to reduce allocation and garbage collection churn from
  1231. // matchAllOffers/matchOffer repeatedly calling startMatching, a single
  1232. // announcementMatchIterator instance is retained and reused by all
  1233. // startMatching calls. It is not safe to concurrently call startMatching or
  1234. // concurrently use the returned announcementMatchIterator or retain the
  1235. // returned announcementMatchIterator between startMatching calls.
  1236. func (q *announcementMultiQueue) startMatching(
  1237. isCommonCompartments bool,
  1238. compartmentIDs []ID) *announcementMatchIterator {
  1239. // Reset the reused announcementMatchIterator fields, including clearing
  1240. // any references, while retaining the allocated slice capacity.
  1241. iter := q.iterator
  1242. clear(iter.compartmentQueues)
  1243. iter.compartmentQueues = iter.compartmentQueues[:0]
  1244. clear(iter.nextEntries)
  1245. iter.nextEntries = iter.nextEntries[:0]
  1246. // Find the matching compartment queues and initialize iteration over
  1247. // those queues. Building the set of matching queues is a linear time
  1248. // operation, bounded by the length of compartmentIDs (no more than
  1249. // maxCompartmentIDs, as enforced in
  1250. // ClientOfferRequest.ValidateAndGetLogFields).
  1251. // Priority queues, when in use, must all be added to the beginning of
  1252. // iter.compartmentQueues in order to ensure that the iteration logic in
  1253. // getNext visits all priority items first.
  1254. var compartmentQueuesList []map[ID]*announcementCompartmentQueue
  1255. if isCommonCompartments {
  1256. compartmentQueuesList = append(
  1257. compartmentQueuesList,
  1258. q.priorityCommonCompartmentQueues,
  1259. q.commonCompartmentQueues)
  1260. } else {
  1261. compartmentQueuesList = append(
  1262. compartmentQueuesList,
  1263. q.personalCompartmentQueues)
  1264. }
  1265. for _, compartmentQueues := range compartmentQueuesList {
  1266. for _, ID := range compartmentIDs {
  1267. if compartmentQueue, ok := compartmentQueues[ID]; ok {
  1268. iter.compartmentQueues = append(iter.compartmentQueues, compartmentQueue)
  1269. iter.nextEntries = append(iter.nextEntries, compartmentQueue.entries.Front())
  1270. }
  1271. }
  1272. }
  1273. return iter
  1274. }
  1275. func (iter *announcementMatchIterator) getNATCounts() (int, int, int) {
  1276. // Return the count of NAT types across all matchable compartment queues.
  1277. //
  1278. // A potential future enhancement would be to provide per-queue NAT counts
  1279. // or NAT type indexing in order to quickly find preferred NAT matches.
  1280. unlimitedNATCount := 0
  1281. partiallyLimitedNATCount := 0
  1282. strictlyLimitedNATCount := 0
  1283. for _, compartmentQueue := range iter.compartmentQueues {
  1284. unlimitedNATCount += compartmentQueue.unlimitedNATCount
  1285. partiallyLimitedNATCount += compartmentQueue.partiallyLimitedNATCount
  1286. strictlyLimitedNATCount += compartmentQueue.strictlyLimitedNATCount
  1287. }
  1288. return unlimitedNATCount, partiallyLimitedNATCount, strictlyLimitedNATCount
  1289. }
  1290. // announcementMatchIterator returns the next announcement entry candidate in
  1291. // compartment queue FIFO order, selecting the queue with the oldest head
  1292. // item.
  1293. //
  1294. // The caller should invoke announcementEntry.queueReference.dequeue when the
  1295. // candidate is selected. dequeue may be called on any getNext return value
  1296. // without disrupting the iteration state; however,
  1297. // announcementEntry.queueReference.dequeue calls for arbitrary queue entries
  1298. // are not supported during iteration. Iteration and dequeue should all be
  1299. // performed with a lock over the entire announcementMultiQueue, and with
  1300. // only one concurrent announcementMatchIterator.
  1301. //
  1302. // getNext returns a nil *announcementEntry when there are no more items.
  1303. // getNext also returns an isPriority flag, indicating the announcement is a
  1304. // priority candidate. All priority candidates are guaranteed to be returned
  1305. // before any non-priority candidates.
  1306. func (iter *announcementMatchIterator) getNext() (*announcementEntry, bool) {
  1307. // Assumes announcements are enqueued in announcementEntry.ctx.Deadline
  1308. // order. Also assumes that any priority queues are all at the front of
  1309. // iter.compartmentQueues.
  1310. // Select the oldest item, by deadline, from all the candidate queue head
  1311. // items. This operation is linear in the number of matching compartment
  1312. // ID queues, which is currently bounded by the length of matching
  1313. // compartment IDs (no more than maxCompartmentIDs, as enforced in
  1314. // ClientOfferRequest.ValidateAndGetLogFields).
  1315. //
  1316. // When there are priority candidates, they are selected first, regardless
  1317. // of the deadlines of non-priority candidates. Multiple priority
  1318. // candidates are processed in FIFO deadline order.
  1319. //
  1320. // A potential future enhancement is to add more iterator state to track
  1321. // which queue has the next oldest time to select on the following
  1322. // getNext call. Another potential enhancement is to remove fully
  1323. // consumed queues from compartmentQueues/nextEntries.
  1324. var selectedCandidate *announcementEntry
  1325. selectedIndex := -1
  1326. selectedPriority := false
  1327. for i := 0; i < len(iter.compartmentQueues); i++ {
  1328. if iter.nextEntries[i] == nil {
  1329. continue
  1330. }
  1331. isPriority := iter.compartmentQueues[i].isPriority
  1332. if selectedPriority && !isPriority {
  1333. // Ignore older of non-priority entries when there are priority
  1334. // candidates.
  1335. break
  1336. }
  1337. if selectedCandidate == nil {
  1338. selectedCandidate = iter.nextEntries[i].Value.(*announcementEntry)
  1339. selectedIndex = i
  1340. selectedPriority = isPriority
  1341. } else {
  1342. candidate := iter.nextEntries[i].Value.(*announcementEntry)
  1343. deadline, deadlineOk := candidate.ctx.Deadline()
  1344. selectedDeadline, selectedDeadlineOk := selectedCandidate.ctx.Deadline()
  1345. if deadlineOk && selectedDeadlineOk && deadline.Before(selectedDeadline) {
  1346. selectedCandidate = candidate
  1347. selectedIndex = i
  1348. selectedPriority = isPriority
  1349. }
  1350. }
  1351. }
  1352. // Advance the selected queue to the next element. This must be done
  1353. // before any dequeue call, since container/list.remove clears
  1354. // list.Element.next.
  1355. if selectedIndex != -1 {
  1356. iter.nextEntries[selectedIndex] = iter.nextEntries[selectedIndex].Next()
  1357. }
  1358. return selectedCandidate, selectedPriority
  1359. }