inproxy_test.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245
  1. //go:build PSIPHON_ENABLE_INPROXY
  2. /*
  3. * Copyright (c) 2023, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package inproxy
  21. import (
  22. "bytes"
  23. "context"
  24. std_tls "crypto/tls"
  25. "encoding/base64"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "net"
  30. "net/http"
  31. _ "net/http/pprof"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "testing"
  37. "time"
  38. tls "github.com/Psiphon-Labs/psiphon-tls"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/internal/testutils"
  45. "golang.org/x/sync/errgroup"
  46. )
  47. func TestInproxy(t *testing.T) {
  48. err := runTestInproxy(false)
  49. if err != nil {
  50. t.Error(errors.Trace(err).Error())
  51. }
  52. }
  53. func TestInproxyMustUpgrade(t *testing.T) {
  54. err := runTestInproxy(true)
  55. if err != nil {
  56. t.Error(errors.Trace(err).Error())
  57. }
  58. }
  59. func runTestInproxy(doMustUpgrade bool) error {
  60. // Note: use the environment variable PION_LOG_TRACE=all to emit WebRTC logging.
  61. numProxies := 5
  62. proxyMaxClients := 3
  63. numClients := 10
  64. bytesToSend := 1 << 20
  65. targetElapsedSeconds := 2
  66. baseAPIParameters := common.APIParameters{
  67. "sponsor_id": strings.ToUpper(prng.HexString(8)),
  68. "client_platform": "test-client-platform",
  69. }
  70. testCompartmentID, _ := MakeID()
  71. testCommonCompartmentIDs := []ID{testCompartmentID}
  72. testNetworkID := "NETWORK-ID-1"
  73. testNetworkType := NetworkTypeUnknown
  74. testNATType := NATTypeUnknown
  75. testSTUNServerAddress := "stun.voipgate.com:3478"
  76. testDisableSTUN := false
  77. testDisablePortMapping := false
  78. testNewTacticsPayload := []byte(prng.HexString(100))
  79. testNewTacticsTag := "new-tactics-tag"
  80. testUnchangedTacticsPayload := []byte(prng.HexString(100))
  81. currentNetworkCtx, currentNetworkCancelFunc := context.WithCancel(context.Background())
  82. defer currentNetworkCancelFunc()
  83. // TODO: test port mapping
  84. stunServerAddressSucceededCount := int32(0)
  85. stunServerAddressSucceeded := func(bool, string) { atomic.AddInt32(&stunServerAddressSucceededCount, 1) }
  86. stunServerAddressFailedCount := int32(0)
  87. stunServerAddressFailed := func(bool, string) { atomic.AddInt32(&stunServerAddressFailedCount, 1) }
  88. roundTripperSucceededCount := int32(0)
  89. roundTripperSucceded := func(RoundTripper) { atomic.AddInt32(&roundTripperSucceededCount, 1) }
  90. roundTripperFailedCount := int32(0)
  91. roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
  92. noMatch := func(RoundTripper) {}
  93. var receivedProxyMustUpgrade chan struct{}
  94. var receivedClientMustUpgrade chan struct{}
  95. if doMustUpgrade {
  96. receivedProxyMustUpgrade = make(chan struct{})
  97. receivedClientMustUpgrade = make(chan struct{})
  98. // trigger MustUpgrade
  99. minimumProxyProtocolVersion = LatestProtocolVersion + 1
  100. minimumClientProtocolVersion = LatestProtocolVersion + 1
  101. // Minimize test parameters for MustUpgrade case
  102. numProxies = 1
  103. proxyMaxClients = 1
  104. numClients = 1
  105. testDisableSTUN = true
  106. testDisablePortMapping = true
  107. }
  108. testCtx, stopTest := context.WithCancel(context.Background())
  109. defer stopTest()
  110. testGroup := new(errgroup.Group)
  111. // Enable test to run without requiring host firewall exceptions
  112. SetAllowBogonWebRTCConnections(true)
  113. defer SetAllowBogonWebRTCConnections(false)
  114. // Init logging and profiling
  115. logger := testutils.NewTestLogger()
  116. pprofListener, err := net.Listen("tcp", "127.0.0.1:0")
  117. go http.Serve(pprofListener, nil)
  118. defer pprofListener.Close()
  119. logger.WithTrace().Info(fmt.Sprintf("PPROF: http://%s/debug/pprof", pprofListener.Addr()))
  120. // Start echo servers
  121. tcpEchoListener, err := net.Listen("tcp", "127.0.0.1:0")
  122. if err != nil {
  123. return errors.Trace(err)
  124. }
  125. defer tcpEchoListener.Close()
  126. go runTCPEchoServer(tcpEchoListener)
  127. // QUIC tests UDP proxying, and provides reliable delivery of echoed data
  128. quicEchoServer, err := newQuicEchoServer()
  129. if err != nil {
  130. return errors.Trace(err)
  131. }
  132. defer quicEchoServer.Close()
  133. go quicEchoServer.Run()
  134. // Create signed server entry with capability
  135. serverPrivateKey, err := GenerateSessionPrivateKey()
  136. if err != nil {
  137. return errors.Trace(err)
  138. }
  139. serverPublicKey, err := serverPrivateKey.GetPublicKey()
  140. if err != nil {
  141. return errors.Trace(err)
  142. }
  143. serverRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  144. if err != nil {
  145. return errors.Trace(err)
  146. }
  147. serverEntry := make(protocol.ServerEntryFields)
  148. serverEntry["ipAddress"] = "127.0.0.1"
  149. _, tcpPort, _ := net.SplitHostPort(tcpEchoListener.Addr().String())
  150. _, udpPort, _ := net.SplitHostPort(quicEchoServer.Addr().String())
  151. serverEntry["inproxyOSSHPort"], _ = strconv.Atoi(tcpPort)
  152. serverEntry["inproxyQUICPort"], _ = strconv.Atoi(udpPort)
  153. serverEntry["capabilities"] = []string{"INPROXY-WEBRTC-OSSH", "INPROXY-WEBRTC-QUIC-OSSH"}
  154. serverEntry["inproxySessionPublicKey"] = base64.RawStdEncoding.EncodeToString(serverPublicKey[:])
  155. serverEntry["inproxySessionRootObfuscationSecret"] = base64.RawStdEncoding.EncodeToString(serverRootObfuscationSecret[:])
  156. testServerEntryTag := prng.HexString(16)
  157. serverEntry["tag"] = testServerEntryTag
  158. serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey, err :=
  159. protocol.NewServerEntrySignatureKeyPair()
  160. if err != nil {
  161. return errors.Trace(err)
  162. }
  163. err = serverEntry.AddSignature(serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey)
  164. if err != nil {
  165. return errors.Trace(err)
  166. }
  167. packedServerEntryFields, err := protocol.EncodePackedServerEntryFields(serverEntry)
  168. if err != nil {
  169. return errors.Trace(err)
  170. }
  171. packedDestinationServerEntry, err := protocol.CBOREncoding.Marshal(packedServerEntryFields)
  172. if err != nil {
  173. return errors.Trace(err)
  174. }
  175. // API parameter handlers
  176. apiParameterValidator := func(params common.APIParameters) error {
  177. if len(params) != len(baseAPIParameters) {
  178. return errors.TraceNew("unexpected base API parameter count")
  179. }
  180. for name, value := range params {
  181. if value.(string) != baseAPIParameters[name].(string) {
  182. return errors.Tracef(
  183. "unexpected base API parameter: %v: %v != %v",
  184. name,
  185. value.(string),
  186. baseAPIParameters[name].(string))
  187. }
  188. }
  189. return nil
  190. }
  191. apiParameterLogFieldFormatter := func(
  192. _ string, _ common.GeoIPData, params common.APIParameters) common.LogFields {
  193. logFields := common.LogFields{}
  194. logFields.Add(common.LogFields(params))
  195. return logFields
  196. }
  197. // Start broker
  198. logger.WithTrace().Info("START BROKER")
  199. brokerPrivateKey, err := GenerateSessionPrivateKey()
  200. if err != nil {
  201. return errors.Trace(err)
  202. }
  203. brokerPublicKey, err := brokerPrivateKey.GetPublicKey()
  204. if err != nil {
  205. return errors.Trace(err)
  206. }
  207. brokerRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  208. if err != nil {
  209. return errors.Trace(err)
  210. }
  211. brokerListener, err := net.Listen("tcp", "127.0.0.1:0")
  212. if err != nil {
  213. return errors.Trace(err)
  214. }
  215. defer brokerListener.Close()
  216. brokerConfig := &BrokerConfig{
  217. Logger: logger,
  218. CommonCompartmentIDs: testCommonCompartmentIDs,
  219. APIParameterValidator: apiParameterValidator,
  220. APIParameterLogFieldFormatter: apiParameterLogFieldFormatter,
  221. GetTacticsPayload: func(_ common.GeoIPData, _ common.APIParameters) ([]byte, string, error) {
  222. // Exercise both new and unchanged tactics
  223. if prng.FlipCoin() {
  224. return testNewTacticsPayload, testNewTacticsTag, nil
  225. }
  226. return testUnchangedTacticsPayload, "", nil
  227. },
  228. IsValidServerEntryTag: func(serverEntryTag string) bool { return serverEntryTag == testServerEntryTag },
  229. PrivateKey: brokerPrivateKey,
  230. ObfuscationRootSecret: brokerRootObfuscationSecret,
  231. ServerEntrySignaturePublicKey: serverEntrySignaturePublicKey,
  232. AllowProxy: func(common.GeoIPData) bool { return true },
  233. AllowClient: func(common.GeoIPData) bool { return true },
  234. AllowDomainFrontedDestinations: func(common.GeoIPData) bool { return true },
  235. AllowMatch: func(common.GeoIPData, common.GeoIPData) bool { return true },
  236. }
  237. broker, err := NewBroker(brokerConfig)
  238. if err != nil {
  239. return errors.Trace(err)
  240. }
  241. // Enable proxy quality (and otherwise use the default quality parameters)
  242. enableProxyQuality := true
  243. broker.SetProxyQualityParameters(
  244. enableProxyQuality,
  245. proxyQualityTTL,
  246. proxyQualityPendingFailedMatchDeadline,
  247. proxyQualityFailedMatchThreshold)
  248. err = broker.Start()
  249. if err != nil {
  250. return errors.Trace(err)
  251. }
  252. defer broker.Stop()
  253. testGroup.Go(func() error {
  254. err := runHTTPServer(brokerListener, broker)
  255. if testCtx.Err() != nil {
  256. return nil
  257. }
  258. return errors.Trace(err)
  259. })
  260. // Stub server broker request handler (in Psiphon, this will be the
  261. // destination Psiphon server; here, it's not necessary to build this
  262. // handler into the destination echo server)
  263. //
  264. // The stub server broker request handler also triggers a server proxy
  265. // quality request in the other direction.
  266. makeServerBrokerClientRoundTripper := func(_ SessionPublicKey) (
  267. RoundTripper, common.APIParameters, error) {
  268. return newHTTPRoundTripper(brokerListener.Addr().String(), "server"), nil, nil
  269. }
  270. serverSessionsConfig := &ServerBrokerSessionsConfig{
  271. Logger: logger,
  272. ServerPrivateKey: serverPrivateKey,
  273. ServerRootObfuscationSecret: serverRootObfuscationSecret,
  274. BrokerPublicKeys: []SessionPublicKey{brokerPublicKey},
  275. BrokerRootObfuscationSecrets: []ObfuscationSecret{brokerRootObfuscationSecret},
  276. BrokerRoundTripperMaker: makeServerBrokerClientRoundTripper,
  277. ProxyMetricsValidator: apiParameterValidator,
  278. ProxyMetricsFormatter: apiParameterLogFieldFormatter,
  279. ProxyMetricsPrefix: "",
  280. }
  281. serverSessions, err := NewServerBrokerSessions(serverSessionsConfig)
  282. if err != nil {
  283. return errors.Trace(err)
  284. }
  285. err = serverSessions.Start()
  286. if err != nil {
  287. return errors.Trace(err)
  288. }
  289. defer serverSessions.Stop()
  290. // Don't delay reporting quality.
  291. serverSessions.SetProxyQualityRequestParameters(
  292. proxyQualityReporterMaxRequestEntries,
  293. 0,
  294. proxyQualityReporterRequestTimeout,
  295. proxyQualityReporterRequestRetries)
  296. var pendingBrokerServerReportsMutex sync.Mutex
  297. pendingBrokerServerReports := make(map[ID]bool)
  298. addPendingBrokerServerReport := func(connectionID ID) {
  299. pendingBrokerServerReportsMutex.Lock()
  300. defer pendingBrokerServerReportsMutex.Unlock()
  301. pendingBrokerServerReports[connectionID] = true
  302. }
  303. removePendingBrokerServerReport := func(connectionID ID) {
  304. pendingBrokerServerReportsMutex.Lock()
  305. defer pendingBrokerServerReportsMutex.Unlock()
  306. delete(pendingBrokerServerReports, connectionID)
  307. }
  308. hasPendingBrokerServerReports := func() bool {
  309. pendingBrokerServerReportsMutex.Lock()
  310. defer pendingBrokerServerReportsMutex.Unlock()
  311. return len(pendingBrokerServerReports) > 0
  312. }
  313. serverQualityGroup := new(errgroup.Group)
  314. var serverQualityProxyIDsMutex sync.Mutex
  315. serverQualityProxyIDs := make(map[ID]struct{})
  316. testProxyASN := "65537"
  317. testClientASN := "65538"
  318. handleBrokerServerReports := func(in []byte, clientConnectionID ID) ([]byte, error) {
  319. handler := func(
  320. brokerVerifiedOriginalClientIP string,
  321. brokerReportedProxyID ID,
  322. brokerMatchedPersonalCompartments bool,
  323. logFields common.LogFields) {
  324. // Mark the report as no longer outstanding
  325. removePendingBrokerServerReport(clientConnectionID)
  326. // Trigger an asynchronous proxy quality request to the broker.
  327. // This roughly follows the Psiphon server functionality, where a
  328. // quality request is made sometime after the Psiphon handshake
  329. // completes, once tunnel quality thresholds are achieved.
  330. serverQualityGroup.Go(func() error {
  331. serverSessions.ReportQuality(
  332. brokerReportedProxyID, testProxyASN, testClientASN)
  333. serverQualityProxyIDsMutex.Lock()
  334. serverQualityProxyIDs[brokerReportedProxyID] = struct{}{}
  335. serverQualityProxyIDsMutex.Unlock()
  336. return nil
  337. })
  338. }
  339. out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
  340. return out, errors.Trace(err)
  341. }
  342. // Check that the tactics round trip succeeds
  343. var pendingProxyTacticsCallbacksMutex sync.Mutex
  344. pendingProxyTacticsCallbacks := make(map[SessionPrivateKey]bool)
  345. addPendingProxyTacticsCallback := func(proxyPrivateKey SessionPrivateKey) {
  346. pendingProxyTacticsCallbacksMutex.Lock()
  347. defer pendingProxyTacticsCallbacksMutex.Unlock()
  348. pendingProxyTacticsCallbacks[proxyPrivateKey] = true
  349. }
  350. hasPendingProxyTacticsCallbacks := func() bool {
  351. pendingProxyTacticsCallbacksMutex.Lock()
  352. defer pendingProxyTacticsCallbacksMutex.Unlock()
  353. return len(pendingProxyTacticsCallbacks) > 0
  354. }
  355. makeHandleTacticsPayload := func(
  356. proxyPrivateKey SessionPrivateKey,
  357. tacticsNetworkID string) func(_ string, _ bool, _ []byte) bool {
  358. return func(networkID string, _ bool, tacticsPayload []byte) bool {
  359. pendingProxyTacticsCallbacksMutex.Lock()
  360. defer pendingProxyTacticsCallbacksMutex.Unlock()
  361. // Check that the correct networkID is passed around; if not,
  362. // skip the delete, which will fail the test
  363. if networkID == tacticsNetworkID {
  364. // Certain state is reset when new tactics are applied -- the
  365. // return true case; exercise both cases
  366. if bytes.Equal(tacticsPayload, testNewTacticsPayload) {
  367. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  368. return true
  369. }
  370. if bytes.Equal(tacticsPayload, testUnchangedTacticsPayload) {
  371. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  372. return false
  373. }
  374. }
  375. panic("unexpected tactics payload")
  376. }
  377. }
  378. // Start proxies
  379. logger.WithTrace().Info("START PROXIES")
  380. for i := 0; i < numProxies; i++ {
  381. proxyPrivateKey, err := GenerateSessionPrivateKey()
  382. if err != nil {
  383. return errors.Trace(err)
  384. }
  385. brokerCoordinator := &testBrokerDialCoordinator{
  386. networkID: testNetworkID,
  387. networkType: testNetworkType,
  388. brokerClientPrivateKey: proxyPrivateKey,
  389. brokerPublicKey: brokerPublicKey,
  390. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  391. brokerClientRoundTripper: newHTTPRoundTripper(
  392. brokerListener.Addr().String(), "proxy"),
  393. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  394. brokerClientRoundTripperFailed: roundTripperFailed,
  395. // Minimize the delay before proxies reannounce after dial
  396. // failures, which may occur.
  397. announceDelay: 0,
  398. announceMaxBackoffDelay: 0,
  399. announceDelayJitter: 0.0,
  400. }
  401. webRTCCoordinator := &testWebRTCDialCoordinator{
  402. networkID: testNetworkID,
  403. networkType: testNetworkType,
  404. natType: testNATType,
  405. disableSTUN: testDisableSTUN,
  406. disablePortMapping: testDisablePortMapping,
  407. stunServerAddress: testSTUNServerAddress,
  408. stunServerAddressRFC5780: testSTUNServerAddress,
  409. stunServerAddressSucceeded: stunServerAddressSucceeded,
  410. stunServerAddressFailed: stunServerAddressFailed,
  411. setNATType: func(NATType) {},
  412. setPortMappingTypes: func(PortMappingTypes) {},
  413. bindToDevice: func(int) error { return nil },
  414. // Minimize the delay before proxies reannounce after failed
  415. // connections, which may occur.
  416. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  417. proxyRelayInactivityTimeout: 5 * time.Second,
  418. }
  419. // Each proxy has its own broker client
  420. brokerClient, err := NewBrokerClient(brokerCoordinator)
  421. if err != nil {
  422. return errors.Trace(err)
  423. }
  424. tacticsNetworkID := prng.HexString(32)
  425. runCtx, cancelRun := context.WithCancel(testCtx)
  426. // No deferred cancelRun due to testGroup.Go below
  427. name := fmt.Sprintf("proxy-%d", i)
  428. proxy, err := NewProxy(&ProxyConfig{
  429. Logger: testutils.NewTestLoggerWithComponent(name),
  430. WaitForNetworkConnectivity: func() bool {
  431. return true
  432. },
  433. GetCurrentNetworkContext: func() context.Context {
  434. return currentNetworkCtx
  435. },
  436. GetBrokerClient: func() (*BrokerClient, error) {
  437. return brokerClient, nil
  438. },
  439. GetBaseAPIParameters: func(bool) (common.APIParameters, string, error) {
  440. return baseAPIParameters, tacticsNetworkID, nil
  441. },
  442. MakeWebRTCDialCoordinator: func() (WebRTCDialCoordinator, error) {
  443. return webRTCCoordinator, nil
  444. },
  445. HandleTacticsPayload: makeHandleTacticsPayload(proxyPrivateKey, tacticsNetworkID),
  446. MaxClients: proxyMaxClients,
  447. LimitUpstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  448. LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  449. ActivityUpdater: func(connectingClients int32, connectedClients int32,
  450. bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
  451. fmt.Printf("[%s][%s] ACTIVITY: %d connecting, %d connected, %d up, %d down\n",
  452. time.Now().UTC().Format(time.RFC3339), name,
  453. connectingClients, connectedClients, bytesUp, bytesDown)
  454. },
  455. MustUpgrade: func() {
  456. close(receivedProxyMustUpgrade)
  457. cancelRun()
  458. },
  459. })
  460. if err != nil {
  461. return errors.Trace(err)
  462. }
  463. addPendingProxyTacticsCallback(proxyPrivateKey)
  464. testGroup.Go(func() error {
  465. proxy.Run(runCtx)
  466. return nil
  467. })
  468. }
  469. // Await proxy announcements before starting clients
  470. //
  471. // - Announcements may delay due to proxyAnnounceRetryDelay in Proxy.Run,
  472. // plus NAT discovery
  473. //
  474. // - Don't wait for > numProxies announcements due to
  475. // InitiatorSessions.NewRoundTrip waitToShareSession limitation
  476. if !doMustUpgrade {
  477. for {
  478. time.Sleep(100 * time.Millisecond)
  479. broker.matcher.announcementQueueMutex.Lock()
  480. n := broker.matcher.announcementQueue.getLen()
  481. broker.matcher.announcementQueueMutex.Unlock()
  482. if n >= numProxies {
  483. break
  484. }
  485. }
  486. }
  487. // Start clients
  488. var completedClientCount atomic.Int64
  489. logger.WithTrace().Info("START CLIENTS")
  490. clientsGroup := new(errgroup.Group)
  491. makeClientFunc := func(
  492. clientNum int,
  493. isTCP bool,
  494. brokerClient *BrokerClient,
  495. webRTCCoordinator WebRTCDialCoordinator) func() error {
  496. var networkProtocol NetworkProtocol
  497. var addr string
  498. var wrapWithQUIC bool
  499. if isTCP {
  500. networkProtocol = NetworkProtocolTCP
  501. addr = tcpEchoListener.Addr().String()
  502. } else {
  503. networkProtocol = NetworkProtocolUDP
  504. addr = quicEchoServer.Addr().String()
  505. wrapWithQUIC = true
  506. }
  507. return func() error {
  508. name := fmt.Sprintf("client-%d", clientNum)
  509. dialCtx, cancelDial := context.WithTimeout(testCtx, 60*time.Second)
  510. defer cancelDial()
  511. conn, err := DialClient(
  512. dialCtx,
  513. &ClientConfig{
  514. Logger: testutils.NewTestLoggerWithComponent(name),
  515. BaseAPIParameters: baseAPIParameters,
  516. BrokerClient: brokerClient,
  517. WebRTCDialCoordinator: webRTCCoordinator,
  518. ReliableTransport: isTCP,
  519. DialNetworkProtocol: networkProtocol,
  520. DialAddress: addr,
  521. PackedDestinationServerEntry: packedDestinationServerEntry,
  522. MustUpgrade: func() {
  523. close(receivedClientMustUpgrade)
  524. cancelDial()
  525. },
  526. })
  527. if err != nil {
  528. return errors.Trace(err)
  529. }
  530. var relayConn net.Conn
  531. relayConn = conn
  532. if wrapWithQUIC {
  533. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  534. if err != nil {
  535. return errors.Trace(err)
  536. }
  537. disablePathMTUDiscovery := true
  538. quicConn, err := quic.Dial(
  539. dialCtx,
  540. conn,
  541. udpAddr,
  542. "test",
  543. "QUICv1",
  544. nil,
  545. quicEchoServer.ObfuscationKey(),
  546. nil,
  547. nil,
  548. disablePathMTUDiscovery,
  549. GetQUICMaxPacketSizeAdjustment(),
  550. false,
  551. false,
  552. common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), ""),
  553. )
  554. if err != nil {
  555. return errors.Trace(err)
  556. }
  557. relayConn = quicConn
  558. }
  559. addPendingBrokerServerReport(conn.GetConnectionID())
  560. signalRelayComplete := make(chan struct{})
  561. clientsGroup.Go(func() error {
  562. defer close(signalRelayComplete)
  563. in := conn.InitialRelayPacket()
  564. for in != nil {
  565. out, err := handleBrokerServerReports(in, conn.GetConnectionID())
  566. if err != nil {
  567. if out == nil {
  568. return errors.Trace(err)
  569. } else {
  570. fmt.Printf("HandlePacket returned packet and error: %v\n", err)
  571. // Proceed with reset session token packet
  572. }
  573. }
  574. if out == nil {
  575. // Relay is complete
  576. break
  577. }
  578. in, err = conn.RelayPacket(testCtx, out)
  579. if err != nil {
  580. return errors.Trace(err)
  581. }
  582. }
  583. return nil
  584. })
  585. sendBytes := prng.Bytes(bytesToSend)
  586. clientsGroup.Go(func() error {
  587. for n := 0; n < bytesToSend; {
  588. m := prng.Range(1024, 32768)
  589. if bytesToSend-n < m {
  590. m = bytesToSend - n
  591. }
  592. _, err := relayConn.Write(sendBytes[n : n+m])
  593. if err != nil {
  594. return errors.Trace(err)
  595. }
  596. n += m
  597. }
  598. fmt.Printf("[%s][%s] %d bytes sent\n",
  599. time.Now().UTC().Format(time.RFC3339), name, bytesToSend)
  600. return nil
  601. })
  602. clientsGroup.Go(func() error {
  603. buf := make([]byte, 32768)
  604. n := 0
  605. for n < bytesToSend {
  606. m, err := relayConn.Read(buf)
  607. if err != nil {
  608. return errors.Trace(err)
  609. }
  610. if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
  611. return errors.Tracef(
  612. "unexpected bytes: expected at index %d, received at index %d",
  613. bytes.Index(sendBytes, buf[:m]), n)
  614. }
  615. n += m
  616. }
  617. completed := completedClientCount.Add(1)
  618. fmt.Printf("[%s][%s] %d bytes received; relay complete (%d/%d)\n",
  619. time.Now().UTC().Format(time.RFC3339), name,
  620. bytesToSend, completed, numClients)
  621. select {
  622. case <-signalRelayComplete:
  623. case <-testCtx.Done():
  624. }
  625. fmt.Printf("[%s][%s] closing\n",
  626. time.Now().UTC().Format(time.RFC3339), name)
  627. relayConn.Close()
  628. conn.Close()
  629. return nil
  630. })
  631. return nil
  632. }
  633. }
  634. newClientBrokerClient := func(
  635. disableWaitToShareSession bool) (*BrokerClient, error) {
  636. clientPrivateKey, err := GenerateSessionPrivateKey()
  637. if err != nil {
  638. return nil, errors.Trace(err)
  639. }
  640. brokerCoordinator := &testBrokerDialCoordinator{
  641. networkID: testNetworkID,
  642. networkType: testNetworkType,
  643. commonCompartmentIDs: testCommonCompartmentIDs,
  644. disableWaitToShareSession: disableWaitToShareSession,
  645. brokerClientPrivateKey: clientPrivateKey,
  646. brokerPublicKey: brokerPublicKey,
  647. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  648. brokerClientRoundTripper: newHTTPRoundTripper(
  649. brokerListener.Addr().String(), "client"),
  650. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  651. brokerClientRoundTripperFailed: roundTripperFailed,
  652. brokerClientNoMatch: noMatch,
  653. }
  654. brokerClient, err := NewBrokerClient(brokerCoordinator)
  655. if err != nil {
  656. return nil, errors.Trace(err)
  657. }
  658. return brokerClient, nil
  659. }
  660. newClientWebRTCDialCoordinator := func(
  661. isMobile bool,
  662. useMediaStreams bool) (*testWebRTCDialCoordinator, error) {
  663. clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  664. if err != nil {
  665. return nil, errors.Trace(err)
  666. }
  667. var trafficShapingParameters *TrafficShapingParameters
  668. if useMediaStreams {
  669. trafficShapingParameters = &TrafficShapingParameters{
  670. MinPaddedMessages: 0,
  671. MaxPaddedMessages: 10,
  672. MinPaddingSize: 0,
  673. MaxPaddingSize: 254,
  674. MinDecoyMessages: 0,
  675. MaxDecoyMessages: 10,
  676. MinDecoySize: 1,
  677. MaxDecoySize: 1200,
  678. DecoyMessageProbability: 0.5,
  679. }
  680. } else {
  681. trafficShapingParameters = &TrafficShapingParameters{
  682. MinPaddedMessages: 0,
  683. MaxPaddedMessages: 10,
  684. MinPaddingSize: 0,
  685. MaxPaddingSize: 1500,
  686. MinDecoyMessages: 0,
  687. MaxDecoyMessages: 10,
  688. MinDecoySize: 1,
  689. MaxDecoySize: 1500,
  690. DecoyMessageProbability: 0.5,
  691. }
  692. }
  693. webRTCCoordinator := &testWebRTCDialCoordinator{
  694. networkID: testNetworkID,
  695. networkType: testNetworkType,
  696. natType: testNATType,
  697. disableSTUN: testDisableSTUN,
  698. stunServerAddress: testSTUNServerAddress,
  699. stunServerAddressRFC5780: testSTUNServerAddress,
  700. stunServerAddressSucceeded: stunServerAddressSucceeded,
  701. stunServerAddressFailed: stunServerAddressFailed,
  702. clientRootObfuscationSecret: clientRootObfuscationSecret,
  703. doDTLSRandomization: prng.FlipCoin(),
  704. useMediaStreams: useMediaStreams,
  705. trafficShapingParameters: trafficShapingParameters,
  706. setNATType: func(NATType) {},
  707. setPortMappingTypes: func(PortMappingTypes) {},
  708. bindToDevice: func(int) error { return nil },
  709. // With STUN enabled (testDisableSTUN = false), there are cases
  710. // where the WebRTC peer connection is not successfully
  711. // established. With a short enough timeout here, clients will
  712. // redial and eventually succceed.
  713. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  714. }
  715. if isMobile {
  716. webRTCCoordinator.networkType = NetworkTypeMobile
  717. webRTCCoordinator.disableInboundForMobileNetworks = true
  718. }
  719. return webRTCCoordinator, nil
  720. }
  721. sharedBrokerClient, err := newClientBrokerClient(false)
  722. if err != nil {
  723. return errors.Trace(err)
  724. }
  725. sharedBrokerClientDisableWait, err := newClientBrokerClient(true)
  726. if err != nil {
  727. return errors.Trace(err)
  728. }
  729. for i := 0; i < numClients; i++ {
  730. // Test a mix of TCP and UDP proxying; also test the
  731. // DisableInboundForMobileNetworks code path.
  732. isTCP := i%2 == 0
  733. isMobile := i%4 == 0
  734. useMediaStreams := i%4 < 2
  735. // Exercise BrokerClients shared by multiple clients, but also create
  736. // several broker clients.
  737. var brokerClient *BrokerClient
  738. switch i % 3 {
  739. case 0:
  740. brokerClient = sharedBrokerClient
  741. case 1:
  742. brokerClient = sharedBrokerClientDisableWait
  743. case 2:
  744. brokerClient, err = newClientBrokerClient(true)
  745. if err != nil {
  746. return errors.Trace(err)
  747. }
  748. }
  749. webRTCCoordinator, err := newClientWebRTCDialCoordinator(
  750. isMobile, useMediaStreams)
  751. if err != nil {
  752. return errors.Trace(err)
  753. }
  754. clientsGroup.Go(
  755. makeClientFunc(
  756. i,
  757. isTCP,
  758. brokerClient,
  759. webRTCCoordinator))
  760. }
  761. if doMustUpgrade {
  762. // Await MustUpgrade callbacks
  763. logger.WithTrace().Info("AWAIT MUST UPGRADE")
  764. <-receivedProxyMustUpgrade
  765. <-receivedClientMustUpgrade
  766. _ = clientsGroup.Wait()
  767. } else {
  768. // Await client transfers complete
  769. logger.WithTrace().Info("AWAIT DATA TRANSFER")
  770. err = clientsGroup.Wait()
  771. if err != nil {
  772. return errors.Trace(err)
  773. }
  774. logger.WithTrace().Info("DONE DATA TRANSFER")
  775. if hasPendingBrokerServerReports() {
  776. return errors.TraceNew("unexpected pending broker server requests")
  777. }
  778. if hasPendingProxyTacticsCallbacks() {
  779. return errors.TraceNew("unexpected pending proxy tactics callback")
  780. }
  781. err = serverQualityGroup.Wait()
  782. if err != nil {
  783. return errors.Trace(err)
  784. }
  785. // Inspect the broker's proxy quality state, to verify that the proxy
  786. // quality request was processed.
  787. //
  788. // Limitation: currently we don't check the priority
  789. // announcement _queue_, as announcements may have arrived before the
  790. // quality request, and announcements are promoted between queues.
  791. serverQualityProxyIDsMutex.Lock()
  792. defer serverQualityProxyIDsMutex.Unlock()
  793. for proxyID := range serverQualityProxyIDs {
  794. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, "") {
  795. return errors.TraceNew("unexpected missing HasQuality (no client ASN)")
  796. }
  797. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, testClientASN) {
  798. return errors.TraceNew("unexpected missing HasQuality (with client ASN)")
  799. }
  800. }
  801. // TODO: check that elapsed time is consistent with rate limit (+/-)
  802. // Check if STUN server replay callbacks were triggered
  803. if !testDisableSTUN {
  804. if atomic.LoadInt32(&stunServerAddressSucceededCount) < 1 {
  805. return errors.TraceNew("unexpected STUN server succeeded count")
  806. }
  807. // Allow for some STUN server failures
  808. if atomic.LoadInt32(&stunServerAddressFailedCount) >= int32(numProxies/2) {
  809. return errors.TraceNew("unexpected STUN server failed count")
  810. }
  811. }
  812. // Check if RoundTripper server replay callbacks were triggered
  813. if atomic.LoadInt32(&roundTripperSucceededCount) < 1 {
  814. return errors.TraceNew("unexpected round tripper succeeded count")
  815. }
  816. if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
  817. return errors.TraceNew("unexpected round tripper failed count")
  818. }
  819. }
  820. // Await shutdowns
  821. stopTest()
  822. brokerListener.Close()
  823. err = testGroup.Wait()
  824. if err != nil {
  825. return errors.Trace(err)
  826. }
  827. return nil
  828. }
  829. func runHTTPServer(listener net.Listener, broker *Broker) error {
  830. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  831. // For this test, clients set the path to "/client" and proxies
  832. // set the path to "/proxy" and we use that to create stub GeoIP
  833. // data to pass the not-same-ASN condition.
  834. var geoIPData common.GeoIPData
  835. geoIPData.ASN = r.URL.Path
  836. requestPayload, err := ioutil.ReadAll(
  837. http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
  838. if err != nil {
  839. fmt.Printf("runHTTPServer ioutil.ReadAll failed: %v\n", err)
  840. http.Error(w, "", http.StatusNotFound)
  841. return
  842. }
  843. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
  844. extendTimeout := func(timeout time.Duration) {
  845. // TODO: set insufficient initial timeout, so extension is
  846. // required for success
  847. http.NewResponseController(w).SetWriteDeadline(time.Now().Add(timeout))
  848. }
  849. responsePayload, err := broker.HandleSessionPacket(
  850. r.Context(),
  851. extendTimeout,
  852. nil,
  853. clientIP,
  854. geoIPData,
  855. requestPayload)
  856. if err != nil {
  857. fmt.Printf("runHTTPServer HandleSessionPacket failed: %v\n", err)
  858. http.Error(w, "", http.StatusNotFound)
  859. return
  860. }
  861. w.WriteHeader(http.StatusOK)
  862. w.Write(responsePayload)
  863. })
  864. // WriteTimeout will be extended via extendTimeout.
  865. httpServer := &http.Server{
  866. ReadTimeout: 10 * time.Second,
  867. WriteTimeout: 10 * time.Second,
  868. IdleTimeout: 1 * time.Minute,
  869. Handler: handler,
  870. }
  871. certificate, privateKey, _, err := common.GenerateWebServerCertificate("www.example.com")
  872. if err != nil {
  873. return errors.Trace(err)
  874. }
  875. tlsCert, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
  876. if err != nil {
  877. return errors.Trace(err)
  878. }
  879. tlsConfig := &tls.Config{
  880. Certificates: []tls.Certificate{tlsCert},
  881. }
  882. err = httpServer.Serve(tls.NewListener(listener, tlsConfig))
  883. return errors.Trace(err)
  884. }
  885. type httpRoundTripper struct {
  886. httpClient *http.Client
  887. endpointAddr string
  888. path string
  889. }
  890. func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
  891. return &httpRoundTripper{
  892. httpClient: &http.Client{
  893. Transport: &http.Transport{
  894. ForceAttemptHTTP2: true,
  895. MaxIdleConns: 2,
  896. IdleConnTimeout: 1 * time.Minute,
  897. TLSHandshakeTimeout: 10 * time.Second,
  898. TLSClientConfig: &std_tls.Config{
  899. InsecureSkipVerify: true,
  900. },
  901. },
  902. },
  903. endpointAddr: endpointAddr,
  904. path: path,
  905. }
  906. }
  907. func (r *httpRoundTripper) RoundTrip(
  908. ctx context.Context,
  909. roundTripDelay time.Duration,
  910. roundTripTimeout time.Duration,
  911. requestPayload []byte) ([]byte, error) {
  912. if roundTripDelay > 0 {
  913. common.SleepWithContext(ctx, roundTripDelay)
  914. }
  915. requestCtx, requestCancelFunc := context.WithTimeout(ctx, roundTripTimeout)
  916. defer requestCancelFunc()
  917. url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
  918. request, err := http.NewRequestWithContext(
  919. requestCtx, "POST", url, bytes.NewReader(requestPayload))
  920. if err != nil {
  921. return nil, errors.Trace(err)
  922. }
  923. response, err := r.httpClient.Do(request)
  924. if err != nil {
  925. return nil, errors.Trace(err)
  926. }
  927. defer response.Body.Close()
  928. if response.StatusCode != http.StatusOK {
  929. return nil, errors.Tracef("unexpected response status code: %d", response.StatusCode)
  930. }
  931. responsePayload, err := io.ReadAll(response.Body)
  932. if err != nil {
  933. return nil, errors.Trace(err)
  934. }
  935. return responsePayload, nil
  936. }
  937. func (r *httpRoundTripper) Close() error {
  938. r.httpClient.CloseIdleConnections()
  939. return nil
  940. }
  941. func runTCPEchoServer(listener net.Listener) {
  942. for {
  943. conn, err := listener.Accept()
  944. if err != nil {
  945. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  946. return
  947. }
  948. go func(conn net.Conn) {
  949. buf := make([]byte, 32768)
  950. for {
  951. n, err := conn.Read(buf)
  952. if n > 0 {
  953. _, err = conn.Write(buf[:n])
  954. }
  955. if err != nil {
  956. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  957. return
  958. }
  959. }
  960. }(conn)
  961. }
  962. }
  963. type quicEchoServer struct {
  964. listener net.Listener
  965. obfuscationKey string
  966. }
  967. func newQuicEchoServer() (*quicEchoServer, error) {
  968. obfuscationKey := prng.HexString(32)
  969. listener, err := quic.Listen(
  970. nil,
  971. nil,
  972. "127.0.0.1:0",
  973. true,
  974. GetQUICMaxPacketSizeAdjustment(),
  975. obfuscationKey,
  976. false)
  977. if err != nil {
  978. return nil, errors.Trace(err)
  979. }
  980. return &quicEchoServer{
  981. listener: listener,
  982. obfuscationKey: obfuscationKey,
  983. }, nil
  984. }
  985. func (q *quicEchoServer) ObfuscationKey() string {
  986. return q.obfuscationKey
  987. }
  988. func (q *quicEchoServer) Close() error {
  989. return q.listener.Close()
  990. }
  991. func (q *quicEchoServer) Addr() net.Addr {
  992. return q.listener.Addr()
  993. }
  994. func (q *quicEchoServer) Run() {
  995. for {
  996. conn, err := q.listener.Accept()
  997. if err != nil {
  998. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  999. return
  1000. }
  1001. go func(conn net.Conn) {
  1002. buf := make([]byte, 32768)
  1003. for {
  1004. n, err := conn.Read(buf)
  1005. if n > 0 {
  1006. _, err = conn.Write(buf[:n])
  1007. }
  1008. if err != nil {
  1009. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  1010. return
  1011. }
  1012. }
  1013. }(conn)
  1014. }
  1015. }