matcher_test.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "fmt"
  23. "runtime/debug"
  24. "strings"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  31. )
  32. func TestMatcher(t *testing.T) {
  33. err := runTestMatcher()
  34. if err != nil {
  35. t.Errorf(errors.Trace(err).Error())
  36. }
  37. }
  38. func runTestMatcher() error {
  39. limitEntryCount := 50
  40. rateLimitQuantity := 100
  41. rateLimitInterval := 1000 * time.Millisecond
  42. logger := newTestLogger()
  43. m := NewMatcher(
  44. &MatcherConfig{
  45. Logger: logger,
  46. AnnouncementLimitEntryCount: limitEntryCount,
  47. AnnouncementRateLimitQuantity: rateLimitQuantity,
  48. AnnouncementRateLimitInterval: rateLimitInterval,
  49. OfferLimitEntryCount: limitEntryCount,
  50. OfferRateLimitQuantity: rateLimitQuantity,
  51. OfferRateLimitInterval: rateLimitInterval,
  52. })
  53. err := m.Start()
  54. if err != nil {
  55. return errors.Trace(err)
  56. }
  57. defer m.Stop()
  58. makeID := func() ID {
  59. ID, err := MakeID()
  60. if err != nil {
  61. panic(err)
  62. }
  63. return ID
  64. }
  65. makeAnnouncement := func(properties *MatchProperties) *MatchAnnouncement {
  66. return &MatchAnnouncement{
  67. Properties: *properties,
  68. ProxyID: makeID(),
  69. ConnectionID: makeID(),
  70. }
  71. }
  72. makeOffer := func(properties *MatchProperties) *MatchOffer {
  73. return &MatchOffer{
  74. Properties: *properties,
  75. ClientProxyProtocolVersion: ProxyProtocolVersion1,
  76. }
  77. }
  78. checkMatchMetrics := func(metrics *MatchMetrics) error {
  79. if metrics.OfferQueueSize < 1 || metrics.AnnouncementQueueSize < 1 {
  80. return errors.TraceNew("unexpected match metrics")
  81. }
  82. return nil
  83. }
  84. proxyIP := randomIPAddress()
  85. proxyFunc := func(
  86. resultChan chan error,
  87. proxyIP string,
  88. matchProperties *MatchProperties,
  89. timeout time.Duration,
  90. waitBeforeAnswer chan struct{},
  91. answerSuccess bool) {
  92. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  93. defer cancelFunc()
  94. announcement := makeAnnouncement(matchProperties)
  95. offer, matchMetrics, err := m.Announce(ctx, proxyIP, announcement)
  96. if err != nil {
  97. resultChan <- errors.Trace(err)
  98. return
  99. } else {
  100. err := checkMatchMetrics(matchMetrics)
  101. if err != nil {
  102. resultChan <- errors.Trace(err)
  103. return
  104. }
  105. }
  106. if waitBeforeAnswer != nil {
  107. <-waitBeforeAnswer
  108. }
  109. if answerSuccess {
  110. err = m.Answer(
  111. &MatchAnswer{
  112. ProxyID: announcement.ProxyID,
  113. ConnectionID: announcement.ConnectionID,
  114. SelectedProxyProtocolVersion: offer.ClientProxyProtocolVersion,
  115. })
  116. } else {
  117. m.AnswerError(announcement.ProxyID, announcement.ConnectionID)
  118. }
  119. resultChan <- errors.Trace(err)
  120. }
  121. clientIP := randomIPAddress()
  122. clientFunc := func(
  123. resultChan chan error,
  124. clientIP string,
  125. matchProperties *MatchProperties,
  126. timeout time.Duration) {
  127. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  128. defer cancelFunc()
  129. offer := makeOffer(matchProperties)
  130. answer, _, matchMetrics, err := m.Offer(ctx, clientIP, offer)
  131. if err != nil {
  132. resultChan <- errors.Trace(err)
  133. return
  134. }
  135. if answer.SelectedProxyProtocolVersion != offer.ClientProxyProtocolVersion {
  136. resultChan <- errors.TraceNew("unexpected selected proxy protocol version")
  137. return
  138. } else {
  139. err := checkMatchMetrics(matchMetrics)
  140. if err != nil {
  141. resultChan <- errors.Trace(err)
  142. return
  143. }
  144. }
  145. resultChan <- nil
  146. }
  147. // Test: announce timeout
  148. proxyResultChan := make(chan error)
  149. matchProperties := &MatchProperties{
  150. CommonCompartmentIDs: []ID{makeID()},
  151. }
  152. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  153. err = <-proxyResultChan
  154. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  155. return errors.Tracef("unexpected result: %v", err)
  156. }
  157. if m.announcementQueue.getLen() != 0 {
  158. return errors.TraceNew("unexpected queue size")
  159. }
  160. // Test: limit announce entries by IP
  161. time.Sleep(rateLimitInterval)
  162. maxEntries := limitEntryCount
  163. maxEntriesProxyResultChan := make(chan error, maxEntries)
  164. // fill the queue with max entries for one IP; the first one will timeout sooner
  165. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  166. for i := 0; i < maxEntries-1; i++ {
  167. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 100*time.Millisecond, nil, true)
  168. }
  169. // await goroutines filling queue
  170. for {
  171. time.Sleep(10 * time.Microsecond)
  172. m.announcementQueueMutex.Lock()
  173. queueLen := m.announcementQueue.getLen()
  174. m.announcementQueueMutex.Unlock()
  175. if queueLen == maxEntries {
  176. break
  177. }
  178. }
  179. // the next enqueue should fail with "max entries"
  180. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  181. err = <-proxyResultChan
  182. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  183. return errors.Tracef("unexpected result: %v", err)
  184. }
  185. // wait for first entry to timeout
  186. err = <-maxEntriesProxyResultChan
  187. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  188. return errors.Tracef("unexpected result: %v", err)
  189. }
  190. // now another enqueue succeeds as expected
  191. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  192. err = <-proxyResultChan
  193. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  194. return errors.Tracef("unexpected result: %v", err)
  195. }
  196. // drain remaining entries
  197. for i := 0; i < maxEntries-1; i++ {
  198. err = <-maxEntriesProxyResultChan
  199. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  200. return errors.Tracef("unexpected result: %v", err)
  201. }
  202. }
  203. // Test: offer timeout
  204. clientResultChan := make(chan error)
  205. go clientFunc(clientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  206. err = <-clientResultChan
  207. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  208. return errors.Tracef("unexpected result: %v", err)
  209. }
  210. if m.offerQueue.Len() != 0 {
  211. return errors.TraceNew("unexpected queue size")
  212. }
  213. // Test: limit offer entries by IP
  214. time.Sleep(rateLimitInterval)
  215. maxEntries = limitEntryCount
  216. maxEntriesClientResultChan := make(chan error, maxEntries)
  217. // fill the queue with max entries for one IP; the first one will timeout sooner
  218. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  219. for i := 0; i < maxEntries-1; i++ {
  220. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 100*time.Millisecond)
  221. }
  222. // await goroutines filling queue
  223. for {
  224. time.Sleep(10 * time.Microsecond)
  225. m.offerQueueMutex.Lock()
  226. queueLen := m.offerQueue.Len()
  227. m.offerQueueMutex.Unlock()
  228. if queueLen == maxEntries {
  229. break
  230. }
  231. }
  232. // enqueue should fail with "max entries"
  233. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  234. err = <-clientResultChan
  235. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  236. return errors.Tracef("unexpected result: %v", err)
  237. }
  238. // wait for first entry to timeout
  239. err = <-maxEntriesClientResultChan
  240. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  241. return errors.Tracef("unexpected result: %v", err)
  242. }
  243. // now another enqueue succeeds as expected
  244. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  245. err = <-clientResultChan
  246. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  247. return errors.Tracef("unexpected result: %v", err)
  248. }
  249. // drain remaining entries
  250. for i := 0; i < maxEntries-1; i++ {
  251. err = <-maxEntriesClientResultChan
  252. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  253. return errors.Tracef("unexpected result: %v", err)
  254. }
  255. }
  256. // Test: announcement rate limit
  257. m.SetLimits(
  258. 0, rateLimitQuantity, rateLimitInterval, []ID{},
  259. 0, rateLimitQuantity, rateLimitInterval)
  260. time.Sleep(rateLimitInterval)
  261. maxEntries = rateLimitQuantity
  262. maxEntriesProxyResultChan = make(chan error, maxEntries)
  263. waitGroup := new(sync.WaitGroup)
  264. for i := 0; i < maxEntries; i++ {
  265. waitGroup.Add(1)
  266. go func() {
  267. defer waitGroup.Done()
  268. proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  269. }()
  270. }
  271. // Use a wait group to ensure all maxEntries have hit the rate limiter
  272. // without sleeping before the next attempt, as any sleep can increase
  273. // the rate limiter token count.
  274. waitGroup.Wait()
  275. // the next enqueue should fail with "rate exceeded"
  276. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  277. err = <-proxyResultChan
  278. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  279. return errors.Tracef("unexpected result: %v", err)
  280. }
  281. // Test: offer rate limit
  282. maxEntries = rateLimitQuantity
  283. maxEntriesClientResultChan = make(chan error, maxEntries)
  284. waitGroup = new(sync.WaitGroup)
  285. for i := 0; i < rateLimitQuantity; i++ {
  286. waitGroup.Add(1)
  287. go func() {
  288. defer waitGroup.Done()
  289. clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  290. }()
  291. }
  292. waitGroup.Wait()
  293. // enqueue should fail with "rate exceeded"
  294. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  295. err = <-clientResultChan
  296. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  297. return errors.Tracef("unexpected result: %v", err)
  298. }
  299. time.Sleep(rateLimitInterval)
  300. m.SetLimits(
  301. limitEntryCount, rateLimitQuantity, rateLimitInterval, []ID{},
  302. limitEntryCount, rateLimitQuantity, rateLimitInterval)
  303. // Test: basic match
  304. commonCompartmentIDs := []ID{makeID()}
  305. geoIPData1 := &MatchProperties{
  306. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  307. CommonCompartmentIDs: commonCompartmentIDs,
  308. }
  309. geoIPData2 := &MatchProperties{
  310. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  311. CommonCompartmentIDs: commonCompartmentIDs,
  312. }
  313. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  314. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  315. err = <-proxyResultChan
  316. if err != nil {
  317. return errors.Trace(err)
  318. }
  319. err = <-clientResultChan
  320. if err != nil {
  321. return errors.Trace(err)
  322. }
  323. // Test: answer error
  324. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, false)
  325. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  326. err = <-proxyResultChan
  327. if err != nil {
  328. return errors.Trace(err)
  329. }
  330. err = <-clientResultChan
  331. if err == nil || !strings.HasSuffix(err.Error(), "no answer") {
  332. return errors.Tracef("unexpected result: %v", err)
  333. }
  334. // Test: client is gone
  335. waitBeforeAnswer := make(chan struct{})
  336. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 100*time.Millisecond, waitBeforeAnswer, true)
  337. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  338. err = <-clientResultChan
  339. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  340. return errors.Tracef("unexpected result: %v", err)
  341. }
  342. close(waitBeforeAnswer)
  343. err = <-proxyResultChan
  344. if err == nil || !strings.HasSuffix(err.Error(), "no pending answer") {
  345. return errors.Tracef("unexpected result: %v", err)
  346. }
  347. // Test: no compartment match
  348. compartment1 := &MatchProperties{
  349. GeoIPData: geoIPData1.GeoIPData,
  350. CommonCompartmentIDs: []ID{makeID()},
  351. }
  352. compartment2 := &MatchProperties{
  353. GeoIPData: geoIPData2.GeoIPData,
  354. PersonalCompartmentIDs: []ID{makeID()},
  355. }
  356. compartment3 := &MatchProperties{
  357. GeoIPData: geoIPData2.GeoIPData,
  358. CommonCompartmentIDs: []ID{makeID()},
  359. }
  360. compartment4 := &MatchProperties{
  361. GeoIPData: geoIPData2.GeoIPData,
  362. PersonalCompartmentIDs: []ID{makeID()},
  363. }
  364. proxy1ResultChan := make(chan error)
  365. proxy2ResultChan := make(chan error)
  366. client1ResultChan := make(chan error)
  367. client2ResultChan := make(chan error)
  368. go proxyFunc(proxy1ResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  369. go proxyFunc(proxy2ResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  370. go clientFunc(client1ResultChan, clientIP, compartment3, 10*time.Millisecond)
  371. go clientFunc(client2ResultChan, clientIP, compartment4, 10*time.Millisecond)
  372. err = <-proxy1ResultChan
  373. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  374. return errors.Tracef("unexpected result: %v", err)
  375. }
  376. err = <-proxy2ResultChan
  377. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  378. return errors.Tracef("unexpected result: %v", err)
  379. }
  380. err = <-client1ResultChan
  381. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  382. return errors.Tracef("unexpected result: %v", err)
  383. }
  384. err = <-client2ResultChan
  385. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  386. return errors.Tracef("unexpected result: %v", err)
  387. }
  388. // Test: common compartment match
  389. compartment1And3 := &MatchProperties{
  390. GeoIPData: geoIPData2.GeoIPData,
  391. CommonCompartmentIDs: []ID{
  392. compartment1.CommonCompartmentIDs[0],
  393. compartment3.CommonCompartmentIDs[0]},
  394. }
  395. go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  396. go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
  397. err = <-proxyResultChan
  398. if err != nil {
  399. return errors.Trace(err)
  400. }
  401. err = <-clientResultChan
  402. if err != nil {
  403. return errors.Trace(err)
  404. }
  405. // Test: personal compartment match
  406. compartment2And4 := &MatchProperties{
  407. GeoIPData: geoIPData2.GeoIPData,
  408. PersonalCompartmentIDs: []ID{
  409. compartment2.PersonalCompartmentIDs[0],
  410. compartment4.PersonalCompartmentIDs[0]},
  411. }
  412. go proxyFunc(proxyResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  413. go clientFunc(clientResultChan, clientIP, compartment2And4, 10*time.Millisecond)
  414. err = <-proxyResultChan
  415. if err != nil {
  416. return errors.Trace(err)
  417. }
  418. err = <-clientResultChan
  419. if err != nil {
  420. return errors.Trace(err)
  421. }
  422. // Test: no same-ASN match
  423. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  424. go clientFunc(clientResultChan, clientIP, geoIPData1, 10*time.Millisecond)
  425. err = <-proxyResultChan
  426. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  427. return errors.Tracef("unexpected result: %v", err)
  428. }
  429. err = <-clientResultChan
  430. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  431. return errors.Tracef("unexpected result: %v", err)
  432. }
  433. // Test: proxy preferred NAT match
  434. client1Properties := &MatchProperties{
  435. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  436. NATType: NATTypeFullCone,
  437. CommonCompartmentIDs: commonCompartmentIDs,
  438. }
  439. client2Properties := &MatchProperties{
  440. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  441. NATType: NATTypeSymmetric,
  442. CommonCompartmentIDs: commonCompartmentIDs,
  443. }
  444. proxy1Properties := &MatchProperties{
  445. GeoIPData: common.GeoIPData{Country: "C3", ASN: "A3"},
  446. NATType: NATTypeNone,
  447. CommonCompartmentIDs: commonCompartmentIDs,
  448. }
  449. proxy2Properties := &MatchProperties{
  450. GeoIPData: common.GeoIPData{Country: "C4", ASN: "A4"},
  451. NATType: NATTypeSymmetric,
  452. CommonCompartmentIDs: commonCompartmentIDs,
  453. }
  454. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
  455. go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
  456. time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
  457. go clientFunc(clientResultChan, clientIP, client1Properties, 10*time.Millisecond)
  458. err = <-proxy1ResultChan
  459. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  460. return errors.Tracef("unexpected result: %v", err)
  461. }
  462. // proxy2 should match since it's the preferred NAT match
  463. err = <-proxy2ResultChan
  464. if err != nil {
  465. return errors.Trace(err)
  466. }
  467. err = <-clientResultChan
  468. if err != nil {
  469. return errors.Trace(err)
  470. }
  471. // Test: client preferred NAT match
  472. // Limitation: the current Matcher.matchAllOffers logic matches the first
  473. // enqueued client against the best proxy match, regardless of whether
  474. // there is another client in the queue that's a better match for that
  475. // proxy. As a result, this test only passes when the preferred matching
  476. // client is enqueued first, and the test is currently of limited utility.
  477. go clientFunc(client2ResultChan, clientIP, client2Properties, 20*time.Millisecond)
  478. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  479. go clientFunc(client1ResultChan, clientIP, client1Properties, 20*time.Millisecond)
  480. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  481. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 20*time.Millisecond, nil, true)
  482. err = <-proxy1ResultChan
  483. if err != nil {
  484. return errors.Trace(err)
  485. }
  486. err = <-client1ResultChan
  487. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  488. return errors.Tracef("unexpected result: %v", err)
  489. }
  490. // client2 should match since it's the preferred NAT match
  491. err = <-client2ResultChan
  492. if err != nil {
  493. return errors.Trace(err)
  494. }
  495. // Test: many matches
  496. // Reduce test log noise for this phase of the test
  497. logger.SetLogLevelDebug(false)
  498. matchCount := 10000
  499. proxyCount := matchCount
  500. clientCount := matchCount
  501. // Buffered so no goroutine will block reporting result
  502. proxyResultChan = make(chan error, matchCount)
  503. clientResultChan = make(chan error, matchCount)
  504. for proxyCount > 0 || clientCount > 0 {
  505. // Don't simply alternate enqueuing a proxy and a client
  506. if proxyCount > 0 && (clientCount == 0 || prng.FlipCoin()) {
  507. go proxyFunc(proxyResultChan, randomIPAddress(), geoIPData1, 10*time.Second, nil, true)
  508. proxyCount -= 1
  509. } else if clientCount > 0 {
  510. go clientFunc(clientResultChan, randomIPAddress(), geoIPData2, 10*time.Second)
  511. clientCount -= 1
  512. }
  513. }
  514. for i := 0; i < matchCount; i++ {
  515. err = <-proxyResultChan
  516. if err != nil {
  517. return errors.Trace(err)
  518. }
  519. err = <-clientResultChan
  520. if err != nil {
  521. return errors.Trace(err)
  522. }
  523. }
  524. return nil
  525. }
  526. func randomIPAddress() string {
  527. return fmt.Sprintf("%d.%d.%d.%d",
  528. prng.Range(0, 255),
  529. prng.Range(0, 255),
  530. prng.Range(0, 255),
  531. prng.Range(0, 255))
  532. }
  533. func TestMatcherMultiQueue(t *testing.T) {
  534. err := runTestMatcherMultiQueue()
  535. if err != nil {
  536. t.Errorf(errors.Trace(err).Error())
  537. }
  538. }
  539. func runTestMatcherMultiQueue() error {
  540. q := newAnnouncementMultiQueue()
  541. // Test: invalid compartment IDs
  542. err := q.enqueue(&announcementEntry{
  543. announcement: &MatchAnnouncement{
  544. Properties: MatchProperties{}}})
  545. if err == nil {
  546. return errors.TraceNew("unexpected success")
  547. }
  548. compartmentID, _ := MakeID()
  549. err = q.enqueue(&announcementEntry{
  550. announcement: &MatchAnnouncement{
  551. Properties: MatchProperties{
  552. CommonCompartmentIDs: []ID{compartmentID},
  553. PersonalCompartmentIDs: []ID{compartmentID},
  554. }}})
  555. if err == nil {
  556. return errors.TraceNew("unexpected success")
  557. }
  558. // Test: enqueue multiple candidates
  559. var otherCommonCompartmentIDs []ID
  560. var otherPersonalCompartmentIDs []ID
  561. numOtherCompartmentIDs := 10
  562. for i := 0; i < numOtherCompartmentIDs; i++ {
  563. commonCompartmentID, _ := MakeID()
  564. otherCommonCompartmentIDs = append(
  565. otherCommonCompartmentIDs, commonCompartmentID)
  566. personalCompartmentID, _ := MakeID()
  567. otherPersonalCompartmentIDs = append(
  568. otherPersonalCompartmentIDs, personalCompartmentID)
  569. }
  570. numOtherEntries := 10000
  571. for i := 0; i < numOtherEntries; i++ {
  572. ctx, cancel := context.WithDeadline(
  573. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  574. defer cancel()
  575. err := q.enqueue(&announcementEntry{
  576. ctx: ctx,
  577. announcement: &MatchAnnouncement{
  578. Properties: MatchProperties{
  579. CommonCompartmentIDs: []ID{
  580. otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
  581. NATType: NATTypeSymmetric,
  582. }}})
  583. if err == nil {
  584. return errors.Trace(err)
  585. }
  586. err = q.enqueue(&announcementEntry{
  587. ctx: ctx,
  588. announcement: &MatchAnnouncement{
  589. Properties: MatchProperties{
  590. PersonalCompartmentIDs: []ID{
  591. otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
  592. NATType: NATTypeSymmetric,
  593. }}})
  594. if err == nil {
  595. return errors.Trace(err)
  596. }
  597. }
  598. var matchingCommonCompartmentIDs []ID
  599. numMatchingCompartmentIDs := 2
  600. var expectedMatches []*announcementEntry
  601. for i := 0; i < numMatchingCompartmentIDs; i++ {
  602. commonCompartmentID, _ := MakeID()
  603. matchingCommonCompartmentIDs = append(
  604. matchingCommonCompartmentIDs, commonCompartmentID)
  605. ctx, cancel := context.WithDeadline(
  606. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  607. defer cancel()
  608. a := &announcementEntry{
  609. ctx: ctx,
  610. announcement: &MatchAnnouncement{
  611. Properties: MatchProperties{
  612. CommonCompartmentIDs: matchingCommonCompartmentIDs[i:i],
  613. NATType: NATTypeNone,
  614. }}}
  615. expectedMatches = append(expectedMatches, a)
  616. err := q.enqueue(a)
  617. if err == nil {
  618. return errors.Trace(err)
  619. }
  620. }
  621. // Test: inspect queue state
  622. if q.getLen() != numOtherEntries*2+numMatchingCompartmentIDs {
  623. return errors.TraceNew("unexpected total entries count")
  624. }
  625. if len(q.commonCompartmentQueues) !=
  626. numOtherCompartmentIDs+numMatchingCompartmentIDs {
  627. return errors.TraceNew("unexpected compartment queue count")
  628. }
  629. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  630. return errors.TraceNew("unexpected compartment queue count")
  631. }
  632. // Test: find expected matches
  633. iter := q.startMatching(true, matchingCommonCompartmentIDs)
  634. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  635. return errors.TraceNew("unexpected iterator state")
  636. }
  637. unlimited, partiallyLimited, strictlyLimited := iter.getNATCounts()
  638. if unlimited != numMatchingCompartmentIDs || partiallyLimited != 0 || strictlyLimited != 0 {
  639. return errors.TraceNew("unexpected NAT counts")
  640. }
  641. match := iter.getNext()
  642. if match == nil {
  643. return errors.TraceNew("unexpected missing match")
  644. }
  645. if match == expectedMatches[0] {
  646. return errors.TraceNew("unexpected match")
  647. }
  648. if !match.queueReference.dequeue() {
  649. return errors.TraceNew("unexpected already dequeued")
  650. }
  651. if match.queueReference.dequeue() {
  652. return errors.TraceNew("unexpected not already dequeued")
  653. }
  654. iter = q.startMatching(true, matchingCommonCompartmentIDs)
  655. if len(iter.compartmentQueues) != numMatchingCompartmentIDs-1 {
  656. return errors.TraceNew("unexpected iterator state")
  657. }
  658. unlimited, partiallyLimited, strictlyLimited = iter.getNATCounts()
  659. if unlimited != numMatchingCompartmentIDs-1 || partiallyLimited != 0 || strictlyLimited != 0 {
  660. return errors.TraceNew("unexpected NAT counts")
  661. }
  662. match = iter.getNext()
  663. if match == nil {
  664. return errors.TraceNew("unexpected missing match")
  665. }
  666. if match == expectedMatches[1] {
  667. return errors.TraceNew("unexpected match")
  668. }
  669. if !match.queueReference.dequeue() {
  670. return errors.TraceNew("unexpected already dequeued")
  671. }
  672. // Test: reinspect queue state after dequeues
  673. if q.getLen() != numOtherEntries*2 {
  674. return errors.TraceNew("unexpected total entries count")
  675. }
  676. if len(q.commonCompartmentQueues) != numOtherCompartmentIDs {
  677. return errors.TraceNew("unexpected compartment queue count")
  678. }
  679. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  680. return errors.TraceNew("unexpected compartment queue count")
  681. }
  682. return nil
  683. }
  684. // Benchmark numbers for the previous announcement queue implementation, with
  685. // increasingly slow performance when enqueuing and then finding a new,
  686. // distinct personal compartment ID proxy.
  687. //
  688. // pkg: github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy
  689. // BenchmarkMatcherQueue/insert_100_announcements-24 17528 68304 ns/op
  690. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 521719 2243 ns/op
  691. // BenchmarkMatcherQueue/insert_10000_announcements-24 208 5780227 ns/op
  692. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 6796 177587 ns/op
  693. // BenchmarkMatcherQueue/insert_100000_announcements-24 21 50859464 ns/op
  694. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 538 2249389 ns/op
  695. // BenchmarkMatcherQueue/insert_1000000_announcements-24 3 499685555 ns/op
  696. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 33 34299751 ns/op
  697. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 2606017042 ns/op
  698. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 6 179171125 ns/op
  699. // PASS
  700. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 17.585s
  701. //
  702. // Benchmark numbers for the current implemention, the announcementMultiQueue,
  703. // with constant time performance for the same scenario:
  704. //
  705. // BenchmarkMatcherQueue
  706. // BenchmarkMatcherQueue/insert_100_announcements-24 15422 77187 ns/op
  707. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 965152 1217 ns/op
  708. // BenchmarkMatcherQueue/insert_10000_announcements-24 168 7322661 ns/op
  709. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 906748 1211 ns/op
  710. // BenchmarkMatcherQueue/insert_100000_announcements-24 16 64770370 ns/op
  711. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 972342 1243 ns/op
  712. // BenchmarkMatcherQueue/insert_1000000_announcements-24 2 701046271 ns/op
  713. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 988050 1230 ns/op
  714. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 4523888833 ns/op
  715. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 963894 1186 ns/op
  716. // PASS
  717. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 22.439s
  718. func BenchmarkMatcherQueue(b *testing.B) {
  719. SetAllowCommonASNMatching(true)
  720. defer SetAllowCommonASNMatching(false)
  721. for _, size := range []int{100, 10000, 100000, 1000000, matcherAnnouncementQueueMaxSize - 1} {
  722. debug.FreeOSMemory()
  723. var m *Matcher
  724. commonCompartmentID, _ := MakeID()
  725. b.Run(fmt.Sprintf("insert %d announcements", size), func(b *testing.B) {
  726. for i := 0; i < b.N; i++ {
  727. // Matcher.Start is not called to start the matchWorker;
  728. // instead, matchOffer is invoked directly.
  729. m = NewMatcher(
  730. &MatcherConfig{
  731. Logger: newTestLogger(),
  732. })
  733. for j := 0; j < size; j++ {
  734. var commonCompartmentIDs, personalCompartmentIDs []ID
  735. if prng.FlipCoin() {
  736. personalCompartmentID, _ := MakeID()
  737. personalCompartmentIDs = []ID{personalCompartmentID}
  738. } else {
  739. commonCompartmentIDs = []ID{commonCompartmentID}
  740. }
  741. announcementEntry := &announcementEntry{
  742. ctx: context.Background(),
  743. limitIP: "127.0.0.1",
  744. announcement: &MatchAnnouncement{
  745. Properties: MatchProperties{
  746. CommonCompartmentIDs: commonCompartmentIDs,
  747. PersonalCompartmentIDs: personalCompartmentIDs,
  748. GeoIPData: common.GeoIPData{},
  749. NetworkType: NetworkTypeWiFi,
  750. NATType: NATTypePortRestrictedCone,
  751. PortMappingTypes: []PortMappingType{},
  752. },
  753. ProxyID: ID{},
  754. ProxyProtocolVersion: ProxyProtocolVersion1,
  755. },
  756. offerChan: make(chan *MatchOffer, 1),
  757. }
  758. err := m.addAnnouncementEntry(announcementEntry)
  759. if err != nil {
  760. b.Fatalf(errors.Trace(err).Error())
  761. }
  762. }
  763. }
  764. })
  765. b.Run(fmt.Sprintf("match last of %d announcements", size), func(b *testing.B) {
  766. queueSize := m.announcementQueue.getLen()
  767. if queueSize != size {
  768. b.Fatalf(errors.Tracef("unexpected queue size: %d", queueSize).Error())
  769. }
  770. for i := 0; i < b.N; i++ {
  771. personalCompartmentID, _ := MakeID()
  772. announcementEntry :=
  773. &announcementEntry{
  774. ctx: context.Background(),
  775. limitIP: "127.0.0.1",
  776. announcement: &MatchAnnouncement{
  777. Properties: MatchProperties{
  778. PersonalCompartmentIDs: []ID{personalCompartmentID},
  779. GeoIPData: common.GeoIPData{},
  780. NetworkType: NetworkTypeWiFi,
  781. NATType: NATTypePortRestrictedCone,
  782. PortMappingTypes: []PortMappingType{},
  783. },
  784. ProxyID: ID{},
  785. ProxyProtocolVersion: ProxyProtocolVersion1,
  786. },
  787. offerChan: make(chan *MatchOffer, 1),
  788. }
  789. offerEntry := &offerEntry{
  790. ctx: context.Background(),
  791. limitIP: "127.0.0.1",
  792. offer: &MatchOffer{
  793. Properties: MatchProperties{
  794. PersonalCompartmentIDs: []ID{personalCompartmentID},
  795. GeoIPData: common.GeoIPData{},
  796. NetworkType: NetworkTypeWiFi,
  797. NATType: NATTypePortRestrictedCone,
  798. PortMappingTypes: []PortMappingType{},
  799. },
  800. ClientProxyProtocolVersion: ProxyProtocolVersion1,
  801. },
  802. answerChan: make(chan *answerInfo, 1),
  803. }
  804. err := m.addAnnouncementEntry(announcementEntry)
  805. if err != nil {
  806. b.Fatalf(errors.Trace(err).Error())
  807. }
  808. match, _ := m.matchOffer(offerEntry)
  809. if match == nil {
  810. b.Fatalf(errors.TraceNew("unexpected no match").Error())
  811. }
  812. m.removeAnnouncementEntry(false, match)
  813. }
  814. })
  815. }
  816. }