matcher.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "sync"
  23. "time"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  26. lrucache "github.com/cognusion/go-cache-lru"
  27. "github.com/gammazero/deque"
  28. "github.com/pion/webrtc/v3"
  29. )
  30. // TTLs should be aligned with STUN hole punch lifetimes.
  31. const (
  32. matcherAnnouncementQueueMaxSize = 100000
  33. matcherOfferQueueMaxSize = 100000
  34. matcherPendingAnswersTTL = 30 * time.Second
  35. matcherPendingAnswersMaxSize = 100000
  36. )
  37. // Matcher matches proxy announcements with client offers. Matcher also
  38. // coordinates pending proxy answers and routes answers to the awaiting
  39. // client offer handler.
  40. //
  41. // Matching prioritizes selecting the oldest announcments and client offers,
  42. // as they are closest to timing out.
  43. //
  44. // The client and proxy must supply matching personal or common compartment
  45. // IDs. Personal compartment matching is preferred. Common compartments are
  46. // managed by Psiphon and can be obtained via a tactics parameter or via an
  47. // OSL embedding.
  48. //
  49. // Matching prefers to pair proxies and clients in a way that maximizes total
  50. // possible matches. For a client or proxy with less-limited NAT traversal, a
  51. // pairing with more-limited NAT traversal is preferred; and vice versa.
  52. // Candidates with unknown NAT types and mobile network types are assumed to
  53. // have the most limited NAT traversal capability.
  54. //
  55. // Preferred matchings take priority over announcment age.
  56. //
  57. // The client and proxy will not match if they are in the same country and
  58. // ASN, as it's assumed that doesn't provide any blocking circumvention
  59. // benefit. Disallowing proxies in certain blocked countries is handled at a
  60. // higher level; any such proxies should not be enqueued for matching.
  61. type Matcher struct {
  62. config *MatcherConfig
  63. runMutex sync.Mutex
  64. runContext context.Context
  65. stopRunning context.CancelFunc
  66. waitGroup *sync.WaitGroup
  67. // The announcement queue is implicitly sorted by announcement age. The
  68. // count fields are used to skip searching deeper into the queue for
  69. // preferred matches.
  70. // TODO: replace queue and counts with an indexed, in-memory database?
  71. announcementQueueMutex sync.Mutex
  72. announcementQueue *deque.Deque[*announcementEntry]
  73. announcementsPersonalCompartmentalizedCount int
  74. announcementsUnlimitedNATCount int
  75. announcementsPartiallyLimitedNATCount int
  76. announcementsStrictlyLimitedNATCount int
  77. // The offer queue is also implicitly sorted by offer age. Both an offer
  78. // and announcement queue are required since either announcements or
  79. // offers can arrive while there are no available pairings.
  80. offerQueueMutex sync.Mutex
  81. offerQueue *deque.Deque[*offerEntry]
  82. matchSignal chan struct{}
  83. pendingAnswers *lrucache.Cache
  84. }
  85. // MatchProperties specifies the compartment, GeoIP, and network topology
  86. // matching roperties of clients and proxies.
  87. type MatchProperties struct {
  88. CommonCompartmentIDs []ID
  89. PersonalCompartmentIDs []ID
  90. GeoIPData common.GeoIPData
  91. NetworkType NetworkType
  92. NATType NATType
  93. PortMappingTypes PortMappingTypes
  94. }
  95. // EffectiveNATType combines the set of network properties into an effective
  96. // NAT type. When a port mapping is offered, a NAT type with unlimiter NAT
  97. // traversal is assumed. When NAT type is unknown and the network type is
  98. // mobile, CGNAT with limited NAT traversal is assumed.
  99. func (p *MatchProperties) EffectiveNATType() NATType {
  100. if p.PortMappingTypes.Available() {
  101. return NATTypePortMapping
  102. }
  103. // TODO: can a peer have limited NAT travseral for IPv4 and also have a
  104. // publicly reachable IPv6 ICE host candidate? If so, change the
  105. // effective NAT type? Depends on whether the matched peer can use IPv6.
  106. if p.NATType == NATTypeUnknown && p.NetworkType == NetworkTypeMobile {
  107. return NATTypeMobileNetwork
  108. }
  109. return p.NATType
  110. }
  111. // ExistsPreferredNATMatch indicates whether there exists a preferred NAT
  112. // matching given the types of pairing candidates available.
  113. func (p *MatchProperties) ExistsPreferredNATMatch(
  114. unlimitedNAT, partiallyLimitedNAT, limitedNAT bool) bool {
  115. return p.EffectiveNATType().ExistsPreferredMatch(
  116. unlimitedNAT, partiallyLimitedNAT, limitedNAT)
  117. }
  118. // IsPreferredNATMatch indicates whether the peer candidate is a preferred
  119. // NAT matching.
  120. func (p *MatchProperties) IsPreferredNATMatch(
  121. peerMatchProperties *MatchProperties) bool {
  122. return p.EffectiveNATType().IsPreferredMatch(
  123. peerMatchProperties.EffectiveNATType())
  124. }
  125. // IsPersonalCompartmentalized indicates whether the candidate has personal
  126. // compartment IDs.
  127. func (p *MatchProperties) IsPersonalCompartmentalized() bool {
  128. return len(p.PersonalCompartmentIDs) > 0
  129. }
  130. // MatchAnnouncement is a proxy announcement to be queued for matching.
  131. type MatchAnnouncement struct {
  132. Properties MatchProperties
  133. ProxyID ID
  134. ConnectionID ID
  135. ProxyProtocolVersion int32
  136. }
  137. // MatchOffer is a client offer to be queued for matching.
  138. type MatchOffer struct {
  139. Properties MatchProperties
  140. ClientProxyProtocolVersion int32
  141. ClientOfferSDP webrtc.SessionDescription
  142. ClientRootObfuscationSecret ObfuscationSecret
  143. NetworkProtocol NetworkProtocol
  144. DestinationAddress string
  145. DestinationServerID string
  146. }
  147. // MatchAnswer is a proxy answer, the proxy's follow up to a matched
  148. // announcement, to be routed to the awaiting client offer.
  149. type MatchAnswer struct {
  150. ProxyIP string
  151. ProxyID ID
  152. ConnectionID ID
  153. SelectedProxyProtocolVersion int32
  154. ProxyAnswerSDP webrtc.SessionDescription
  155. }
  156. // announcementEntry is an announcement queue entry, an announcement with its
  157. // associated lifetime context and signaling channel.
  158. type announcementEntry struct {
  159. ctx context.Context
  160. announcement *MatchAnnouncement
  161. offerChan chan *MatchOffer
  162. }
  163. // offerEntry is an offer queue entry, an offer with its associated lifetime
  164. // context and signaling channel.
  165. type offerEntry struct {
  166. ctx context.Context
  167. offer *MatchOffer
  168. answerChan chan *answerInfo
  169. }
  170. // answerInfo is an answer and its associated announcement.
  171. type answerInfo struct {
  172. announcement *MatchAnnouncement
  173. answer *MatchAnswer
  174. }
  175. // pendingAnswer represents an answer that is expected to arrive from a
  176. // proxy.
  177. type pendingAnswer struct {
  178. announcement *MatchAnnouncement
  179. answerChan chan *answerInfo
  180. }
  181. // MatcherConfig specifies the configuration for a matcher.
  182. type MatcherConfig struct {
  183. // Logger is used to log events.
  184. Logger common.Logger
  185. }
  186. // NewMatcher creates a new Matcher.
  187. func NewMatcher(config *MatcherConfig) *Matcher {
  188. return &Matcher{
  189. config: config,
  190. waitGroup: new(sync.WaitGroup),
  191. announcementQueue: deque.New[*announcementEntry](),
  192. offerQueue: deque.New[*offerEntry](),
  193. matchSignal: make(chan struct{}, 1),
  194. // matcherPendingAnswersTTL is not configurable; it supplies a default
  195. // that is expected to be ignored when each entry's TTL is set to the
  196. // Offer ctx timeout.
  197. pendingAnswers: lrucache.NewWithLRU(
  198. matcherPendingAnswersTTL,
  199. 1*time.Minute,
  200. matcherPendingAnswersMaxSize),
  201. }
  202. }
  203. // Start starts running the Matcher. The Matcher runs a goroutine which
  204. // matches announcements and offers.
  205. func (m *Matcher) Start() error {
  206. m.runMutex.Lock()
  207. defer m.runMutex.Unlock()
  208. if m.runContext != nil {
  209. return errors.TraceNew("already running")
  210. }
  211. m.runContext, m.stopRunning = context.WithCancel(context.Background())
  212. m.waitGroup.Add(1)
  213. go func() {
  214. defer m.waitGroup.Done()
  215. m.matchWorker(m.runContext)
  216. }()
  217. return nil
  218. }
  219. // Stop stops running the Matcher and its worker goroutine.
  220. //
  221. // Limitation: Stop is not synchronized with Announce/Offer/Answer, so items
  222. // can get enqueued during and after a Stop call. Stop is intended more for a
  223. // full broker shutdown, where this won't be a concern.
  224. func (m *Matcher) Stop() {
  225. m.runMutex.Lock()
  226. defer m.runMutex.Unlock()
  227. m.stopRunning()
  228. m.waitGroup.Wait()
  229. m.runContext, m.stopRunning = nil, nil
  230. }
  231. // Announce enqueues the proxy announcement and blocks until it is matched
  232. // with a returned offer or ctx is done. The caller must not mutate the
  233. // announcement or its properties after calling Announce.
  234. //
  235. // The offer is sent to the proxy by the broker, and then the proxy sends its
  236. // answer back to the broker, which calls Answer with that value.
  237. func (m *Matcher) Announce(
  238. ctx context.Context,
  239. proxyAnnouncement *MatchAnnouncement) (*MatchOffer, error) {
  240. announcementEntry := &announcementEntry{
  241. ctx: ctx,
  242. announcement: proxyAnnouncement,
  243. offerChan: make(chan *MatchOffer, 1),
  244. }
  245. m.addAnnouncementEntry(announcementEntry)
  246. // Await client offer.
  247. var clientOffer *MatchOffer
  248. select {
  249. case <-ctx.Done():
  250. m.removeAnnouncementEntry(announcementEntry)
  251. return nil, errors.Trace(ctx.Err())
  252. case clientOffer = <-announcementEntry.offerChan:
  253. }
  254. return clientOffer, nil
  255. }
  256. // Offer enqueues the client offer and blocks until it is matched with a
  257. // returned announcement or ctx is done. The caller must not mutate the offer
  258. // or its properties after calling Announce.
  259. //
  260. // The answer is returned to the client by the broker, and the WebRTC
  261. // connection is dialed. The original announcement is also returned, so its
  262. // match properties can be logged.
  263. func (m *Matcher) Offer(
  264. ctx context.Context,
  265. clientOffer *MatchOffer) (*MatchAnswer, *MatchAnnouncement, error) {
  266. offerEntry := &offerEntry{
  267. ctx: ctx,
  268. offer: clientOffer,
  269. answerChan: make(chan *answerInfo, 1),
  270. }
  271. m.addOfferEntry(offerEntry)
  272. // Await proxy answer.
  273. var proxyAnswerInfo *answerInfo
  274. select {
  275. case <-ctx.Done():
  276. m.removeOfferEntry(offerEntry)
  277. // TODO: also remove any pendingAnswers entry? The entry TTL is set to
  278. // the Offer ctx, the client request, timeout, so it will eventually
  279. // get removed. But a client may abort its request earlier than the
  280. // timeout.
  281. return nil, nil, errors.Trace(ctx.Err())
  282. case proxyAnswerInfo = <-offerEntry.answerChan:
  283. }
  284. if proxyAnswerInfo == nil {
  285. // nil will be delivered to the channel when either the proxy
  286. // announcment request concurrently timed out, or the answer
  287. // indicated a proxy error, or the answer did not arrive in time.
  288. return nil, nil, errors.TraceNew("no answer")
  289. }
  290. // This is a sanity check and not expected to fail.
  291. if !proxyAnswerInfo.answer.ConnectionID.Equal(
  292. proxyAnswerInfo.announcement.ConnectionID) {
  293. return nil, nil, errors.TraceNew("unexpected connection ID")
  294. }
  295. return proxyAnswerInfo.answer, proxyAnswerInfo.announcement, nil
  296. }
  297. // Answer delivers an answer from the proxy for a previously matched offer.
  298. // The ProxyID and ConnectionID must correspond to the original announcement.
  299. // The caller must not mutate the answer after calling Answer. Answer does
  300. // not block.
  301. //
  302. // The answer is returned to the awaiting Offer call and sent to the matched
  303. // client.
  304. func (m *Matcher) Answer(
  305. proxyAnswer *MatchAnswer) error {
  306. key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
  307. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  308. if !ok {
  309. // The client is no longer awaiting the response.
  310. return errors.TraceNew("no client")
  311. }
  312. m.pendingAnswers.Delete(key)
  313. pendingAnswer := pendingAnswerValue.(*pendingAnswer)
  314. pendingAnswer.answerChan <- &answerInfo{
  315. announcement: pendingAnswer.announcement,
  316. answer: proxyAnswer,
  317. }
  318. return nil
  319. }
  320. // AnswerError delivers a failed answer indication from the proxy to an
  321. // awaiting offer. The ProxyID and ConnectionID must correspond to the
  322. // original announcement.
  323. //
  324. // The failure indication is returned to the awaiting Offer call and sent to
  325. // the matched client.
  326. func (m *Matcher) AnswerError(proxyID ID, connectionID ID) {
  327. key := m.pendingAnswerKey(proxyID, connectionID)
  328. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  329. if !ok {
  330. // The client is no longer awaiting the response.
  331. return
  332. }
  333. m.pendingAnswers.Delete(key)
  334. // Closing the channel delivers nil, a failed indicator, to any receiver.
  335. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  336. }
  337. // matchWorker is the matching worker goroutine. It idles until signaled that
  338. // a queue item has been added, and then runs a full matching pass.
  339. func (m *Matcher) matchWorker(ctx context.Context) {
  340. for {
  341. select {
  342. case <-m.matchSignal:
  343. case <-ctx.Done():
  344. return
  345. }
  346. m.matchAllOffers()
  347. }
  348. }
  349. // matchAllOffers iterates over the queues, making all possible matches.
  350. func (m *Matcher) matchAllOffers() {
  351. m.announcementQueueMutex.Lock()
  352. defer m.announcementQueueMutex.Unlock()
  353. m.offerQueueMutex.Lock()
  354. defer m.offerQueueMutex.Unlock()
  355. // Take each offer in turn, and select an announcement match. There is an
  356. // implicit preference for older client offers, sooner to timeout, at the
  357. // front of the queue.
  358. // TODO: consider matching one offer, then releasing the locks to allow
  359. // more announcements to be enqueued, then continuing to match.
  360. i := 0
  361. end := m.offerQueue.Len()
  362. for i < end && m.announcementQueue.Len() > 0 {
  363. offerEntry := m.offerQueue.At(i)
  364. // Skip and remove this offer if its deadline has already passed.
  365. // There is no signal to the awaiting Offer function, as it will exit
  366. // based on the same ctx.
  367. if offerEntry.ctx.Err() != nil {
  368. m.offerQueue.Remove(i)
  369. end -= 1
  370. continue
  371. }
  372. j, ok := m.matchOffer(offerEntry)
  373. if !ok {
  374. // No match, so leave this offer in place in the queue and move to
  375. // the next.
  376. i++
  377. continue
  378. }
  379. if m.config.Logger.IsLogLevelDebug() {
  380. m.config.Logger.WithTraceFields(common.LogFields{
  381. "match_index": j,
  382. "offer_queue_size": m.offerQueue.Len(),
  383. "announcement_queue_size": m.announcementQueue.Len(),
  384. }).Debug("match metrics")
  385. }
  386. // Remove the matched announcement from the queue. Send the offer to
  387. // the announcment entry's offerChan, which will deliver it to the
  388. // blocked Announce call. Add a pending answers entry to await the
  389. // proxy's follow up Answer call. The TTL for the pending answer
  390. // entry is set to the matched Offer call's ctx, as the answer is
  391. // only useful as long as the client is still waiting.
  392. announcementEntry := m.announcementQueue.At(j)
  393. expiry := lrucache.DefaultExpiration
  394. deadline, ok := offerEntry.ctx.Deadline()
  395. if ok {
  396. expiry = time.Until(deadline)
  397. }
  398. key := m.pendingAnswerKey(
  399. announcementEntry.announcement.ProxyID,
  400. announcementEntry.announcement.ConnectionID)
  401. m.pendingAnswers.Set(
  402. key,
  403. &pendingAnswer{
  404. announcement: announcementEntry.announcement,
  405. answerChan: offerEntry.answerChan,
  406. },
  407. expiry)
  408. announcementEntry.offerChan <- offerEntry.offer
  409. m.announcementQueue.Remove(j)
  410. m.adjustAnnouncementCounts(announcementEntry, -1)
  411. // Remove the matched offer from the queue and match the next offer,
  412. // now first in the queue.
  413. m.offerQueue.Remove(i)
  414. end -= 1
  415. }
  416. }
  417. func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
  418. // Assumes the caller has the queue mutexed locked.
  419. // Check each announcement in turn, and select a match. There is an
  420. // implicit preference for older proxy announcments, sooner to timeout, at the
  421. // front of the queue.
  422. // Future matching enhancements could include more sophisticated GeoIP
  423. // rules, such as a configuration encoding knowledge of an ASN's NAT
  424. // type, or preferred client/proxy country/ASN matches.
  425. offerProperties := &offerEntry.offer.Properties
  426. // Use the NAT traversal type counters to check if there's any preferred
  427. // NAT match for this offer in the announcement queue. When there is, we
  428. // will search beyond the first announcement.
  429. existsPreferredNATMatch := offerProperties.ExistsPreferredNATMatch(
  430. m.announcementsUnlimitedNATCount > 0,
  431. m.announcementsPartiallyLimitedNATCount > 0,
  432. m.announcementsStrictlyLimitedNATCount > 0)
  433. bestMatch := -1
  434. bestMatchNAT := false
  435. bestMatchCompartment := false
  436. end := m.announcementQueue.Len()
  437. for i := 0; i < end; i++ {
  438. announcementEntry := m.announcementQueue.At(i)
  439. // Skip and remove this announcement if its deadline has already
  440. // passed. There is no signal to the awaiting Announce function, as
  441. // it will exit based on the same ctx.
  442. if announcementEntry.ctx.Err() != nil {
  443. m.announcementQueue.Remove(i)
  444. end -= 1
  445. continue
  446. }
  447. announcementProperties := &announcementEntry.announcement.Properties
  448. // Disallow matching the same country and ASN
  449. if offerProperties.GeoIPData.Country ==
  450. announcementProperties.GeoIPData.Country &&
  451. offerProperties.GeoIPData.ASN ==
  452. announcementProperties.GeoIPData.ASN {
  453. continue
  454. }
  455. // There must be a compartment match. If there is a personal
  456. // compartment match, this match will be preferred.
  457. matchCommonCompartment := HaveCommonIDs(
  458. announcementProperties.CommonCompartmentIDs, offerProperties.CommonCompartmentIDs)
  459. matchPersonalCompartment := HaveCommonIDs(
  460. announcementProperties.PersonalCompartmentIDs, offerProperties.PersonalCompartmentIDs)
  461. if !matchCommonCompartment && !matchPersonalCompartment {
  462. continue
  463. }
  464. // Check if this is a preferred NAT match. Ultimately, a match may be
  465. // made with potentially incompatible NATs, but the client/proxy
  466. // reported NAT types may be incorrect or unknown; the client will
  467. // oftern skip NAT discovery.
  468. matchNAT := offerProperties.IsPreferredNATMatch(announcementProperties)
  469. // At this point, the candidate is a match. Determine if this is a new
  470. // best match.
  471. if bestMatch == -1 {
  472. // This is a match, and there was no previous match, so it becomes
  473. // the provisional best match.
  474. bestMatch = i
  475. bestMatchNAT = matchNAT
  476. bestMatchCompartment = matchPersonalCompartment
  477. } else if !bestMatchNAT && matchNAT {
  478. // If there was a previous best match which was not a preferred
  479. // NAT match, this becomes the new best match. The preferred NAT
  480. // match is prioritized over personal compartment matching.
  481. bestMatch = i
  482. bestMatchNAT = true
  483. bestMatchCompartment = matchPersonalCompartment
  484. } else if !bestMatchCompartment && matchPersonalCompartment && (!bestMatchNAT || matchNAT) {
  485. // If there was a previous best match which was not a personal
  486. // compartment match, and as long as this match doesn't undo a
  487. // better NAT match, this becomes the new best match.
  488. bestMatch = i
  489. bestMatchNAT = matchNAT
  490. bestMatchCompartment = true
  491. }
  492. // Stop as soon as we have the best possible match.
  493. if (bestMatchNAT || !existsPreferredNATMatch) &&
  494. (matchPersonalCompartment || m.announcementsPersonalCompartmentalizedCount == 0) {
  495. break
  496. }
  497. }
  498. return bestMatch, bestMatch != -1
  499. }
  500. func (m *Matcher) addAnnouncementEntry(announcementEntry *announcementEntry) bool {
  501. m.announcementQueueMutex.Lock()
  502. defer m.announcementQueueMutex.Unlock()
  503. if m.announcementQueue.Len() >= matcherAnnouncementQueueMaxSize {
  504. return false
  505. }
  506. m.announcementQueue.PushBack(announcementEntry)
  507. m.adjustAnnouncementCounts(announcementEntry, 1)
  508. select {
  509. case m.matchSignal <- struct{}{}:
  510. default:
  511. }
  512. return true
  513. }
  514. func (m *Matcher) removeAnnouncementEntry(announcementEntry *announcementEntry) {
  515. m.announcementQueueMutex.Lock()
  516. defer m.announcementQueueMutex.Unlock()
  517. found := false
  518. for i := 0; i < m.announcementQueue.Len(); i++ {
  519. if m.announcementQueue.At(i) == announcementEntry {
  520. m.announcementQueue.Remove(i)
  521. m.adjustAnnouncementCounts(announcementEntry, -1)
  522. found = true
  523. break
  524. }
  525. }
  526. if !found {
  527. // The Announce call is aborting and taking its entry back out of the
  528. // queue. If the entry is not found in the queue, then a concurrent
  529. // Offer has matched the announcement. So check for the pending
  530. // answer corresponding to the announcement and remove it and deliver
  531. // a failure signal to the waiting Offer, so the client doesn't wait
  532. // longer than necessary.
  533. key := m.pendingAnswerKey(
  534. announcementEntry.announcement.ProxyID,
  535. announcementEntry.announcement.ConnectionID)
  536. pendingAnswerValue, ok := m.pendingAnswers.Get(key)
  537. if ok {
  538. close(pendingAnswerValue.(*pendingAnswer).answerChan)
  539. m.pendingAnswers.Delete(key)
  540. }
  541. }
  542. }
  543. func (m *Matcher) adjustAnnouncementCounts(
  544. announcementEntry *announcementEntry, delta int) {
  545. // Assumes s.announcementQueueMutex lock is held.
  546. if announcementEntry.announcement.Properties.IsPersonalCompartmentalized() {
  547. m.announcementsPersonalCompartmentalizedCount += delta
  548. }
  549. switch announcementEntry.announcement.Properties.EffectiveNATType().Traversal() {
  550. case NATTraversalUnlimited:
  551. m.announcementsUnlimitedNATCount += delta
  552. case NATTraversalPartiallyLimited:
  553. m.announcementsPartiallyLimitedNATCount += delta
  554. case NATTraversalStrictlyLimited:
  555. m.announcementsStrictlyLimitedNATCount += delta
  556. }
  557. }
  558. func (m *Matcher) addOfferEntry(offerEntry *offerEntry) bool {
  559. m.offerQueueMutex.Lock()
  560. defer m.offerQueueMutex.Unlock()
  561. if m.offerQueue.Len() >= matcherOfferQueueMaxSize {
  562. return false
  563. }
  564. m.offerQueue.PushBack(offerEntry)
  565. select {
  566. case m.matchSignal <- struct{}{}:
  567. default:
  568. }
  569. return true
  570. }
  571. func (m *Matcher) removeOfferEntry(offerEntry *offerEntry) {
  572. m.offerQueueMutex.Lock()
  573. defer m.offerQueueMutex.Unlock()
  574. for i := 0; i < m.offerQueue.Len(); i++ {
  575. if m.offerQueue.At(i) == offerEntry {
  576. m.offerQueue.Remove(i)
  577. break
  578. }
  579. }
  580. }
  581. func (m *Matcher) pendingAnswerKey(proxyID ID, connectionID ID) string {
  582. // The pending answer lookup key is used to associate announcements and
  583. // subsequent answers. While the client learns the ConnectionID, only the
  584. // proxy knows the ProxyID component, so only the correct proxy can match
  585. // an answer to an announcement. The ConnectionID component is necessary
  586. // as a proxy may have multiple, concurrent pending answers.
  587. return string(proxyID[:]) + string(connectionID[:])
  588. }