matcher.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. std_errors "errors"
  23. "net"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  29. lrucache "github.com/cognusion/go-cache-lru"
  30. "github.com/gammazero/deque"
  31. "golang.org/x/time/rate"
  32. )
  33. // TTLs should be aligned with STUN hole punch lifetimes.
  34. const (
  35. matcherAnnouncementQueueMaxSize = 5000000
  36. matcherOfferQueueMaxSize = 5000000
  37. matcherPendingAnswersTTL = 30 * time.Second
  38. matcherPendingAnswersMaxSize = 100000
  39. matcherRateLimiterReapHistoryFrequencySeconds = 300
  40. matcherRateLimiterMaxCacheEntries = 1000000
  41. )
  42. // Matcher matches proxy announcements with client offers. Matcher also
  43. // coordinates pending proxy answers and routes answers to the awaiting
  44. // client offer handler.
  45. //
  46. // Matching prioritizes selecting the oldest announcements and client offers,
  47. // as they are closest to timing out.
  48. //
  49. // The client and proxy must supply matching personal or common compartment
  50. // IDs. Personal compartment matching is preferred. Common compartments are
  51. // managed by Psiphon and can be obtained via a tactics parameter or via an
  52. // OSL embedding.
  53. //
  54. // A client may opt form personal-only matching by not supplying any common
  55. // compartment IDs.
  56. //
  57. // Matching prefers to pair proxies and clients in a way that maximizes total
  58. // possible matches. For a client or proxy with less-limited NAT traversal, a
  59. // pairing with more-limited NAT traversal is preferred; and vice versa.
  60. // Candidates with unknown NAT types and mobile network types are assumed to
  61. // have the most limited NAT traversal capability.
  62. //
  63. // Preferred matchings take priority over announcement age.
  64. //
  65. // The client and proxy will not match if they are in the same country and
  66. // ASN, as it's assumed that doesn't provide any blocking circumvention
  67. // benefit. Disallowing proxies in certain blocked countries is handled at a
  68. // higher level; any such proxies should not be enqueued for matching.
  69. type Matcher struct {
  70. config *MatcherConfig
  71. runMutex sync.Mutex
  72. runContext context.Context
  73. stopRunning context.CancelFunc
  74. waitGroup *sync.WaitGroup
  75. // The announcement queue is implicitly sorted by announcement age. The
  76. // count fields are used to skip searching deeper into the queue for
  77. // preferred matches.
  78. // TODO: replace queue and counts with an indexed, in-memory database?
  79. announcementQueueMutex sync.Mutex
  80. announcementQueue *deque.Deque[*announcementEntry]
  81. announcementQueueEntryCountByIP map[string]int
  82. announcementQueueRateLimiters *lrucache.Cache
  83. announcementLimitEntryCount int
  84. announcementRateLimitQuantity int
  85. announcementRateLimitInterval time.Duration
  86. announcementNonlimitedProxyIDs map[ID]struct{}
  87. announcementsPersonalCompartmentalizedCount int
  88. announcementsUnlimitedNATCount int
  89. announcementsPartiallyLimitedNATCount int
  90. announcementsStrictlyLimitedNATCount int
  91. // The offer queue is also implicitly sorted by offer age. Both an offer
  92. // and announcement queue are required since either announcements or
  93. // offers can arrive while there are no available pairings.
  94. offerQueueMutex sync.Mutex
  95. offerQueue *deque.Deque[*offerEntry]
  96. offerQueueEntryCountByIP map[string]int
  97. offerQueueRateLimiters *lrucache.Cache
  98. offerLimitEntryCount int
  99. offerRateLimitQuantity int
  100. offerRateLimitInterval time.Duration
  101. matchSignal chan struct{}
  102. pendingAnswers *lrucache.Cache
  103. }
  104. // MatchProperties specifies the compartment, GeoIP, and network topology
  105. // matching roperties of clients and proxies.
  106. type MatchProperties struct {
  107. CommonCompartmentIDs []ID
  108. PersonalCompartmentIDs []ID
  109. GeoIPData common.GeoIPData
  110. NetworkType NetworkType
  111. NATType NATType
  112. PortMappingTypes PortMappingTypes
  113. }
  114. // EffectiveNATType combines the set of network properties into an effective
  115. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  116. // traversal is assumed. When NAT type is unknown and the network type is
  117. // mobile, CGNAT with limited NAT traversal is assumed.
  118. func (p *MatchProperties) EffectiveNATType() NATType {
  119. if p.PortMappingTypes.Available() {
  120. return NATTypePortMapping
  121. }
  122. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  123. // publicly reachable IPv6 ICE host candidate? If so, change the
  124. // effective NAT type? Depends on whether the matched peer can use IPv6.
  125. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  126. return NATTypeMobileNetwork
  127. }
  128. return p.NATType
  129. }
  130. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  131. // matching given the types of pairing candidates available.
  132. func (p *MatchProperties) ExistsPreferredNATMatch(
  133. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  134. return p.EffectiveNATType().ExistsPreferredMatch(
  135. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  136. }
  137. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  138. // NAT matching.
  139. func (p *MatchProperties) IsPreferredNATMatch(
  140. peerMatchProperties *MatchProperties) bool {
  141. return p.EffectiveNATType().IsPreferredMatch(
  142. peerMatchProperties.EffectiveNATType())
  143. }
  144. // IsPersonalCompartmentalized indicates whether the candidate has personal
  145. // compartment IDs.
  146. func (p *MatchProperties) IsPersonalCompartmentalized() bool {
  147. return len(p.PersonalCompartmentIDs) > 0
  148. }
  149. // MatchAnnouncement is a proxy announcement to be queued for matching.
  150. type MatchAnnouncement struct {
  151. Properties MatchProperties
  152. ProxyID ID
  153. ConnectionID ID
  154. ProxyProtocolVersion int32
  155. }
  156. // MatchOffer is a client offer to be queued for matching.
  157. type MatchOffer struct {
  158. Properties MatchProperties
  159. ClientProxyProtocolVersion int32
  160. ClientOfferSDP WebRTCSessionDescription
  161. ClientRootObfuscationSecret ObfuscationSecret
  162. DoDTLSRandomization bool
  163. TrafficShapingParameters *DataChannelTrafficShapingParameters
  164. NetworkProtocol NetworkProtocol
  165. DestinationAddress string
  166. DestinationServerID string
  167. }
  168. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  169. // announcement, to be routed to the awaiting client offer.
  170. type MatchAnswer struct {
  171. ProxyIP string
  172. ProxyID ID
  173. ConnectionID ID
  174. SelectedProxyProtocolVersion int32
  175. ProxyAnswerSDP WebRTCSessionDescription
  176. }
  177. // MatchMetrics records statistics about the match queue state at the time a
  178. // match is made.
  179. type MatchMetrics struct {
  180. OfferMatchIndex int
  181. OfferQueueSize int
  182. AnnouncementMatchIndex int
  183. AnnouncementQueueSize int
  184. }
  185. // GetMetrics converts MatchMetrics to loggable fields.
  186. func (metrics *MatchMetrics) GetMetrics() common.LogFields {
  187. if metrics == nil {
  188. return nil
  189. }
  190. return common.LogFields{
  191. "offer_match_index": metrics.OfferMatchIndex,
  192. "offer_queue_size": metrics.OfferQueueSize,
  193. "announcement_match_index": metrics.AnnouncementMatchIndex,
  194. "announcement_queue_size": metrics.AnnouncementQueueSize,
  195. }
  196. }
  197. // announcementEntry is an announcement queue entry, an announcement with its
  198. // associated lifetime context and signaling channel.
  199. type announcementEntry struct {
  200. ctx context.Context
  201. limitIP string
  202. announcement *MatchAnnouncement
  203. offerChan chan *MatchOffer
  204. matchMetrics atomic.Value
  205. }
  206. func (announcementEntry *announcementEntry) getMatchMetrics() *MatchMetrics {
  207. matchMetrics, _ := announcementEntry.matchMetrics.Load().(*MatchMetrics)
  208. return matchMetrics
  209. }
  210. // offerEntry is an offer queue entry, an offer with its associated lifetime
  211. // context and signaling channel.
  212. type offerEntry struct {
  213. ctx context.Context
  214. limitIP string
  215. offer *MatchOffer
  216. answerChan chan *answerInfo
  217. matchMetrics atomic.Value
  218. }
  219. func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
  220. matchMetrics, _ := offerEntry.matchMetrics.Load().(*MatchMetrics)
  221. return matchMetrics
  222. }
  223. // answerInfo is an answer and its associated announcement.
  224. type answerInfo struct {
  225. announcement *MatchAnnouncement
  226. answer *MatchAnswer
  227. }
  228. // pendingAnswer represents an answer that is expected to arrive from a
  229. // proxy.
  230. type pendingAnswer struct {
  231. announcement *MatchAnnouncement
  232. answerChan chan *answerInfo
  233. }
  234. // MatcherConfig specifies the configuration for a matcher.
  235. type MatcherConfig struct {
  236. // Logger is used to log events.
  237. Logger common.Logger
  238. // Accouncement queue limits.
  239. AnnouncementLimitEntryCount int
  240. AnnouncementRateLimitQuantity int
  241. AnnouncementRateLimitInterval time.Duration
  242. AnnouncementNonlimitedProxyIDs []ID
  243. // Offer queue limits.
  244. OfferLimitEntryCount int
  245. OfferRateLimitQuantity int
  246. OfferRateLimitInterval time.Duration
  247. }
  248. // NewMatcher creates a new Matcher.
  249. func NewMatcher(config *MatcherConfig) *Matcher {
  250. m := &Matcher{
  251. config: config,
  252. waitGroup: new(sync.WaitGroup),
  253. announcementQueue: deque.New[*announcementEntry](),
  254. announcementQueueEntryCountByIP: make(map[string]int),
  255. announcementQueueRateLimiters: lrucache.NewWithLRU(
  256. 0,
  257. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  258. matcherRateLimiterMaxCacheEntries),
  259. offerQueue: deque.New[*offerEntry](),
  260. offerQueueEntryCountByIP: make(map[string]int),
  261. offerQueueRateLimiters: lrucache.NewWithLRU(
  262. 0,
  263. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  264. matcherRateLimiterMaxCacheEntries),
  265. matchSignal: make(chan struct{}, 1),
  266. // matcherPendingAnswersTTL is not configurable; it supplies a default
  267. // that is expected to be ignored when each entry's TTL is set to the
  268. // Offer ctx timeout.
  269. pendingAnswers: lrucache.NewWithLRU(
  270. matcherPendingAnswersTTL,
  271. 1*time.Minute,
  272. matcherPendingAnswersMaxSize),
  273. }
  274. m.SetLimits(
  275. config.AnnouncementLimitEntryCount,
  276. config.AnnouncementRateLimitQuantity,
  277. config.AnnouncementRateLimitInterval,
  278. config.AnnouncementNonlimitedProxyIDs,
  279. config.OfferLimitEntryCount,
  280. config.OfferRateLimitQuantity,
  281. config.OfferRateLimitInterval)
  282. return m
  283. }
  284. // SetLimits sets new queue limits, replacing the previous configuration.
  285. // Existing, cached rate limiters retain their existing rate limit state. New
  286. // entries will use the new quantity/interval configuration. In addition,
  287. // currently enqueued items may exceed any new, lower maximum entry count
  288. // until naturally dequeued.
  289. func (m *Matcher) SetLimits(
  290. announcementLimitEntryCount int,
  291. announcementRateLimitQuantity int,
  292. announcementRateLimitInterval time.Duration,
  293. announcementNonlimitedProxyIDs []ID,
  294. offerLimitEntryCount int,
  295. offerRateLimitQuantity int,
  296. offerRateLimitInterval time.Duration) {
  297. nonlimitedProxyIDs := make(map[ID]struct{})
  298. for _, proxyID := range announcementNonlimitedProxyIDs {
  299. nonlimitedProxyIDs[proxyID] = struct{}{}
  300. }
  301. m.announcementQueueMutex.Lock()
  302. m.announcementLimitEntryCount = announcementLimitEntryCount
  303. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  304. m.announcementRateLimitInterval = announcementRateLimitInterval
  305. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  306. m.announcementQueueMutex.Unlock()
  307. m.offerQueueMutex.Lock()
  308. m.offerLimitEntryCount = offerLimitEntryCount
  309. m.offerRateLimitQuantity = offerRateLimitQuantity
  310. m.offerRateLimitInterval = offerRateLimitInterval
  311. m.offerQueueMutex.Unlock()
  312. }
  313. // Start starts running the Matcher. The Matcher runs a goroutine which
  314. // matches announcements and offers.
  315. func (m *Matcher) Start() error {
  316. m.runMutex.Lock()
  317. defer m.runMutex.Unlock()
  318. if m.runContext != nil {
  319. return errors.TraceNew("already running")
  320. }
  321. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  322. m.waitGroup.Add(1)
  323. go func() {
  324. defer m.waitGroup.Done()
  325. m.matchWorker(m.runContext)
  326. }()
  327. return nil
  328. }
  329. // Stop stops running the Matcher and its worker goroutine.
  330. //
  331. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  332. // can get enqueued during and after a Stop call. Stop is intended more for a
  333. // full broker shutdown, where this won't be a concern.
  334. func (m *Matcher) Stop() {
  335. m.runMutex.Lock()
  336. defer m.runMutex.Unlock()
  337. m.stopRunning()
  338. m.waitGroup.Wait()
  339. m.runContext, m.stopRunning = nil, nil
  340. }
  341. // Announce enqueues the proxy announcement and blocks until it is matched
  342. // with a returned offer or ctx is done. The caller must not mutate the
  343. // announcement or its properties after calling Announce.
  344. //
  345. // The offer is sent to the proxy by the broker, and then the proxy sends its
  346. // answer back to the broker, which calls Answer with that value.
  347. //
  348. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  349. // match is made, even if there is a later error.
  350. func (m *Matcher) Announce(
  351. ctx context.Context,
  352. proxyIP string,
  353. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
  354. announcementEntry := &announcementEntry{
  355. ctx: ctx,
  356. limitIP: getRateLimitIP(proxyIP),
  357. announcement: proxyAnnouncement,
  358. offerChan: make(chan *MatchOffer, 1),
  359. }
  360. err := m.addAnnouncementEntry(announcementEntry)
  361. if err != nil {
  362. return nil, nil, errors.Trace(err)
  363. }
  364. // Await client offer.
  365. var clientOffer *MatchOffer
  366. select {
  367. case <-ctx.Done():
  368. m.removeAnnouncementEntry(announcementEntry)
  369. return nil, announcementEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  370. case clientOffer = <-announcementEntry.offerChan:
  371. }
  372. return clientOffer, announcementEntry.getMatchMetrics(), nil
  373. }
  374. // Offer enqueues the client offer and blocks until it is matched with a
  375. // returned announcement or ctx is done. The caller must not mutate the offer
  376. // or its properties after calling Announce.
  377. //
  378. // The answer is returned to the client by the broker, and the WebRTC
  379. // connection is dialed. The original announcement is also returned, so its
  380. // match properties can be logged.
  381. //
  382. // The returned MatchMetrics is nil unless a match is made; and non-nil if a
  383. // match is made, even if there is a later error.
  384. func (m *Matcher) Offer(
  385. ctx context.Context,
  386. clientIP string,
  387. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, *MatchMetrics, error) {
  388. offerEntry := &offerEntry{
  389. ctx: ctx,
  390. limitIP: getRateLimitIP(clientIP),
  391. offer: clientOffer,
  392. answerChan: make(chan *answerInfo, 1),
  393. }
  394. err := m.addOfferEntry(offerEntry)
  395. if err != nil {
  396. return nil, nil, nil, errors.Trace(err)
  397. }
  398. // Await proxy answer.
  399. var proxyAnswerInfo *answerInfo
  400. select {
  401. case <-ctx.Done():
  402. m.removeOfferEntry(offerEntry)
  403. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  404. // the Offer ctx, the client request, timeout, so it will eventually
  405. // get removed. But a client may abort its request earlier than the
  406. // timeout.
  407. return nil, nil,
  408. offerEntry.getMatchMetrics(), errors.Trace(ctx.Err())
  409. case proxyAnswerInfo = <-offerEntry.answerChan:
  410. }
  411. if proxyAnswerInfo == nil {
  412. // nil will be delivered to the channel when either the proxy
  413. // announcement request concurrently timed out, or the answer
  414. // indicated a proxy error, or the answer did not arrive in time.
  415. return nil, nil,
  416. offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
  417. }
  418. // This is a sanity check and not expected to fail.
  419. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  420. proxyAnswerInfo.announcement.ConnectionID) {
  421. return nil, nil,
  422. offerEntry.getMatchMetrics(), errors.TraceNew("unexpected connection ID")
  423. }
  424. return proxyAnswerInfo.answer,
  425. proxyAnswerInfo.announcement,
  426. offerEntry.getMatchMetrics(),
  427. nil
  428. }
  429. // Answer delivers an answer from the proxy for a previously matched offer.
  430. // The ProxyID and ConnectionID must correspond to the original announcement.
  431. // The caller must not mutate the answer after calling Answer. Answer does
  432. // not block.
  433. //
  434. // The answer is returned to the awaiting Offer call and sent to the matched
  435. // client.
  436. func (m *Matcher) Answer(
  437. proxyAnswer *MatchAnswer) error {
  438. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  439. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  440. if !ok {
  441. // The client is no longer awaiting the response.
  442. return errors.TraceNew("no client")
  443. }
  444. m.pendingAnswers.Delete(key)
  445. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  446. pendingAnswer.answerChan <- &answerInfo{
  447. announcement: pendingAnswer.announcement,
  448. answer: proxyAnswer,
  449. }
  450. return nil
  451. }
  452. // AnswerError delivers a failed answer indication from the proxy to an
  453. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  454. // original announcement.
  455. //
  456. // The failure indication is returned to the awaiting Offer call and sent to
  457. // the matched client.
  458. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  459. key := m.pendingAnswerKey(proxyID, connectionID)
  460. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  461. if !ok {
  462. // The client is no longer awaiting the response.
  463. return
  464. }
  465. m.pendingAnswers.Delete(key)
  466. // Closing the channel delivers nil, a failed indicator, to any receiver.
  467. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  468. }
  469. // matchWorker is the matching worker goroutine. It idles until signaled that
  470. // a queue item has been added, and then runs a full matching pass.
  471. func (m *Matcher) matchWorker(ctx context.Context) {
  472. for {
  473. select {
  474. case <-m.matchSignal:
  475. m.matchAllOffers()
  476. case <-ctx.Done():
  477. return
  478. }
  479. }
  480. }
  481. // matchAllOffers iterates over the queues, making all possible matches.
  482. func (m *Matcher) matchAllOffers() {
  483. m.announcementQueueMutex.Lock()
  484. defer m.announcementQueueMutex.Unlock()
  485. m.offerQueueMutex.Lock()
  486. defer m.offerQueueMutex.Unlock()
  487. // Take each offer in turn, and select an announcement match. There is an
  488. // implicit preference for older client offers, sooner to timeout, at the
  489. // front of the queue.
  490. // TODO: consider matching one offer, then releasing the locks to allow
  491. // more announcements to be enqueued, then continuing to match.
  492. i := 0
  493. end := m.offerQueue.Len()
  494. for i < end && m.announcementQueue.Len() > 0 {
  495. offerEntry := m.offerQueue.At(i)
  496. // Skip and remove this offer if its deadline has already passed.
  497. // There is no signal to the awaiting Offer function, as it will exit
  498. // based on the same ctx.
  499. if offerEntry.ctx.Err() != nil {
  500. m.removeOfferEntryByIndex(i)
  501. end -= 1
  502. continue
  503. }
  504. j, ok := m.matchOffer(offerEntry)
  505. if !ok {
  506. // No match, so leave this offer in place in the queue and move to
  507. // the next.
  508. i++
  509. continue
  510. }
  511. // Get the matched announcement entry.
  512. announcementEntry := m.announcementQueue.At(j)
  513. // Record match metrics.
  514. matchMetrics := &MatchMetrics{
  515. OfferMatchIndex: i,
  516. OfferQueueSize: m.offerQueue.Len(),
  517. AnnouncementMatchIndex: j,
  518. AnnouncementQueueSize: m.announcementQueue.Len(),
  519. }
  520. offerEntry.matchMetrics.Store(matchMetrics)
  521. announcementEntry.matchMetrics.Store(matchMetrics)
  522. // Remove the matched announcement from the queue. Send the offer to
  523. // the announcement entry's offerChan, which will deliver it to the
  524. // blocked Announce call. Add a pending answers entry to await the
  525. // proxy's follow up Answer call. The TTL for the pending answer
  526. // entry is set to the matched Offer call's ctx, as the answer is
  527. // only useful as long as the client is still waiting.
  528. expiry := lrucache.DefaultExpiration
  529. deadline, ok := offerEntry.ctx.Deadline()
  530. if ok {
  531. expiry = time.Until(deadline)
  532. }
  533. key := m.pendingAnswerKey(
  534. announcementEntry.announcement.ProxyID,
  535. announcementEntry.announcement.ConnectionID)
  536. m.pendingAnswers.Set(
  537. key,
  538. &pendingAnswer{
  539. announcement: announcementEntry.announcement,
  540. answerChan: offerEntry.answerChan,
  541. },
  542. expiry)
  543. announcementEntry.offerChan <- offerEntry.offer
  544. m.removeAnnouncementEntryByIndex(j)
  545. // Remove the matched offer from the queue and match the next offer,
  546. // now first in the queue.
  547. m.removeOfferEntryByIndex(i)
  548. end -= 1
  549. }
  550. }
  551. func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
  552. // Assumes the caller has the queue mutexed locked.
  553. // Check each announcement in turn, and select a match. There is an
  554. // implicit preference for older proxy announcements, sooner to timeout,
  555. // at the front of the queue.
  556. //
  557. // Limitation: since this logic matches each enqueued client in turn, it will
  558. // only make the optimal NAT match for the oldest enqueued client vs. all
  559. // proxies, and not do optimal N x M matching for all clients and all proxies.
  560. //
  561. // Future matching enhancements could include more sophisticated GeoIP
  562. // rules, such as a configuration encoding knowledge of an ASN's NAT
  563. // type, or preferred client/proxy country/ASN matches.
  564. // TODO: match supported protocol versions. Currently, all announces and
  565. // offers must specify ProxyProtocolVersion1, so there's no protocol
  566. // version match logic.
  567. offerProperties := &offerEntry.offer.Properties
  568. // Use the NAT traversal type counters to check if there's any preferred
  569. // NAT match for this offer in the announcement queue. When there is, we
  570. // will search beyond the first announcement.
  571. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  572. m.announcementsUnlimitedNATCount > 0,
  573. m.announcementsPartiallyLimitedNATCount > 0,
  574. m.announcementsStrictlyLimitedNATCount > 0)
  575. bestMatch := -1
  576. bestMatchNAT := false
  577. bestMatchCompartment := false
  578. end := m.announcementQueue.Len()
  579. // TODO: add queue indexing to facilitate skipping ahead to a matching
  580. // personal compartment ID, if any, when personal-only matching is
  581. // required. Personal matching may often require near-full queue scans
  582. // when looking for a match. Common compartment matching may also benefit
  583. // from indexing, although with a handful of common compartment IDs more
  584. // or less uniformly distributed, frequent long scans are not expected in
  585. // practise.
  586. for i := 0; i < end; i++ {
  587. announcementEntry := m.announcementQueue.At(i)
  588. // Skip and remove this announcement if its deadline has already
  589. // passed. There is no signal to the awaiting Announce function, as
  590. // it will exit based on the same ctx.
  591. if announcementEntry.ctx.Err() != nil {
  592. m.removeAnnouncementEntryByIndex(i)
  593. end -= 1
  594. continue
  595. }
  596. announcementProperties := &announcementEntry.announcement.Properties
  597. // There must be a compartment match. If there is a personal
  598. // compartment match, this match will be preferred.
  599. matchCommonCompartment := HaveCommonIDs(
  600. announcementProperties.CommonCompartmentIDs, offerProperties.CommonCompartmentIDs)
  601. matchPersonalCompartment := HaveCommonIDs(
  602. announcementProperties.PersonalCompartmentIDs, offerProperties.PersonalCompartmentIDs)
  603. if !matchCommonCompartment && !matchPersonalCompartment {
  604. continue
  605. }
  606. // Disallow matching the same country and ASN, except for personal
  607. // compartment ID matches.
  608. //
  609. // For common matching, hopping through the same ISP is assumed to
  610. // have no circumvention benefit. For personal matching, the user may
  611. // wish to hop their their own or their friend's proxy regardless.
  612. if !matchPersonalCompartment &&
  613. !GetAllowCommonASNMatching() &&
  614. (offerProperties.GeoIPData.Country ==
  615. announcementProperties.GeoIPData.Country &&
  616. offerProperties.GeoIPData.ASN ==
  617. announcementProperties.GeoIPData.ASN) {
  618. continue
  619. }
  620. // Check if this is a preferred NAT match. Ultimately, a match may be
  621. // made with potentially incompatible NATs, but the client/proxy
  622. // reported NAT types may be incorrect or unknown; the client will
  623. // often skip NAT discovery.
  624. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  625. // At this point, the candidate is a match. Determine if this is a new
  626. // best match.
  627. if bestMatch == -1 {
  628. // This is a match, and there was no previous match, so it becomes
  629. // the provisional best match.
  630. bestMatch = i
  631. bestMatchNAT = matchNAT
  632. bestMatchCompartment = matchPersonalCompartment
  633. } else if !bestMatchNAT && matchNAT {
  634. // If there was a previous best match which was not a preferred
  635. // NAT match, this becomes the new best match. The preferred NAT
  636. // match is prioritized over personal compartment matching.
  637. bestMatch = i
  638. bestMatchNAT = true
  639. bestMatchCompartment = matchPersonalCompartment
  640. } else if !bestMatchCompartment && matchPersonalCompartment && (!bestMatchNAT || matchNAT) {
  641. // If there was a previous best match which was not a personal
  642. // compartment match, and as long as this match doesn't undo a
  643. // better NAT match, this becomes the new best match.
  644. bestMatch = i
  645. bestMatchNAT = matchNAT
  646. bestMatchCompartment = true
  647. }
  648. // Stop as soon as we have the best possible match.
  649. if (bestMatchNAT || !existsPreferredNATMatch) &&
  650. (matchPersonalCompartment ||
  651. m.announcementsPersonalCompartmentalizedCount == 0 ||
  652. len(offerProperties.PersonalCompartmentIDs) == 0) {
  653. break
  654. }
  655. }
  656. return bestMatch, bestMatch != -1
  657. }
  658. // MatcherLimitError is the error type returned by Announce or Offer when the
  659. // caller has exceeded configured queue entry or rate limits.
  660. type MatcherLimitError struct {
  661. err error
  662. }
  663. func NewMatcherLimitError(err error) *MatcherLimitError {
  664. return &MatcherLimitError{err: err}
  665. }
  666. func (e MatcherLimitError) Error() string {
  667. return e.err.Error()
  668. }
  669. func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  670. // Assumes the m.announcementQueueMutex or m.offerQueue mutex is locked.
  671. var entryCountByIP map[string]int
  672. var queueRateLimiters *lrucache.Cache
  673. var limitEntryCount int
  674. var quantity int
  675. var interval time.Duration
  676. if isAnnouncement {
  677. // Skip limit checks for non-limited proxies.
  678. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  679. return nil
  680. }
  681. entryCountByIP = m.announcementQueueEntryCountByIP
  682. queueRateLimiters = m.announcementQueueRateLimiters
  683. limitEntryCount = m.announcementLimitEntryCount
  684. quantity = m.announcementRateLimitQuantity
  685. interval = m.announcementRateLimitInterval
  686. } else {
  687. entryCountByIP = m.offerQueueEntryCountByIP
  688. queueRateLimiters = m.offerQueueRateLimiters
  689. limitEntryCount = m.offerLimitEntryCount
  690. quantity = m.offerRateLimitQuantity
  691. interval = m.offerRateLimitInterval
  692. }
  693. // The rate limit is checked first, before the max count check, to ensure
  694. // that the rate limit state is updated regardless of the max count check
  695. // outcome.
  696. if quantity > 0 && interval > 0 {
  697. var rateLimiter *rate.Limiter
  698. entry, ok := queueRateLimiters.Get(limitIP)
  699. if ok {
  700. rateLimiter = entry.(*rate.Limiter)
  701. } else {
  702. limit := float64(quantity) / interval.Seconds()
  703. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  704. queueRateLimiters.Set(
  705. limitIP, rateLimiter, interval)
  706. }
  707. if !rateLimiter.Allow() {
  708. return errors.Trace(
  709. NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
  710. }
  711. }
  712. if limitEntryCount > 0 {
  713. // Limitation: non-limited proxy ID entries are counted in
  714. // entryCountByIP. If both a limited and non-limited proxy ingress
  715. // from the same limitIP, then the non-limited entries will count
  716. // against the limited proxy's limitEntryCount.
  717. entryCount, ok := entryCountByIP[limitIP]
  718. if ok && entryCount >= limitEntryCount {
  719. return errors.Trace(
  720. NewMatcherLimitError(std_errors.New("max entries for IP")))
  721. }
  722. }
  723. return nil
  724. }
  725. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  726. m.announcementQueueMutex.Lock()
  727. defer m.announcementQueueMutex.Unlock()
  728. // Ensure the queue doesn't grow larger than the max size.
  729. if m.announcementQueue.Len() >= matcherAnnouncementQueueMaxSize {
  730. return errors.TraceNew("queue full")
  731. }
  732. // Ensure no single peer IP can enqueue a large number of entries or
  733. // rapidly enqueue beyond the configured rate.
  734. isAnnouncement := true
  735. err := m.applyLimits(
  736. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  737. if err != nil {
  738. return errors.Trace(err)
  739. }
  740. m.announcementQueue.PushBack(announcementEntry)
  741. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  742. m.adjustAnnouncementCounts(announcementEntry, 1)
  743. select {
  744. case m.matchSignal <- struct{}{}:
  745. default:
  746. }
  747. return nil
  748. }
  749. func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry) {
  750. m.announcementQueueMutex.Lock()
  751. defer m.announcementQueueMutex.Unlock()
  752. found := false
  753. for i := 0; i < m.announcementQueue.Len(); i++ {
  754. if m.announcementQueue.At(i) == announcementEntry {
  755. m.removeAnnouncementEntryByIndex(i)
  756. found = true
  757. break
  758. }
  759. }
  760. if !found {
  761. // The Announce call is aborting and taking its entry back out of the
  762. // queue. If the entry is not found in the queue, then a concurrent
  763. // Offer has matched the announcement. So check for the pending
  764. // answer corresponding to the announcement and remove it and deliver
  765. // a failure signal to the waiting Offer, so the client doesn't wait
  766. // longer than necessary.
  767. key := m.pendingAnswerKey(
  768. announcementEntry.announcement.ProxyID,
  769. announcementEntry.announcement.ConnectionID)
  770. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  771. if ok {
  772. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  773. m.pendingAnswers.Delete(key)
  774. }
  775. }
  776. }
  777. func (m *Matcher) removeAnnouncementEntryByIndex(i int) {
  778. // Assumes s.announcementQueueMutex lock is held.
  779. announcementEntry := m.announcementQueue.At(i)
  780. // This should be only direct call to Remove, as following adjustments
  781. // must always be made when removing.
  782. m.announcementQueue.Remove(i)
  783. // Adjust entry counts by peer IP, used to enforce
  784. // matcherAnnouncementQueueMaxEntriesPerIP.
  785. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  786. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  787. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  788. }
  789. m.adjustAnnouncementCounts(announcementEntry, -1)
  790. }
  791. func (m *Matcher) adjustAnnouncementCounts(
  792. announcementEntry *announcementEntry, delta int) {
  793. // Assumes s.announcementQueueMutex lock is held.
  794. if announcementEntry.announcement.Properties.IsPersonalCompartmentalized() {
  795. m.announcementsPersonalCompartmentalizedCount += delta
  796. }
  797. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  798. case NATTraversalUnlimited:
  799. m.announcementsUnlimitedNATCount += delta
  800. case NATTraversalPartiallyLimited:
  801. m.announcementsPartiallyLimitedNATCount += delta
  802. case NATTraversalStrictlyLimited:
  803. m.announcementsStrictlyLimitedNATCount += delta
  804. }
  805. }
  806. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  807. m.offerQueueMutex.Lock()
  808. defer m.offerQueueMutex.Unlock()
  809. // Ensure the queue doesn't grow larger than the max size.
  810. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  811. return errors.TraceNew("queue full")
  812. }
  813. // Ensure no single peer IP can enqueue a large number of entries or
  814. // rapidly enqueue beyond the configured rate.
  815. isAnnouncement := false
  816. err := m.applyLimits(
  817. isAnnouncement, offerEntry.limitIP, ID{})
  818. if err != nil {
  819. return errors.Trace(err)
  820. }
  821. m.offerQueue.PushBack(offerEntry)
  822. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  823. select {
  824. case m.matchSignal <- struct{}{}:
  825. default:
  826. }
  827. return nil
  828. }
  829. func (m *Matcher) removeOfferEntry(offerEntry *offerEntry) {
  830. m.offerQueueMutex.Lock()
  831. defer m.offerQueueMutex.Unlock()
  832. for i := 0; i < m.offerQueue.Len(); i++ {
  833. if m.offerQueue.At(i) == offerEntry {
  834. m.removeOfferEntryByIndex(i)
  835. break
  836. }
  837. }
  838. }
  839. func (m *Matcher) removeOfferEntryByIndex(i int) {
  840. // Assumes s.offerQueueMutex lock is held.
  841. offerEntry := m.offerQueue.At(i)
  842. // This should be only direct call to Remove, as following adjustments
  843. // must always be made when removing.
  844. m.offerQueue.Remove(i)
  845. // Adjust entry counts by peer IP, used to enforce
  846. // matcherOfferQueueMaxEntriesPerIP.
  847. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  848. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  849. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  850. }
  851. }
  852. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  853. // The pending answer lookup key is used to associate announcements and
  854. // subsequent answers. While the client learns the ConnectionID, only the
  855. // proxy knows the ProxyID component, so only the correct proxy can match
  856. // an answer to an announcement. The ConnectionID component is necessary
  857. // as a proxy may have multiple, concurrent pending answers.
  858. return string(proxyID[:]) + string(connectionID[:])
  859. }
  860. func getRateLimitIP(strIP string) string {
  861. IP := net.ParseIP(strIP)
  862. if IP == nil || IP.To4() != nil {
  863. return strIP
  864. }
  865. // With IPv6, individual users or sites are users commonly allocated a /64
  866. // or /56, so rate limit by /56.
  867. return IP.Mask(net.CIDRMask(56, 128)).String()
  868. }