| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220 |
- /*
- * Copyright (c) 2023, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package inproxy
- import (
- "context"
- "fmt"
- "runtime/debug"
- "strings"
- "sync"
- "testing"
- "time"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
- )
- func TestMatcher(t *testing.T) {
- err := runTestMatcher()
- if err != nil {
- t.Error(errors.Trace(err).Error())
- }
- }
- func runTestMatcher() error {
- limitEntryCount := 50
- rateLimitQuantity := 100
- rateLimitInterval := 1000 * time.Millisecond
- logger := newTestLogger()
- m := NewMatcher(
- &MatcherConfig{
- Logger: logger,
- AnnouncementLimitEntryCount: limitEntryCount,
- AnnouncementRateLimitQuantity: rateLimitQuantity,
- AnnouncementRateLimitInterval: rateLimitInterval,
- OfferLimitEntryCount: limitEntryCount,
- OfferRateLimitQuantity: rateLimitQuantity,
- OfferRateLimitInterval: rateLimitInterval,
- ProxyQualityState: NewProxyQuality(),
- })
- err := m.Start()
- if err != nil {
- return errors.Trace(err)
- }
- defer m.Stop()
- makeID := func() ID {
- ID, err := MakeID()
- if err != nil {
- panic(err)
- }
- return ID
- }
- makeAnnouncement := func(properties *MatchProperties) *MatchAnnouncement {
- return &MatchAnnouncement{
- Properties: *properties,
- ProxyID: makeID(),
- ConnectionID: makeID(),
- }
- }
- makeOffer := func(properties *MatchProperties, useMediaStreams bool) *MatchOffer {
- return &MatchOffer{
- Properties: *properties,
- UseMediaStreams: useMediaStreams,
- }
- }
- checkMatchMetrics := func(metrics *MatchMetrics) error {
- if metrics.OfferQueueSize < 1 || metrics.AnnouncementQueueSize < 1 {
- return errors.TraceNew("unexpected match metrics")
- }
- return nil
- }
- proxyIP := randomIPAddress()
- proxyFunc := func(
- resultChan chan error,
- proxyIP string,
- matchProperties *MatchProperties,
- timeout time.Duration,
- waitBeforeAnswer chan struct{},
- answerSuccess bool) {
- ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
- defer cancelFunc()
- announcement := makeAnnouncement(matchProperties)
- offer, matchMetrics, err := m.Announce(ctx, proxyIP, announcement)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- err = checkMatchMetrics(matchMetrics)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- _, ok := negotiateProtocolVersion(
- matchProperties.ProtocolVersion,
- offer.Properties.ProtocolVersion,
- offer.UseMediaStreams)
- if !ok {
- resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
- return
- }
- if waitBeforeAnswer != nil {
- <-waitBeforeAnswer
- }
- if answerSuccess {
- err = m.Answer(
- &MatchAnswer{
- ProxyID: announcement.ProxyID,
- ConnectionID: announcement.ConnectionID,
- })
- } else {
- m.AnswerError(announcement.ProxyID, announcement.ConnectionID)
- }
- resultChan <- errors.Trace(err)
- }
- clientIP := randomIPAddress()
- baseClientFunc := func(
- resultChan chan error,
- clientIP string,
- matchProperties *MatchProperties,
- useMediaStreams bool,
- timeout time.Duration) {
- ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
- defer cancelFunc()
- offer := makeOffer(matchProperties, useMediaStreams)
- _, matchAnnouncement, matchMetrics, err := m.Offer(ctx, clientIP, offer)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- err = checkMatchMetrics(matchMetrics)
- if err != nil {
- resultChan <- errors.Trace(err)
- return
- }
- _, ok := negotiateProtocolVersion(
- matchAnnouncement.Properties.ProtocolVersion,
- offer.Properties.ProtocolVersion,
- offer.UseMediaStreams)
- if !ok {
- resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
- return
- }
- resultChan <- nil
- }
- clientFunc := func(resultChan chan error, clientIP string,
- matchProperties *MatchProperties, timeout time.Duration) {
- baseClientFunc(resultChan, clientIP, matchProperties, false, timeout)
- }
- clientUsingMediaStreamsFunc := func(resultChan chan error, clientIP string,
- matchProperties *MatchProperties, timeout time.Duration) {
- baseClientFunc(resultChan, clientIP, matchProperties, true, timeout)
- }
- // Test: announce timeout
- proxyResultChan := make(chan error)
- matchProperties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- CommonCompartmentIDs: []ID{makeID()},
- }
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- if m.announcementQueue.getLen() != 0 {
- return errors.TraceNew("unexpected queue size")
- }
- // Test: limit announce entries by IP
- time.Sleep(rateLimitInterval)
- maxEntries := limitEntryCount
- maxEntriesProxyResultChan := make(chan error, maxEntries)
- // fill the queue with max entries for one IP; the first one will timeout sooner
- go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- for i := 0; i < maxEntries-1; i++ {
- go proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 100*time.Millisecond, nil, true)
- }
- // await goroutines filling queue
- for {
- time.Sleep(10 * time.Microsecond)
- m.announcementQueueMutex.Lock()
- queueLen := m.announcementQueue.getLen()
- m.announcementQueueMutex.Unlock()
- if queueLen == maxEntries {
- break
- }
- }
- // the next enqueue should fail with "max entries"
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // wait for first entry to timeout
- err = <-maxEntriesProxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // now another enqueue succeeds as expected
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // drain remaining entries
- for i := 0; i < maxEntries-1; i++ {
- err = <-maxEntriesProxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- }
- // Test: offer timeout
- clientResultChan := make(chan error)
- go clientFunc(clientResultChan, clientIP, matchProperties, 1*time.Microsecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- if m.offerQueue.Len() != 0 {
- return errors.TraceNew("unexpected queue size")
- }
- // Test: limit offer entries by IP
- time.Sleep(rateLimitInterval)
- maxEntries = limitEntryCount
- maxEntriesClientResultChan := make(chan error, maxEntries)
- // fill the queue with max entries for one IP; the first one will timeout sooner
- go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- for i := 0; i < maxEntries-1; i++ {
- go clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 100*time.Millisecond)
- }
- // await goroutines filling queue
- for {
- time.Sleep(10 * time.Microsecond)
- m.offerQueueMutex.Lock()
- queueLen := m.offerQueue.Len()
- m.offerQueueMutex.Unlock()
- if queueLen == maxEntries {
- break
- }
- }
- // enqueue should fail with "max entries"
- go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "max entries for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // wait for first entry to timeout
- err = <-maxEntriesClientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // now another enqueue succeeds as expected
- go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // drain remaining entries
- for i := 0; i < maxEntries-1; i++ {
- err = <-maxEntriesClientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- }
- // Test: announcement rate limit
- m.SetLimits(
- 0, rateLimitQuantity, rateLimitInterval, []ID{},
- 0, rateLimitQuantity, rateLimitInterval)
- time.Sleep(rateLimitInterval)
- maxEntries = rateLimitQuantity
- maxEntriesProxyResultChan = make(chan error, maxEntries)
- waitGroup := new(sync.WaitGroup)
- for i := 0; i < maxEntries; i++ {
- waitGroup.Add(1)
- go func() {
- defer waitGroup.Done()
- proxyFunc(maxEntriesProxyResultChan, proxyIP, matchProperties, 1*time.Microsecond, nil, true)
- }()
- }
- // Use a wait group to ensure all maxEntries have hit the rate limiter
- // without sleeping before the next attempt, as any sleep can increase
- // the rate limiter token count.
- waitGroup.Wait()
- // the next enqueue should fail with "rate exceeded"
- go proxyFunc(proxyResultChan, proxyIP, matchProperties, 10*time.Millisecond, nil, true)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: offer rate limit
- maxEntries = rateLimitQuantity
- maxEntriesClientResultChan = make(chan error, maxEntries)
- waitGroup = new(sync.WaitGroup)
- for i := 0; i < rateLimitQuantity; i++ {
- waitGroup.Add(1)
- go func() {
- defer waitGroup.Done()
- clientFunc(maxEntriesClientResultChan, clientIP, matchProperties, 1*time.Microsecond)
- }()
- }
- waitGroup.Wait()
- // enqueue should fail with "rate exceeded"
- go clientFunc(clientResultChan, clientIP, matchProperties, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "rate exceeded for IP") {
- return errors.Tracef("unexpected result: %v", err)
- }
- time.Sleep(rateLimitInterval)
- m.SetLimits(
- limitEntryCount, rateLimitQuantity, rateLimitInterval, []ID{},
- limitEntryCount, rateLimitQuantity, rateLimitInterval)
- // Test: basic match
- commonCompartmentIDs := []ID{makeID()}
- geoIPData1 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- geoIPData2 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: answer error
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, false)
- go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "no answer") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: client is gone
- waitBeforeAnswer := make(chan struct{})
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 100*time.Millisecond, waitBeforeAnswer, true)
- go clientFunc(clientResultChan, clientIP, geoIPData2, 10*time.Millisecond)
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- close(waitBeforeAnswer)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "no pending answer") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: no compartment match
- compartment1 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData1.GeoIPData,
- CommonCompartmentIDs: []ID{makeID()},
- }
- compartment2 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- PersonalCompartmentIDs: []ID{makeID()},
- }
- compartment3 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- CommonCompartmentIDs: []ID{makeID()},
- }
- compartment4 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- PersonalCompartmentIDs: []ID{makeID()},
- }
- proxy1ResultChan := make(chan error)
- proxy2ResultChan := make(chan error)
- client1ResultChan := make(chan error)
- client2ResultChan := make(chan error)
- go proxyFunc(proxy1ResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
- go proxyFunc(proxy2ResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
- go clientFunc(client1ResultChan, clientIP, compartment3, 10*time.Millisecond)
- go clientFunc(client2ResultChan, clientIP, compartment4, 10*time.Millisecond)
- err = <-proxy1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-proxy2ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-client1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-client2ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: common compartment match
- compartment1And3 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- CommonCompartmentIDs: []ID{
- compartment1.CommonCompartmentIDs[0],
- compartment3.CommonCompartmentIDs[0]},
- }
- go proxyFunc(proxyResultChan, proxyIP, compartment1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, compartment1And3, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: personal compartment match
- compartment2And4 := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: geoIPData2.GeoIPData,
- PersonalCompartmentIDs: []ID{
- compartment2.PersonalCompartmentIDs[0],
- compartment4.PersonalCompartmentIDs[0]},
- }
- go proxyFunc(proxyResultChan, proxyIP, compartment2, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, compartment2And4, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: no same-ASN match
- go proxyFunc(proxyResultChan, proxyIP, geoIPData1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, geoIPData1, 10*time.Millisecond)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: downgrade-compatible protocol version match
- protocolVersion1 := &MatchProperties{
- ProtocolVersion: ProtocolVersion1,
- GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- protocolVersion2 := &MatchProperties{
- ProtocolVersion: ProtocolVersion2,
- GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
- go clientFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: no incompatible protocol version match
- go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
- go clientUsingMediaStreamsFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
- err = <-proxyResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- err = <-clientResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // Test: downgrade-compatible protocol version match
- // Test: proxy preferred NAT match
- client1Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C1", ASN: "A1"},
- NATType: NATTypeFullCone,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- client2Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C2", ASN: "A2"},
- NATType: NATTypeSymmetric,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- proxy1Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C3", ASN: "A3"},
- NATType: NATTypeNone,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- proxy2Properties := &MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- GeoIPData: common.GeoIPData{Country: "C4", ASN: "A4"},
- NATType: NATTypeSymmetric,
- CommonCompartmentIDs: commonCompartmentIDs,
- }
- go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
- go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
- time.Sleep(5 * time.Millisecond) // Hack to ensure both proxies are enqueued
- go clientFunc(clientResultChan, clientIP, client1Properties, 10*time.Millisecond)
- err = <-proxy1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // proxy2 should match since it's the preferred NAT match
- err = <-proxy2ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: client preferred NAT match
- // Limitation: the current Matcher.matchAllOffers logic matches the first
- // enqueued client against the best proxy match, regardless of whether
- // there is another client in the queue that's a better match for that
- // proxy. As a result, this test only passes when the preferred matching
- // client is enqueued first, and the test is currently of limited utility.
- go clientFunc(client2ResultChan, clientIP, client2Properties, 20*time.Millisecond)
- time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
- go clientFunc(client1ResultChan, clientIP, client1Properties, 20*time.Millisecond)
- time.Sleep(5 * time.Millisecond) // Hack to ensure client is enqueued
- go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 20*time.Millisecond, nil, true)
- err = <-proxy1ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-client1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // client2 should match since it's the preferred NAT match
- err = <-client2ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: priority supercedes preferred NAT match
- go proxyFunc(proxy1ResultChan, proxyIP, proxy1Properties, 10*time.Millisecond, nil, true)
- time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
- proxy2Properties.IsPriority = true
- go proxyFunc(proxy2ResultChan, proxyIP, proxy2Properties, 10*time.Millisecond, nil, true)
- time.Sleep(5 * time.Millisecond) // Hack to ensure proxy is enqueued
- go clientFunc(clientResultChan, clientIP, client2Properties, 10*time.Millisecond)
- err = <-proxy1ResultChan
- if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
- return errors.Tracef("unexpected result: %v", err)
- }
- // proxy2 should match since it's the priority, but not preferred NAT match
- err = <-proxy2ResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- // Test: many matches
- // Reduce test log noise for this phase of the test
- logger.SetLogLevelDebug(false)
- matchCount := 10000
- proxyCount := matchCount
- clientCount := matchCount
- // Buffered so no goroutine will block reporting result
- proxyResultChan = make(chan error, matchCount)
- clientResultChan = make(chan error, matchCount)
- for proxyCount > 0 || clientCount > 0 {
- // Don't simply alternate enqueuing a proxy and a client
- if proxyCount > 0 && (clientCount == 0 || prng.FlipCoin()) {
- go proxyFunc(proxyResultChan, randomIPAddress(), geoIPData1, 10*time.Second, nil, true)
- proxyCount -= 1
- } else if clientCount > 0 {
- go clientFunc(clientResultChan, randomIPAddress(), geoIPData2, 10*time.Second)
- clientCount -= 1
- }
- }
- for i := 0; i < matchCount; i++ {
- err = <-proxyResultChan
- if err != nil {
- return errors.Trace(err)
- }
- err = <-clientResultChan
- if err != nil {
- return errors.Trace(err)
- }
- }
- return nil
- }
- func randomIPAddress() string {
- return fmt.Sprintf("%d.%d.%d.%d",
- prng.Range(0, 255),
- prng.Range(0, 255),
- prng.Range(0, 255),
- prng.Range(0, 255))
- }
- func TestMatcherMultiQueue(t *testing.T) {
- err := runTestMatcherMultiQueue()
- if err != nil {
- t.Error(errors.Trace(err).Error())
- }
- }
- func runTestMatcherMultiQueue() error {
- // Test: invalid compartment IDs
- q := newAnnouncementMultiQueue()
- err := q.enqueue(
- &announcementEntry{
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{}}})
- if err == nil {
- return errors.TraceNew("unexpected success")
- }
- compartmentID, _ := MakeID()
- err = q.enqueue(
- &announcementEntry{
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: []ID{compartmentID},
- PersonalCompartmentIDs: []ID{compartmentID},
- }}})
- if err == nil {
- return errors.TraceNew("unexpected success")
- }
- // Test: enqueue multiple candidates
- var otherCommonCompartmentIDs []ID
- var otherPersonalCompartmentIDs []ID
- numOtherCompartmentIDs := 10
- for i := 0; i < numOtherCompartmentIDs; i++ {
- commonCompartmentID, _ := MakeID()
- otherCommonCompartmentIDs = append(
- otherCommonCompartmentIDs, commonCompartmentID)
- personalCompartmentID, _ := MakeID()
- otherPersonalCompartmentIDs = append(
- otherPersonalCompartmentIDs, personalCompartmentID)
- }
- numOtherEntries := 10000
- for i := 0; i < numOtherEntries; i++ {
- ctx, cancel := context.WithDeadline(
- context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
- defer cancel()
- err := q.enqueue(
- &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: []ID{
- otherCommonCompartmentIDs[i%numOtherCompartmentIDs]},
- NATType: NATTypeSymmetric,
- }}})
- if err != nil {
- return errors.Trace(err)
- }
- err = q.enqueue(
- &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- PersonalCompartmentIDs: []ID{
- otherPersonalCompartmentIDs[i%numOtherCompartmentIDs]},
- NATType: NATTypeSymmetric,
- }}})
- if err != nil {
- return errors.Trace(err)
- }
- }
- var matchingCommonCompartmentIDs []ID
- numMatchingCompartmentIDs := 2
- numMatchingEntries := 2
- var expectedMatches []*announcementEntry
- for i := 0; i < numMatchingCompartmentIDs; i++ {
- for j := 0; j < numMatchingEntries; j++ {
- commonCompartmentID, _ := MakeID()
- matchingCommonCompartmentIDs = append(
- matchingCommonCompartmentIDs, commonCompartmentID)
- ctx, cancel := context.WithDeadline(
- context.Background(), time.Now().Add(time.Duration(i+1)*time.Minute))
- defer cancel()
- a := &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: matchingCommonCompartmentIDs[i : i+1],
- NATType: NATTypeNone,
- }}}
- expectedMatches = append(expectedMatches, a)
- err := q.enqueue(a)
- if err != nil {
- return errors.Trace(err)
- }
- }
- }
- // Test: inspect queue state
- if q.getLen() != numOtherEntries*2+numMatchingCompartmentIDs*numMatchingEntries {
- return errors.TraceNew("unexpected total entries count")
- }
- if len(q.commonCompartmentQueues) !=
- numOtherCompartmentIDs+numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- // Test: find expected matches
- iter := q.startMatching(true, matchingCommonCompartmentIDs)
- if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected iterator state")
- }
- unlimited, partiallyLimited, strictlyLimited := iter.getNATCounts()
- if unlimited != numMatchingCompartmentIDs*numMatchingEntries ||
- partiallyLimited != 0 ||
- strictlyLimited != 0 {
- return errors.TraceNew("unexpected NAT counts")
- }
- match, _ := iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[0] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- if match.queueReference.dequeue() {
- return errors.TraceNew("unexpected not already dequeued")
- }
- iter = q.startMatching(true, matchingCommonCompartmentIDs)
- if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected iterator state")
- }
- unlimited, partiallyLimited, strictlyLimited = iter.getNATCounts()
- if unlimited != numMatchingEntries*numMatchingCompartmentIDs-1 ||
- partiallyLimited != 0 ||
- strictlyLimited != 0 {
- return errors.TraceNew("unexpected NAT counts")
- }
- match, _ = iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[1] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- if len(iter.compartmentQueues) != numMatchingCompartmentIDs {
- return errors.TraceNew("unexpected iterator state")
- }
- // Test: getNext after dequeue
- match, _ = iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[2] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- match, _ = iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- if match != expectedMatches[3] {
- return errors.TraceNew("unexpected match")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- // Test: reinspect queue state after dequeues
- if q.getLen() != numOtherEntries*2 {
- return errors.TraceNew("unexpected total entries count")
- }
- if len(q.commonCompartmentQueues) != numOtherCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- if len(q.personalCompartmentQueues) != numOtherCompartmentIDs {
- return errors.TraceNew("unexpected compartment queue count")
- }
- // Test: priority
- q = newAnnouncementMultiQueue()
- var commonCompartmentIDs []ID
- numCompartmentIDs := 10
- for i := 0; i < numCompartmentIDs; i++ {
- commonCompartmentID, _ := MakeID()
- commonCompartmentIDs = append(
- commonCompartmentIDs, commonCompartmentID)
- }
- priorityProxyID, _ := MakeID()
- nonPriorityProxyID, _ := MakeID()
- ctx, cancel := context.WithDeadline(
- context.Background(), time.Now().Add(10*time.Minute))
- defer cancel()
- numEntries := 10000
- for i := 0; i < numEntries; i++ {
- // Enqueue every other announcement as a priority
- isPriority := i%2 == 0
- proxyID := priorityProxyID
- if !isPriority {
- proxyID = nonPriorityProxyID
- }
- err := q.enqueue(
- &announcementEntry{
- ctx: ctx,
- announcement: &MatchAnnouncement{
- ProxyID: proxyID,
- Properties: MatchProperties{
- IsPriority: isPriority,
- CommonCompartmentIDs: []ID{
- commonCompartmentIDs[prng.Intn(numCompartmentIDs)]},
- NATType: NATTypeUnknown,
- }}})
- if err != nil {
- return errors.Trace(err)
- }
- }
- iter = q.startMatching(true, commonCompartmentIDs)
- for i := 0; i < numEntries; i++ {
- match, isPriority := iter.getNext()
- if match == nil {
- return errors.TraceNew("unexpected missing match")
- }
- // First half, and only first half, of matches is priority
- expectPriority := i < numEntries/2
- if isPriority != expectPriority {
- return errors.TraceNew("unexpected isPriority")
- }
- expectedProxyID := priorityProxyID
- if !expectPriority {
- expectedProxyID = nonPriorityProxyID
- }
- if match.announcement.ProxyID != expectedProxyID {
- return errors.TraceNew("unexpected ProxyID")
- }
- if !match.queueReference.dequeue() {
- return errors.TraceNew("unexpected already dequeued")
- }
- }
- match, _ = iter.getNext()
- if match != nil {
- return errors.TraceNew("unexpected match")
- }
- return nil
- }
- // Benchmark numbers for the previous announcement queue implementation, with
- // increasingly slow performance when enqueuing and then finding a new,
- // distinct personal compartment ID proxy.
- //
- // pkg: github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy
- // BenchmarkMatcherQueue/insert_100_announcements-24 17528 68304 ns/op
- // BenchmarkMatcherQueue/match_last_of_100_announcements-24 521719 2243 ns/op
- // BenchmarkMatcherQueue/insert_10000_announcements-24 208 5780227 ns/op
- // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 6796 177587 ns/op
- // BenchmarkMatcherQueue/insert_100000_announcements-24 21 50859464 ns/op
- // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 538 2249389 ns/op
- // BenchmarkMatcherQueue/insert_1000000_announcements-24 3 499685555 ns/op
- // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 33 34299751 ns/op
- // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 2606017042 ns/op
- // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 6 179171125 ns/op
- // PASS
- // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 17.585s
- //
- // Benchmark numbers for the current implemention, the announcementMultiQueue,
- // with constant time performance for the same scenario:
- //
- // BenchmarkMatcherQueue
- // BenchmarkMatcherQueue/insert_100_announcements-24 15422 77187 ns/op
- // BenchmarkMatcherQueue/match_last_of_100_announcements-24 965152 1217 ns/op
- // BenchmarkMatcherQueue/insert_10000_announcements-24 168 7322661 ns/op
- // BenchmarkMatcherQueue/match_last_of_10000_announcements-24 906748 1211 ns/op
- // BenchmarkMatcherQueue/insert_100000_announcements-24 16 64770370 ns/op
- // BenchmarkMatcherQueue/match_last_of_100000_announcements-24 972342 1243 ns/op
- // BenchmarkMatcherQueue/insert_1000000_announcements-24 2 701046271 ns/op
- // BenchmarkMatcherQueue/match_last_of_1000000_announcements-24 988050 1230 ns/op
- // BenchmarkMatcherQueue/insert_4999999_announcements-24 1 4523888833 ns/op
- // BenchmarkMatcherQueue/match_last_of_4999999_announcements-24 963894 1186 ns/op
- // PASS
- // ok github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy 22.439s
- func BenchmarkMatcherQueue(b *testing.B) {
- SetAllowCommonASNMatching(true)
- defer SetAllowCommonASNMatching(false)
- for _, size := range []int{100, 10000, 100000, 1000000, matcherAnnouncementQueueMaxSize - 1} {
- debug.FreeOSMemory()
- var m *Matcher
- commonCompartmentID, _ := MakeID()
- b.Run(fmt.Sprintf("insert %d announcements", size), func(b *testing.B) {
- for i := 0; i < b.N; i++ {
- // Matcher.Start is not called to start the matchWorker;
- // instead, matchOffer is invoked directly.
- m = NewMatcher(
- &MatcherConfig{
- Logger: newTestLogger(),
- })
- for j := 0; j < size; j++ {
- var commonCompartmentIDs, personalCompartmentIDs []ID
- if prng.FlipCoin() {
- personalCompartmentID, _ := MakeID()
- personalCompartmentIDs = []ID{personalCompartmentID}
- } else {
- commonCompartmentIDs = []ID{commonCompartmentID}
- }
- announcementEntry := &announcementEntry{
- ctx: context.Background(),
- limitIP: "127.0.0.1",
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- CommonCompartmentIDs: commonCompartmentIDs,
- PersonalCompartmentIDs: personalCompartmentIDs,
- GeoIPData: common.GeoIPData{},
- NetworkType: NetworkTypeWiFi,
- NATType: NATTypePortRestrictedCone,
- PortMappingTypes: []PortMappingType{},
- },
- ProxyID: ID{},
- },
- offerChan: make(chan *MatchOffer, 1),
- }
- err := m.addAnnouncementEntry(announcementEntry)
- if err != nil {
- b.Fatal(errors.Trace(err).Error())
- }
- }
- }
- })
- b.Run(fmt.Sprintf("match last of %d announcements", size), func(b *testing.B) {
- queueSize := m.announcementQueue.getLen()
- if queueSize != size {
- b.Fatal(errors.Tracef("unexpected queue size: %d", queueSize).Error())
- }
- for i := 0; i < b.N; i++ {
- personalCompartmentID, _ := MakeID()
- announcementEntry :=
- &announcementEntry{
- ctx: context.Background(),
- limitIP: "127.0.0.1",
- announcement: &MatchAnnouncement{
- Properties: MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- PersonalCompartmentIDs: []ID{personalCompartmentID},
- GeoIPData: common.GeoIPData{},
- NetworkType: NetworkTypeWiFi,
- NATType: NATTypePortRestrictedCone,
- PortMappingTypes: []PortMappingType{},
- },
- ProxyID: ID{},
- },
- offerChan: make(chan *MatchOffer, 1),
- }
- offerEntry := &offerEntry{
- ctx: context.Background(),
- limitIP: "127.0.0.1",
- offer: &MatchOffer{
- Properties: MatchProperties{
- ProtocolVersion: LatestProtocolVersion,
- PersonalCompartmentIDs: []ID{personalCompartmentID},
- GeoIPData: common.GeoIPData{},
- NetworkType: NetworkTypeWiFi,
- NATType: NATTypePortRestrictedCone,
- PortMappingTypes: []PortMappingType{},
- },
- },
- answerChan: make(chan *answerInfo, 1),
- }
- err := m.addAnnouncementEntry(announcementEntry)
- if err != nil {
- b.Fatal(errors.Trace(err).Error())
- }
- match, _ := m.matchOffer(offerEntry)
- if match == nil {
- b.Fatal(errors.TraceNew("unexpected no match").Error())
- }
- m.removeAnnouncementEntry(false, match)
- }
- })
- }
- }
|