matcher_test.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "fmt"
  23. "runtime/debug"
  24. "strings"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  31. )
  32. func TestMatcher(t *testing.T) {
  33. err := runTestMatcher()
  34. if err != nil {
  35. t.Errorf(errors.Trace(err).Error())
  36. }
  37. }
  38. func runTestMatcher() error {
  39. limitEntryCount := 50
  40. rateLimitQuantity := 100
  41. rateLimitInterval := 1000 * time.Millisecond
  42. logger := newTestLogger()
  43. m := NewMatcher(
  44. &MatcherConfig{
  45. Logger: logger,
  46. AnnouncementLimitEntryCount: limitEntryCount,
  47. AnnouncementRateLimitQuantity: rateLimitQuantity,
  48. AnnouncementRateLimitInterval: rateLimitInterval,
  49. OfferLimitEntryCount: limitEntryCount,
  50. OfferRateLimitQuantity: rateLimitQuantity,
  51. OfferRateLimitInterval: rateLimitInterval,
  52. })
  53. err := m.Start()
  54. if err != nil {
  55. return errors.Trace(err)
  56. }
  57. defer m.Stop()
  58. makeID := func() ID {
  59. ID, err := MakeID()
  60. if err != nil {
  61. panic(err)
  62. }
  63. return ID
  64. }
  65. makeAnnouncement := func(properties *MatchProperties) *MatchAnnouncement {
  66. return &MatchAnnouncement{
  67. Properties: *properties,
  68. ProxyID: makeID(),
  69. ConnectionID: makeID(),
  70. }
  71. }
  72. makeOffer := func(properties *MatchProperties, useMediaStreams bool) *MatchOffer {
  73. return &MatchOffer{
  74. Properties: *properties,
  75. UseMediaStreams: useMediaStreams,
  76. }
  77. }
  78. checkMatchMetrics := func(metrics *MatchMetrics) error {
  79. if metrics.OfferQueueSize < 1 || metrics.AnnouncementQueueSize < 1 {
  80. return errors.TraceNew("unexpected match metrics")
  81. }
  82. return nil
  83. }
  84. proxyIP := randomIPAddress()
  85. proxyFunc := func(
  86. resultChan chan error,
  87. proxyIP string,
  88. matchProperties *MatchProperties,
  89. timeout time.Duration,
  90. waitBeforeAnswer chan struct{},
  91. answerSuccess bool) {
  92. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  93. defer cancelFunc()
  94. announcement := makeAnnouncement(matchProperties)
  95. offer, matchMetrics, err := m.Announce(ctx, proxyIP, announcement)
  96. if err != nil {
  97. resultChan <- errors.Trace(err)
  98. return
  99. }
  100. err = checkMatchMetrics(matchMetrics)
  101. if err != nil {
  102. resultChan <- errors.Trace(err)
  103. return
  104. }
  105. _, ok := negotiateProtocolVersion(
  106. matchProperties.ProtocolVersion,
  107. offer.Properties.ProtocolVersion,
  108. offer.UseMediaStreams)
  109. if !ok {
  110. resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
  111. return
  112. }
  113. if waitBeforeAnswer != nil {
  114. <-waitBeforeAnswer
  115. }
  116. if answerSuccess {
  117. err = m.Answer(
  118. &MatchAnswer{
  119. ProxyID: announcement.ProxyID,
  120. ConnectionID: announcement.ConnectionID,
  121. })
  122. } else {
  123. m.AnswerError(announcement.ProxyID, announcement.ConnectionID)
  124. }
  125. resultChan <- errors.Trace(err)
  126. }
  127. clientIP := randomIPAddress()
  128. baseClientFunc := func(
  129. resultChan chan error,
  130. clientIP string,
  131. matchProperties *MatchProperties,
  132. useMediaStreams bool,
  133. timeout time.Duration) {
  134. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  135. defer cancelFunc()
  136. offer := makeOffer(matchProperties, useMediaStreams)
  137. _, matchAnnouncement, matchMetrics, err := m.Offer(ctx, clientIP, offer)
  138. if err != nil {
  139. resultChan <- errors.Trace(err)
  140. return
  141. }
  142. err = checkMatchMetrics(matchMetrics)
  143. if err != nil {
  144. resultChan <- errors.Trace(err)
  145. return
  146. }
  147. _, ok := negotiateProtocolVersion(
  148. matchAnnouncement.Properties.ProtocolVersion,
  149. offer.Properties.ProtocolVersion,
  150. offer.UseMediaStreams)
  151. if !ok {
  152. resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
  153. return
  154. }
  155. resultChan <- nil
  156. }
  157. clientFunc := func(resultChan chan error, clientIP string,
  158. matchProperties *MatchProperties, timeout time.Duration) {
  159. baseClientFunc(resultChan, clientIP, matchProperties, false, timeout)
  160. }
  161. clientUsingMediaStreamsFunc := func(resultChan chan error, clientIP string,
  162. matchProperties *MatchProperties, timeout time.Duration) {
  163. baseClientFunc(resultChan, clientIP, matchProperties, true, timeout)
  164. }
  165. // Test: announce timeout
  166. proxyResultChan := make(chan error)
  167. matchProperties := &MatchProperties{
  168. ProtocolVersion: LatestProtocolVersion,
  169. CommonCompartmentIDs: []ID{makeID()},
  170. }
  171. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  172. err = <-proxyResultChan
  173. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  174. return errors.Tracef("unexpected result: %v", err)
  175. }
  176. if m.announcementQueue.getLen() != 0 {
  177. return errors.TraceNew("unexpected queue size")
  178. }
  179. // Test: limit announce entries by IP
  180. time.Sleep(rateLimitInterval)
  181. maxEntries := limitEntryCount
  182. maxEntriesProxyResultChan := make(chan error, maxEntries)
  183. // fill the queue with max entries for one IP; the first one will timeout sooner
  184. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  185. for i := 0; i < maxEntries-1; i++ {
  186. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 100*time.Millisecond, nil, true)
  187. }
  188. // await goroutines filling queue
  189. for {
  190. time.Sleep(10 * time.Microsecond)
  191. m.announcementQueueMutex.Lock()
  192. queueLen := m.announcementQueue.getLen()
  193. m.announcementQueueMutex.Unlock()
  194. if queueLen == maxEntries {
  195. break
  196. }
  197. }
  198. // the next enqueue should fail with "max entries"
  199. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  200. err = <-proxyResultChan
  201. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  202. return errors.Tracef("unexpected result: %v", err)
  203. }
  204. // wait for first entry to timeout
  205. err = <-maxEntriesProxyResultChan
  206. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  207. return errors.Tracef("unexpected result: %v", err)
  208. }
  209. // now another enqueue succeeds as expected
  210. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  211. err = <-proxyResultChan
  212. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  213. return errors.Tracef("unexpected result: %v", err)
  214. }
  215. // drain remaining entries
  216. for i := 0; i < maxEntries-1; i++ {
  217. err = <-maxEntriesProxyResultChan
  218. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  219. return errors.Tracef("unexpected result: %v", err)
  220. }
  221. }
  222. // Test: offer timeout
  223. clientResultChan := make(chan error)
  224. go clientFunc(clientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  225. err = <-clientResultChan
  226. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  227. return errors.Tracef("unexpected result: %v", err)
  228. }
  229. if m.offerQueue.Len() != 0 {
  230. return errors.TraceNew("unexpected queue size")
  231. }
  232. // Test: limit offer entries by IP
  233. time.Sleep(rateLimitInterval)
  234. maxEntries = limitEntryCount
  235. maxEntriesClientResultChan := make(chan error, maxEntries)
  236. // fill the queue with max entries for one IP; the first one will timeout sooner
  237. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  238. for i := 0; i < maxEntries-1; i++ {
  239. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 100*time.Millisecond)
  240. }
  241. // await goroutines filling queue
  242. for {
  243. time.Sleep(10 * time.Microsecond)
  244. m.offerQueueMutex.Lock()
  245. queueLen := m.offerQueue.Len()
  246. m.offerQueueMutex.Unlock()
  247. if queueLen == maxEntries {
  248. break
  249. }
  250. }
  251. // enqueue should fail with "max entries"
  252. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  253. err = <-clientResultChan
  254. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  255. return errors.Tracef("unexpected result: %v", err)
  256. }
  257. // wait for first entry to timeout
  258. err = <-maxEntriesClientResultChan
  259. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  260. return errors.Tracef("unexpected result: %v", err)
  261. }
  262. // now another enqueue succeeds as expected
  263. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  264. err = <-clientResultChan
  265. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  266. return errors.Tracef("unexpected result: %v", err)
  267. }
  268. // drain remaining entries
  269. for i := 0; i < maxEntries-1; i++ {
  270. err = <-maxEntriesClientResultChan
  271. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  272. return errors.Tracef("unexpected result: %v", err)
  273. }
  274. }
  275. // Test: announcement rate limit
  276. m.SetLimits(
  277. 0, rateLimitQuantity, rateLimitInterval, []ID{},
  278. 0, rateLimitQuantity, rateLimitInterval)
  279. time.Sleep(rateLimitInterval)
  280. maxEntries = rateLimitQuantity
  281. maxEntriesProxyResultChan = make(chan error, maxEntries)
  282. waitGroup := new(sync.WaitGroup)
  283. for i := 0; i < maxEntries; i++ {
  284. waitGroup.Add(1)
  285. go func() {
  286. defer waitGroup.Done()
  287. proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  288. }()
  289. }
  290. // Use a wait group to ensure all maxEntries have hit the rate limiter
  291. // without sleeping before the next attempt, as any sleep can increase
  292. // the rate limiter token count.
  293. waitGroup.Wait()
  294. // the next enqueue should fail with "rate exceeded"
  295. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  296. err = <-proxyResultChan
  297. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  298. return errors.Tracef("unexpected result: %v", err)
  299. }
  300. // Test: offer rate limit
  301. maxEntries = rateLimitQuantity
  302. maxEntriesClientResultChan = make(chan error, maxEntries)
  303. waitGroup = new(sync.WaitGroup)
  304. for i := 0; i < rateLimitQuantity; i++ {
  305. waitGroup.Add(1)
  306. go func() {
  307. defer waitGroup.Done()
  308. clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  309. }()
  310. }
  311. waitGroup.Wait()
  312. // enqueue should fail with "rate exceeded"
  313. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  314. err = <-clientResultChan
  315. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  316. return errors.Tracef("unexpected result: %v", err)
  317. }
  318. time.Sleep(rateLimitInterval)
  319. m.SetLimits(
  320. limitEntryCount, rateLimitQuantity, rateLimitInterval, []ID{},
  321. limitEntryCount, rateLimitQuantity, rateLimitInterval)
  322. // Test: basic match
  323. commonCompartmentIDs := []ID{makeID()}
  324. geoIPData1 := &MatchProperties{
  325. ProtocolVersion: LatestProtocolVersion,
  326. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  327. CommonCompartmentIDs: commonCompartmentIDs,
  328. }
  329. geoIPData2 := &MatchProperties{
  330. ProtocolVersion: LatestProtocolVersion,
  331. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  332. CommonCompartmentIDs: commonCompartmentIDs,
  333. }
  334. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  335. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  336. err = <-proxyResultChan
  337. if err != nil {
  338. return errors.Trace(err)
  339. }
  340. err = <-clientResultChan
  341. if err != nil {
  342. return errors.Trace(err)
  343. }
  344. // Test: answer error
  345. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, false)
  346. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  347. err = <-proxyResultChan
  348. if err != nil {
  349. return errors.Trace(err)
  350. }
  351. err = <-clientResultChan
  352. if err == nil || !strings.HasSuffix(err.Error(), "no answer") {
  353. return errors.Tracef("unexpected result: %v", err)
  354. }
  355. // Test: client is gone
  356. waitBeforeAnswer := make(chan struct{})
  357. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 100*time.Millisecond, waitBeforeAnswer, true)
  358. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  359. err = <-clientResultChan
  360. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  361. return errors.Tracef("unexpected result: %v", err)
  362. }
  363. close(waitBeforeAnswer)
  364. err = <-proxyResultChan
  365. if err == nil || !strings.HasSuffix(err.Error(), "no pending answer") {
  366. return errors.Tracef("unexpected result: %v", err)
  367. }
  368. // Test: no compartment match
  369. compartment1 := &MatchProperties{
  370. ProtocolVersion: LatestProtocolVersion,
  371. GeoIPData: geoIPData1.GeoIPData,
  372. CommonCompartmentIDs: []ID{makeID()},
  373. }
  374. compartment2 := &MatchProperties{
  375. ProtocolVersion: LatestProtocolVersion,
  376. GeoIPData: geoIPData2.GeoIPData,
  377. PersonalCompartmentIDs: []ID{makeID()},
  378. }
  379. compartment3 := &MatchProperties{
  380. ProtocolVersion: LatestProtocolVersion,
  381. GeoIPData: geoIPData2.GeoIPData,
  382. CommonCompartmentIDs: []ID{makeID()},
  383. }
  384. compartment4 := &MatchProperties{
  385. ProtocolVersion: LatestProtocolVersion,
  386. GeoIPData: geoIPData2.GeoIPData,
  387. PersonalCompartmentIDs: []ID{makeID()},
  388. }
  389. proxy1ResultChan := make(chan error)
  390. proxy2ResultChan := make(chan error)
  391. client1ResultChan := make(chan error)
  392. client2ResultChan := make(chan error)
  393. go proxyFunc(proxy1ResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  394. go proxyFunc(proxy2ResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  395. go clientFunc(client1ResultChan, clientIP, compartment3, 10*time.Millisecond)
  396. go clientFunc(client2ResultChan, clientIP, compartment4, 10*time.Millisecond)
  397. err = <-proxy1ResultChan
  398. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  399. return errors.Tracef("unexpected result: %v", err)
  400. }
  401. err = <-proxy2ResultChan
  402. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  403. return errors.Tracef("unexpected result: %v", err)
  404. }
  405. err = <-client1ResultChan
  406. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  407. return errors.Tracef("unexpected result: %v", err)
  408. }
  409. err = <-client2ResultChan
  410. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  411. return errors.Tracef("unexpected result: %v", err)
  412. }
  413. // Test: common compartment match
  414. compartment1And3 := &MatchProperties{
  415. ProtocolVersion: LatestProtocolVersion,
  416. GeoIPData: geoIPData2.GeoIPData,
  417. CommonCompartmentIDs: []ID{
  418. compartment1.CommonCompartmentIDs[0],
  419. compartment3.CommonCompartmentIDs[0]},
  420. }
  421. go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  422. go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
  423. err = <-proxyResultChan
  424. if err != nil {
  425. return errors.Trace(err)
  426. }
  427. err = <-clientResultChan
  428. if err != nil {
  429. return errors.Trace(err)
  430. }
  431. // Test: personal compartment match
  432. compartment2And4 := &MatchProperties{
  433. ProtocolVersion: LatestProtocolVersion,
  434. GeoIPData: geoIPData2.GeoIPData,
  435. PersonalCompartmentIDs: []ID{
  436. compartment2.PersonalCompartmentIDs[0],
  437. compartment4.PersonalCompartmentIDs[0]},
  438. }
  439. go proxyFunc(proxyResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  440. go clientFunc(clientResultChan, clientIP, compartment2And4, 10*time.Millisecond)
  441. err = <-proxyResultChan
  442. if err != nil {
  443. return errors.Trace(err)
  444. }
  445. err = <-clientResultChan
  446. if err != nil {
  447. return errors.Trace(err)
  448. }
  449. // Test: no same-ASN match
  450. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  451. go clientFunc(clientResultChan, clientIP, geoIPData1, 10*time.Millisecond)
  452. err = <-proxyResultChan
  453. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  454. return errors.Tracef("unexpected result: %v", err)
  455. }
  456. err = <-clientResultChan
  457. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  458. return errors.Tracef("unexpected result: %v", err)
  459. }
  460. // Test: downgrade-compatible protocol version match
  461. protocolVersion1 := &MatchProperties{
  462. ProtocolVersion: ProtocolVersion1,
  463. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  464. CommonCompartmentIDs: commonCompartmentIDs,
  465. }
  466. protocolVersion2 := &MatchProperties{
  467. ProtocolVersion: ProtocolVersion2,
  468. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  469. CommonCompartmentIDs: commonCompartmentIDs,
  470. }
  471. go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
  472. go clientFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
  473. err = <-proxyResultChan
  474. if err != nil {
  475. return errors.Trace(err)
  476. }
  477. err = <-clientResultChan
  478. if err != nil {
  479. return errors.Trace(err)
  480. }
  481. // Test: no incompatible protocol version match
  482. go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
  483. go clientUsingMediaStreamsFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
  484. err = <-proxyResultChan
  485. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  486. return errors.Tracef("unexpected result: %v", err)
  487. }
  488. err = <-clientResultChan
  489. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  490. return errors.Tracef("unexpected result: %v", err)
  491. }
  492. // Test: downgrade-compatible protocol version match
  493. // Test: proxy preferred NAT match
  494. client1Properties := &MatchProperties{
  495. ProtocolVersion: LatestProtocolVersion,
  496. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  497. NATType: NATTypeFullCone,
  498. CommonCompartmentIDs: commonCompartmentIDs,
  499. }
  500. client2Properties := &MatchProperties{
  501. ProtocolVersion: LatestProtocolVersion,
  502. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  503. NATType: NATTypeSymmetric,
  504. CommonCompartmentIDs: commonCompartmentIDs,
  505. }
  506. proxy1Properties := &MatchProperties{
  507. ProtocolVersion: LatestProtocolVersion,
  508. GeoIPData: common.GeoIPData{Country: "C3", ASN: "A3"},
  509. NATType: NATTypeNone,
  510. CommonCompartmentIDs: commonCompartmentIDs,
  511. }
  512. proxy2Properties := &MatchProperties{
  513. ProtocolVersion: LatestProtocolVersion,
  514. GeoIPData: common.GeoIPData{Country: "C4", ASN: "A4"},
  515. NATType: NATTypeSymmetric,
  516. CommonCompartmentIDs: commonCompartmentIDs,
  517. }
  518. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
  519. go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
  520. time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
  521. go clientFunc(clientResultChan, clientIP, client1Properties, 10*time.Millisecond)
  522. err = <-proxy1ResultChan
  523. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  524. return errors.Tracef("unexpected result: %v", err)
  525. }
  526. // proxy2 should match since it's the preferred NAT match
  527. err = <-proxy2ResultChan
  528. if err != nil {
  529. return errors.Trace(err)
  530. }
  531. err = <-clientResultChan
  532. if err != nil {
  533. return errors.Trace(err)
  534. }
  535. // Test: client preferred NAT match
  536. // Limitation: the current Matcher.matchAllOffers logic matches the first
  537. // enqueued client against the best proxy match, regardless of whether
  538. // there is another client in the queue that's a better match for that
  539. // proxy. As a result, this test only passes when the preferred matching
  540. // client is enqueued first, and the test is currently of limited utility.
  541. go clientFunc(client2ResultChan, clientIP, client2Properties, 20*time.Millisecond)
  542. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  543. go clientFunc(client1ResultChan, clientIP, client1Properties, 20*time.Millisecond)
  544. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  545. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 20*time.Millisecond, nil, true)
  546. err = <-proxy1ResultChan
  547. if err != nil {
  548. return errors.Trace(err)
  549. }
  550. err = <-client1ResultChan
  551. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  552. return errors.Tracef("unexpected result: %v", err)
  553. }
  554. // client2 should match since it's the preferred NAT match
  555. err = <-client2ResultChan
  556. if err != nil {
  557. return errors.Trace(err)
  558. }
  559. // Test: priority supercedes preferred NAT match
  560. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
  561. time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
  562. proxy2Properties.IsPriority = true
  563. go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
  564. time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
  565. go clientFunc(clientResultChan, clientIP, client2Properties, 10*time.Millisecond)
  566. err = <-proxy1ResultChan
  567. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  568. return errors.Tracef("unexpected result: %v", err)
  569. }
  570. // proxy2 should match since it's the priority, but not preferred NAT match
  571. err = <-proxy2ResultChan
  572. if err != nil {
  573. return errors.Trace(err)
  574. }
  575. err = <-clientResultChan
  576. if err != nil {
  577. return errors.Trace(err)
  578. }
  579. // Test: many matches
  580. // Reduce test log noise for this phase of the test
  581. logger.SetLogLevelDebug(false)
  582. matchCount := 10000
  583. proxyCount := matchCount
  584. clientCount := matchCount
  585. // Buffered so no goroutine will block reporting result
  586. proxyResultChan = make(chan error, matchCount)
  587. clientResultChan = make(chan error, matchCount)
  588. for proxyCount > 0 || clientCount > 0 {
  589. // Don't simply alternate enqueuing a proxy and a client
  590. if proxyCount > 0 && (clientCount == 0 || prng.FlipCoin()) {
  591. go proxyFunc(proxyResultChan, randomIPAddress(), geoIPData1, 10*time.Second, nil, true)
  592. proxyCount -= 1
  593. } else if clientCount > 0 {
  594. go clientFunc(clientResultChan, randomIPAddress(), geoIPData2, 10*time.Second)
  595. clientCount -= 1
  596. }
  597. }
  598. for i := 0; i < matchCount; i++ {
  599. err = <-proxyResultChan
  600. if err != nil {
  601. return errors.Trace(err)
  602. }
  603. err = <-clientResultChan
  604. if err != nil {
  605. return errors.Trace(err)
  606. }
  607. }
  608. return nil
  609. }
  610. func randomIPAddress() string {
  611. return fmt.Sprintf("%d.%d.%d.%d",
  612. prng.Range(0, 255),
  613. prng.Range(0, 255),
  614. prng.Range(0, 255),
  615. prng.Range(0, 255))
  616. }
  617. func TestMatcherMultiQueue(t *testing.T) {
  618. err := runTestMatcherMultiQueue()
  619. if err != nil {
  620. t.Errorf(errors.Trace(err).Error())
  621. }
  622. }
  623. func runTestMatcherMultiQueue() error {
  624. // Test: invalid compartment IDs
  625. q := newAnnouncementMultiQueue()
  626. err := q.enqueue(
  627. &announcementEntry{
  628. announcement: &MatchAnnouncement{
  629. Properties: MatchProperties{}}})
  630. if err == nil {
  631. return errors.TraceNew("unexpected success")
  632. }
  633. compartmentID, _ := MakeID()
  634. err = q.enqueue(
  635. &announcementEntry{
  636. announcement: &MatchAnnouncement{
  637. Properties: MatchProperties{
  638. CommonCompartmentIDs: []ID{compartmentID},
  639. PersonalCompartmentIDs: []ID{compartmentID},
  640. }}})
  641. if err == nil {
  642. return errors.TraceNew("unexpected success")
  643. }
  644. // Test: enqueue multiple candidates
  645. var otherCommonCompartmentIDs []ID
  646. var otherPersonalCompartmentIDs []ID
  647. numOtherCompartmentIDs := 10
  648. for i := 0; i < numOtherCompartmentIDs; i++ {
  649. commonCompartmentID, _ := MakeID()
  650. otherCommonCompartmentIDs = append(
  651. otherCommonCompartmentIDs, commonCompartmentID)
  652. personalCompartmentID, _ := MakeID()
  653. otherPersonalCompartmentIDs = append(
  654. otherPersonalCompartmentIDs, personalCompartmentID)
  655. }
  656. numOtherEntries := 10000
  657. for i := 0; i < numOtherEntries; i++ {
  658. ctx, cancel := context.WithDeadline(
  659. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  660. defer cancel()
  661. err := q.enqueue(
  662. &announcementEntry{
  663. ctx: ctx,
  664. announcement: &MatchAnnouncement{
  665. Properties: MatchProperties{
  666. CommonCompartmentIDs: []ID{
  667. otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
  668. NATType: NATTypeSymmetric,
  669. }}})
  670. if err != nil {
  671. return errors.Trace(err)
  672. }
  673. err = q.enqueue(
  674. &announcementEntry{
  675. ctx: ctx,
  676. announcement: &MatchAnnouncement{
  677. Properties: MatchProperties{
  678. PersonalCompartmentIDs: []ID{
  679. otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
  680. NATType: NATTypeSymmetric,
  681. }}})
  682. if err != nil {
  683. return errors.Trace(err)
  684. }
  685. }
  686. var matchingCommonCompartmentIDs []ID
  687. numMatchingCompartmentIDs := 2
  688. numMatchingEntries := 2
  689. var expectedMatches []*announcementEntry
  690. for i := 0; i < numMatchingCompartmentIDs; i++ {
  691. for j := 0; j < numMatchingEntries; j++ {
  692. commonCompartmentID, _ := MakeID()
  693. matchingCommonCompartmentIDs = append(
  694. matchingCommonCompartmentIDs, commonCompartmentID)
  695. ctx, cancel := context.WithDeadline(
  696. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  697. defer cancel()
  698. a := &announcementEntry{
  699. ctx: ctx,
  700. announcement: &MatchAnnouncement{
  701. Properties: MatchProperties{
  702. CommonCompartmentIDs: matchingCommonCompartmentIDs[i : i+1],
  703. NATType: NATTypeNone,
  704. }}}
  705. expectedMatches = append(expectedMatches, a)
  706. err := q.enqueue(a)
  707. if err != nil {
  708. return errors.Trace(err)
  709. }
  710. }
  711. }
  712. // Test: inspect queue state
  713. if q.getLen() != numOtherEntries*2+numMatchingCompartmentIDs*numMatchingEntries {
  714. return errors.TraceNew("unexpected total entries count")
  715. }
  716. if len(q.commonCompartmentQueues) !=
  717. numOtherCompartmentIDs+numMatchingCompartmentIDs {
  718. return errors.TraceNew("unexpected compartment queue count")
  719. }
  720. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  721. return errors.TraceNew("unexpected compartment queue count")
  722. }
  723. // Test: find expected matches
  724. iter := q.startMatching(true, matchingCommonCompartmentIDs)
  725. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  726. return errors.TraceNew("unexpected iterator state")
  727. }
  728. unlimited, partiallyLimited, strictlyLimited := iter.getNATCounts()
  729. if unlimited != numMatchingCompartmentIDs*numMatchingEntries ||
  730. partiallyLimited != 0 ||
  731. strictlyLimited != 0 {
  732. return errors.TraceNew("unexpected NAT counts")
  733. }
  734. match, _ := iter.getNext()
  735. if match == nil {
  736. return errors.TraceNew("unexpected missing match")
  737. }
  738. if match != expectedMatches[0] {
  739. return errors.TraceNew("unexpected match")
  740. }
  741. if !match.queueReference.dequeue() {
  742. return errors.TraceNew("unexpected already dequeued")
  743. }
  744. if match.queueReference.dequeue() {
  745. return errors.TraceNew("unexpected not already dequeued")
  746. }
  747. iter = q.startMatching(true, matchingCommonCompartmentIDs)
  748. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  749. return errors.TraceNew("unexpected iterator state")
  750. }
  751. unlimited, partiallyLimited, strictlyLimited = iter.getNATCounts()
  752. if unlimited != numMatchingEntries*numMatchingCompartmentIDs-1 ||
  753. partiallyLimited != 0 ||
  754. strictlyLimited != 0 {
  755. return errors.TraceNew("unexpected NAT counts")
  756. }
  757. match, _ = iter.getNext()
  758. if match == nil {
  759. return errors.TraceNew("unexpected missing match")
  760. }
  761. if match != expectedMatches[1] {
  762. return errors.TraceNew("unexpected match")
  763. }
  764. if !match.queueReference.dequeue() {
  765. return errors.TraceNew("unexpected already dequeued")
  766. }
  767. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  768. return errors.TraceNew("unexpected iterator state")
  769. }
  770. // Test: getNext after dequeue
  771. match, _ = iter.getNext()
  772. if match == nil {
  773. return errors.TraceNew("unexpected missing match")
  774. }
  775. if match != expectedMatches[2] {
  776. return errors.TraceNew("unexpected match")
  777. }
  778. if !match.queueReference.dequeue() {
  779. return errors.TraceNew("unexpected already dequeued")
  780. }
  781. match, _ = iter.getNext()
  782. if match == nil {
  783. return errors.TraceNew("unexpected missing match")
  784. }
  785. if match != expectedMatches[3] {
  786. return errors.TraceNew("unexpected match")
  787. }
  788. if !match.queueReference.dequeue() {
  789. return errors.TraceNew("unexpected already dequeued")
  790. }
  791. // Test: reinspect queue state after dequeues
  792. if q.getLen() != numOtherEntries*2 {
  793. return errors.TraceNew("unexpected total entries count")
  794. }
  795. if len(q.commonCompartmentQueues) != numOtherCompartmentIDs {
  796. return errors.TraceNew("unexpected compartment queue count")
  797. }
  798. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  799. return errors.TraceNew("unexpected compartment queue count")
  800. }
  801. // Test: priority
  802. q = newAnnouncementMultiQueue()
  803. var commonCompartmentIDs []ID
  804. numCompartmentIDs := 10
  805. for i := 0; i < numCompartmentIDs; i++ {
  806. commonCompartmentID, _ := MakeID()
  807. commonCompartmentIDs = append(
  808. commonCompartmentIDs, commonCompartmentID)
  809. }
  810. priorityProxyID, _ := MakeID()
  811. nonPriorityProxyID, _ := MakeID()
  812. ctx, cancel := context.WithDeadline(
  813. context.Background(), time.Now().Add(10*time.Minute))
  814. defer cancel()
  815. numEntries := 10000
  816. for i := 0; i < numEntries; i++ {
  817. // Enqueue every other announcement as a priority
  818. isPriority := i%2 == 0
  819. proxyID := priorityProxyID
  820. if !isPriority {
  821. proxyID = nonPriorityProxyID
  822. }
  823. err := q.enqueue(
  824. &announcementEntry{
  825. ctx: ctx,
  826. announcement: &MatchAnnouncement{
  827. ProxyID: proxyID,
  828. Properties: MatchProperties{
  829. IsPriority: isPriority,
  830. CommonCompartmentIDs: []ID{
  831. commonCompartmentIDs[prng.Intn(numCompartmentIDs)]},
  832. NATType: NATTypeUnknown,
  833. }}})
  834. if err != nil {
  835. return errors.Trace(err)
  836. }
  837. }
  838. iter = q.startMatching(true, commonCompartmentIDs)
  839. for i := 0; i < numEntries; i++ {
  840. match, isPriority := iter.getNext()
  841. if match == nil {
  842. return errors.TraceNew("unexpected missing match")
  843. }
  844. // First half, and only first half, of matches is priority
  845. expectPriority := i < numEntries/2
  846. if isPriority != expectPriority {
  847. return errors.TraceNew("unexpected isPriority")
  848. }
  849. expectedProxyID := priorityProxyID
  850. if !expectPriority {
  851. expectedProxyID = nonPriorityProxyID
  852. }
  853. if match.announcement.ProxyID != expectedProxyID {
  854. return errors.TraceNew("unexpected ProxyID")
  855. }
  856. if !match.queueReference.dequeue() {
  857. return errors.TraceNew("unexpected already dequeued")
  858. }
  859. }
  860. match, _ = iter.getNext()
  861. if match != nil {
  862. return errors.TraceNew("unexpected match")
  863. }
  864. return nil
  865. }
  866. // Benchmark numbers for the previous announcement queue implementation, with
  867. // increasingly slow performance when enqueuing and then finding a new,
  868. // distinct personal compartment ID proxy.
  869. //
  870. // pkg: github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy
  871. // BenchmarkMatcherQueue/insert_100_announcements-24 17528 68304 ns/op
  872. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 521719 2243 ns/op
  873. // BenchmarkMatcherQueue/insert_10000_announcements-24 208 5780227 ns/op
  874. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 6796 177587 ns/op
  875. // BenchmarkMatcherQueue/insert_100000_announcements-24 21 50859464 ns/op
  876. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 538 2249389 ns/op
  877. // BenchmarkMatcherQueue/insert_1000000_announcements-24 3 499685555 ns/op
  878. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 33 34299751 ns/op
  879. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 2606017042 ns/op
  880. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 6 179171125 ns/op
  881. // PASS
  882. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 17.585s
  883. //
  884. // Benchmark numbers for the current implemention, the announcementMultiQueue,
  885. // with constant time performance for the same scenario:
  886. //
  887. // BenchmarkMatcherQueue
  888. // BenchmarkMatcherQueue/insert_100_announcements-24 15422 77187 ns/op
  889. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 965152 1217 ns/op
  890. // BenchmarkMatcherQueue/insert_10000_announcements-24 168 7322661 ns/op
  891. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 906748 1211 ns/op
  892. // BenchmarkMatcherQueue/insert_100000_announcements-24 16 64770370 ns/op
  893. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 972342 1243 ns/op
  894. // BenchmarkMatcherQueue/insert_1000000_announcements-24 2 701046271 ns/op
  895. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 988050 1230 ns/op
  896. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 4523888833 ns/op
  897. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 963894 1186 ns/op
  898. // PASS
  899. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 22.439s
  900. func BenchmarkMatcherQueue(b *testing.B) {
  901. SetAllowCommonASNMatching(true)
  902. defer SetAllowCommonASNMatching(false)
  903. for _, size := range []int{100, 10000, 100000, 1000000, matcherAnnouncementQueueMaxSize - 1} {
  904. debug.FreeOSMemory()
  905. var m *Matcher
  906. commonCompartmentID, _ := MakeID()
  907. b.Run(fmt.Sprintf("insert %d announcements", size), func(b *testing.B) {
  908. for i := 0; i < b.N; i++ {
  909. // Matcher.Start is not called to start the matchWorker;
  910. // instead, matchOffer is invoked directly.
  911. m = NewMatcher(
  912. &MatcherConfig{
  913. Logger: newTestLogger(),
  914. })
  915. for j := 0; j < size; j++ {
  916. var commonCompartmentIDs, personalCompartmentIDs []ID
  917. if prng.FlipCoin() {
  918. personalCompartmentID, _ := MakeID()
  919. personalCompartmentIDs = []ID{personalCompartmentID}
  920. } else {
  921. commonCompartmentIDs = []ID{commonCompartmentID}
  922. }
  923. announcementEntry := &announcementEntry{
  924. ctx: context.Background(),
  925. limitIP: "127.0.0.1",
  926. announcement: &MatchAnnouncement{
  927. Properties: MatchProperties{
  928. CommonCompartmentIDs: commonCompartmentIDs,
  929. PersonalCompartmentIDs: personalCompartmentIDs,
  930. GeoIPData: common.GeoIPData{},
  931. NetworkType: NetworkTypeWiFi,
  932. NATType: NATTypePortRestrictedCone,
  933. PortMappingTypes: []PortMappingType{},
  934. },
  935. ProxyID: ID{},
  936. },
  937. offerChan: make(chan *MatchOffer, 1),
  938. }
  939. err := m.addAnnouncementEntry(announcementEntry)
  940. if err != nil {
  941. b.Fatalf(errors.Trace(err).Error())
  942. }
  943. }
  944. }
  945. })
  946. b.Run(fmt.Sprintf("match last of %d announcements", size), func(b *testing.B) {
  947. queueSize := m.announcementQueue.getLen()
  948. if queueSize != size {
  949. b.Fatalf(errors.Tracef("unexpected queue size: %d", queueSize).Error())
  950. }
  951. for i := 0; i < b.N; i++ {
  952. personalCompartmentID, _ := MakeID()
  953. announcementEntry :=
  954. &announcementEntry{
  955. ctx: context.Background(),
  956. limitIP: "127.0.0.1",
  957. announcement: &MatchAnnouncement{
  958. Properties: MatchProperties{
  959. ProtocolVersion: LatestProtocolVersion,
  960. PersonalCompartmentIDs: []ID{personalCompartmentID},
  961. GeoIPData: common.GeoIPData{},
  962. NetworkType: NetworkTypeWiFi,
  963. NATType: NATTypePortRestrictedCone,
  964. PortMappingTypes: []PortMappingType{},
  965. },
  966. ProxyID: ID{},
  967. },
  968. offerChan: make(chan *MatchOffer, 1),
  969. }
  970. offerEntry := &offerEntry{
  971. ctx: context.Background(),
  972. limitIP: "127.0.0.1",
  973. offer: &MatchOffer{
  974. Properties: MatchProperties{
  975. ProtocolVersion: LatestProtocolVersion,
  976. PersonalCompartmentIDs: []ID{personalCompartmentID},
  977. GeoIPData: common.GeoIPData{},
  978. NetworkType: NetworkTypeWiFi,
  979. NATType: NATTypePortRestrictedCone,
  980. PortMappingTypes: []PortMappingType{},
  981. },
  982. },
  983. answerChan: make(chan *answerInfo, 1),
  984. }
  985. err := m.addAnnouncementEntry(announcementEntry)
  986. if err != nil {
  987. b.Fatalf(errors.Trace(err).Error())
  988. }
  989. match, _ := m.matchOffer(offerEntry)
  990. if match == nil {
  991. b.Fatalf(errors.TraceNew("unexpected no match").Error())
  992. }
  993. m.removeAnnouncementEntry(false, match)
  994. }
  995. })
  996. }
  997. }