matcher_test.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "fmt"
  23. "runtime/debug"
  24. "strings"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  31. )
  32. func TestMatcher(t *testing.T) {
  33. err := runTestMatcher()
  34. if err != nil {
  35. t.Errorf(errors.Trace(err).Error())
  36. }
  37. }
  38. func runTestMatcher() error {
  39. limitEntryCount := 50
  40. rateLimitQuantity := 100
  41. rateLimitInterval := 1000 * time.Millisecond
  42. logger := newTestLogger()
  43. m := NewMatcher(
  44. &MatcherConfig{
  45. Logger: logger,
  46. AnnouncementLimitEntryCount: limitEntryCount,
  47. AnnouncementRateLimitQuantity: rateLimitQuantity,
  48. AnnouncementRateLimitInterval: rateLimitInterval,
  49. OfferLimitEntryCount: limitEntryCount,
  50. OfferRateLimitQuantity: rateLimitQuantity,
  51. OfferRateLimitInterval: rateLimitInterval,
  52. })
  53. err := m.Start()
  54. if err != nil {
  55. return errors.Trace(err)
  56. }
  57. defer m.Stop()
  58. makeID := func() ID {
  59. ID, err := MakeID()
  60. if err != nil {
  61. panic(err)
  62. }
  63. return ID
  64. }
  65. makeAnnouncement := func(properties *MatchProperties) *MatchAnnouncement {
  66. return &MatchAnnouncement{
  67. Properties: *properties,
  68. ProxyID: makeID(),
  69. ConnectionID: makeID(),
  70. }
  71. }
  72. makeOffer := func(properties *MatchProperties) *MatchOffer {
  73. return &MatchOffer{
  74. Properties: *properties,
  75. ClientProxyProtocolVersion: ProxyProtocolVersion1,
  76. }
  77. }
  78. checkMatchMetrics := func(metrics *MatchMetrics) error {
  79. if metrics.OfferQueueSize < 1 || metrics.AnnouncementQueueSize < 1 {
  80. return errors.TraceNew("unexpected match metrics")
  81. }
  82. return nil
  83. }
  84. proxyIP := randomIPAddress()
  85. proxyFunc := func(
  86. resultChan chan error,
  87. proxyIP string,
  88. matchProperties *MatchProperties,
  89. timeout time.Duration,
  90. waitBeforeAnswer chan struct{},
  91. answerSuccess bool) {
  92. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  93. defer cancelFunc()
  94. announcement := makeAnnouncement(matchProperties)
  95. offer, matchMetrics, err := m.Announce(ctx, proxyIP, announcement)
  96. if err != nil {
  97. resultChan <- errors.Trace(err)
  98. return
  99. } else {
  100. err := checkMatchMetrics(matchMetrics)
  101. if err != nil {
  102. resultChan <- errors.Trace(err)
  103. return
  104. }
  105. }
  106. if waitBeforeAnswer != nil {
  107. <-waitBeforeAnswer
  108. }
  109. if answerSuccess {
  110. err = m.Answer(
  111. &MatchAnswer{
  112. ProxyID: announcement.ProxyID,
  113. ConnectionID: announcement.ConnectionID,
  114. SelectedProxyProtocolVersion: offer.ClientProxyProtocolVersion,
  115. })
  116. } else {
  117. m.AnswerError(announcement.ProxyID, announcement.ConnectionID)
  118. }
  119. resultChan <- errors.Trace(err)
  120. }
  121. clientIP := randomIPAddress()
  122. clientFunc := func(
  123. resultChan chan error,
  124. clientIP string,
  125. matchProperties *MatchProperties,
  126. timeout time.Duration) {
  127. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  128. defer cancelFunc()
  129. offer := makeOffer(matchProperties)
  130. answer, _, matchMetrics, err := m.Offer(ctx, clientIP, offer)
  131. if err != nil {
  132. resultChan <- errors.Trace(err)
  133. return
  134. }
  135. if answer.SelectedProxyProtocolVersion != offer.ClientProxyProtocolVersion {
  136. resultChan <- errors.TraceNew("unexpected selected proxy protocol version")
  137. return
  138. } else {
  139. err := checkMatchMetrics(matchMetrics)
  140. if err != nil {
  141. resultChan <- errors.Trace(err)
  142. return
  143. }
  144. }
  145. resultChan <- nil
  146. }
  147. // Test: announce timeout
  148. proxyResultChan := make(chan error)
  149. matchProperties := &MatchProperties{
  150. CommonCompartmentIDs: []ID{makeID()},
  151. }
  152. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  153. err = <-proxyResultChan
  154. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  155. return errors.Tracef("unexpected result: %v", err)
  156. }
  157. if m.announcementQueue.getLen() != 0 {
  158. return errors.TraceNew("unexpected queue size")
  159. }
  160. // Test: limit announce entries by IP
  161. time.Sleep(rateLimitInterval)
  162. maxEntries := limitEntryCount
  163. maxEntriesProxyResultChan := make(chan error, maxEntries)
  164. // fill the queue with max entries for one IP; the first one will timeout sooner
  165. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  166. for i := 0; i < maxEntries-1; i++ {
  167. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 100*time.Millisecond, nil, true)
  168. }
  169. // await goroutines filling queue
  170. for {
  171. time.Sleep(10 * time.Microsecond)
  172. m.announcementQueueMutex.Lock()
  173. queueLen := m.announcementQueue.getLen()
  174. m.announcementQueueMutex.Unlock()
  175. if queueLen == maxEntries {
  176. break
  177. }
  178. }
  179. // the next enqueue should fail with "max entries"
  180. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  181. err = <-proxyResultChan
  182. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  183. return errors.Tracef("unexpected result: %v", err)
  184. }
  185. // wait for first entry to timeout
  186. err = <-maxEntriesProxyResultChan
  187. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  188. return errors.Tracef("unexpected result: %v", err)
  189. }
  190. // now another enqueue succeeds as expected
  191. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  192. err = <-proxyResultChan
  193. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  194. return errors.Tracef("unexpected result: %v", err)
  195. }
  196. // drain remaining entries
  197. for i := 0; i < maxEntries-1; i++ {
  198. err = <-maxEntriesProxyResultChan
  199. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  200. return errors.Tracef("unexpected result: %v", err)
  201. }
  202. }
  203. // Test: offer timeout
  204. clientResultChan := make(chan error)
  205. go clientFunc(clientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  206. err = <-clientResultChan
  207. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  208. return errors.Tracef("unexpected result: %v", err)
  209. }
  210. if m.offerQueue.Len() != 0 {
  211. return errors.TraceNew("unexpected queue size")
  212. }
  213. // Test: limit offer entries by IP
  214. time.Sleep(rateLimitInterval)
  215. maxEntries = limitEntryCount
  216. maxEntriesClientResultChan := make(chan error, maxEntries)
  217. // fill the queue with max entries for one IP; the first one will timeout sooner
  218. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  219. for i := 0; i < maxEntries-1; i++ {
  220. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 100*time.Millisecond)
  221. }
  222. // await goroutines filling queue
  223. for {
  224. time.Sleep(10 * time.Microsecond)
  225. m.offerQueueMutex.Lock()
  226. queueLen := m.offerQueue.Len()
  227. m.offerQueueMutex.Unlock()
  228. if queueLen == maxEntries {
  229. break
  230. }
  231. }
  232. // enqueue should fail with "max entries"
  233. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  234. err = <-clientResultChan
  235. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  236. return errors.Tracef("unexpected result: %v", err)
  237. }
  238. // wait for first entry to timeout
  239. err = <-maxEntriesClientResultChan
  240. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  241. return errors.Tracef("unexpected result: %v", err)
  242. }
  243. // now another enqueue succeeds as expected
  244. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  245. err = <-clientResultChan
  246. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  247. return errors.Tracef("unexpected result: %v", err)
  248. }
  249. // drain remaining entries
  250. for i := 0; i < maxEntries-1; i++ {
  251. err = <-maxEntriesClientResultChan
  252. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  253. return errors.Tracef("unexpected result: %v", err)
  254. }
  255. }
  256. // Test: announcement rate limit
  257. m.SetLimits(
  258. 0, rateLimitQuantity, rateLimitInterval, []ID{},
  259. 0, rateLimitQuantity, rateLimitInterval)
  260. time.Sleep(rateLimitInterval)
  261. maxEntries = rateLimitQuantity
  262. maxEntriesProxyResultChan = make(chan error, maxEntries)
  263. waitGroup := new(sync.WaitGroup)
  264. for i := 0; i < maxEntries; i++ {
  265. waitGroup.Add(1)
  266. go func() {
  267. defer waitGroup.Done()
  268. proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  269. }()
  270. }
  271. // Use a wait group to ensure all maxEntries have hit the rate limiter
  272. // without sleeping before the next attempt, as any sleep can increase
  273. // the rate limiter token count.
  274. waitGroup.Wait()
  275. // the next enqueue should fail with "rate exceeded"
  276. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  277. err = <-proxyResultChan
  278. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  279. return errors.Tracef("unexpected result: %v", err)
  280. }
  281. // Test: offer rate limit
  282. maxEntries = rateLimitQuantity
  283. maxEntriesClientResultChan = make(chan error, maxEntries)
  284. waitGroup = new(sync.WaitGroup)
  285. for i := 0; i < rateLimitQuantity; i++ {
  286. waitGroup.Add(1)
  287. go func() {
  288. defer waitGroup.Done()
  289. clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  290. }()
  291. }
  292. waitGroup.Wait()
  293. // enqueue should fail with "rate exceeded"
  294. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  295. err = <-clientResultChan
  296. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  297. return errors.Tracef("unexpected result: %v", err)
  298. }
  299. time.Sleep(rateLimitInterval)
  300. m.SetLimits(
  301. limitEntryCount, rateLimitQuantity, rateLimitInterval, []ID{},
  302. limitEntryCount, rateLimitQuantity, rateLimitInterval)
  303. // Test: basic match
  304. commonCompartmentIDs := []ID{makeID()}
  305. geoIPData1 := &MatchProperties{
  306. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  307. CommonCompartmentIDs: commonCompartmentIDs,
  308. }
  309. geoIPData2 := &MatchProperties{
  310. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  311. CommonCompartmentIDs: commonCompartmentIDs,
  312. }
  313. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  314. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  315. err = <-proxyResultChan
  316. if err != nil {
  317. return errors.Trace(err)
  318. }
  319. err = <-clientResultChan
  320. if err != nil {
  321. return errors.Trace(err)
  322. }
  323. // Test: answer error
  324. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, false)
  325. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  326. err = <-proxyResultChan
  327. if err != nil {
  328. return errors.Trace(err)
  329. }
  330. err = <-clientResultChan
  331. if err == nil || !strings.HasSuffix(err.Error(), "no answer") {
  332. return errors.Tracef("unexpected result: %v", err)
  333. }
  334. // Test: client is gone
  335. waitBeforeAnswer := make(chan struct{})
  336. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 100*time.Millisecond, waitBeforeAnswer, true)
  337. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  338. err = <-clientResultChan
  339. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  340. return errors.Tracef("unexpected result: %v", err)
  341. }
  342. close(waitBeforeAnswer)
  343. err = <-proxyResultChan
  344. if err == nil || !strings.HasSuffix(err.Error(), "no pending answer") {
  345. return errors.Tracef("unexpected result: %v", err)
  346. }
  347. // Test: no compartment match
  348. compartment1 := &MatchProperties{
  349. GeoIPData: geoIPData1.GeoIPData,
  350. CommonCompartmentIDs: []ID{makeID()},
  351. }
  352. compartment2 := &MatchProperties{
  353. GeoIPData: geoIPData2.GeoIPData,
  354. PersonalCompartmentIDs: []ID{makeID()},
  355. }
  356. compartment3 := &MatchProperties{
  357. GeoIPData: geoIPData2.GeoIPData,
  358. CommonCompartmentIDs: []ID{makeID()},
  359. }
  360. compartment4 := &MatchProperties{
  361. GeoIPData: geoIPData2.GeoIPData,
  362. PersonalCompartmentIDs: []ID{makeID()},
  363. }
  364. proxy1ResultChan := make(chan error)
  365. proxy2ResultChan := make(chan error)
  366. client1ResultChan := make(chan error)
  367. client2ResultChan := make(chan error)
  368. go proxyFunc(proxy1ResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  369. go proxyFunc(proxy2ResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  370. go clientFunc(client1ResultChan, clientIP, compartment3, 10*time.Millisecond)
  371. go clientFunc(client2ResultChan, clientIP, compartment4, 10*time.Millisecond)
  372. err = <-proxy1ResultChan
  373. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  374. return errors.Tracef("unexpected result: %v", err)
  375. }
  376. err = <-proxy2ResultChan
  377. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  378. return errors.Tracef("unexpected result: %v", err)
  379. }
  380. err = <-client1ResultChan
  381. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  382. return errors.Tracef("unexpected result: %v", err)
  383. }
  384. err = <-client2ResultChan
  385. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  386. return errors.Tracef("unexpected result: %v", err)
  387. }
  388. // Test: common compartment match
  389. compartment1And3 := &MatchProperties{
  390. GeoIPData: geoIPData2.GeoIPData,
  391. CommonCompartmentIDs: []ID{
  392. compartment1.CommonCompartmentIDs[0],
  393. compartment3.CommonCompartmentIDs[0]},
  394. }
  395. go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  396. go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
  397. err = <-proxyResultChan
  398. if err != nil {
  399. return errors.Trace(err)
  400. }
  401. err = <-clientResultChan
  402. if err != nil {
  403. return errors.Trace(err)
  404. }
  405. // Test: personal compartment match
  406. compartment2And4 := &MatchProperties{
  407. GeoIPData: geoIPData2.GeoIPData,
  408. PersonalCompartmentIDs: []ID{
  409. compartment2.PersonalCompartmentIDs[0],
  410. compartment4.PersonalCompartmentIDs[0]},
  411. }
  412. go proxyFunc(proxyResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  413. go clientFunc(clientResultChan, clientIP, compartment2And4, 10*time.Millisecond)
  414. err = <-proxyResultChan
  415. if err != nil {
  416. return errors.Trace(err)
  417. }
  418. err = <-clientResultChan
  419. if err != nil {
  420. return errors.Trace(err)
  421. }
  422. // Test: no same-ASN match
  423. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  424. go clientFunc(clientResultChan, clientIP, geoIPData1, 10*time.Millisecond)
  425. err = <-proxyResultChan
  426. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  427. return errors.Tracef("unexpected result: %v", err)
  428. }
  429. err = <-clientResultChan
  430. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  431. return errors.Tracef("unexpected result: %v", err)
  432. }
  433. // Test: proxy preferred NAT match
  434. client1Properties := &MatchProperties{
  435. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  436. NATType: NATTypeFullCone,
  437. CommonCompartmentIDs: commonCompartmentIDs,
  438. }
  439. client2Properties := &MatchProperties{
  440. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  441. NATType: NATTypeSymmetric,
  442. CommonCompartmentIDs: commonCompartmentIDs,
  443. }
  444. proxy1Properties := &MatchProperties{
  445. GeoIPData: common.GeoIPData{Country: "C3", ASN: "A3"},
  446. NATType: NATTypeNone,
  447. CommonCompartmentIDs: commonCompartmentIDs,
  448. }
  449. proxy2Properties := &MatchProperties{
  450. GeoIPData: common.GeoIPData{Country: "C4", ASN: "A4"},
  451. NATType: NATTypeSymmetric,
  452. CommonCompartmentIDs: commonCompartmentIDs,
  453. }
  454. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
  455. go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
  456. time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
  457. go clientFunc(clientResultChan, clientIP, client1Properties, 10*time.Millisecond)
  458. err = <-proxy1ResultChan
  459. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  460. return errors.Tracef("unexpected result: %v", err)
  461. }
  462. // proxy2 should match since it's the preferred NAT match
  463. err = <-proxy2ResultChan
  464. if err != nil {
  465. return errors.Trace(err)
  466. }
  467. err = <-clientResultChan
  468. if err != nil {
  469. return errors.Trace(err)
  470. }
  471. // Test: client preferred NAT match
  472. // Limitation: the current Matcher.matchAllOffers logic matches the first
  473. // enqueued client against the best proxy match, regardless of whether
  474. // there is another client in the queue that's a better match for that
  475. // proxy. As a result, this test only passes when the preferred matching
  476. // client is enqueued first, and the test is currently of limited utility.
  477. go clientFunc(client2ResultChan, clientIP, client2Properties, 20*time.Millisecond)
  478. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  479. go clientFunc(client1ResultChan, clientIP, client1Properties, 20*time.Millisecond)
  480. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  481. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 20*time.Millisecond, nil, true)
  482. err = <-proxy1ResultChan
  483. if err != nil {
  484. return errors.Trace(err)
  485. }
  486. err = <-client1ResultChan
  487. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  488. return errors.Tracef("unexpected result: %v", err)
  489. }
  490. // client2 should match since it's the preferred NAT match
  491. err = <-client2ResultChan
  492. if err != nil {
  493. return errors.Trace(err)
  494. }
  495. // Test: priority supercedes preferred NAT match
  496. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
  497. time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
  498. proxy2Properties.IsPriority = true
  499. go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
  500. time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
  501. go clientFunc(clientResultChan, clientIP, client2Properties, 10*time.Millisecond)
  502. err = <-proxy1ResultChan
  503. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  504. return errors.Tracef("unexpected result: %v", err)
  505. }
  506. // proxy2 should match since it's the priority, but not preferred NAT match
  507. err = <-proxy2ResultChan
  508. if err != nil {
  509. return errors.Trace(err)
  510. }
  511. err = <-clientResultChan
  512. if err != nil {
  513. return errors.Trace(err)
  514. }
  515. // Test: many matches
  516. // Reduce test log noise for this phase of the test
  517. logger.SetLogLevelDebug(false)
  518. matchCount := 10000
  519. proxyCount := matchCount
  520. clientCount := matchCount
  521. // Buffered so no goroutine will block reporting result
  522. proxyResultChan = make(chan error, matchCount)
  523. clientResultChan = make(chan error, matchCount)
  524. for proxyCount > 0 || clientCount > 0 {
  525. // Don't simply alternate enqueuing a proxy and a client
  526. if proxyCount > 0 && (clientCount == 0 || prng.FlipCoin()) {
  527. go proxyFunc(proxyResultChan, randomIPAddress(), geoIPData1, 10*time.Second, nil, true)
  528. proxyCount -= 1
  529. } else if clientCount > 0 {
  530. go clientFunc(clientResultChan, randomIPAddress(), geoIPData2, 10*time.Second)
  531. clientCount -= 1
  532. }
  533. }
  534. for i := 0; i < matchCount; i++ {
  535. err = <-proxyResultChan
  536. if err != nil {
  537. return errors.Trace(err)
  538. }
  539. err = <-clientResultChan
  540. if err != nil {
  541. return errors.Trace(err)
  542. }
  543. }
  544. return nil
  545. }
  546. func randomIPAddress() string {
  547. return fmt.Sprintf("%d.%d.%d.%d",
  548. prng.Range(0, 255),
  549. prng.Range(0, 255),
  550. prng.Range(0, 255),
  551. prng.Range(0, 255))
  552. }
  553. func TestMatcherMultiQueue(t *testing.T) {
  554. err := runTestMatcherMultiQueue()
  555. if err != nil {
  556. t.Errorf(errors.Trace(err).Error())
  557. }
  558. }
  559. func runTestMatcherMultiQueue() error {
  560. // Test: invalid compartment IDs
  561. q := newAnnouncementMultiQueue()
  562. err := q.enqueue(
  563. &announcementEntry{
  564. announcement: &MatchAnnouncement{
  565. Properties: MatchProperties{}}})
  566. if err == nil {
  567. return errors.TraceNew("unexpected success")
  568. }
  569. compartmentID, _ := MakeID()
  570. err = q.enqueue(
  571. &announcementEntry{
  572. announcement: &MatchAnnouncement{
  573. Properties: MatchProperties{
  574. CommonCompartmentIDs: []ID{compartmentID},
  575. PersonalCompartmentIDs: []ID{compartmentID},
  576. }}})
  577. if err == nil {
  578. return errors.TraceNew("unexpected success")
  579. }
  580. // Test: enqueue multiple candidates
  581. var otherCommonCompartmentIDs []ID
  582. var otherPersonalCompartmentIDs []ID
  583. numOtherCompartmentIDs := 10
  584. for i := 0; i < numOtherCompartmentIDs; i++ {
  585. commonCompartmentID, _ := MakeID()
  586. otherCommonCompartmentIDs = append(
  587. otherCommonCompartmentIDs, commonCompartmentID)
  588. personalCompartmentID, _ := MakeID()
  589. otherPersonalCompartmentIDs = append(
  590. otherPersonalCompartmentIDs, personalCompartmentID)
  591. }
  592. numOtherEntries := 10000
  593. for i := 0; i < numOtherEntries; i++ {
  594. ctx, cancel := context.WithDeadline(
  595. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  596. defer cancel()
  597. err := q.enqueue(
  598. &announcementEntry{
  599. ctx: ctx,
  600. announcement: &MatchAnnouncement{
  601. Properties: MatchProperties{
  602. CommonCompartmentIDs: []ID{
  603. otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
  604. NATType: NATTypeSymmetric,
  605. }}})
  606. if err != nil {
  607. return errors.Trace(err)
  608. }
  609. err = q.enqueue(
  610. &announcementEntry{
  611. ctx: ctx,
  612. announcement: &MatchAnnouncement{
  613. Properties: MatchProperties{
  614. PersonalCompartmentIDs: []ID{
  615. otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
  616. NATType: NATTypeSymmetric,
  617. }}})
  618. if err != nil {
  619. return errors.Trace(err)
  620. }
  621. }
  622. var matchingCommonCompartmentIDs []ID
  623. numMatchingCompartmentIDs := 2
  624. numMatchingEntries := 2
  625. var expectedMatches []*announcementEntry
  626. for i := 0; i < numMatchingCompartmentIDs; i++ {
  627. for j := 0; j < numMatchingEntries; j++ {
  628. commonCompartmentID, _ := MakeID()
  629. matchingCommonCompartmentIDs = append(
  630. matchingCommonCompartmentIDs, commonCompartmentID)
  631. ctx, cancel := context.WithDeadline(
  632. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  633. defer cancel()
  634. a := &announcementEntry{
  635. ctx: ctx,
  636. announcement: &MatchAnnouncement{
  637. Properties: MatchProperties{
  638. CommonCompartmentIDs: matchingCommonCompartmentIDs[i : i+1],
  639. NATType: NATTypeNone,
  640. }}}
  641. expectedMatches = append(expectedMatches, a)
  642. err := q.enqueue(a)
  643. if err != nil {
  644. return errors.Trace(err)
  645. }
  646. }
  647. }
  648. // Test: inspect queue state
  649. if q.getLen() != numOtherEntries*2+numMatchingCompartmentIDs*numMatchingEntries {
  650. return errors.TraceNew("unexpected total entries count")
  651. }
  652. if len(q.commonCompartmentQueues) !=
  653. numOtherCompartmentIDs+numMatchingCompartmentIDs {
  654. return errors.TraceNew("unexpected compartment queue count")
  655. }
  656. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  657. return errors.TraceNew("unexpected compartment queue count")
  658. }
  659. // Test: find expected matches
  660. iter := q.startMatching(true, matchingCommonCompartmentIDs)
  661. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  662. return errors.TraceNew("unexpected iterator state")
  663. }
  664. unlimited, partiallyLimited, strictlyLimited := iter.getNATCounts()
  665. if unlimited != numMatchingCompartmentIDs*numMatchingEntries ||
  666. partiallyLimited != 0 ||
  667. strictlyLimited != 0 {
  668. return errors.TraceNew("unexpected NAT counts")
  669. }
  670. match, _ := iter.getNext()
  671. if match == nil {
  672. return errors.TraceNew("unexpected missing match")
  673. }
  674. if match != expectedMatches[0] {
  675. return errors.TraceNew("unexpected match")
  676. }
  677. if !match.queueReference.dequeue() {
  678. return errors.TraceNew("unexpected already dequeued")
  679. }
  680. if match.queueReference.dequeue() {
  681. return errors.TraceNew("unexpected not already dequeued")
  682. }
  683. iter = q.startMatching(true, matchingCommonCompartmentIDs)
  684. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  685. return errors.TraceNew("unexpected iterator state")
  686. }
  687. unlimited, partiallyLimited, strictlyLimited = iter.getNATCounts()
  688. if unlimited != numMatchingEntries*numMatchingCompartmentIDs-1 ||
  689. partiallyLimited != 0 ||
  690. strictlyLimited != 0 {
  691. return errors.TraceNew("unexpected NAT counts")
  692. }
  693. match, _ = iter.getNext()
  694. if match == nil {
  695. return errors.TraceNew("unexpected missing match")
  696. }
  697. if match != expectedMatches[1] {
  698. return errors.TraceNew("unexpected match")
  699. }
  700. if !match.queueReference.dequeue() {
  701. return errors.TraceNew("unexpected already dequeued")
  702. }
  703. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  704. return errors.TraceNew("unexpected iterator state")
  705. }
  706. // Test: getNext after dequeue
  707. match, _ = iter.getNext()
  708. if match == nil {
  709. return errors.TraceNew("unexpected missing match")
  710. }
  711. if match != expectedMatches[2] {
  712. return errors.TraceNew("unexpected match")
  713. }
  714. if !match.queueReference.dequeue() {
  715. return errors.TraceNew("unexpected already dequeued")
  716. }
  717. match, _ = iter.getNext()
  718. if match == nil {
  719. return errors.TraceNew("unexpected missing match")
  720. }
  721. if match != expectedMatches[3] {
  722. return errors.TraceNew("unexpected match")
  723. }
  724. if !match.queueReference.dequeue() {
  725. return errors.TraceNew("unexpected already dequeued")
  726. }
  727. // Test: reinspect queue state after dequeues
  728. if q.getLen() != numOtherEntries*2 {
  729. return errors.TraceNew("unexpected total entries count")
  730. }
  731. if len(q.commonCompartmentQueues) != numOtherCompartmentIDs {
  732. return errors.TraceNew("unexpected compartment queue count")
  733. }
  734. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  735. return errors.TraceNew("unexpected compartment queue count")
  736. }
  737. // Test: priority
  738. q = newAnnouncementMultiQueue()
  739. var commonCompartmentIDs []ID
  740. numCompartmentIDs := 10
  741. for i := 0; i < numCompartmentIDs; i++ {
  742. commonCompartmentID, _ := MakeID()
  743. commonCompartmentIDs = append(
  744. commonCompartmentIDs, commonCompartmentID)
  745. }
  746. priorityProxyID, _ := MakeID()
  747. nonPriorityProxyID, _ := MakeID()
  748. ctx, cancel := context.WithDeadline(
  749. context.Background(), time.Now().Add(10*time.Minute))
  750. defer cancel()
  751. numEntries := 10000
  752. for i := 0; i < numEntries; i++ {
  753. // Enqueue every other announcement as a priority
  754. isPriority := i%2 == 0
  755. proxyID := priorityProxyID
  756. if !isPriority {
  757. proxyID = nonPriorityProxyID
  758. }
  759. err := q.enqueue(
  760. &announcementEntry{
  761. ctx: ctx,
  762. announcement: &MatchAnnouncement{
  763. ProxyID: proxyID,
  764. Properties: MatchProperties{
  765. IsPriority: isPriority,
  766. CommonCompartmentIDs: []ID{
  767. commonCompartmentIDs[prng.Intn(numCompartmentIDs)]},
  768. NATType: NATTypeUnknown,
  769. }}})
  770. if err != nil {
  771. return errors.Trace(err)
  772. }
  773. }
  774. iter = q.startMatching(true, commonCompartmentIDs)
  775. for i := 0; i < numEntries; i++ {
  776. match, isPriority := iter.getNext()
  777. if match == nil {
  778. return errors.TraceNew("unexpected missing match")
  779. }
  780. // First half, and only first half, of matches is priority
  781. expectPriority := i < numEntries/2
  782. if isPriority != expectPriority {
  783. return errors.TraceNew("unexpected isPriority")
  784. }
  785. expectedProxyID := priorityProxyID
  786. if !expectPriority {
  787. expectedProxyID = nonPriorityProxyID
  788. }
  789. if match.announcement.ProxyID != expectedProxyID {
  790. return errors.TraceNew("unexpected ProxyID")
  791. }
  792. if !match.queueReference.dequeue() {
  793. return errors.TraceNew("unexpected already dequeued")
  794. }
  795. }
  796. match, _ = iter.getNext()
  797. if match != nil {
  798. return errors.TraceNew("unexpected match")
  799. }
  800. return nil
  801. }
  802. // Benchmark numbers for the previous announcement queue implementation, with
  803. // increasingly slow performance when enqueuing and then finding a new,
  804. // distinct personal compartment ID proxy.
  805. //
  806. // pkg: github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy
  807. // BenchmarkMatcherQueue/insert_100_announcements-24 17528 68304 ns/op
  808. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 521719 2243 ns/op
  809. // BenchmarkMatcherQueue/insert_10000_announcements-24 208 5780227 ns/op
  810. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 6796 177587 ns/op
  811. // BenchmarkMatcherQueue/insert_100000_announcements-24 21 50859464 ns/op
  812. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 538 2249389 ns/op
  813. // BenchmarkMatcherQueue/insert_1000000_announcements-24 3 499685555 ns/op
  814. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 33 34299751 ns/op
  815. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 2606017042 ns/op
  816. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 6 179171125 ns/op
  817. // PASS
  818. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 17.585s
  819. //
  820. // Benchmark numbers for the current implemention, the announcementMultiQueue,
  821. // with constant time performance for the same scenario:
  822. //
  823. // BenchmarkMatcherQueue
  824. // BenchmarkMatcherQueue/insert_100_announcements-24 15422 77187 ns/op
  825. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 965152 1217 ns/op
  826. // BenchmarkMatcherQueue/insert_10000_announcements-24 168 7322661 ns/op
  827. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 906748 1211 ns/op
  828. // BenchmarkMatcherQueue/insert_100000_announcements-24 16 64770370 ns/op
  829. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 972342 1243 ns/op
  830. // BenchmarkMatcherQueue/insert_1000000_announcements-24 2 701046271 ns/op
  831. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 988050 1230 ns/op
  832. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 4523888833 ns/op
  833. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 963894 1186 ns/op
  834. // PASS
  835. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 22.439s
  836. func BenchmarkMatcherQueue(b *testing.B) {
  837. SetAllowCommonASNMatching(true)
  838. defer SetAllowCommonASNMatching(false)
  839. for _, size := range []int{100, 10000, 100000, 1000000, matcherAnnouncementQueueMaxSize - 1} {
  840. debug.FreeOSMemory()
  841. var m *Matcher
  842. commonCompartmentID, _ := MakeID()
  843. b.Run(fmt.Sprintf("insert %d announcements", size), func(b *testing.B) {
  844. for i := 0; i < b.N; i++ {
  845. // Matcher.Start is not called to start the matchWorker;
  846. // instead, matchOffer is invoked directly.
  847. m = NewMatcher(
  848. &MatcherConfig{
  849. Logger: newTestLogger(),
  850. })
  851. for j := 0; j < size; j++ {
  852. var commonCompartmentIDs, personalCompartmentIDs []ID
  853. if prng.FlipCoin() {
  854. personalCompartmentID, _ := MakeID()
  855. personalCompartmentIDs = []ID{personalCompartmentID}
  856. } else {
  857. commonCompartmentIDs = []ID{commonCompartmentID}
  858. }
  859. announcementEntry := &announcementEntry{
  860. ctx: context.Background(),
  861. limitIP: "127.0.0.1",
  862. announcement: &MatchAnnouncement{
  863. Properties: MatchProperties{
  864. CommonCompartmentIDs: commonCompartmentIDs,
  865. PersonalCompartmentIDs: personalCompartmentIDs,
  866. GeoIPData: common.GeoIPData{},
  867. NetworkType: NetworkTypeWiFi,
  868. NATType: NATTypePortRestrictedCone,
  869. PortMappingTypes: []PortMappingType{},
  870. },
  871. ProxyID: ID{},
  872. },
  873. offerChan: make(chan *MatchOffer, 1),
  874. }
  875. err := m.addAnnouncementEntry(announcementEntry)
  876. if err != nil {
  877. b.Fatalf(errors.Trace(err).Error())
  878. }
  879. }
  880. }
  881. })
  882. b.Run(fmt.Sprintf("match last of %d announcements", size), func(b *testing.B) {
  883. queueSize := m.announcementQueue.getLen()
  884. if queueSize != size {
  885. b.Fatalf(errors.Tracef("unexpected queue size: %d", queueSize).Error())
  886. }
  887. for i := 0; i < b.N; i++ {
  888. personalCompartmentID, _ := MakeID()
  889. announcementEntry :=
  890. &announcementEntry{
  891. ctx: context.Background(),
  892. limitIP: "127.0.0.1",
  893. announcement: &MatchAnnouncement{
  894. Properties: MatchProperties{
  895. PersonalCompartmentIDs: []ID{personalCompartmentID},
  896. GeoIPData: common.GeoIPData{},
  897. NetworkType: NetworkTypeWiFi,
  898. NATType: NATTypePortRestrictedCone,
  899. PortMappingTypes: []PortMappingType{},
  900. },
  901. ProxyID: ID{},
  902. },
  903. offerChan: make(chan *MatchOffer, 1),
  904. }
  905. offerEntry := &offerEntry{
  906. ctx: context.Background(),
  907. limitIP: "127.0.0.1",
  908. offer: &MatchOffer{
  909. Properties: MatchProperties{
  910. PersonalCompartmentIDs: []ID{personalCompartmentID},
  911. GeoIPData: common.GeoIPData{},
  912. NetworkType: NetworkTypeWiFi,
  913. NATType: NATTypePortRestrictedCone,
  914. PortMappingTypes: []PortMappingType{},
  915. },
  916. ClientProxyProtocolVersion: ProxyProtocolVersion1,
  917. },
  918. answerChan: make(chan *answerInfo, 1),
  919. }
  920. err := m.addAnnouncementEntry(announcementEntry)
  921. if err != nil {
  922. b.Fatalf(errors.Trace(err).Error())
  923. }
  924. match, _ := m.matchOffer(offerEntry)
  925. if match == nil {
  926. b.Fatalf(errors.TraceNew("unexpected no match").Error())
  927. }
  928. m.removeAnnouncementEntry(false, match)
  929. }
  930. })
  931. }
  932. }