matcher_test.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. "fmt"
  23. "runtime/debug"
  24. "strings"
  25. "sync"
  26. "testing"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  31. )
  32. func TestMatcher(t *testing.T) {
  33. err := runTestMatcher()
  34. if err != nil {
  35. t.Error(errors.Trace(err).Error())
  36. }
  37. }
  38. func runTestMatcher() error {
  39. limitEntryCount := 50
  40. rateLimitQuantity := 100
  41. rateLimitInterval := 1000 * time.Millisecond
  42. logger := newTestLogger()
  43. m := NewMatcher(
  44. &MatcherConfig{
  45. Logger: logger,
  46. AnnouncementLimitEntryCount: limitEntryCount,
  47. AnnouncementRateLimitQuantity: rateLimitQuantity,
  48. AnnouncementRateLimitInterval: rateLimitInterval,
  49. OfferLimitEntryCount: limitEntryCount,
  50. OfferRateLimitQuantity: rateLimitQuantity,
  51. OfferRateLimitInterval: rateLimitInterval,
  52. ProxyQualityState: NewProxyQuality(),
  53. })
  54. err := m.Start()
  55. if err != nil {
  56. return errors.Trace(err)
  57. }
  58. defer m.Stop()
  59. makeID := func() ID {
  60. ID, err := MakeID()
  61. if err != nil {
  62. panic(err)
  63. }
  64. return ID
  65. }
  66. makeAnnouncement := func(properties *MatchProperties) *MatchAnnouncement {
  67. return &MatchAnnouncement{
  68. Properties: *properties,
  69. ProxyID: makeID(),
  70. ConnectionID: makeID(),
  71. }
  72. }
  73. makeOffer := func(properties *MatchProperties, useMediaStreams bool) *MatchOffer {
  74. return &MatchOffer{
  75. Properties: *properties,
  76. UseMediaStreams: useMediaStreams,
  77. }
  78. }
  79. checkMatchMetrics := func(metrics *MatchMetrics) error {
  80. if metrics.OfferQueueSize < 1 || metrics.AnnouncementQueueSize < 1 {
  81. return errors.TraceNew("unexpected match metrics")
  82. }
  83. return nil
  84. }
  85. proxyIP := randomIPAddress()
  86. proxyFunc := func(
  87. resultChan chan error,
  88. proxyIP string,
  89. matchProperties *MatchProperties,
  90. timeout time.Duration,
  91. waitBeforeAnswer chan struct{},
  92. answerSuccess bool) {
  93. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  94. defer cancelFunc()
  95. announcement := makeAnnouncement(matchProperties)
  96. offer, matchMetrics, err := m.Announce(ctx, proxyIP, announcement)
  97. if err != nil {
  98. resultChan <- errors.Trace(err)
  99. return
  100. }
  101. err = checkMatchMetrics(matchMetrics)
  102. if err != nil {
  103. resultChan <- errors.Trace(err)
  104. return
  105. }
  106. _, ok := negotiateProtocolVersion(
  107. matchProperties.ProtocolVersion,
  108. offer.Properties.ProtocolVersion,
  109. offer.UseMediaStreams)
  110. if !ok {
  111. resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
  112. return
  113. }
  114. if waitBeforeAnswer != nil {
  115. <-waitBeforeAnswer
  116. }
  117. if answerSuccess {
  118. err = m.Answer(
  119. &MatchAnswer{
  120. ProxyID: announcement.ProxyID,
  121. ConnectionID: announcement.ConnectionID,
  122. })
  123. } else {
  124. m.AnswerError(announcement.ProxyID, announcement.ConnectionID)
  125. }
  126. resultChan <- errors.Trace(err)
  127. }
  128. clientIP := randomIPAddress()
  129. baseClientFunc := func(
  130. resultChan chan error,
  131. clientIP string,
  132. matchProperties *MatchProperties,
  133. useMediaStreams bool,
  134. timeout time.Duration) {
  135. ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
  136. defer cancelFunc()
  137. offer := makeOffer(matchProperties, useMediaStreams)
  138. _, matchAnnouncement, matchMetrics, err := m.Offer(ctx, clientIP, offer)
  139. if err != nil {
  140. resultChan <- errors.Trace(err)
  141. return
  142. }
  143. err = checkMatchMetrics(matchMetrics)
  144. if err != nil {
  145. resultChan <- errors.Trace(err)
  146. return
  147. }
  148. _, ok := negotiateProtocolVersion(
  149. matchAnnouncement.Properties.ProtocolVersion,
  150. offer.Properties.ProtocolVersion,
  151. offer.UseMediaStreams)
  152. if !ok {
  153. resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
  154. return
  155. }
  156. resultChan <- nil
  157. }
  158. clientFunc := func(resultChan chan error, clientIP string,
  159. matchProperties *MatchProperties, timeout time.Duration) {
  160. baseClientFunc(resultChan, clientIP, matchProperties, false, timeout)
  161. }
  162. clientUsingMediaStreamsFunc := func(resultChan chan error, clientIP string,
  163. matchProperties *MatchProperties, timeout time.Duration) {
  164. baseClientFunc(resultChan, clientIP, matchProperties, true, timeout)
  165. }
  166. // Test: announce timeout
  167. proxyResultChan := make(chan error)
  168. matchProperties := &MatchProperties{
  169. ProtocolVersion: LatestProtocolVersion,
  170. CommonCompartmentIDs: []ID{makeID()},
  171. }
  172. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  173. err = <-proxyResultChan
  174. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  175. return errors.Tracef("unexpected result: %v", err)
  176. }
  177. if m.announcementQueue.getLen() != 0 {
  178. return errors.TraceNew("unexpected queue size")
  179. }
  180. // Test: limit announce entries by IP
  181. time.Sleep(rateLimitInterval)
  182. maxEntries := limitEntryCount
  183. maxEntriesProxyResultChan := make(chan error, maxEntries)
  184. // fill the queue with max entries for one IP; the first one will timeout sooner
  185. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  186. for i := 0; i < maxEntries-1; i++ {
  187. go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 100*time.Millisecond, nil, true)
  188. }
  189. // await goroutines filling queue
  190. for {
  191. time.Sleep(10 * time.Microsecond)
  192. m.announcementQueueMutex.Lock()
  193. queueLen := m.announcementQueue.getLen()
  194. m.announcementQueueMutex.Unlock()
  195. if queueLen == maxEntries {
  196. break
  197. }
  198. }
  199. // the next enqueue should fail with "max entries"
  200. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  201. err = <-proxyResultChan
  202. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  203. return errors.Tracef("unexpected result: %v", err)
  204. }
  205. // wait for first entry to timeout
  206. err = <-maxEntriesProxyResultChan
  207. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  208. return errors.Tracef("unexpected result: %v", err)
  209. }
  210. // now another enqueue succeeds as expected
  211. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  212. err = <-proxyResultChan
  213. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  214. return errors.Tracef("unexpected result: %v", err)
  215. }
  216. // drain remaining entries
  217. for i := 0; i < maxEntries-1; i++ {
  218. err = <-maxEntriesProxyResultChan
  219. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  220. return errors.Tracef("unexpected result: %v", err)
  221. }
  222. }
  223. // Test: offer timeout
  224. clientResultChan := make(chan error)
  225. go clientFunc(clientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  226. err = <-clientResultChan
  227. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  228. return errors.Tracef("unexpected result: %v", err)
  229. }
  230. if m.offerQueue.Len() != 0 {
  231. return errors.TraceNew("unexpected queue size")
  232. }
  233. // Test: limit offer entries by IP
  234. time.Sleep(rateLimitInterval)
  235. maxEntries = limitEntryCount
  236. maxEntriesClientResultChan := make(chan error, maxEntries)
  237. // fill the queue with max entries for one IP; the first one will timeout sooner
  238. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  239. for i := 0; i < maxEntries-1; i++ {
  240. go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 100*time.Millisecond)
  241. }
  242. // await goroutines filling queue
  243. for {
  244. time.Sleep(10 * time.Microsecond)
  245. m.offerQueueMutex.Lock()
  246. queueLen := m.offerQueue.Len()
  247. m.offerQueueMutex.Unlock()
  248. if queueLen == maxEntries {
  249. break
  250. }
  251. }
  252. // enqueue should fail with "max entries"
  253. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  254. err = <-clientResultChan
  255. if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
  256. return errors.Tracef("unexpected result: %v", err)
  257. }
  258. // wait for first entry to timeout
  259. err = <-maxEntriesClientResultChan
  260. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  261. return errors.Tracef("unexpected result: %v", err)
  262. }
  263. // now another enqueue succeeds as expected
  264. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  265. err = <-clientResultChan
  266. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  267. return errors.Tracef("unexpected result: %v", err)
  268. }
  269. // drain remaining entries
  270. for i := 0; i < maxEntries-1; i++ {
  271. err = <-maxEntriesClientResultChan
  272. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  273. return errors.Tracef("unexpected result: %v", err)
  274. }
  275. }
  276. // Test: announcement rate limit
  277. m.SetLimits(
  278. 0, rateLimitQuantity, rateLimitInterval, []ID{},
  279. 0, rateLimitQuantity, rateLimitInterval)
  280. time.Sleep(rateLimitInterval)
  281. maxEntries = rateLimitQuantity
  282. maxEntriesProxyResultChan = make(chan error, maxEntries)
  283. waitGroup := new(sync.WaitGroup)
  284. for i := 0; i < maxEntries; i++ {
  285. waitGroup.Add(1)
  286. go func() {
  287. defer waitGroup.Done()
  288. proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
  289. }()
  290. }
  291. // Use a wait group to ensure all maxEntries have hit the rate limiter
  292. // without sleeping before the next attempt, as any sleep can increase
  293. // the rate limiter token count.
  294. waitGroup.Wait()
  295. // the next enqueue should fail with "rate exceeded"
  296. go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
  297. err = <-proxyResultChan
  298. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  299. return errors.Tracef("unexpected result: %v", err)
  300. }
  301. // Test: offer rate limit
  302. maxEntries = rateLimitQuantity
  303. maxEntriesClientResultChan = make(chan error, maxEntries)
  304. waitGroup = new(sync.WaitGroup)
  305. for i := 0; i < rateLimitQuantity; i++ {
  306. waitGroup.Add(1)
  307. go func() {
  308. defer waitGroup.Done()
  309. clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 1*time.Microsecond)
  310. }()
  311. }
  312. waitGroup.Wait()
  313. // enqueue should fail with "rate exceeded"
  314. go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
  315. err = <-clientResultChan
  316. if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
  317. return errors.Tracef("unexpected result: %v", err)
  318. }
  319. time.Sleep(rateLimitInterval)
  320. m.SetLimits(
  321. limitEntryCount, rateLimitQuantity, rateLimitInterval, []ID{},
  322. limitEntryCount, rateLimitQuantity, rateLimitInterval)
  323. // Test: basic match
  324. commonCompartmentIDs := []ID{makeID()}
  325. geoIPData1 := &MatchProperties{
  326. ProtocolVersion: LatestProtocolVersion,
  327. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  328. CommonCompartmentIDs: commonCompartmentIDs,
  329. }
  330. geoIPData2 := &MatchProperties{
  331. ProtocolVersion: LatestProtocolVersion,
  332. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  333. CommonCompartmentIDs: commonCompartmentIDs,
  334. }
  335. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  336. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  337. err = <-proxyResultChan
  338. if err != nil {
  339. return errors.Trace(err)
  340. }
  341. err = <-clientResultChan
  342. if err != nil {
  343. return errors.Trace(err)
  344. }
  345. // Test: answer error
  346. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, false)
  347. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  348. err = <-proxyResultChan
  349. if err != nil {
  350. return errors.Trace(err)
  351. }
  352. err = <-clientResultChan
  353. if err == nil || !strings.HasSuffix(err.Error(), "no answer") {
  354. return errors.Tracef("unexpected result: %v", err)
  355. }
  356. // Test: client is gone
  357. waitBeforeAnswer := make(chan struct{})
  358. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 100*time.Millisecond, waitBeforeAnswer, true)
  359. go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
  360. err = <-clientResultChan
  361. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  362. return errors.Tracef("unexpected result: %v", err)
  363. }
  364. close(waitBeforeAnswer)
  365. err = <-proxyResultChan
  366. if err == nil || !strings.HasSuffix(err.Error(), "no pending answer") {
  367. return errors.Tracef("unexpected result: %v", err)
  368. }
  369. // Test: no compartment match
  370. compartment1 := &MatchProperties{
  371. ProtocolVersion: LatestProtocolVersion,
  372. GeoIPData: geoIPData1.GeoIPData,
  373. CommonCompartmentIDs: []ID{makeID()},
  374. }
  375. compartment2 := &MatchProperties{
  376. ProtocolVersion: LatestProtocolVersion,
  377. GeoIPData: geoIPData2.GeoIPData,
  378. PersonalCompartmentIDs: []ID{makeID()},
  379. }
  380. compartment3 := &MatchProperties{
  381. ProtocolVersion: LatestProtocolVersion,
  382. GeoIPData: geoIPData2.GeoIPData,
  383. CommonCompartmentIDs: []ID{makeID()},
  384. }
  385. compartment4 := &MatchProperties{
  386. ProtocolVersion: LatestProtocolVersion,
  387. GeoIPData: geoIPData2.GeoIPData,
  388. PersonalCompartmentIDs: []ID{makeID()},
  389. }
  390. proxy1ResultChan := make(chan error)
  391. proxy2ResultChan := make(chan error)
  392. client1ResultChan := make(chan error)
  393. client2ResultChan := make(chan error)
  394. go proxyFunc(proxy1ResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  395. go proxyFunc(proxy2ResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  396. go clientFunc(client1ResultChan, clientIP, compartment3, 10*time.Millisecond)
  397. go clientFunc(client2ResultChan, clientIP, compartment4, 10*time.Millisecond)
  398. err = <-proxy1ResultChan
  399. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  400. return errors.Tracef("unexpected result: %v", err)
  401. }
  402. err = <-proxy2ResultChan
  403. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  404. return errors.Tracef("unexpected result: %v", err)
  405. }
  406. err = <-client1ResultChan
  407. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  408. return errors.Tracef("unexpected result: %v", err)
  409. }
  410. err = <-client2ResultChan
  411. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  412. return errors.Tracef("unexpected result: %v", err)
  413. }
  414. // Test: common compartment match
  415. compartment1And3 := &MatchProperties{
  416. ProtocolVersion: LatestProtocolVersion,
  417. GeoIPData: geoIPData2.GeoIPData,
  418. CommonCompartmentIDs: []ID{
  419. compartment1.CommonCompartmentIDs[0],
  420. compartment3.CommonCompartmentIDs[0]},
  421. }
  422. go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
  423. go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
  424. err = <-proxyResultChan
  425. if err != nil {
  426. return errors.Trace(err)
  427. }
  428. err = <-clientResultChan
  429. if err != nil {
  430. return errors.Trace(err)
  431. }
  432. // Test: personal compartment match
  433. compartment2And4 := &MatchProperties{
  434. ProtocolVersion: LatestProtocolVersion,
  435. GeoIPData: geoIPData2.GeoIPData,
  436. PersonalCompartmentIDs: []ID{
  437. compartment2.PersonalCompartmentIDs[0],
  438. compartment4.PersonalCompartmentIDs[0]},
  439. }
  440. go proxyFunc(proxyResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
  441. go clientFunc(clientResultChan, clientIP, compartment2And4, 10*time.Millisecond)
  442. err = <-proxyResultChan
  443. if err != nil {
  444. return errors.Trace(err)
  445. }
  446. err = <-clientResultChan
  447. if err != nil {
  448. return errors.Trace(err)
  449. }
  450. // Test: no same-ASN match
  451. go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
  452. go clientFunc(clientResultChan, clientIP, geoIPData1, 10*time.Millisecond)
  453. err = <-proxyResultChan
  454. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  455. return errors.Tracef("unexpected result: %v", err)
  456. }
  457. err = <-clientResultChan
  458. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  459. return errors.Tracef("unexpected result: %v", err)
  460. }
  461. // Test: downgrade-compatible protocol version match
  462. protocolVersion1 := &MatchProperties{
  463. ProtocolVersion: ProtocolVersion1,
  464. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  465. CommonCompartmentIDs: commonCompartmentIDs,
  466. }
  467. protocolVersion2 := &MatchProperties{
  468. ProtocolVersion: ProtocolVersion2,
  469. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  470. CommonCompartmentIDs: commonCompartmentIDs,
  471. }
  472. go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
  473. go clientFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
  474. err = <-proxyResultChan
  475. if err != nil {
  476. return errors.Trace(err)
  477. }
  478. err = <-clientResultChan
  479. if err != nil {
  480. return errors.Trace(err)
  481. }
  482. // Test: no incompatible protocol version match
  483. go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
  484. go clientUsingMediaStreamsFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
  485. err = <-proxyResultChan
  486. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  487. return errors.Tracef("unexpected result: %v", err)
  488. }
  489. err = <-clientResultChan
  490. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  491. return errors.Tracef("unexpected result: %v", err)
  492. }
  493. // Test: downgrade-compatible protocol version match
  494. // Test: proxy preferred NAT match
  495. client1Properties := &MatchProperties{
  496. ProtocolVersion: LatestProtocolVersion,
  497. GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
  498. NATType: NATTypeFullCone,
  499. CommonCompartmentIDs: commonCompartmentIDs,
  500. }
  501. client2Properties := &MatchProperties{
  502. ProtocolVersion: LatestProtocolVersion,
  503. GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
  504. NATType: NATTypeSymmetric,
  505. CommonCompartmentIDs: commonCompartmentIDs,
  506. }
  507. proxy1Properties := &MatchProperties{
  508. ProtocolVersion: LatestProtocolVersion,
  509. GeoIPData: common.GeoIPData{Country: "C3", ASN: "A3"},
  510. NATType: NATTypeNone,
  511. CommonCompartmentIDs: commonCompartmentIDs,
  512. }
  513. proxy2Properties := &MatchProperties{
  514. ProtocolVersion: LatestProtocolVersion,
  515. GeoIPData: common.GeoIPData{Country: "C4", ASN: "A4"},
  516. NATType: NATTypeSymmetric,
  517. CommonCompartmentIDs: commonCompartmentIDs,
  518. }
  519. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
  520. go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
  521. time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
  522. go clientFunc(clientResultChan, clientIP, client1Properties, 10*time.Millisecond)
  523. err = <-proxy1ResultChan
  524. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  525. return errors.Tracef("unexpected result: %v", err)
  526. }
  527. // proxy2 should match since it's the preferred NAT match
  528. err = <-proxy2ResultChan
  529. if err != nil {
  530. return errors.Trace(err)
  531. }
  532. err = <-clientResultChan
  533. if err != nil {
  534. return errors.Trace(err)
  535. }
  536. // Test: client preferred NAT match
  537. // Limitation: the current Matcher.matchAllOffers logic matches the first
  538. // enqueued client against the best proxy match, regardless of whether
  539. // there is another client in the queue that's a better match for that
  540. // proxy. As a result, this test only passes when the preferred matching
  541. // client is enqueued first, and the test is currently of limited utility.
  542. go clientFunc(client2ResultChan, clientIP, client2Properties, 20*time.Millisecond)
  543. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  544. go clientFunc(client1ResultChan, clientIP, client1Properties, 20*time.Millisecond)
  545. time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
  546. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 20*time.Millisecond, nil, true)
  547. err = <-proxy1ResultChan
  548. if err != nil {
  549. return errors.Trace(err)
  550. }
  551. err = <-client1ResultChan
  552. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  553. return errors.Tracef("unexpected result: %v", err)
  554. }
  555. // client2 should match since it's the preferred NAT match
  556. err = <-client2ResultChan
  557. if err != nil {
  558. return errors.Trace(err)
  559. }
  560. // Test: priority supercedes preferred NAT match
  561. go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
  562. time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
  563. proxy2Properties.IsPriority = true
  564. go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
  565. time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
  566. go clientFunc(clientResultChan, clientIP, client2Properties, 10*time.Millisecond)
  567. err = <-proxy1ResultChan
  568. if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
  569. return errors.Tracef("unexpected result: %v", err)
  570. }
  571. // proxy2 should match since it's the priority, but not preferred NAT match
  572. err = <-proxy2ResultChan
  573. if err != nil {
  574. return errors.Trace(err)
  575. }
  576. err = <-clientResultChan
  577. if err != nil {
  578. return errors.Trace(err)
  579. }
  580. // Test: many matches
  581. // Reduce test log noise for this phase of the test
  582. logger.SetLogLevelDebug(false)
  583. matchCount := 10000
  584. proxyCount := matchCount
  585. clientCount := matchCount
  586. // Buffered so no goroutine will block reporting result
  587. proxyResultChan = make(chan error, matchCount)
  588. clientResultChan = make(chan error, matchCount)
  589. for proxyCount > 0 || clientCount > 0 {
  590. // Don't simply alternate enqueuing a proxy and a client
  591. if proxyCount > 0 && (clientCount == 0 || prng.FlipCoin()) {
  592. go proxyFunc(proxyResultChan, randomIPAddress(), geoIPData1, 10*time.Second, nil, true)
  593. proxyCount -= 1
  594. } else if clientCount > 0 {
  595. go clientFunc(clientResultChan, randomIPAddress(), geoIPData2, 10*time.Second)
  596. clientCount -= 1
  597. }
  598. }
  599. for i := 0; i < matchCount; i++ {
  600. err = <-proxyResultChan
  601. if err != nil {
  602. return errors.Trace(err)
  603. }
  604. err = <-clientResultChan
  605. if err != nil {
  606. return errors.Trace(err)
  607. }
  608. }
  609. return nil
  610. }
  611. func randomIPAddress() string {
  612. return fmt.Sprintf("%d.%d.%d.%d",
  613. prng.Range(0, 255),
  614. prng.Range(0, 255),
  615. prng.Range(0, 255),
  616. prng.Range(0, 255))
  617. }
  618. func TestMatcherMultiQueue(t *testing.T) {
  619. err := runTestMatcherMultiQueue()
  620. if err != nil {
  621. t.Error(errors.Trace(err).Error())
  622. }
  623. }
  624. func runTestMatcherMultiQueue() error {
  625. // Test: invalid compartment IDs
  626. q := newAnnouncementMultiQueue()
  627. err := q.enqueue(
  628. &announcementEntry{
  629. announcement: &MatchAnnouncement{
  630. Properties: MatchProperties{}}})
  631. if err == nil {
  632. return errors.TraceNew("unexpected success")
  633. }
  634. compartmentID, _ := MakeID()
  635. err = q.enqueue(
  636. &announcementEntry{
  637. announcement: &MatchAnnouncement{
  638. Properties: MatchProperties{
  639. CommonCompartmentIDs: []ID{compartmentID},
  640. PersonalCompartmentIDs: []ID{compartmentID},
  641. }}})
  642. if err == nil {
  643. return errors.TraceNew("unexpected success")
  644. }
  645. // Test: enqueue multiple candidates
  646. var otherCommonCompartmentIDs []ID
  647. var otherPersonalCompartmentIDs []ID
  648. numOtherCompartmentIDs := 10
  649. for i := 0; i < numOtherCompartmentIDs; i++ {
  650. commonCompartmentID, _ := MakeID()
  651. otherCommonCompartmentIDs = append(
  652. otherCommonCompartmentIDs, commonCompartmentID)
  653. personalCompartmentID, _ := MakeID()
  654. otherPersonalCompartmentIDs = append(
  655. otherPersonalCompartmentIDs, personalCompartmentID)
  656. }
  657. numOtherEntries := 10000
  658. for i := 0; i < numOtherEntries; i++ {
  659. ctx, cancel := context.WithDeadline(
  660. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  661. defer cancel()
  662. err := q.enqueue(
  663. &announcementEntry{
  664. ctx: ctx,
  665. announcement: &MatchAnnouncement{
  666. Properties: MatchProperties{
  667. CommonCompartmentIDs: []ID{
  668. otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
  669. NATType: NATTypeSymmetric,
  670. }}})
  671. if err != nil {
  672. return errors.Trace(err)
  673. }
  674. err = q.enqueue(
  675. &announcementEntry{
  676. ctx: ctx,
  677. announcement: &MatchAnnouncement{
  678. Properties: MatchProperties{
  679. PersonalCompartmentIDs: []ID{
  680. otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
  681. NATType: NATTypeSymmetric,
  682. }}})
  683. if err != nil {
  684. return errors.Trace(err)
  685. }
  686. }
  687. var matchingCommonCompartmentIDs []ID
  688. numMatchingCompartmentIDs := 2
  689. numMatchingEntries := 2
  690. var expectedMatches []*announcementEntry
  691. for i := 0; i < numMatchingCompartmentIDs; i++ {
  692. for j := 0; j < numMatchingEntries; j++ {
  693. commonCompartmentID, _ := MakeID()
  694. matchingCommonCompartmentIDs = append(
  695. matchingCommonCompartmentIDs, commonCompartmentID)
  696. ctx, cancel := context.WithDeadline(
  697. context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
  698. defer cancel()
  699. a := &announcementEntry{
  700. ctx: ctx,
  701. announcement: &MatchAnnouncement{
  702. Properties: MatchProperties{
  703. CommonCompartmentIDs: matchingCommonCompartmentIDs[i : i+1],
  704. NATType: NATTypeNone,
  705. }}}
  706. expectedMatches = append(expectedMatches, a)
  707. err := q.enqueue(a)
  708. if err != nil {
  709. return errors.Trace(err)
  710. }
  711. }
  712. }
  713. // Test: inspect queue state
  714. if q.getLen() != numOtherEntries*2+numMatchingCompartmentIDs*numMatchingEntries {
  715. return errors.TraceNew("unexpected total entries count")
  716. }
  717. if len(q.commonCompartmentQueues) !=
  718. numOtherCompartmentIDs+numMatchingCompartmentIDs {
  719. return errors.TraceNew("unexpected compartment queue count")
  720. }
  721. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  722. return errors.TraceNew("unexpected compartment queue count")
  723. }
  724. // Test: find expected matches
  725. iter := q.startMatching(true, matchingCommonCompartmentIDs)
  726. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  727. return errors.TraceNew("unexpected iterator state")
  728. }
  729. unlimited, partiallyLimited, strictlyLimited := iter.getNATCounts()
  730. if unlimited != numMatchingCompartmentIDs*numMatchingEntries ||
  731. partiallyLimited != 0 ||
  732. strictlyLimited != 0 {
  733. return errors.TraceNew("unexpected NAT counts")
  734. }
  735. match, _ := iter.getNext()
  736. if match == nil {
  737. return errors.TraceNew("unexpected missing match")
  738. }
  739. if match != expectedMatches[0] {
  740. return errors.TraceNew("unexpected match")
  741. }
  742. if !match.queueReference.dequeue() {
  743. return errors.TraceNew("unexpected already dequeued")
  744. }
  745. if match.queueReference.dequeue() {
  746. return errors.TraceNew("unexpected not already dequeued")
  747. }
  748. iter = q.startMatching(true, matchingCommonCompartmentIDs)
  749. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  750. return errors.TraceNew("unexpected iterator state")
  751. }
  752. unlimited, partiallyLimited, strictlyLimited = iter.getNATCounts()
  753. if unlimited != numMatchingEntries*numMatchingCompartmentIDs-1 ||
  754. partiallyLimited != 0 ||
  755. strictlyLimited != 0 {
  756. return errors.TraceNew("unexpected NAT counts")
  757. }
  758. match, _ = iter.getNext()
  759. if match == nil {
  760. return errors.TraceNew("unexpected missing match")
  761. }
  762. if match != expectedMatches[1] {
  763. return errors.TraceNew("unexpected match")
  764. }
  765. if !match.queueReference.dequeue() {
  766. return errors.TraceNew("unexpected already dequeued")
  767. }
  768. if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
  769. return errors.TraceNew("unexpected iterator state")
  770. }
  771. // Test: getNext after dequeue
  772. match, _ = iter.getNext()
  773. if match == nil {
  774. return errors.TraceNew("unexpected missing match")
  775. }
  776. if match != expectedMatches[2] {
  777. return errors.TraceNew("unexpected match")
  778. }
  779. if !match.queueReference.dequeue() {
  780. return errors.TraceNew("unexpected already dequeued")
  781. }
  782. match, _ = iter.getNext()
  783. if match == nil {
  784. return errors.TraceNew("unexpected missing match")
  785. }
  786. if match != expectedMatches[3] {
  787. return errors.TraceNew("unexpected match")
  788. }
  789. if !match.queueReference.dequeue() {
  790. return errors.TraceNew("unexpected already dequeued")
  791. }
  792. // Test: reinspect queue state after dequeues
  793. if q.getLen() != numOtherEntries*2 {
  794. return errors.TraceNew("unexpected total entries count")
  795. }
  796. if len(q.commonCompartmentQueues) != numOtherCompartmentIDs {
  797. return errors.TraceNew("unexpected compartment queue count")
  798. }
  799. if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
  800. return errors.TraceNew("unexpected compartment queue count")
  801. }
  802. // Test: priority
  803. q = newAnnouncementMultiQueue()
  804. var commonCompartmentIDs []ID
  805. numCompartmentIDs := 10
  806. for i := 0; i < numCompartmentIDs; i++ {
  807. commonCompartmentID, _ := MakeID()
  808. commonCompartmentIDs = append(
  809. commonCompartmentIDs, commonCompartmentID)
  810. }
  811. priorityProxyID, _ := MakeID()
  812. nonPriorityProxyID, _ := MakeID()
  813. ctx, cancel := context.WithDeadline(
  814. context.Background(), time.Now().Add(10*time.Minute))
  815. defer cancel()
  816. numEntries := 10000
  817. for i := 0; i < numEntries; i++ {
  818. // Enqueue every other announcement as a priority
  819. isPriority := i%2 == 0
  820. proxyID := priorityProxyID
  821. if !isPriority {
  822. proxyID = nonPriorityProxyID
  823. }
  824. err := q.enqueue(
  825. &announcementEntry{
  826. ctx: ctx,
  827. announcement: &MatchAnnouncement{
  828. ProxyID: proxyID,
  829. Properties: MatchProperties{
  830. IsPriority: isPriority,
  831. CommonCompartmentIDs: []ID{
  832. commonCompartmentIDs[prng.Intn(numCompartmentIDs)]},
  833. NATType: NATTypeUnknown,
  834. }}})
  835. if err != nil {
  836. return errors.Trace(err)
  837. }
  838. }
  839. iter = q.startMatching(true, commonCompartmentIDs)
  840. for i := 0; i < numEntries; i++ {
  841. match, isPriority := iter.getNext()
  842. if match == nil {
  843. return errors.TraceNew("unexpected missing match")
  844. }
  845. // First half, and only first half, of matches is priority
  846. expectPriority := i < numEntries/2
  847. if isPriority != expectPriority {
  848. return errors.TraceNew("unexpected isPriority")
  849. }
  850. expectedProxyID := priorityProxyID
  851. if !expectPriority {
  852. expectedProxyID = nonPriorityProxyID
  853. }
  854. if match.announcement.ProxyID != expectedProxyID {
  855. return errors.TraceNew("unexpected ProxyID")
  856. }
  857. if !match.queueReference.dequeue() {
  858. return errors.TraceNew("unexpected already dequeued")
  859. }
  860. }
  861. match, _ = iter.getNext()
  862. if match != nil {
  863. return errors.TraceNew("unexpected match")
  864. }
  865. return nil
  866. }
  867. // Benchmark numbers for the previous announcement queue implementation, with
  868. // increasingly slow performance when enqueuing and then finding a new,
  869. // distinct personal compartment ID proxy.
  870. //
  871. // pkg: github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy
  872. // BenchmarkMatcherQueue/insert_100_announcements-24 17528 68304 ns/op
  873. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 521719 2243 ns/op
  874. // BenchmarkMatcherQueue/insert_10000_announcements-24 208 5780227 ns/op
  875. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 6796 177587 ns/op
  876. // BenchmarkMatcherQueue/insert_100000_announcements-24 21 50859464 ns/op
  877. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 538 2249389 ns/op
  878. // BenchmarkMatcherQueue/insert_1000000_announcements-24 3 499685555 ns/op
  879. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 33 34299751 ns/op
  880. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 2606017042 ns/op
  881. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 6 179171125 ns/op
  882. // PASS
  883. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 17.585s
  884. //
  885. // Benchmark numbers for the current implemention, the announcementMultiQueue,
  886. // with constant time performance for the same scenario:
  887. //
  888. // BenchmarkMatcherQueue
  889. // BenchmarkMatcherQueue/insert_100_announcements-24 15422 77187 ns/op
  890. // BenchmarkMatcherQueue/match_last_of_100_announcements-24 965152 1217 ns/op
  891. // BenchmarkMatcherQueue/insert_10000_announcements-24 168 7322661 ns/op
  892. // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 906748 1211 ns/op
  893. // BenchmarkMatcherQueue/insert_100000_announcements-24 16 64770370 ns/op
  894. // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 972342 1243 ns/op
  895. // BenchmarkMatcherQueue/insert_1000000_announcements-24 2 701046271 ns/op
  896. // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 988050 1230 ns/op
  897. // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 4523888833 ns/op
  898. // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 963894 1186 ns/op
  899. // PASS
  900. // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 22.439s
  901. func BenchmarkMatcherQueue(b *testing.B) {
  902. SetAllowCommonASNMatching(true)
  903. defer SetAllowCommonASNMatching(false)
  904. for _, size := range []int{100, 10000, 100000, 1000000, matcherAnnouncementQueueMaxSize - 1} {
  905. debug.FreeOSMemory()
  906. var m *Matcher
  907. commonCompartmentID, _ := MakeID()
  908. b.Run(fmt.Sprintf("insert %d announcements", size), func(b *testing.B) {
  909. for i := 0; i < b.N; i++ {
  910. // Matcher.Start is not called to start the matchWorker;
  911. // instead, matchOffer is invoked directly.
  912. m = NewMatcher(
  913. &MatcherConfig{
  914. Logger: newTestLogger(),
  915. })
  916. for j := 0; j < size; j++ {
  917. var commonCompartmentIDs, personalCompartmentIDs []ID
  918. if prng.FlipCoin() {
  919. personalCompartmentID, _ := MakeID()
  920. personalCompartmentIDs = []ID{personalCompartmentID}
  921. } else {
  922. commonCompartmentIDs = []ID{commonCompartmentID}
  923. }
  924. announcementEntry := &announcementEntry{
  925. ctx: context.Background(),
  926. limitIP: "127.0.0.1",
  927. announcement: &MatchAnnouncement{
  928. Properties: MatchProperties{
  929. CommonCompartmentIDs: commonCompartmentIDs,
  930. PersonalCompartmentIDs: personalCompartmentIDs,
  931. GeoIPData: common.GeoIPData{},
  932. NetworkType: NetworkTypeWiFi,
  933. NATType: NATTypePortRestrictedCone,
  934. PortMappingTypes: []PortMappingType{},
  935. },
  936. ProxyID: ID{},
  937. },
  938. offerChan: make(chan *MatchOffer, 1),
  939. }
  940. err := m.addAnnouncementEntry(announcementEntry)
  941. if err != nil {
  942. b.Fatal(errors.Trace(err).Error())
  943. }
  944. }
  945. }
  946. })
  947. b.Run(fmt.Sprintf("match last of %d announcements", size), func(b *testing.B) {
  948. queueSize := m.announcementQueue.getLen()
  949. if queueSize != size {
  950. b.Fatal(errors.Tracef("unexpected queue size: %d", queueSize).Error())
  951. }
  952. for i := 0; i < b.N; i++ {
  953. personalCompartmentID, _ := MakeID()
  954. announcementEntry :=
  955. &announcementEntry{
  956. ctx: context.Background(),
  957. limitIP: "127.0.0.1",
  958. announcement: &MatchAnnouncement{
  959. Properties: MatchProperties{
  960. ProtocolVersion: LatestProtocolVersion,
  961. PersonalCompartmentIDs: []ID{personalCompartmentID},
  962. GeoIPData: common.GeoIPData{},
  963. NetworkType: NetworkTypeWiFi,
  964. NATType: NATTypePortRestrictedCone,
  965. PortMappingTypes: []PortMappingType{},
  966. },
  967. ProxyID: ID{},
  968. },
  969. offerChan: make(chan *MatchOffer, 1),
  970. }
  971. offerEntry := &offerEntry{
  972. ctx: context.Background(),
  973. limitIP: "127.0.0.1",
  974. offer: &MatchOffer{
  975. Properties: MatchProperties{
  976. ProtocolVersion: LatestProtocolVersion,
  977. PersonalCompartmentIDs: []ID{personalCompartmentID},
  978. GeoIPData: common.GeoIPData{},
  979. NetworkType: NetworkTypeWiFi,
  980. NATType: NATTypePortRestrictedCone,
  981. PortMappingTypes: []PortMappingType{},
  982. },
  983. },
  984. answerChan: make(chan *answerInfo, 1),
  985. }
  986. err := m.addAnnouncementEntry(announcementEntry)
  987. if err != nil {
  988. b.Fatal(errors.Trace(err).Error())
  989. }
  990. match, _ := m.matchOffer(offerEntry)
  991. if match == nil {
  992. b.Fatal(errors.TraceNew("unexpected no match").Error())
  993. }
  994. m.removeAnnouncementEntry(false, match)
  995. }
  996. })
  997. }
  998. }