| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266 |
- /*
- * Copyright (c) 2023, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package inproxy
- import (
- "context"
- "fmt"
- "runtime/debug"
- "strings"
- "sync"
- "testing"
- "time"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/internal/testutils"
- )
- func TestMatcher(t *testing.T) {
- err := runTestMatcher()
- if err != nil {
- t.Error(errors.Trace(err).Error())
- }
- }
- func runTestMatcher() error {
- limitEntryCount := 50
- rateLimitQuantity := 100
- rateLimitInterval := 1000 * time.Millisecond
- logger := testutils.NewTestLogger()
- m := NewMatcher(
- &MatcherConfig{
- Logger: logger,
- AnnouncementLimitEntryCount: limitEntryCount,
- AnnouncementRateLimitQuantity: rateLimitQuantity,
- AnnouncementRateLimitInterval: rateLimitInterval,
- OfferLimitEntryCount: limitEntryCount,
- OfferRateLimitQuantity: rateLimitQuantity,
- OfferRateLimitInterval: rateLimitInterval,
- ProxyQualityState: NewProxyQuality(),
- AllowMatch: func(common.GeoIPData, common.GeoIPData) bool { return true },
- })
- err := m.Start()
- if err != nil {
- return errors.Trace(err)
- }
- defer m.Stop()
- makeID := func() ID {
- ID, err := MakeID()
- if err != nil {
- panic(err)
- }
- return ID
- }
- makeAnnouncement := func(properties *MatchProperties) *MatchAnnouncement {
- return &MatchAnnouncement{
- Properties: *properties,
- ProxyID: makeID(),
- ConnectionID: makeID(),
- }
- }
- makeOffer := func(properties *MatchProperties, useMediaStreams bool) *MatchOffer {
- return &MatchOffer{
- Properties: *properties,
- UseMediaStreams: useMediaStreams,
- }
- }
- checkMatchMetrics := func(metrics *MatchMetrics) error {
- if metrics.OfferQueueSize < 1 || metrics.AnnouncementQueueSize < 1 {
- return errors.TraceNew("unexpected match metrics")
- }
- return nil
- }
- proxyIP := randomIPAddress()
- proxyFunc := func(
- resultChan chan error,
- proxyIP string,
- matchProperties *MatchProperties,
- timeout time.Duration,
- waitBeforeAnswer chan struct{},
- answerSuccess bool) {
- ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
- defer cancelFunc()
- announcement := makeAnnouncement(matchProperties)
- offer, matchMetrics, err := m.Announce(ctx, proxyIP, announcement)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- err = checkMatchMetrics(matchMetrics)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- _, ok := negotiateProtocolVersion(
- matchProperties.ProtocolVersion,
- offer.Properties.ProtocolVersion,
- offer.UseMediaStreams)
- if !ok {
- resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
- return
- }
- if waitBeforeAnswer != nil {
- <-waitBeforeAnswer
- }
- if answerSuccess {
- err = m.Answer(
- &MatchAnswer{
- ProxyID: announcement.ProxyID,
- ConnectionID: announcement.ConnectionID,
- })
- } else {
- m.AnswerError(announcement.ProxyID, announcement.ConnectionID)
- }
- resultChan <- errors.Trace(err)
- }
- clientIP := randomIPAddress()
- baseClientFunc := func(
- resultChan chan error,
- clientIP string,
- matchProperties *MatchProperties,
- useMediaStreams bool,
- timeout time.Duration) {
- ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
- defer cancelFunc()
- offer := makeOffer(matchProperties, useMediaStreams)
- _, matchAnnouncement, matchMetrics, err := m.Offer(ctx, clientIP, offer)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- err = checkMatchMetrics(matchMetrics)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- _, ok := negotiateProtocolVersion(
- matchAnnouncement.Properties.ProtocolVersion,
- offer.Properties.ProtocolVersion,
- offer.UseMediaStreams)
- if !ok {
- resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
- return
- }
- resultChan <- nil
- }
- clientFunc := func(resultChan chan error, clientIP string,
- matchProperties *MatchProperties, timeout time.Duration) {
- baseClientFunc(resultChan, clientIP, matchProperties, false, timeout)
- }
- clientUsingMediaStreamsFunc := func(resultChan chan error, clientIP string,
- matchProperties *MatchProperties, timeout time.Duration) {
- baseClientFunc(resultChan, clientIP, matchProperties, true, timeout)
- }
- // Test: announce timeout
- proxyResultChan := make(chan error)
- matchProperties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- CommonCompartmentIDs: []ID{makeID()},
- }
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- if m.announcementQueue.getLen() != 0 {
- return errors.TraceNew("unexpected queue size")
- }
- // Test: limit announce entries by IP
- time.Sleep(rateLimitInterval)
- maxEntries := limitEntryCount
- maxEntriesProxyResultChan := make(chan error, maxEntries)
- // fill the queue with max entries for one IP; the first one will timeout sooner
- go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- for i := 0; i < maxEntries-1; i++ {
- go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 100*time.Millisecond, nil, true)
- }
- // await goroutines filling queue
- for {
- time.Sleep(10 * time.Microsecond)
- m.announcementQueueMutex.Lock()
- queueLen := m.announcementQueue.getLen()
- m.announcementQueueMutex.Unlock()
- if queueLen == maxEntries {
- break
- }
- }
- // the next enqueue should fail with "max entries"
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // wait for first entry to timeout
- err = <-maxEntriesProxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // now another enqueue succeeds as expected
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // drain remaining entries
- for i := 0; i < maxEntries-1; i++ {
- err = <-maxEntriesProxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- }
- // Test: offer timeout
- clientResultChan := make(chan error)
- go clientFunc(clientResultChan, clientIP, matchProperties, 1*time.Microsecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- if m.offerQueue.Len() != 0 {
- return errors.TraceNew("unexpected queue size")
- }
- // Test: limit offer entries by IP
- time.Sleep(rateLimitInterval)
- maxEntries = limitEntryCount
- maxEntriesClientResultChan := make(chan error, maxEntries)
- // fill the queue with max entries for one IP; the first one will timeout sooner
- go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- for i := 0; i < maxEntries-1; i++ {
- go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 100*time.Millisecond)
- }
- // await goroutines filling queue
- for {
- time.Sleep(10 * time.Microsecond)
- m.offerQueueMutex.Lock()
- queueLen := m.offerQueue.Len()
- m.offerQueueMutex.Unlock()
- if queueLen == maxEntries {
- break
- }
- }
- // enqueue should fail with "max entries"
- go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // wait for first entry to timeout
- err = <-maxEntriesClientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // now another enqueue succeeds as expected
- go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // drain remaining entries
- for i := 0; i < maxEntries-1; i++ {
- err = <-maxEntriesClientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- }
- // Test: announcement rate limit
- m.SetLimits(
- 0, rateLimitQuantity, rateLimitInterval, []ID{},
- 0, rateLimitQuantity, rateLimitInterval)
- time.Sleep(rateLimitInterval)
- maxEntries = rateLimitQuantity
- maxEntriesProxyResultChan = make(chan error, maxEntries)
- waitGroup := new(sync.WaitGroup)
- for i := 0; i < maxEntries; i++ {
- waitGroup.Add(1)
- go func() {
- defer waitGroup.Done()
- proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
- }()
- }
- // Use a wait group to ensure all maxEntries have hit the rate limiter
- // without sleeping before the next attempt, as any sleep can increase
- // the rate limiter token count.
- waitGroup.Wait()
- // the next enqueue should fail with "rate exceeded"
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: offer rate limit
- maxEntries = rateLimitQuantity
- maxEntriesClientResultChan = make(chan error, maxEntries)
- waitGroup = new(sync.WaitGroup)
- for i := 0; i < rateLimitQuantity; i++ {
- waitGroup.Add(1)
- go func() {
- defer waitGroup.Done()
- clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 1*time.Microsecond)
- }()
- }
- waitGroup.Wait()
- // enqueue should fail with "rate exceeded"
- go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- time.Sleep(rateLimitInterval)
- m.SetLimits(
- limitEntryCount, rateLimitQuantity, rateLimitInterval, []ID{},
- limitEntryCount, rateLimitQuantity, rateLimitInterval)
- // Test: basic match
- commonCompartmentIDs := []ID{makeID()}
- geoIPData1 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- geoIPData2 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: answer error
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, false)
- go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "no answer") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: client is gone
- waitBeforeAnswer := make(chan struct{})
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 100*time.Millisecond, waitBeforeAnswer, true)
- go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- close(waitBeforeAnswer)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "no pending answer") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: no compartment match
- compartment1 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData1.GeoIPData,
- CommonCompartmentIDs: []ID{makeID()},
- }
- compartment2 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- PersonalCompartmentIDs: []ID{makeID()},
- }
- compartment3 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- CommonCompartmentIDs: []ID{makeID()},
- }
- compartment4 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- PersonalCompartmentIDs: []ID{makeID()},
- }
- proxy1ResultChan := make(chan error)
- proxy2ResultChan := make(chan error)
- client1ResultChan := make(chan error)
- client2ResultChan := make(chan error)
- go proxyFunc(proxy1ResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
- go proxyFunc(proxy2ResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
- go clientFunc(client1ResultChan, clientIP, compartment3, 10*time.Millisecond)
- go clientFunc(client2ResultChan, clientIP, compartment4, 10*time.Millisecond)
- err = <-proxy1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-proxy2ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-client1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-client2ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: common compartment match
- compartment1And3 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- CommonCompartmentIDs: []ID{
- compartment1.CommonCompartmentIDs[0],
- compartment3.CommonCompartmentIDs[0]},
- }
- go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: personal compartment match
- compartment2And4 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- PersonalCompartmentIDs: []ID{
- compartment2.PersonalCompartmentIDs[0],
- compartment4.PersonalCompartmentIDs[0]},
- }
- go proxyFunc(proxyResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, compartment2And4, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: no same-ASN match
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, geoIPData1, 10*time.Millisecond)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: AllowMatch disallow
- m.config.AllowMatch = func(proxy common.GeoIPData, client common.GeoIPData) bool {
- return proxy != geoIPData1.GeoIPData && client != geoIPData2.GeoIPData
- }
- go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: AllowMatch allow
- m.config.AllowMatch = func(proxy common.GeoIPData, client common.GeoIPData) bool {
- return proxy == geoIPData1.GeoIPData && client == geoIPData2.GeoIPData
- }
- go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- m.config.AllowMatch = func(proxy common.GeoIPData, client common.GeoIPData) bool {
- return true
- }
- // Test: downgrade-compatible protocol version match
- protocolVersion1 := &MatchProperties{
- ProtocolVersion: ProtocolVersion1,
- GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- protocolVersion2 := &MatchProperties{
- ProtocolVersion: ProtocolVersion2,
- GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: no incompatible protocol version match
- go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
- go clientUsingMediaStreamsFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: downgrade-compatible protocol version match
- // Test: proxy preferred NAT match
- client1Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
- NATType: NATTypeFullCone,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- client2Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
- NATType: NATTypeSymmetric,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- proxy1Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C3", ASN: "A3"},
- NATType: NATTypeNone,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- proxy2Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C4", ASN: "A4"},
- NATType: NATTypeSymmetric,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
- go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
- time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
- go clientFunc(clientResultChan, clientIP, client1Properties, 10*time.Millisecond)
- err = <-proxy1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // proxy2 should match since it's the preferred NAT match
- err = <-proxy2ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: client preferred NAT match
- // Limitation: the current Matcher.matchAllOffers logic matches the first
- // enqueued client against the best proxy match, regardless of whether
- // there is another client in the queue that's a better match for that
- // proxy. As a result, this test only passes when the preferred matching
- // client is enqueued first, and the test is currently of limited utility.
- go clientFunc(client2ResultChan, clientIP, client2Properties, 20*time.Millisecond)
- time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
- go clientFunc(client1ResultChan, clientIP, client1Properties, 20*time.Millisecond)
- time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
- go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 20*time.Millisecond, nil, true)
- err = <-proxy1ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-client1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // client2 should match since it's the preferred NAT match
- err = <-client2ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: priority supercedes preferred NAT match
- go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
- time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
- proxy2Properties.IsPriority = true
- go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
- time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
- go clientFunc(clientResultChan, clientIP, client2Properties, 10*time.Millisecond)
- err = <-proxy1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // proxy2 should match since it's the priority, but not preferred NAT match
- err = <-proxy2ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: many matches
- // Reduce test log noise for this phase of the test
- logger.SetLogLevelDebug(false)
- matchCount := 10000
- proxyCount := matchCount
- clientCount := matchCount
- // Buffered so no goroutine will block reporting result
- proxyResultChan = make(chan error, matchCount)
- clientResultChan = make(chan error, matchCount)
- for proxyCount > 0 || clientCount > 0 {
- // Don't simply alternate enqueuing a proxy and a client
- if proxyCount > 0 && (clientCount == 0 || prng.FlipCoin()) {
- go proxyFunc(proxyResultChan, randomIPAddress(), geoIPData1, 10*time.Second, nil, true)
- proxyCount -= 1
- } else if clientCount > 0 {
- go clientFunc(clientResultChan, randomIPAddress(), geoIPData2, 10*time.Second)
- clientCount -= 1
- }
- }
- for i := 0; i < matchCount; i++ {
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- }
- return nil
- }
- func randomIPAddress() string {
- return fmt.Sprintf("%d.%d.%d.%d",
- prng.Range(0, 255),
- prng.Range(0, 255),
- prng.Range(0, 255),
- prng.Range(0, 255))
- }
- func TestMatcherMultiQueue(t *testing.T) {
- err := runTestMatcherMultiQueue()
- if err != nil {
- t.Error(errors.Trace(err).Error())
- }
- }
- func runTestMatcherMultiQueue() error {
- // Test: invalid compartment IDs
- q := newAnnouncementMultiQueue()
- err := q.enqueue(
- &announcementEntry{
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{}}})
- if err == nil {
- return errors.TraceNew("unexpected success")
- }
- compartmentID, _ := MakeID()
- err = q.enqueue(
- &announcementEntry{
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: []ID{compartmentID},
- PersonalCompartmentIDs: []ID{compartmentID},
- }}})
- if err == nil {
- return errors.TraceNew("unexpected success")
- }
- // Test: enqueue multiple candidates
- var otherCommonCompartmentIDs []ID
- var otherPersonalCompartmentIDs []ID
- numOtherCompartmentIDs := 10
- for i := 0; i < numOtherCompartmentIDs; i++ {
- commonCompartmentID, _ := MakeID()
- otherCommonCompartmentIDs = append(
- otherCommonCompartmentIDs, commonCompartmentID)
- personalCompartmentID, _ := MakeID()
- otherPersonalCompartmentIDs = append(
- otherPersonalCompartmentIDs, personalCompartmentID)
- }
- numOtherEntries := 10000
- for i := 0; i < numOtherEntries; i++ {
- ctx, cancel := context.WithDeadline(
- context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
- defer cancel()
- err := q.enqueue(
- &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: []ID{
- otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
- NATType: NATTypeSymmetric,
- }}})
- if err != nil {
- return errors.Trace(err)
- }
- err = q.enqueue(
- &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- PersonalCompartmentIDs: []ID{
- otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
- NATType: NATTypeSymmetric,
- }}})
- if err != nil {
- return errors.Trace(err)
- }
- }
- var matchingCommonCompartmentIDs []ID
- numMatchingCompartmentIDs := 2
- numMatchingEntries := 2
- var expectedMatches []*announcementEntry
- for i := 0; i < numMatchingCompartmentIDs; i++ {
- for j := 0; j < numMatchingEntries; j++ {
- commonCompartmentID, _ := MakeID()
- matchingCommonCompartmentIDs = append(
- matchingCommonCompartmentIDs, commonCompartmentID)
- ctx, cancel := context.WithDeadline(
- context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
- defer cancel()
- a := &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: matchingCommonCompartmentIDs[i : i+1],
- NATType: NATTypeNone,
- }}}
- expectedMatches = append(expectedMatches, a)
- err := q.enqueue(a)
- if err != nil {
- return errors.Trace(err)
- }
- }
- }
- // Test: inspect queue state
- if q.getLen() != numOtherEntries*2+numMatchingCompartmentIDs*numMatchingEntries {
- return errors.TraceNew("unexpected total entries count")
- }
- if len(q.commonCompartmentQueues) !=
- numOtherCompartmentIDs+numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- // Test: find expected matches
- iter := q.startMatching(true, matchingCommonCompartmentIDs)
- if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected iterator state")
- }
- unlimited, partiallyLimited, strictlyLimited := iter.getNATCounts()
- if unlimited != numMatchingCompartmentIDs*numMatchingEntries ||
- partiallyLimited != 0 ||
- strictlyLimited != 0 {
- return errors.TraceNew("unexpected NAT counts")
- }
- match, _ := iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[0] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- if match.queueReference.dequeue() {
- return errors.TraceNew("unexpected not already dequeued")
- }
- iter = q.startMatching(true, matchingCommonCompartmentIDs)
- if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected iterator state")
- }
- unlimited, partiallyLimited, strictlyLimited = iter.getNATCounts()
- if unlimited != numMatchingEntries*numMatchingCompartmentIDs-1 ||
- partiallyLimited != 0 ||
- strictlyLimited != 0 {
- return errors.TraceNew("unexpected NAT counts")
- }
- match, _ = iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[1] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected iterator state")
- }
- // Test: getNext after dequeue
- match, _ = iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[2] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- match, _ = iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[3] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- // Test: reinspect queue state after dequeues
- if q.getLen() != numOtherEntries*2 {
- return errors.TraceNew("unexpected total entries count")
- }
- if len(q.commonCompartmentQueues) != numOtherCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- // Test: priority
- q = newAnnouncementMultiQueue()
- var commonCompartmentIDs []ID
- numCompartmentIDs := 10
- for i := 0; i < numCompartmentIDs; i++ {
- commonCompartmentID, _ := MakeID()
- commonCompartmentIDs = append(
- commonCompartmentIDs, commonCompartmentID)
- }
- priorityProxyID, _ := MakeID()
- nonPriorityProxyID, _ := MakeID()
- ctx, cancel := context.WithDeadline(
- context.Background(), time.Now().Add(10*time.Minute))
- defer cancel()
- numEntries := 10000
- for i := 0; i < numEntries; i++ {
- // Enqueue every other announcement as a priority
- isPriority := i%2 == 0
- proxyID := priorityProxyID
- if !isPriority {
- proxyID = nonPriorityProxyID
- }
- err := q.enqueue(
- &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- ProxyID: proxyID,
- Properties: MatchProperties{
- IsPriority: isPriority,
- CommonCompartmentIDs: []ID{
- commonCompartmentIDs[prng.Intn(numCompartmentIDs)]},
- NATType: NATTypeUnknown,
- }}})
- if err != nil {
- return errors.Trace(err)
- }
- }
- iter = q.startMatching(true, commonCompartmentIDs)
- for i := 0; i < numEntries; i++ {
- match, isPriority := iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- // First half, and only first half, of matches is priority
- expectPriority := i < numEntries/2
- if isPriority != expectPriority {
- return errors.TraceNew("unexpected isPriority")
- }
- expectedProxyID := priorityProxyID
- if !expectPriority {
- expectedProxyID = nonPriorityProxyID
- }
- if match.announcement.ProxyID != expectedProxyID {
- return errors.TraceNew("unexpected ProxyID")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- }
- match, _ = iter.getNext()
- if match != nil {
- return errors.TraceNew("unexpected match")
- }
- return nil
- }
- // Benchmark numbers for the previous announcement queue implementation, with
- // increasingly slow performance when enqueuing and then finding a new,
- // distinct personal compartment ID proxy.
- //
- // pkg: github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy
- // BenchmarkMatcherQueue/insert_100_announcements-24 17528 68304 ns/op
- // BenchmarkMatcherQueue/match_last_of_100_announcements-24 521719 2243 ns/op
- // BenchmarkMatcherQueue/insert_10000_announcements-24 208 5780227 ns/op
- // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 6796 177587 ns/op
- // BenchmarkMatcherQueue/insert_100000_announcements-24 21 50859464 ns/op
- // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 538 2249389 ns/op
- // BenchmarkMatcherQueue/insert_1000000_announcements-24 3 499685555 ns/op
- // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 33 34299751 ns/op
- // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 2606017042 ns/op
- // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 6 179171125 ns/op
- // PASS
- // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 17.585s
- //
- // Benchmark numbers for the current implemention, the announcementMultiQueue,
- // with constant time performance for the same scenario:
- //
- // BenchmarkMatcherQueue
- // BenchmarkMatcherQueue/insert_100_announcements-24 15422 77187 ns/op
- // BenchmarkMatcherQueue/match_last_of_100_announcements-24 965152 1217 ns/op
- // BenchmarkMatcherQueue/insert_10000_announcements-24 168 7322661 ns/op
- // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 906748 1211 ns/op
- // BenchmarkMatcherQueue/insert_100000_announcements-24 16 64770370 ns/op
- // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 972342 1243 ns/op
- // BenchmarkMatcherQueue/insert_1000000_announcements-24 2 701046271 ns/op
- // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 988050 1230 ns/op
- // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 4523888833 ns/op
- // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 963894 1186 ns/op
- // PASS
- // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 22.439s
- func BenchmarkMatcherQueue(b *testing.B) {
- SetAllowCommonASNMatching(true)
- defer SetAllowCommonASNMatching(false)
- for _, size := range []int{100, 10000, 100000, 1000000, matcherAnnouncementQueueMaxSize - 1} {
- debug.FreeOSMemory()
- var m *Matcher
- commonCompartmentID, _ := MakeID()
- b.Run(fmt.Sprintf("insert %d announcements", size), func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- // Matcher.Start is not called to start the matchWorker;
- // instead, matchOffer is invoked directly.
- m = NewMatcher(
- &MatcherConfig{
- Logger: testutils.NewTestLogger(),
- AllowMatch: func(common.GeoIPData, common.GeoIPData) bool { return true },
- })
- for j := 0; j < size; j++ {
- var commonCompartmentIDs, personalCompartmentIDs []ID
- if prng.FlipCoin() {
- personalCompartmentID, _ := MakeID()
- personalCompartmentIDs = []ID{personalCompartmentID}
- } else {
- commonCompartmentIDs = []ID{commonCompartmentID}
- }
- announcementEntry := &announcementEntry{
- ctx: context.Background(),
- limitIP: "127.0.0.1",
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: commonCompartmentIDs,
- PersonalCompartmentIDs: personalCompartmentIDs,
- GeoIPData: common.GeoIPData{},
- NetworkType: NetworkTypeWiFi,
- NATType: NATTypePortRestrictedCone,
- PortMappingTypes: []PortMappingType{},
- },
- ProxyID: ID{},
- },
- offerChan: make(chan *MatchOffer, 1),
- }
- err := m.addAnnouncementEntry(announcementEntry)
- if err != nil {
- b.Fatal(errors.Trace(err).Error())
- }
- }
- }
- })
- b.Run(fmt.Sprintf("match last of %d announcements", size), func(b *testing.B) {
- queueSize := m.announcementQueue.getLen()
- if queueSize != size {
- b.Fatal(errors.Tracef("unexpected queue size: %d", queueSize).Error())
- }
- for i := 0; i < b.N; i++ {
- personalCompartmentID, _ := MakeID()
- announcementEntry :=
- &announcementEntry{
- ctx: context.Background(),
- limitIP: "127.0.0.1",
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- PersonalCompartmentIDs: []ID{personalCompartmentID},
- GeoIPData: common.GeoIPData{},
- NetworkType: NetworkTypeWiFi,
- NATType: NATTypePortRestrictedCone,
- PortMappingTypes: []PortMappingType{},
- },
- ProxyID: ID{},
- },
- offerChan: make(chan *MatchOffer, 1),
- }
- offerEntry := &offerEntry{
- ctx: context.Background(),
- limitIP: "127.0.0.1",
- offer: &MatchOffer{
- Properties: MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- PersonalCompartmentIDs: []ID{personalCompartmentID},
- GeoIPData: common.GeoIPData{},
- NetworkType: NetworkTypeWiFi,
- NATType: NATTypePortRestrictedCone,
- PortMappingTypes: []PortMappingType{},
- },
- },
- answerChan: make(chan *answerInfo, 1),
- }
- err := m.addAnnouncementEntry(announcementEntry)
- if err != nil {
- b.Fatal(errors.Trace(err).Error())
- }
- match, _ := m.matchOffer(offerEntry)
- if match == nil {
- b.Fatal(errors.TraceNew("unexpected no match").Error())
- }
- m.removeAnnouncementEntry(false, match)
- }
- })
- }
- }
|