inproxy_test.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243
  1. //go:build PSIPHON_ENABLE_INPROXY
  2. /*
  3. * Copyright (c) 2023, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package inproxy
  21. import (
  22. "bytes"
  23. "context"
  24. std_tls "crypto/tls"
  25. "encoding/base64"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "net"
  30. "net/http"
  31. _ "net/http/pprof"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "testing"
  37. "time"
  38. tls "github.com/Psiphon-Labs/psiphon-tls"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  44. "golang.org/x/sync/errgroup"
  45. )
  46. func TestInproxy(t *testing.T) {
  47. err := runTestInproxy(false)
  48. if err != nil {
  49. t.Error(errors.Trace(err).Error())
  50. }
  51. }
  52. func TestInproxyMustUpgrade(t *testing.T) {
  53. err := runTestInproxy(true)
  54. if err != nil {
  55. t.Error(errors.Trace(err).Error())
  56. }
  57. }
  58. func runTestInproxy(doMustUpgrade bool) error {
  59. // Note: use the environment variable PION_LOG_TRACE=all to emit WebRTC logging.
  60. numProxies := 5
  61. proxyMaxClients := 3
  62. numClients := 10
  63. bytesToSend := 1 << 20
  64. targetElapsedSeconds := 2
  65. baseAPIParameters := common.APIParameters{
  66. "sponsor_id": strings.ToUpper(prng.HexString(8)),
  67. "client_platform": "test-client-platform",
  68. }
  69. testCompartmentID, _ := MakeID()
  70. testCommonCompartmentIDs := []ID{testCompartmentID}
  71. testNetworkID := "NETWORK-ID-1"
  72. testNetworkType := NetworkTypeUnknown
  73. testNATType := NATTypeUnknown
  74. testSTUNServerAddress := "stun.nextcloud.com:443"
  75. testDisableSTUN := false
  76. testDisablePortMapping := false
  77. testNewTacticsPayload := []byte(prng.HexString(100))
  78. testNewTacticsTag := "new-tactics-tag"
  79. testUnchangedTacticsPayload := []byte(prng.HexString(100))
  80. currentNetworkCtx, currentNetworkCancelFunc := context.WithCancel(context.Background())
  81. defer currentNetworkCancelFunc()
  82. // TODO: test port mapping
  83. stunServerAddressSucceededCount := int32(0)
  84. stunServerAddressSucceeded := func(bool, string) { atomic.AddInt32(&stunServerAddressSucceededCount, 1) }
  85. stunServerAddressFailedCount := int32(0)
  86. stunServerAddressFailed := func(bool, string) { atomic.AddInt32(&stunServerAddressFailedCount, 1) }
  87. roundTripperSucceededCount := int32(0)
  88. roundTripperSucceded := func(RoundTripper) { atomic.AddInt32(&roundTripperSucceededCount, 1) }
  89. roundTripperFailedCount := int32(0)
  90. roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
  91. noMatch := func(RoundTripper) {}
  92. var receivedProxyMustUpgrade chan struct{}
  93. var receivedClientMustUpgrade chan struct{}
  94. if doMustUpgrade {
  95. receivedProxyMustUpgrade = make(chan struct{})
  96. receivedClientMustUpgrade = make(chan struct{})
  97. // trigger MustUpgrade
  98. minimumProxyProtocolVersion = LatestProtocolVersion + 1
  99. minimumClientProtocolVersion = LatestProtocolVersion + 1
  100. // Minimize test parameters for MustUpgrade case
  101. numProxies = 1
  102. proxyMaxClients = 1
  103. numClients = 1
  104. testDisableSTUN = true
  105. testDisablePortMapping = true
  106. }
  107. testCtx, stopTest := context.WithCancel(context.Background())
  108. defer stopTest()
  109. testGroup := new(errgroup.Group)
  110. // Enable test to run without requiring host firewall exceptions
  111. SetAllowBogonWebRTCConnections(true)
  112. defer SetAllowBogonWebRTCConnections(false)
  113. // Init logging and profiling
  114. logger := newTestLogger()
  115. pprofListener, err := net.Listen("tcp", "127.0.0.1:0")
  116. go http.Serve(pprofListener, nil)
  117. defer pprofListener.Close()
  118. logger.WithTrace().Info(fmt.Sprintf("PPROF: http://%s/debug/pprof", pprofListener.Addr()))
  119. // Start echo servers
  120. tcpEchoListener, err := net.Listen("tcp", "127.0.0.1:0")
  121. if err != nil {
  122. return errors.Trace(err)
  123. }
  124. defer tcpEchoListener.Close()
  125. go runTCPEchoServer(tcpEchoListener)
  126. // QUIC tests UDP proxying, and provides reliable delivery of echoed data
  127. quicEchoServer, err := newQuicEchoServer()
  128. if err != nil {
  129. return errors.Trace(err)
  130. }
  131. defer quicEchoServer.Close()
  132. go quicEchoServer.Run()
  133. // Create signed server entry with capability
  134. serverPrivateKey, err := GenerateSessionPrivateKey()
  135. if err != nil {
  136. return errors.Trace(err)
  137. }
  138. serverPublicKey, err := serverPrivateKey.GetPublicKey()
  139. if err != nil {
  140. return errors.Trace(err)
  141. }
  142. serverRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  143. if err != nil {
  144. return errors.Trace(err)
  145. }
  146. serverEntry := make(protocol.ServerEntryFields)
  147. serverEntry["ipAddress"] = "127.0.0.1"
  148. _, tcpPort, _ := net.SplitHostPort(tcpEchoListener.Addr().String())
  149. _, udpPort, _ := net.SplitHostPort(quicEchoServer.Addr().String())
  150. serverEntry["inproxyOSSHPort"], _ = strconv.Atoi(tcpPort)
  151. serverEntry["inproxyQUICPort"], _ = strconv.Atoi(udpPort)
  152. serverEntry["capabilities"] = []string{"INPROXY-WEBRTC-OSSH", "INPROXY-WEBRTC-QUIC-OSSH"}
  153. serverEntry["inproxySessionPublicKey"] = base64.RawStdEncoding.EncodeToString(serverPublicKey[:])
  154. serverEntry["inproxySessionRootObfuscationSecret"] = base64.RawStdEncoding.EncodeToString(serverRootObfuscationSecret[:])
  155. testServerEntryTag := prng.HexString(16)
  156. serverEntry["tag"] = testServerEntryTag
  157. serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey, err :=
  158. protocol.NewServerEntrySignatureKeyPair()
  159. if err != nil {
  160. return errors.Trace(err)
  161. }
  162. err = serverEntry.AddSignature(serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey)
  163. if err != nil {
  164. return errors.Trace(err)
  165. }
  166. packedServerEntryFields, err := protocol.EncodePackedServerEntryFields(serverEntry)
  167. if err != nil {
  168. return errors.Trace(err)
  169. }
  170. packedDestinationServerEntry, err := protocol.CBOREncoding.Marshal(packedServerEntryFields)
  171. if err != nil {
  172. return errors.Trace(err)
  173. }
  174. // API parameter handlers
  175. apiParameterValidator := func(params common.APIParameters) error {
  176. if len(params) != len(baseAPIParameters) {
  177. return errors.TraceNew("unexpected base API parameter count")
  178. }
  179. for name, value := range params {
  180. if value.(string) != baseAPIParameters[name].(string) {
  181. return errors.Tracef(
  182. "unexpected base API parameter: %v: %v != %v",
  183. name,
  184. value.(string),
  185. baseAPIParameters[name].(string))
  186. }
  187. }
  188. return nil
  189. }
  190. apiParameterLogFieldFormatter := func(
  191. _ string, _ common.GeoIPData, params common.APIParameters) common.LogFields {
  192. logFields := common.LogFields{}
  193. logFields.Add(common.LogFields(params))
  194. return logFields
  195. }
  196. // Start broker
  197. logger.WithTrace().Info("START BROKER")
  198. brokerPrivateKey, err := GenerateSessionPrivateKey()
  199. if err != nil {
  200. return errors.Trace(err)
  201. }
  202. brokerPublicKey, err := brokerPrivateKey.GetPublicKey()
  203. if err != nil {
  204. return errors.Trace(err)
  205. }
  206. brokerRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  207. if err != nil {
  208. return errors.Trace(err)
  209. }
  210. brokerListener, err := net.Listen("tcp", "127.0.0.1:0")
  211. if err != nil {
  212. return errors.Trace(err)
  213. }
  214. defer brokerListener.Close()
  215. brokerConfig := &BrokerConfig{
  216. Logger: logger,
  217. CommonCompartmentIDs: testCommonCompartmentIDs,
  218. APIParameterValidator: apiParameterValidator,
  219. APIParameterLogFieldFormatter: apiParameterLogFieldFormatter,
  220. GetTacticsPayload: func(_ common.GeoIPData, _ common.APIParameters) ([]byte, string, error) {
  221. // Exercise both new and unchanged tactics
  222. if prng.FlipCoin() {
  223. return testNewTacticsPayload, testNewTacticsTag, nil
  224. }
  225. return testUnchangedTacticsPayload, "", nil
  226. },
  227. IsValidServerEntryTag: func(serverEntryTag string) bool { return serverEntryTag == testServerEntryTag },
  228. PrivateKey: brokerPrivateKey,
  229. ObfuscationRootSecret: brokerRootObfuscationSecret,
  230. ServerEntrySignaturePublicKey: serverEntrySignaturePublicKey,
  231. AllowProxy: func(common.GeoIPData) bool { return true },
  232. AllowClient: func(common.GeoIPData) bool { return true },
  233. AllowDomainFrontedDestinations: func(common.GeoIPData) bool { return true },
  234. }
  235. broker, err := NewBroker(brokerConfig)
  236. if err != nil {
  237. return errors.Trace(err)
  238. }
  239. // Enable proxy quality (and otherwise use the default quality parameters)
  240. enableProxyQuality := true
  241. broker.SetProxyQualityParameters(
  242. enableProxyQuality,
  243. proxyQualityTTL,
  244. proxyQualityPendingFailedMatchDeadline,
  245. proxyQualityFailedMatchThreshold)
  246. err = broker.Start()
  247. if err != nil {
  248. return errors.Trace(err)
  249. }
  250. defer broker.Stop()
  251. testGroup.Go(func() error {
  252. err := runHTTPServer(brokerListener, broker)
  253. if testCtx.Err() != nil {
  254. return nil
  255. }
  256. return errors.Trace(err)
  257. })
  258. // Stub server broker request handler (in Psiphon, this will be the
  259. // destination Psiphon server; here, it's not necessary to build this
  260. // handler into the destination echo server)
  261. //
  262. // The stub server broker request handler also triggers a server proxy
  263. // quality request in the other direction.
  264. makeServerBrokerClientRoundTripper := func(_ SessionPublicKey) (
  265. RoundTripper, common.APIParameters, error) {
  266. return newHTTPRoundTripper(brokerListener.Addr().String(), "server"), nil, nil
  267. }
  268. serverSessionsConfig := &ServerBrokerSessionsConfig{
  269. Logger: logger,
  270. ServerPrivateKey: serverPrivateKey,
  271. ServerRootObfuscationSecret: serverRootObfuscationSecret,
  272. BrokerPublicKeys: []SessionPublicKey{brokerPublicKey},
  273. BrokerRootObfuscationSecrets: []ObfuscationSecret{brokerRootObfuscationSecret},
  274. BrokerRoundTripperMaker: makeServerBrokerClientRoundTripper,
  275. ProxyMetricsValidator: apiParameterValidator,
  276. ProxyMetricsFormatter: apiParameterLogFieldFormatter,
  277. ProxyMetricsPrefix: "",
  278. }
  279. serverSessions, err := NewServerBrokerSessions(serverSessionsConfig)
  280. if err != nil {
  281. return errors.Trace(err)
  282. }
  283. err = serverSessions.Start()
  284. if err != nil {
  285. return errors.Trace(err)
  286. }
  287. defer serverSessions.Stop()
  288. // Don't delay reporting quality.
  289. serverSessions.SetProxyQualityRequestParameters(
  290. proxyQualityReporterMaxRequestEntries,
  291. 0,
  292. proxyQualityReporterRequestTimeout,
  293. proxyQualityReporterRequestRetries)
  294. var pendingBrokerServerReportsMutex sync.Mutex
  295. pendingBrokerServerReports := make(map[ID]bool)
  296. addPendingBrokerServerReport := func(connectionID ID) {
  297. pendingBrokerServerReportsMutex.Lock()
  298. defer pendingBrokerServerReportsMutex.Unlock()
  299. pendingBrokerServerReports[connectionID] = true
  300. }
  301. removePendingBrokerServerReport := func(connectionID ID) {
  302. pendingBrokerServerReportsMutex.Lock()
  303. defer pendingBrokerServerReportsMutex.Unlock()
  304. delete(pendingBrokerServerReports, connectionID)
  305. }
  306. hasPendingBrokerServerReports := func() bool {
  307. pendingBrokerServerReportsMutex.Lock()
  308. defer pendingBrokerServerReportsMutex.Unlock()
  309. return len(pendingBrokerServerReports) > 0
  310. }
  311. serverQualityGroup := new(errgroup.Group)
  312. var serverQualityProxyIDsMutex sync.Mutex
  313. serverQualityProxyIDs := make(map[ID]struct{})
  314. testProxyASN := "65537"
  315. testClientASN := "65538"
  316. handleBrokerServerReports := func(in []byte, clientConnectionID ID) ([]byte, error) {
  317. handler := func(
  318. brokerVerifiedOriginalClientIP string,
  319. brokerReportedProxyID ID,
  320. brokerMatchedPersonalCompartments bool,
  321. logFields common.LogFields) {
  322. // Mark the report as no longer outstanding
  323. removePendingBrokerServerReport(clientConnectionID)
  324. // Trigger an asynchronous proxy quality request to the broker.
  325. // This roughly follows the Psiphon server functionality, where a
  326. // quality request is made sometime after the Psiphon handshake
  327. // completes, once tunnel quality thresholds are achieved.
  328. serverQualityGroup.Go(func() error {
  329. serverSessions.ReportQuality(
  330. brokerReportedProxyID, testProxyASN, testClientASN)
  331. serverQualityProxyIDsMutex.Lock()
  332. serverQualityProxyIDs[brokerReportedProxyID] = struct{}{}
  333. serverQualityProxyIDsMutex.Unlock()
  334. return nil
  335. })
  336. }
  337. out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
  338. return out, errors.Trace(err)
  339. }
  340. // Check that the tactics round trip succeeds
  341. var pendingProxyTacticsCallbacksMutex sync.Mutex
  342. pendingProxyTacticsCallbacks := make(map[SessionPrivateKey]bool)
  343. addPendingProxyTacticsCallback := func(proxyPrivateKey SessionPrivateKey) {
  344. pendingProxyTacticsCallbacksMutex.Lock()
  345. defer pendingProxyTacticsCallbacksMutex.Unlock()
  346. pendingProxyTacticsCallbacks[proxyPrivateKey] = true
  347. }
  348. hasPendingProxyTacticsCallbacks := func() bool {
  349. pendingProxyTacticsCallbacksMutex.Lock()
  350. defer pendingProxyTacticsCallbacksMutex.Unlock()
  351. return len(pendingProxyTacticsCallbacks) > 0
  352. }
  353. makeHandleTacticsPayload := func(
  354. proxyPrivateKey SessionPrivateKey,
  355. tacticsNetworkID string) func(_ string, _ []byte) bool {
  356. return func(networkID string, tacticsPayload []byte) bool {
  357. pendingProxyTacticsCallbacksMutex.Lock()
  358. defer pendingProxyTacticsCallbacksMutex.Unlock()
  359. // Check that the correct networkID is passed around; if not,
  360. // skip the delete, which will fail the test
  361. if networkID == tacticsNetworkID {
  362. // Certain state is reset when new tactics are applied -- the
  363. // return true case; exercise both cases
  364. if bytes.Equal(tacticsPayload, testNewTacticsPayload) {
  365. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  366. return true
  367. }
  368. if bytes.Equal(tacticsPayload, testUnchangedTacticsPayload) {
  369. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  370. return false
  371. }
  372. }
  373. panic("unexpected tactics payload")
  374. }
  375. }
  376. // Start proxies
  377. logger.WithTrace().Info("START PROXIES")
  378. for i := 0; i < numProxies; i++ {
  379. proxyPrivateKey, err := GenerateSessionPrivateKey()
  380. if err != nil {
  381. return errors.Trace(err)
  382. }
  383. brokerCoordinator := &testBrokerDialCoordinator{
  384. networkID: testNetworkID,
  385. networkType: testNetworkType,
  386. brokerClientPrivateKey: proxyPrivateKey,
  387. brokerPublicKey: brokerPublicKey,
  388. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  389. brokerClientRoundTripper: newHTTPRoundTripper(
  390. brokerListener.Addr().String(), "proxy"),
  391. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  392. brokerClientRoundTripperFailed: roundTripperFailed,
  393. // Minimize the delay before proxies reannounce after dial
  394. // failures, which may occur.
  395. announceDelay: 0,
  396. announceMaxBackoffDelay: 0,
  397. announceDelayJitter: 0.0,
  398. }
  399. webRTCCoordinator := &testWebRTCDialCoordinator{
  400. networkID: testNetworkID,
  401. networkType: testNetworkType,
  402. natType: testNATType,
  403. disableSTUN: testDisableSTUN,
  404. disablePortMapping: testDisablePortMapping,
  405. stunServerAddress: testSTUNServerAddress,
  406. stunServerAddressRFC5780: testSTUNServerAddress,
  407. stunServerAddressSucceeded: stunServerAddressSucceeded,
  408. stunServerAddressFailed: stunServerAddressFailed,
  409. setNATType: func(NATType) {},
  410. setPortMappingTypes: func(PortMappingTypes) {},
  411. bindToDevice: func(int) error { return nil },
  412. // Minimize the delay before proxies reannounce after failed
  413. // connections, which may occur.
  414. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  415. proxyRelayInactivityTimeout: 5 * time.Second,
  416. }
  417. // Each proxy has its own broker client
  418. brokerClient, err := NewBrokerClient(brokerCoordinator)
  419. if err != nil {
  420. return errors.Trace(err)
  421. }
  422. tacticsNetworkID := prng.HexString(32)
  423. runCtx, cancelRun := context.WithCancel(testCtx)
  424. // No deferred cancelRun due to testGroup.Go below
  425. name := fmt.Sprintf("proxy-%d", i)
  426. proxy, err := NewProxy(&ProxyConfig{
  427. Logger: newTestLoggerWithComponent(name),
  428. WaitForNetworkConnectivity: func() bool {
  429. return true
  430. },
  431. GetCurrentNetworkContext: func() context.Context {
  432. return currentNetworkCtx
  433. },
  434. GetBrokerClient: func() (*BrokerClient, error) {
  435. return brokerClient, nil
  436. },
  437. GetBaseAPIParameters: func(bool) (common.APIParameters, string, error) {
  438. return baseAPIParameters, tacticsNetworkID, nil
  439. },
  440. MakeWebRTCDialCoordinator: func() (WebRTCDialCoordinator, error) {
  441. return webRTCCoordinator, nil
  442. },
  443. HandleTacticsPayload: makeHandleTacticsPayload(proxyPrivateKey, tacticsNetworkID),
  444. MaxClients: proxyMaxClients,
  445. LimitUpstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  446. LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  447. ActivityUpdater: func(connectingClients int32, connectedClients int32,
  448. bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
  449. fmt.Printf("[%s][%s] ACTIVITY: %d connecting, %d connected, %d up, %d down\n",
  450. time.Now().UTC().Format(time.RFC3339), name,
  451. connectingClients, connectedClients, bytesUp, bytesDown)
  452. },
  453. MustUpgrade: func() {
  454. close(receivedProxyMustUpgrade)
  455. cancelRun()
  456. },
  457. })
  458. if err != nil {
  459. return errors.Trace(err)
  460. }
  461. addPendingProxyTacticsCallback(proxyPrivateKey)
  462. testGroup.Go(func() error {
  463. proxy.Run(runCtx)
  464. return nil
  465. })
  466. }
  467. // Await proxy announcements before starting clients
  468. //
  469. // - Announcements may delay due to proxyAnnounceRetryDelay in Proxy.Run,
  470. // plus NAT discovery
  471. //
  472. // - Don't wait for > numProxies announcements due to
  473. // InitiatorSessions.NewRoundTrip waitToShareSession limitation
  474. if !doMustUpgrade {
  475. for {
  476. time.Sleep(100 * time.Millisecond)
  477. broker.matcher.announcementQueueMutex.Lock()
  478. n := broker.matcher.announcementQueue.getLen()
  479. broker.matcher.announcementQueueMutex.Unlock()
  480. if n >= numProxies {
  481. break
  482. }
  483. }
  484. }
  485. // Start clients
  486. var completedClientCount atomic.Int64
  487. logger.WithTrace().Info("START CLIENTS")
  488. clientsGroup := new(errgroup.Group)
  489. makeClientFunc := func(
  490. clientNum int,
  491. isTCP bool,
  492. brokerClient *BrokerClient,
  493. webRTCCoordinator WebRTCDialCoordinator) func() error {
  494. var networkProtocol NetworkProtocol
  495. var addr string
  496. var wrapWithQUIC bool
  497. if isTCP {
  498. networkProtocol = NetworkProtocolTCP
  499. addr = tcpEchoListener.Addr().String()
  500. } else {
  501. networkProtocol = NetworkProtocolUDP
  502. addr = quicEchoServer.Addr().String()
  503. wrapWithQUIC = true
  504. }
  505. return func() error {
  506. name := fmt.Sprintf("client-%d", clientNum)
  507. dialCtx, cancelDial := context.WithTimeout(testCtx, 60*time.Second)
  508. defer cancelDial()
  509. conn, err := DialClient(
  510. dialCtx,
  511. &ClientConfig{
  512. Logger: newTestLoggerWithComponent(name),
  513. BaseAPIParameters: baseAPIParameters,
  514. BrokerClient: brokerClient,
  515. WebRTCDialCoordinator: webRTCCoordinator,
  516. ReliableTransport: isTCP,
  517. DialNetworkProtocol: networkProtocol,
  518. DialAddress: addr,
  519. PackedDestinationServerEntry: packedDestinationServerEntry,
  520. MustUpgrade: func() {
  521. close(receivedClientMustUpgrade)
  522. cancelDial()
  523. },
  524. })
  525. if err != nil {
  526. return errors.Trace(err)
  527. }
  528. var relayConn net.Conn
  529. relayConn = conn
  530. if wrapWithQUIC {
  531. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  532. if err != nil {
  533. return errors.Trace(err)
  534. }
  535. disablePathMTUDiscovery := true
  536. quicConn, err := quic.Dial(
  537. dialCtx,
  538. conn,
  539. udpAddr,
  540. "test",
  541. "QUICv1",
  542. nil,
  543. quicEchoServer.ObfuscationKey(),
  544. nil,
  545. nil,
  546. disablePathMTUDiscovery,
  547. GetQUICMaxPacketSizeAdjustment(),
  548. false,
  549. false,
  550. common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), ""),
  551. )
  552. if err != nil {
  553. return errors.Trace(err)
  554. }
  555. relayConn = quicConn
  556. }
  557. addPendingBrokerServerReport(conn.GetConnectionID())
  558. signalRelayComplete := make(chan struct{})
  559. clientsGroup.Go(func() error {
  560. defer close(signalRelayComplete)
  561. in := conn.InitialRelayPacket()
  562. for in != nil {
  563. out, err := handleBrokerServerReports(in, conn.GetConnectionID())
  564. if err != nil {
  565. if out == nil {
  566. return errors.Trace(err)
  567. } else {
  568. fmt.Printf("HandlePacket returned packet and error: %v\n", err)
  569. // Proceed with reset session token packet
  570. }
  571. }
  572. if out == nil {
  573. // Relay is complete
  574. break
  575. }
  576. in, err = conn.RelayPacket(testCtx, out)
  577. if err != nil {
  578. return errors.Trace(err)
  579. }
  580. }
  581. return nil
  582. })
  583. sendBytes := prng.Bytes(bytesToSend)
  584. clientsGroup.Go(func() error {
  585. for n := 0; n < bytesToSend; {
  586. m := prng.Range(1024, 32768)
  587. if bytesToSend-n < m {
  588. m = bytesToSend - n
  589. }
  590. _, err := relayConn.Write(sendBytes[n : n+m])
  591. if err != nil {
  592. return errors.Trace(err)
  593. }
  594. n += m
  595. }
  596. fmt.Printf("[%s][%s] %d bytes sent\n",
  597. time.Now().UTC().Format(time.RFC3339), name, bytesToSend)
  598. return nil
  599. })
  600. clientsGroup.Go(func() error {
  601. buf := make([]byte, 32768)
  602. n := 0
  603. for n < bytesToSend {
  604. m, err := relayConn.Read(buf)
  605. if err != nil {
  606. return errors.Trace(err)
  607. }
  608. if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
  609. return errors.Tracef(
  610. "unexpected bytes: expected at index %d, received at index %d",
  611. bytes.Index(sendBytes, buf[:m]), n)
  612. }
  613. n += m
  614. }
  615. completed := completedClientCount.Add(1)
  616. fmt.Printf("[%s][%s] %d bytes received; relay complete (%d/%d)\n",
  617. time.Now().UTC().Format(time.RFC3339), name,
  618. bytesToSend, completed, numClients)
  619. select {
  620. case <-signalRelayComplete:
  621. case <-testCtx.Done():
  622. }
  623. fmt.Printf("[%s][%s] closing\n",
  624. time.Now().UTC().Format(time.RFC3339), name)
  625. relayConn.Close()
  626. conn.Close()
  627. return nil
  628. })
  629. return nil
  630. }
  631. }
  632. newClientBrokerClient := func(
  633. disableWaitToShareSession bool) (*BrokerClient, error) {
  634. clientPrivateKey, err := GenerateSessionPrivateKey()
  635. if err != nil {
  636. return nil, errors.Trace(err)
  637. }
  638. brokerCoordinator := &testBrokerDialCoordinator{
  639. networkID: testNetworkID,
  640. networkType: testNetworkType,
  641. commonCompartmentIDs: testCommonCompartmentIDs,
  642. disableWaitToShareSession: disableWaitToShareSession,
  643. brokerClientPrivateKey: clientPrivateKey,
  644. brokerPublicKey: brokerPublicKey,
  645. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  646. brokerClientRoundTripper: newHTTPRoundTripper(
  647. brokerListener.Addr().String(), "client"),
  648. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  649. brokerClientRoundTripperFailed: roundTripperFailed,
  650. brokerClientNoMatch: noMatch,
  651. }
  652. brokerClient, err := NewBrokerClient(brokerCoordinator)
  653. if err != nil {
  654. return nil, errors.Trace(err)
  655. }
  656. return brokerClient, nil
  657. }
  658. newClientWebRTCDialCoordinator := func(
  659. isMobile bool,
  660. useMediaStreams bool) (*testWebRTCDialCoordinator, error) {
  661. clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  662. if err != nil {
  663. return nil, errors.Trace(err)
  664. }
  665. var trafficShapingParameters *TrafficShapingParameters
  666. if useMediaStreams {
  667. trafficShapingParameters = &TrafficShapingParameters{
  668. MinPaddedMessages: 0,
  669. MaxPaddedMessages: 10,
  670. MinPaddingSize: 0,
  671. MaxPaddingSize: 254,
  672. MinDecoyMessages: 0,
  673. MaxDecoyMessages: 10,
  674. MinDecoySize: 1,
  675. MaxDecoySize: 1200,
  676. DecoyMessageProbability: 0.5,
  677. }
  678. } else {
  679. trafficShapingParameters = &TrafficShapingParameters{
  680. MinPaddedMessages: 0,
  681. MaxPaddedMessages: 10,
  682. MinPaddingSize: 0,
  683. MaxPaddingSize: 1500,
  684. MinDecoyMessages: 0,
  685. MaxDecoyMessages: 10,
  686. MinDecoySize: 1,
  687. MaxDecoySize: 1500,
  688. DecoyMessageProbability: 0.5,
  689. }
  690. }
  691. webRTCCoordinator := &testWebRTCDialCoordinator{
  692. networkID: testNetworkID,
  693. networkType: testNetworkType,
  694. natType: testNATType,
  695. disableSTUN: testDisableSTUN,
  696. stunServerAddress: testSTUNServerAddress,
  697. stunServerAddressRFC5780: testSTUNServerAddress,
  698. stunServerAddressSucceeded: stunServerAddressSucceeded,
  699. stunServerAddressFailed: stunServerAddressFailed,
  700. clientRootObfuscationSecret: clientRootObfuscationSecret,
  701. doDTLSRandomization: prng.FlipCoin(),
  702. useMediaStreams: useMediaStreams,
  703. trafficShapingParameters: trafficShapingParameters,
  704. setNATType: func(NATType) {},
  705. setPortMappingTypes: func(PortMappingTypes) {},
  706. bindToDevice: func(int) error { return nil },
  707. // With STUN enabled (testDisableSTUN = false), there are cases
  708. // where the WebRTC peer connection is not successfully
  709. // established. With a short enough timeout here, clients will
  710. // redial and eventually succceed.
  711. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  712. }
  713. if isMobile {
  714. webRTCCoordinator.networkType = NetworkTypeMobile
  715. webRTCCoordinator.disableInboundForMobileNetworks = true
  716. }
  717. return webRTCCoordinator, nil
  718. }
  719. sharedBrokerClient, err := newClientBrokerClient(false)
  720. if err != nil {
  721. return errors.Trace(err)
  722. }
  723. sharedBrokerClientDisableWait, err := newClientBrokerClient(true)
  724. if err != nil {
  725. return errors.Trace(err)
  726. }
  727. for i := 0; i < numClients; i++ {
  728. // Test a mix of TCP and UDP proxying; also test the
  729. // DisableInboundForMobileNetworks code path.
  730. isTCP := i%2 == 0
  731. isMobile := i%4 == 0
  732. useMediaStreams := i%4 < 2
  733. // Exercise BrokerClients shared by multiple clients, but also create
  734. // several broker clients.
  735. var brokerClient *BrokerClient
  736. switch i % 3 {
  737. case 0:
  738. brokerClient = sharedBrokerClient
  739. case 1:
  740. brokerClient = sharedBrokerClientDisableWait
  741. case 2:
  742. brokerClient, err = newClientBrokerClient(true)
  743. if err != nil {
  744. return errors.Trace(err)
  745. }
  746. }
  747. webRTCCoordinator, err := newClientWebRTCDialCoordinator(
  748. isMobile, useMediaStreams)
  749. if err != nil {
  750. return errors.Trace(err)
  751. }
  752. clientsGroup.Go(
  753. makeClientFunc(
  754. i,
  755. isTCP,
  756. brokerClient,
  757. webRTCCoordinator))
  758. }
  759. if doMustUpgrade {
  760. // Await MustUpgrade callbacks
  761. logger.WithTrace().Info("AWAIT MUST UPGRADE")
  762. <-receivedProxyMustUpgrade
  763. <-receivedClientMustUpgrade
  764. _ = clientsGroup.Wait()
  765. } else {
  766. // Await client transfers complete
  767. logger.WithTrace().Info("AWAIT DATA TRANSFER")
  768. err = clientsGroup.Wait()
  769. if err != nil {
  770. return errors.Trace(err)
  771. }
  772. logger.WithTrace().Info("DONE DATA TRANSFER")
  773. if hasPendingBrokerServerReports() {
  774. return errors.TraceNew("unexpected pending broker server requests")
  775. }
  776. if hasPendingProxyTacticsCallbacks() {
  777. return errors.TraceNew("unexpected pending proxy tactics callback")
  778. }
  779. err = serverQualityGroup.Wait()
  780. if err != nil {
  781. return errors.Trace(err)
  782. }
  783. // Inspect the broker's proxy quality state, to verify that the proxy
  784. // quality request was processed.
  785. //
  786. // Limitation: currently we don't check the priority
  787. // announcement _queue_, as announcements may have arrived before the
  788. // quality request, and announcements are promoted between queues.
  789. serverQualityProxyIDsMutex.Lock()
  790. defer serverQualityProxyIDsMutex.Unlock()
  791. for proxyID := range serverQualityProxyIDs {
  792. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, "") {
  793. return errors.TraceNew("unexpected missing HasQuality (no client ASN)")
  794. }
  795. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, testClientASN) {
  796. return errors.TraceNew("unexpected missing HasQuality (with client ASN)")
  797. }
  798. }
  799. // TODO: check that elapsed time is consistent with rate limit (+/-)
  800. // Check if STUN server replay callbacks were triggered
  801. if !testDisableSTUN {
  802. if atomic.LoadInt32(&stunServerAddressSucceededCount) < 1 {
  803. return errors.TraceNew("unexpected STUN server succeeded count")
  804. }
  805. // Allow for some STUN server failures
  806. if atomic.LoadInt32(&stunServerAddressFailedCount) >= int32(numProxies/2) {
  807. return errors.TraceNew("unexpected STUN server failed count")
  808. }
  809. }
  810. // Check if RoundTripper server replay callbacks were triggered
  811. if atomic.LoadInt32(&roundTripperSucceededCount) < 1 {
  812. return errors.TraceNew("unexpected round tripper succeeded count")
  813. }
  814. if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
  815. return errors.TraceNew("unexpected round tripper failed count")
  816. }
  817. }
  818. // Await shutdowns
  819. stopTest()
  820. brokerListener.Close()
  821. err = testGroup.Wait()
  822. if err != nil {
  823. return errors.Trace(err)
  824. }
  825. return nil
  826. }
  827. func runHTTPServer(listener net.Listener, broker *Broker) error {
  828. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  829. // For this test, clients set the path to "/client" and proxies
  830. // set the path to "/proxy" and we use that to create stub GeoIP
  831. // data to pass the not-same-ASN condition.
  832. var geoIPData common.GeoIPData
  833. geoIPData.ASN = r.URL.Path
  834. requestPayload, err := ioutil.ReadAll(
  835. http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
  836. if err != nil {
  837. fmt.Printf("runHTTPServer ioutil.ReadAll failed: %v\n", err)
  838. http.Error(w, "", http.StatusNotFound)
  839. return
  840. }
  841. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
  842. extendTimeout := func(timeout time.Duration) {
  843. // TODO: set insufficient initial timeout, so extension is
  844. // required for success
  845. http.NewResponseController(w).SetWriteDeadline(time.Now().Add(timeout))
  846. }
  847. responsePayload, err := broker.HandleSessionPacket(
  848. r.Context(),
  849. extendTimeout,
  850. nil,
  851. clientIP,
  852. geoIPData,
  853. requestPayload)
  854. if err != nil {
  855. fmt.Printf("runHTTPServer HandleSessionPacket failed: %v\n", err)
  856. http.Error(w, "", http.StatusNotFound)
  857. return
  858. }
  859. w.WriteHeader(http.StatusOK)
  860. w.Write(responsePayload)
  861. })
  862. // WriteTimeout will be extended via extendTimeout.
  863. httpServer := &http.Server{
  864. ReadTimeout: 10 * time.Second,
  865. WriteTimeout: 10 * time.Second,
  866. IdleTimeout: 1 * time.Minute,
  867. Handler: handler,
  868. }
  869. certificate, privateKey, _, err := common.GenerateWebServerCertificate("www.example.com")
  870. if err != nil {
  871. return errors.Trace(err)
  872. }
  873. tlsCert, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
  874. if err != nil {
  875. return errors.Trace(err)
  876. }
  877. tlsConfig := &tls.Config{
  878. Certificates: []tls.Certificate{tlsCert},
  879. }
  880. err = httpServer.Serve(tls.NewListener(listener, tlsConfig))
  881. return errors.Trace(err)
  882. }
  883. type httpRoundTripper struct {
  884. httpClient *http.Client
  885. endpointAddr string
  886. path string
  887. }
  888. func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
  889. return &httpRoundTripper{
  890. httpClient: &http.Client{
  891. Transport: &http.Transport{
  892. ForceAttemptHTTP2: true,
  893. MaxIdleConns: 2,
  894. IdleConnTimeout: 1 * time.Minute,
  895. TLSHandshakeTimeout: 10 * time.Second,
  896. TLSClientConfig: &std_tls.Config{
  897. InsecureSkipVerify: true,
  898. },
  899. },
  900. },
  901. endpointAddr: endpointAddr,
  902. path: path,
  903. }
  904. }
  905. func (r *httpRoundTripper) RoundTrip(
  906. ctx context.Context,
  907. roundTripDelay time.Duration,
  908. roundTripTimeout time.Duration,
  909. requestPayload []byte) ([]byte, error) {
  910. if roundTripDelay > 0 {
  911. common.SleepWithContext(ctx, roundTripDelay)
  912. }
  913. requestCtx, requestCancelFunc := context.WithTimeout(ctx, roundTripTimeout)
  914. defer requestCancelFunc()
  915. url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
  916. request, err := http.NewRequestWithContext(
  917. requestCtx, "POST", url, bytes.NewReader(requestPayload))
  918. if err != nil {
  919. return nil, errors.Trace(err)
  920. }
  921. response, err := r.httpClient.Do(request)
  922. if err != nil {
  923. return nil, errors.Trace(err)
  924. }
  925. defer response.Body.Close()
  926. if response.StatusCode != http.StatusOK {
  927. return nil, errors.Tracef("unexpected response status code: %d", response.StatusCode)
  928. }
  929. responsePayload, err := io.ReadAll(response.Body)
  930. if err != nil {
  931. return nil, errors.Trace(err)
  932. }
  933. return responsePayload, nil
  934. }
  935. func (r *httpRoundTripper) Close() error {
  936. r.httpClient.CloseIdleConnections()
  937. return nil
  938. }
  939. func runTCPEchoServer(listener net.Listener) {
  940. for {
  941. conn, err := listener.Accept()
  942. if err != nil {
  943. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  944. return
  945. }
  946. go func(conn net.Conn) {
  947. buf := make([]byte, 32768)
  948. for {
  949. n, err := conn.Read(buf)
  950. if n > 0 {
  951. _, err = conn.Write(buf[:n])
  952. }
  953. if err != nil {
  954. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  955. return
  956. }
  957. }
  958. }(conn)
  959. }
  960. }
  961. type quicEchoServer struct {
  962. listener net.Listener
  963. obfuscationKey string
  964. }
  965. func newQuicEchoServer() (*quicEchoServer, error) {
  966. obfuscationKey := prng.HexString(32)
  967. listener, err := quic.Listen(
  968. nil,
  969. nil,
  970. "127.0.0.1:0",
  971. true,
  972. GetQUICMaxPacketSizeAdjustment(),
  973. obfuscationKey,
  974. false)
  975. if err != nil {
  976. return nil, errors.Trace(err)
  977. }
  978. return &quicEchoServer{
  979. listener: listener,
  980. obfuscationKey: obfuscationKey,
  981. }, nil
  982. }
  983. func (q *quicEchoServer) ObfuscationKey() string {
  984. return q.obfuscationKey
  985. }
  986. func (q *quicEchoServer) Close() error {
  987. return q.listener.Close()
  988. }
  989. func (q *quicEchoServer) Addr() net.Addr {
  990. return q.listener.Addr()
  991. }
  992. func (q *quicEchoServer) Run() {
  993. for {
  994. conn, err := q.listener.Accept()
  995. if err != nil {
  996. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  997. return
  998. }
  999. go func(conn net.Conn) {
  1000. buf := make([]byte, 32768)
  1001. for {
  1002. n, err := conn.Read(buf)
  1003. if n > 0 {
  1004. _, err = conn.Write(buf[:n])
  1005. }
  1006. if err != nil {
  1007. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  1008. return
  1009. }
  1010. }
  1011. }(conn)
  1012. }
  1013. }