matcher.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "net"
  23. "sync"
  24. "time"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  27. lrucache "github.com/cognusion/go-cache-lru"
  28. "github.com/gammazero/deque"
  29. "github.com/juju/ratelimit"
  30. "github.com/pion/webrtc/v3"
  31. )
  32. // TTLs should be aligned with STUN hole punch lifetimes.
  33. const (
  34. matcherAnnouncementQueueMaxSize = 5000000
  35. matcherOfferQueueMaxSize = 5000000
  36. matcherPendingAnswersTTL = 30 * time.Second
  37. matcherPendingAnswersMaxSize = 100000
  38. matcherRateLimiterReapHistoryFrequencySeconds = 300
  39. matcherRateLimiterMaxCacheEntries = 1000000
  40. )
  41. // Matcher matches proxy announcements with client offers. Matcher also
  42. // coordinates pending proxy answers and routes answers to the awaiting
  43. // client offer handler.
  44. //
  45. // Matching prioritizes selecting the oldest announcments and client offers,
  46. // as they are closest to timing out.
  47. //
  48. // The client and proxy must supply matching personal or common compartment
  49. // IDs. Personal compartment matching is preferred. Common compartments are
  50. // managed by Psiphon and can be obtained via a tactics parameter or via an
  51. // OSL embedding.
  52. //
  53. // A client may opt form personal-only matching by not supplying any common
  54. // compartment IDs.
  55. //
  56. // Matching prefers to pair proxies and clients in a way that maximizes total
  57. // possible matches. For a client or proxy with less-limited NAT traversal, a
  58. // pairing with more-limited NAT traversal is preferred; and vice versa.
  59. // Candidates with unknown NAT types and mobile network types are assumed to
  60. // have the most limited NAT traversal capability.
  61. //
  62. // Preferred matchings take priority over announcment age.
  63. //
  64. // The client and proxy will not match if they are in the same country and
  65. // ASN, as it's assumed that doesn't provide any blocking circumvention
  66. // benefit. Disallowing proxies in certain blocked countries is handled at a
  67. // higher level; any such proxies should not be enqueued for matching.
  68. type Matcher struct {
  69. config *MatcherConfig
  70. runMutex sync.Mutex
  71. runContext context.Context
  72. stopRunning context.CancelFunc
  73. waitGroup *sync.WaitGroup
  74. // The announcement queue is implicitly sorted by announcement age. The
  75. // count fields are used to skip searching deeper into the queue for
  76. // preferred matches.
  77. // TODO: replace queue and counts with an indexed, in-memory database?
  78. announcementQueueMutex sync.Mutex
  79. announcementQueue *deque.Deque[*announcementEntry]
  80. announcementQueueEntryCountByIP map[string]int
  81. announcementQueueRateLimiters *lrucache.Cache
  82. announcementLimitEntryCount int
  83. announcementRateLimitQuantity int
  84. announcementRateLimitInterval time.Duration
  85. announcementNonlimitedProxyIDs map[ID]struct{}
  86. announcementsPersonalCompartmentalizedCount int
  87. announcementsUnlimitedNATCount int
  88. announcementsPartiallyLimitedNATCount int
  89. announcementsStrictlyLimitedNATCount int
  90. // The offer queue is also implicitly sorted by offer age. Both an offer
  91. // and announcement queue are required since either announcements or
  92. // offers can arrive while there are no available pairings.
  93. offerQueueMutex sync.Mutex
  94. offerQueue *deque.Deque[*offerEntry]
  95. offerQueueEntryCountByIP map[string]int
  96. offerQueueRateLimiters *lrucache.Cache
  97. offerLimitEntryCount int
  98. offerRateLimitQuantity int
  99. offerRateLimitInterval time.Duration
  100. matchSignal chan struct{}
  101. pendingAnswers *lrucache.Cache
  102. }
  103. // MatchProperties specifies the compartment, GeoIP, and network topology
  104. // matching roperties of clients and proxies.
  105. type MatchProperties struct {
  106. CommonCompartmentIDs []ID
  107. PersonalCompartmentIDs []ID
  108. GeoIPData common.GeoIPData
  109. NetworkType NetworkType
  110. NATType NATType
  111. PortMappingTypes PortMappingTypes
  112. }
  113. // EffectiveNATType combines the set of network properties into an effective
  114. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  115. // traversal is assumed. When NAT type is unknown and the network type is
  116. // mobile, CGNAT with limited NAT traversal is assumed.
  117. func (p *MatchProperties) EffectiveNATType() NATType {
  118. if p.PortMappingTypes.Available() {
  119. return NATTypePortMapping
  120. }
  121. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  122. // publicly reachable IPv6 ICE host candidate? If so, change the
  123. // effective NAT type? Depends on whether the matched peer can use IPv6.
  124. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  125. return NATTypeMobileNetwork
  126. }
  127. return p.NATType
  128. }
  129. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  130. // matching given the types of pairing candidates available.
  131. func (p *MatchProperties) ExistsPreferredNATMatch(
  132. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  133. return p.EffectiveNATType().ExistsPreferredMatch(
  134. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  135. }
  136. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  137. // NAT matching.
  138. func (p *MatchProperties) IsPreferredNATMatch(
  139. peerMatchProperties *MatchProperties) bool {
  140. return p.EffectiveNATType().IsPreferredMatch(
  141. peerMatchProperties.EffectiveNATType())
  142. }
  143. // IsPersonalCompartmentalized indicates whether the candidate has personal
  144. // compartment IDs.
  145. func (p *MatchProperties) IsPersonalCompartmentalized() bool {
  146. return len(p.PersonalCompartmentIDs) > 0
  147. }
  148. // MatchAnnouncement is a proxy announcement to be queued for matching.
  149. type MatchAnnouncement struct {
  150. Properties MatchProperties
  151. ProxyID ID
  152. ConnectionID ID
  153. ProxyProtocolVersion int32
  154. }
  155. // MatchOffer is a client offer to be queued for matching.
  156. type MatchOffer struct {
  157. Properties MatchProperties
  158. ClientProxyProtocolVersion int32
  159. ClientOfferSDP webrtc.SessionDescription
  160. ClientRootObfuscationSecret ObfuscationSecret
  161. DoDTLSRandomization bool
  162. TrafficShapingParameters *DataChannelTrafficShapingParameters
  163. NetworkProtocol NetworkProtocol
  164. DestinationAddress string
  165. DestinationServerID string
  166. }
  167. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  168. // announcement, to be routed to the awaiting client offer.
  169. type MatchAnswer struct {
  170. ProxyIP string
  171. ProxyID ID
  172. ConnectionID ID
  173. SelectedProxyProtocolVersion int32
  174. ProxyAnswerSDP webrtc.SessionDescription
  175. }
  176. // announcementEntry is an announcement queue entry, an announcement with its
  177. // associated lifetime context and signaling channel.
  178. type announcementEntry struct {
  179. ctx context.Context
  180. limitIP string
  181. announcement *MatchAnnouncement
  182. offerChan chan *MatchOffer
  183. }
  184. // offerEntry is an offer queue entry, an offer with its associated lifetime
  185. // context and signaling channel.
  186. type offerEntry struct {
  187. ctx context.Context
  188. limitIP string
  189. offer *MatchOffer
  190. answerChan chan *answerInfo
  191. }
  192. // answerInfo is an answer and its associated announcement.
  193. type answerInfo struct {
  194. announcement *MatchAnnouncement
  195. answer *MatchAnswer
  196. }
  197. // pendingAnswer represents an answer that is expected to arrive from a
  198. // proxy.
  199. type pendingAnswer struct {
  200. announcement *MatchAnnouncement
  201. answerChan chan *answerInfo
  202. }
  203. // MatcherConfig specifies the configuration for a matcher.
  204. type MatcherConfig struct {
  205. // Logger is used to log events.
  206. Logger common.Logger
  207. // Accouncement queue limits.
  208. AnnouncementLimitEntryCount int
  209. AnnouncementRateLimitQuantity int
  210. AnnouncementRateLimitInterval time.Duration
  211. AnnouncementNonlimitedProxyIDs []ID
  212. // Offer queue limits.
  213. OfferLimitEntryCount int
  214. OfferRateLimitQuantity int
  215. OfferRateLimitInterval time.Duration
  216. }
  217. // NewMatcher creates a new Matcher.
  218. func NewMatcher(config *MatcherConfig) *Matcher {
  219. m := &Matcher{
  220. config: config,
  221. waitGroup: new(sync.WaitGroup),
  222. announcementQueue: deque.New[*announcementEntry](),
  223. announcementQueueEntryCountByIP: make(map[string]int),
  224. announcementQueueRateLimiters: lrucache.NewWithLRU(
  225. 0,
  226. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  227. matcherRateLimiterMaxCacheEntries),
  228. offerQueue: deque.New[*offerEntry](),
  229. offerQueueEntryCountByIP: make(map[string]int),
  230. offerQueueRateLimiters: lrucache.NewWithLRU(
  231. 0,
  232. time.Duration(matcherRateLimiterReapHistoryFrequencySeconds)*time.Second,
  233. matcherRateLimiterMaxCacheEntries),
  234. matchSignal: make(chan struct{}, 1),
  235. // matcherPendingAnswersTTL is not configurable; it supplies a default
  236. // that is expected to be ignored when each entry's TTL is set to the
  237. // Offer ctx timeout.
  238. pendingAnswers: lrucache.NewWithLRU(
  239. matcherPendingAnswersTTL,
  240. 1*time.Minute,
  241. matcherPendingAnswersMaxSize),
  242. }
  243. m.SetLimits(
  244. config.AnnouncementLimitEntryCount,
  245. config.AnnouncementRateLimitQuantity,
  246. config.AnnouncementRateLimitInterval,
  247. config.AnnouncementNonlimitedProxyIDs,
  248. config.OfferLimitEntryCount,
  249. config.OfferRateLimitQuantity,
  250. config.OfferRateLimitInterval)
  251. return m
  252. }
  253. // SetLimits sets new queue limits, replacing the previous configuration.
  254. // Existing, cached rate limiters retain their existing rate limit state. New
  255. // entries will use the new quantity/interval configuration. In addition,
  256. // currently enqueued items may exceed any new, lower maximum entry count
  257. // until naturally dequeued.
  258. func (m *Matcher) SetLimits(
  259. announcementLimitEntryCount int,
  260. announcementRateLimitQuantity int,
  261. announcementRateLimitInterval time.Duration,
  262. announcementNonlimitedProxyIDs []ID,
  263. offerLimitEntryCount int,
  264. offerRateLimitQuantity int,
  265. offerRateLimitInterval time.Duration) {
  266. nonlimitedProxyIDs := make(map[ID]struct{})
  267. for _, proxyID := range announcementNonlimitedProxyIDs {
  268. nonlimitedProxyIDs[proxyID] = struct{}{}
  269. }
  270. m.announcementQueueMutex.Lock()
  271. m.announcementLimitEntryCount = announcementLimitEntryCount
  272. m.announcementRateLimitQuantity = announcementRateLimitQuantity
  273. m.announcementRateLimitInterval = announcementRateLimitInterval
  274. m.announcementNonlimitedProxyIDs = nonlimitedProxyIDs
  275. m.announcementQueueMutex.Unlock()
  276. m.offerQueueMutex.Lock()
  277. m.offerLimitEntryCount = offerLimitEntryCount
  278. m.offerRateLimitQuantity = offerRateLimitQuantity
  279. m.offerRateLimitInterval = offerRateLimitInterval
  280. m.offerQueueMutex.Unlock()
  281. }
  282. // Start starts running the Matcher. The Matcher runs a goroutine which
  283. // matches announcements and offers.
  284. func (m *Matcher) Start() error {
  285. m.runMutex.Lock()
  286. defer m.runMutex.Unlock()
  287. if m.runContext != nil {
  288. return errors.TraceNew("already running")
  289. }
  290. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  291. m.waitGroup.Add(1)
  292. go func() {
  293. defer m.waitGroup.Done()
  294. m.matchWorker(m.runContext)
  295. }()
  296. return nil
  297. }
  298. // Stop stops running the Matcher and its worker goroutine.
  299. //
  300. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  301. // can get enqueued during and after a Stop call. Stop is intended more for a
  302. // full broker shutdown, where this won't be a concern.
  303. func (m *Matcher) Stop() {
  304. m.runMutex.Lock()
  305. defer m.runMutex.Unlock()
  306. m.stopRunning()
  307. m.waitGroup.Wait()
  308. m.runContext, m.stopRunning = nil, nil
  309. }
  310. // Announce enqueues the proxy announcement and blocks until it is matched
  311. // with a returned offer or ctx is done. The caller must not mutate the
  312. // announcement or its properties after calling Announce.
  313. //
  314. // The offer is sent to the proxy by the broker, and then the proxy sends its
  315. // answer back to the broker, which calls Answer with that value.
  316. func (m *Matcher) Announce(
  317. ctx context.Context,
  318. proxyIP string,
  319. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, error) {
  320. announcementEntry := &announcementEntry{
  321. ctx: ctx,
  322. limitIP: getRateLimitIP(proxyIP),
  323. announcement: proxyAnnouncement,
  324. offerChan: make(chan *MatchOffer, 1),
  325. }
  326. err := m.addAnnouncementEntry(announcementEntry)
  327. if err != nil {
  328. return nil, errors.Trace(err)
  329. }
  330. // Await client offer.
  331. var clientOffer *MatchOffer
  332. select {
  333. case <-ctx.Done():
  334. m.removeAnnouncementEntry(announcementEntry)
  335. return nil, errors.Trace(ctx.Err())
  336. case clientOffer = <-announcementEntry.offerChan:
  337. }
  338. return clientOffer, nil
  339. }
  340. // Offer enqueues the client offer and blocks until it is matched with a
  341. // returned announcement or ctx is done. The caller must not mutate the offer
  342. // or its properties after calling Announce.
  343. //
  344. // The answer is returned to the client by the broker, and the WebRTC
  345. // connection is dialed. The original announcement is also returned, so its
  346. // match properties can be logged.
  347. func (m *Matcher) Offer(
  348. ctx context.Context,
  349. clientIP string,
  350. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, error) {
  351. offerEntry := &offerEntry{
  352. ctx: ctx,
  353. limitIP: getRateLimitIP(clientIP),
  354. offer: clientOffer,
  355. answerChan: make(chan *answerInfo, 1),
  356. }
  357. err := m.addOfferEntry(offerEntry)
  358. if err != nil {
  359. return nil, nil, errors.Trace(err)
  360. }
  361. // Await proxy answer.
  362. var proxyAnswerInfo *answerInfo
  363. select {
  364. case <-ctx.Done():
  365. m.removeOfferEntry(offerEntry)
  366. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  367. // the Offer ctx, the client request, timeout, so it will eventually
  368. // get removed. But a client may abort its request earlier than the
  369. // timeout.
  370. return nil, nil, errors.Trace(ctx.Err())
  371. case proxyAnswerInfo = <-offerEntry.answerChan:
  372. }
  373. if proxyAnswerInfo == nil {
  374. // nil will be delivered to the channel when either the proxy
  375. // announcment request concurrently timed out, or the answer
  376. // indicated a proxy error, or the answer did not arrive in time.
  377. return nil, nil, errors.TraceNew("no answer")
  378. }
  379. // This is a sanity check and not expected to fail.
  380. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  381. proxyAnswerInfo.announcement.ConnectionID) {
  382. return nil, nil, errors.TraceNew("unexpected connection ID")
  383. }
  384. return proxyAnswerInfo.answer, proxyAnswerInfo.announcement, nil
  385. }
  386. // Answer delivers an answer from the proxy for a previously matched offer.
  387. // The ProxyID and ConnectionID must correspond to the original announcement.
  388. // The caller must not mutate the answer after calling Answer. Answer does
  389. // not block.
  390. //
  391. // The answer is returned to the awaiting Offer call and sent to the matched
  392. // client.
  393. func (m *Matcher) Answer(
  394. proxyAnswer *MatchAnswer) error {
  395. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  396. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  397. if !ok {
  398. // The client is no longer awaiting the response.
  399. return errors.TraceNew("no client")
  400. }
  401. m.pendingAnswers.Delete(key)
  402. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  403. pendingAnswer.answerChan <- &answerInfo{
  404. announcement: pendingAnswer.announcement,
  405. answer: proxyAnswer,
  406. }
  407. return nil
  408. }
  409. // AnswerError delivers a failed answer indication from the proxy to an
  410. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  411. // original announcement.
  412. //
  413. // The failure indication is returned to the awaiting Offer call and sent to
  414. // the matched client.
  415. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  416. key := m.pendingAnswerKey(proxyID, connectionID)
  417. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  418. if !ok {
  419. // The client is no longer awaiting the response.
  420. return
  421. }
  422. m.pendingAnswers.Delete(key)
  423. // Closing the channel delivers nil, a failed indicator, to any receiver.
  424. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  425. }
  426. // matchWorker is the matching worker goroutine. It idles until signaled that
  427. // a queue item has been added, and then runs a full matching pass.
  428. func (m *Matcher) matchWorker(ctx context.Context) {
  429. for {
  430. select {
  431. case <-m.matchSignal:
  432. m.matchAllOffers()
  433. case <-ctx.Done():
  434. return
  435. }
  436. }
  437. }
  438. // matchAllOffers iterates over the queues, making all possible matches.
  439. func (m *Matcher) matchAllOffers() {
  440. m.announcementQueueMutex.Lock()
  441. defer m.announcementQueueMutex.Unlock()
  442. m.offerQueueMutex.Lock()
  443. defer m.offerQueueMutex.Unlock()
  444. // Take each offer in turn, and select an announcement match. There is an
  445. // implicit preference for older client offers, sooner to timeout, at the
  446. // front of the queue.
  447. // TODO: consider matching one offer, then releasing the locks to allow
  448. // more announcements to be enqueued, then continuing to match.
  449. i := 0
  450. end := m.offerQueue.Len()
  451. for i < end && m.announcementQueue.Len() > 0 {
  452. offerEntry := m.offerQueue.At(i)
  453. // Skip and remove this offer if its deadline has already passed.
  454. // There is no signal to the awaiting Offer function, as it will exit
  455. // based on the same ctx.
  456. if offerEntry.ctx.Err() != nil {
  457. m.removeOfferEntryByIndex(i)
  458. end -= 1
  459. continue
  460. }
  461. j, ok := m.matchOffer(offerEntry)
  462. if !ok {
  463. // No match, so leave this offer in place in the queue and move to
  464. // the next.
  465. i++
  466. continue
  467. }
  468. if m.config.Logger.IsLogLevelDebug() {
  469. m.config.Logger.WithTraceFields(common.LogFields{
  470. "match_index": j,
  471. "offer_queue_size": m.offerQueue.Len(),
  472. "announcement_queue_size": m.announcementQueue.Len(),
  473. }).Debug("match metrics")
  474. }
  475. // Remove the matched announcement from the queue. Send the offer to
  476. // the announcment entry's offerChan, which will deliver it to the
  477. // blocked Announce call. Add a pending answers entry to await the
  478. // proxy's follow up Answer call. The TTL for the pending answer
  479. // entry is set to the matched Offer call's ctx, as the answer is
  480. // only useful as long as the client is still waiting.
  481. announcementEntry := m.announcementQueue.At(j)
  482. expiry := lrucache.DefaultExpiration
  483. deadline, ok := offerEntry.ctx.Deadline()
  484. if ok {
  485. expiry = time.Until(deadline)
  486. }
  487. key := m.pendingAnswerKey(
  488. announcementEntry.announcement.ProxyID,
  489. announcementEntry.announcement.ConnectionID)
  490. m.pendingAnswers.Set(
  491. key,
  492. &pendingAnswer{
  493. announcement: announcementEntry.announcement,
  494. answerChan: offerEntry.answerChan,
  495. },
  496. expiry)
  497. announcementEntry.offerChan <- offerEntry.offer
  498. m.removeAnnouncementEntryByIndex(j)
  499. // Remove the matched offer from the queue and match the next offer,
  500. // now first in the queue.
  501. m.removeOfferEntryByIndex(i)
  502. end -= 1
  503. }
  504. }
  505. func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
  506. // Assumes the caller has the queue mutexed locked.
  507. // Check each announcement in turn, and select a match. There is an
  508. // implicit preference for older proxy announcments, sooner to timeout, at the
  509. // front of the queue.
  510. //
  511. // Limitation: since this logic matches each enqueued client in turn, it will
  512. // only make the optimal NAT match for the oldest enqueued client vs. all
  513. // proxies, and not do optimal N x M matching for all clients and all proxies.
  514. //
  515. // Future matching enhancements could include more sophisticated GeoIP
  516. // rules, such as a configuration encoding knowledge of an ASN's NAT
  517. // type, or preferred client/proxy country/ASN matches.
  518. offerProperties := &offerEntry.offer.Properties
  519. // Use the NAT traversal type counters to check if there's any preferred
  520. // NAT match for this offer in the announcement queue. When there is, we
  521. // will search beyond the first announcement.
  522. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  523. m.announcementsUnlimitedNATCount > 0,
  524. m.announcementsPartiallyLimitedNATCount > 0,
  525. m.announcementsStrictlyLimitedNATCount > 0)
  526. bestMatch := -1
  527. bestMatchNAT := false
  528. bestMatchCompartment := false
  529. end := m.announcementQueue.Len()
  530. for i := 0; i < end; i++ {
  531. announcementEntry := m.announcementQueue.At(i)
  532. // Skip and remove this announcement if its deadline has already
  533. // passed. There is no signal to the awaiting Announce function, as
  534. // it will exit based on the same ctx.
  535. if announcementEntry.ctx.Err() != nil {
  536. m.removeAnnouncementEntryByIndex(i)
  537. end -= 1
  538. continue
  539. }
  540. announcementProperties := &announcementEntry.announcement.Properties
  541. // There must be a compartment match. If there is a personal
  542. // compartment match, this match will be preferred.
  543. matchCommonCompartment := HaveCommonIDs(
  544. announcementProperties.CommonCompartmentIDs, offerProperties.CommonCompartmentIDs)
  545. matchPersonalCompartment := HaveCommonIDs(
  546. announcementProperties.PersonalCompartmentIDs, offerProperties.PersonalCompartmentIDs)
  547. if !matchCommonCompartment && !matchPersonalCompartment {
  548. continue
  549. }
  550. // Disallow matching the same country and ASN, except for personal
  551. // compartment ID matches.
  552. //
  553. // For common matching, hopping through the same ISP is assumed to
  554. // have no circumvention benefit. For personal matching, the user may
  555. // wish to hop their their own or their friend's proxy regardless.
  556. if !matchPersonalCompartment &&
  557. !GetAllowCommonASNMatching() &&
  558. (offerProperties.GeoIPData.Country ==
  559. announcementProperties.GeoIPData.Country &&
  560. offerProperties.GeoIPData.ASN ==
  561. announcementProperties.GeoIPData.ASN) {
  562. continue
  563. }
  564. // Check if this is a preferred NAT match. Ultimately, a match may be
  565. // made with potentially incompatible NATs, but the client/proxy
  566. // reported NAT types may be incorrect or unknown; the client will
  567. // often skip NAT discovery.
  568. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  569. // At this point, the candidate is a match. Determine if this is a new
  570. // best match.
  571. if bestMatch == -1 {
  572. // This is a match, and there was no previous match, so it becomes
  573. // the provisional best match.
  574. bestMatch = i
  575. bestMatchNAT = matchNAT
  576. bestMatchCompartment = matchPersonalCompartment
  577. } else if !bestMatchNAT && matchNAT {
  578. // If there was a previous best match which was not a preferred
  579. // NAT match, this becomes the new best match. The preferred NAT
  580. // match is prioritized over personal compartment matching.
  581. bestMatch = i
  582. bestMatchNAT = true
  583. bestMatchCompartment = matchPersonalCompartment
  584. } else if !bestMatchCompartment && matchPersonalCompartment && (!bestMatchNAT || matchNAT) {
  585. // If there was a previous best match which was not a personal
  586. // compartment match, and as long as this match doesn't undo a
  587. // better NAT match, this becomes the new best match.
  588. bestMatch = i
  589. bestMatchNAT = matchNAT
  590. bestMatchCompartment = true
  591. }
  592. // Stop as soon as we have the best possible match.
  593. if (bestMatchNAT || !existsPreferredNATMatch) &&
  594. (matchPersonalCompartment || m.announcementsPersonalCompartmentalizedCount == 0) {
  595. break
  596. }
  597. }
  598. return bestMatch, bestMatch != -1
  599. }
  600. func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
  601. // Assumes the m.announcementQueueMutex or m.offerQueue mutex is locked.
  602. var entryCountByIP map[string]int
  603. var queueRateLimiters *lrucache.Cache
  604. var limitEntryCount int
  605. var quantity int
  606. var interval time.Duration
  607. if isAnnouncement {
  608. // Skip limit checks for non-limited proxies.
  609. if _, ok := m.announcementNonlimitedProxyIDs[proxyID]; ok {
  610. return nil
  611. }
  612. entryCountByIP = m.announcementQueueEntryCountByIP
  613. queueRateLimiters = m.announcementQueueRateLimiters
  614. limitEntryCount = m.announcementLimitEntryCount
  615. quantity = m.announcementRateLimitQuantity
  616. interval = m.announcementRateLimitInterval
  617. } else {
  618. entryCountByIP = m.offerQueueEntryCountByIP
  619. queueRateLimiters = m.offerQueueRateLimiters
  620. limitEntryCount = m.offerLimitEntryCount
  621. quantity = m.offerRateLimitQuantity
  622. interval = m.offerRateLimitInterval
  623. }
  624. // The rate limit is checked first, before the max count check, to ensure
  625. // that the rate limit state is updated regardless of the max count check
  626. // outcome.
  627. if quantity > 0 && interval > 0 {
  628. var rateLimiter *ratelimit.Bucket
  629. entry, ok := queueRateLimiters.Get(limitIP)
  630. if ok {
  631. rateLimiter = entry.(*ratelimit.Bucket)
  632. } else {
  633. rateLimiter = ratelimit.NewBucketWithQuantum(
  634. interval, int64(quantity), int64(quantity))
  635. queueRateLimiters.Set(
  636. limitIP, rateLimiter, interval)
  637. }
  638. if rateLimiter.TakeAvailable(1) < 1 {
  639. return errors.TraceNew("rate exceeded for IP")
  640. }
  641. }
  642. if limitEntryCount > 0 {
  643. entryCount, ok := entryCountByIP[limitIP]
  644. if ok && entryCount >= limitEntryCount {
  645. return errors.TraceNew("max entries for IP")
  646. }
  647. }
  648. return nil
  649. }
  650. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) error {
  651. m.announcementQueueMutex.Lock()
  652. defer m.announcementQueueMutex.Unlock()
  653. // Ensure the queue doesn't grow larger than the max size.
  654. if m.announcementQueue.Len() >= matcherAnnouncementQueueMaxSize {
  655. return errors.TraceNew("queue full")
  656. }
  657. // Ensure no single peer IP can enqueue a large number of entries or
  658. // rapidly enqueue beyond the configured rate.
  659. isAnnouncement := true
  660. err := m.applyLimits(
  661. isAnnouncement, announcementEntry.limitIP, announcementEntry.announcement.ProxyID)
  662. if err != nil {
  663. return errors.Trace(err)
  664. }
  665. m.announcementQueue.PushBack(announcementEntry)
  666. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] += 1
  667. m.adjustAnnouncementCounts(announcementEntry, 1)
  668. select {
  669. case m.matchSignal <- struct{}{}:
  670. default:
  671. }
  672. return nil
  673. }
  674. func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry) {
  675. m.announcementQueueMutex.Lock()
  676. defer m.announcementQueueMutex.Unlock()
  677. found := false
  678. for i := 0; i < m.announcementQueue.Len(); i++ {
  679. if m.announcementQueue.At(i) == announcementEntry {
  680. m.removeAnnouncementEntryByIndex(i)
  681. found = true
  682. break
  683. }
  684. }
  685. if !found {
  686. // The Announce call is aborting and taking its entry back out of the
  687. // queue. If the entry is not found in the queue, then a concurrent
  688. // Offer has matched the announcement. So check for the pending
  689. // answer corresponding to the announcement and remove it and deliver
  690. // a failure signal to the waiting Offer, so the client doesn't wait
  691. // longer than necessary.
  692. key := m.pendingAnswerKey(
  693. announcementEntry.announcement.ProxyID,
  694. announcementEntry.announcement.ConnectionID)
  695. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  696. if ok {
  697. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  698. m.pendingAnswers.Delete(key)
  699. }
  700. }
  701. }
  702. func (m *Matcher) removeAnnouncementEntryByIndex(i int) {
  703. // Assumes s.announcementQueueMutex lock is held.
  704. announcementEntry := m.announcementQueue.At(i)
  705. // This should be only direct call to Remove, as following adjustments
  706. // must always be made when removing.
  707. m.announcementQueue.Remove(i)
  708. // Adjust entry counts by peer IP, used to enforce
  709. // matcherAnnouncementQueueMaxEntriesPerIP.
  710. m.announcementQueueEntryCountByIP[announcementEntry.limitIP] -= 1
  711. if m.announcementQueueEntryCountByIP[announcementEntry.limitIP] == 0 {
  712. delete(m.announcementQueueEntryCountByIP, announcementEntry.limitIP)
  713. }
  714. m.adjustAnnouncementCounts(announcementEntry, -1)
  715. }
  716. func (m *Matcher) adjustAnnouncementCounts(
  717. announcementEntry *announcementEntry, delta int) {
  718. // Assumes s.announcementQueueMutex lock is held.
  719. if announcementEntry.announcement.Properties.IsPersonalCompartmentalized() {
  720. m.announcementsPersonalCompartmentalizedCount += delta
  721. }
  722. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  723. case NATTraversalUnlimited:
  724. m.announcementsUnlimitedNATCount += delta
  725. case NATTraversalPartiallyLimited:
  726. m.announcementsPartiallyLimitedNATCount += delta
  727. case NATTraversalStrictlyLimited:
  728. m.announcementsStrictlyLimitedNATCount += delta
  729. }
  730. }
  731. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) error {
  732. m.offerQueueMutex.Lock()
  733. defer m.offerQueueMutex.Unlock()
  734. // Ensure the queue doesn't grow larger than the max size.
  735. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  736. return errors.TraceNew("queue full")
  737. }
  738. // Ensure no single peer IP can enqueue a large number of entries or
  739. // rapidly enqueue beyond the configured rate.
  740. isAnnouncement := false
  741. err := m.applyLimits(
  742. isAnnouncement, offerEntry.limitIP, ID{})
  743. if err != nil {
  744. return errors.Trace(err)
  745. }
  746. m.offerQueue.PushBack(offerEntry)
  747. m.offerQueueEntryCountByIP[offerEntry.limitIP] += 1
  748. select {
  749. case m.matchSignal <- struct{}{}:
  750. default:
  751. }
  752. return nil
  753. }
  754. func (m *Matcher) removeOfferEntry(offerEntry *offerEntry) {
  755. m.offerQueueMutex.Lock()
  756. defer m.offerQueueMutex.Unlock()
  757. for i := 0; i < m.offerQueue.Len(); i++ {
  758. if m.offerQueue.At(i) == offerEntry {
  759. m.removeOfferEntryByIndex(i)
  760. break
  761. }
  762. }
  763. }
  764. func (m *Matcher) removeOfferEntryByIndex(i int) {
  765. // Assumes s.offerQueueMutex lock is held.
  766. offerEntry := m.offerQueue.At(i)
  767. // This should be only direct call to Remove, as following adjustments
  768. // must always be made when removing.
  769. m.offerQueue.Remove(i)
  770. // Adjust entry counts by peer IP, used to enforce
  771. // matcherOfferQueueMaxEntriesPerIP.
  772. m.offerQueueEntryCountByIP[offerEntry.limitIP] -= 1
  773. if m.offerQueueEntryCountByIP[offerEntry.limitIP] == 0 {
  774. delete(m.offerQueueEntryCountByIP, offerEntry.limitIP)
  775. }
  776. }
  777. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  778. // The pending answer lookup key is used to associate announcements and
  779. // subsequent answers. While the client learns the ConnectionID, only the
  780. // proxy knows the ProxyID component, so only the correct proxy can match
  781. // an answer to an announcement. The ConnectionID component is necessary
  782. // as a proxy may have multiple, concurrent pending answers.
  783. return string(proxyID[:]) + string(connectionID[:])
  784. }
  785. func getRateLimitIP(strIP string) string {
  786. IP := net.ParseIP(strIP)
  787. if IP == nil || IP.To4() != nil {
  788. return strIP
  789. }
  790. // With IPv6, individual users or sites are users commonly allocated a /64
  791. // or /56, so rate limit by /56.
  792. return IP.Mask(net.CIDRMask(56, 128)).String()
  793. }