inproxy_test.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247
  1. //go:build PSIPHON_ENABLE_INPROXY
  2. /*
  3. * Copyright (c) 2023, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package inproxy
  21. import (
  22. "bytes"
  23. "context"
  24. std_tls "crypto/tls"
  25. "encoding/base64"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "net"
  30. "net/http"
  31. _ "net/http/pprof"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "testing"
  37. "time"
  38. tls "github.com/Psiphon-Labs/psiphon-tls"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/internal/testutils"
  45. "golang.org/x/sync/errgroup"
  46. )
  47. func TestInproxy(t *testing.T) {
  48. err := runTestInproxy(false)
  49. if err != nil {
  50. t.Error(errors.Trace(err).Error())
  51. }
  52. }
  53. func TestInproxyMustUpgrade(t *testing.T) {
  54. err := runTestInproxy(true)
  55. if err != nil {
  56. t.Error(errors.Trace(err).Error())
  57. }
  58. }
  59. func runTestInproxy(doMustUpgrade bool) error {
  60. // Note: use the environment variable PION_LOG_TRACE=all to emit WebRTC logging.
  61. numProxies := 5
  62. proxyMaxClients := 3
  63. numClients := 10
  64. bytesToSend := 1 << 20
  65. targetElapsedSeconds := 2
  66. baseAPIParameters := common.APIParameters{
  67. "sponsor_id": strings.ToUpper(prng.HexString(8)),
  68. "client_platform": "test-client-platform",
  69. }
  70. testCompartmentID, _ := MakeID()
  71. testCommonCompartmentIDs := []ID{testCompartmentID}
  72. testNetworkID := "NETWORK-ID-1"
  73. testNetworkType := NetworkTypeUnknown
  74. testNATType := NATTypeUnknown
  75. testSTUNServerAddress := "stun.voipgate.com:3478"
  76. testDisableSTUN := false
  77. testDisablePortMapping := false
  78. testNewTacticsPayload := []byte(prng.HexString(100))
  79. testNewTacticsTag := "new-tactics-tag"
  80. testUnchangedTacticsPayload := []byte(prng.HexString(100))
  81. currentNetworkCtx, currentNetworkCancelFunc := context.WithCancel(context.Background())
  82. defer currentNetworkCancelFunc()
  83. // TODO: test port mapping
  84. stunServerAddressSucceededCount := int32(0)
  85. stunServerAddressSucceeded := func(bool, string) { atomic.AddInt32(&stunServerAddressSucceededCount, 1) }
  86. stunServerAddressFailedCount := int32(0)
  87. stunServerAddressFailed := func(bool, string) { atomic.AddInt32(&stunServerAddressFailedCount, 1) }
  88. roundTripperSucceededCount := int32(0)
  89. roundTripperSucceded := func(RoundTripper) { atomic.AddInt32(&roundTripperSucceededCount, 1) }
  90. roundTripperFailedCount := int32(0)
  91. roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
  92. noMatch := func(RoundTripper) {}
  93. var receivedProxyMustUpgrade chan struct{}
  94. var receivedClientMustUpgrade chan struct{}
  95. if doMustUpgrade {
  96. receivedProxyMustUpgrade = make(chan struct{})
  97. receivedClientMustUpgrade = make(chan struct{})
  98. // trigger MustUpgrade
  99. minimumProxyProtocolVersion = LatestProtocolVersion + 1
  100. minimumClientProtocolVersion = LatestProtocolVersion + 1
  101. // Minimize test parameters for MustUpgrade case
  102. numProxies = 1
  103. proxyMaxClients = 1
  104. numClients = 1
  105. testDisableSTUN = true
  106. testDisablePortMapping = true
  107. }
  108. testCtx, stopTest := context.WithCancel(context.Background())
  109. defer stopTest()
  110. testGroup := new(errgroup.Group)
  111. // Enable test to run without requiring host firewall exceptions
  112. SetAllowBogonWebRTCConnections(true)
  113. defer SetAllowBogonWebRTCConnections(false)
  114. // Init logging and profiling
  115. logger := testutils.NewTestLogger()
  116. pprofListener, err := net.Listen("tcp", "127.0.0.1:0")
  117. go http.Serve(pprofListener, nil)
  118. defer pprofListener.Close()
  119. logger.WithTrace().Info(fmt.Sprintf("PPROF: http://%s/debug/pprof", pprofListener.Addr()))
  120. // Start echo servers
  121. tcpEchoListener, err := net.Listen("tcp", "127.0.0.1:0")
  122. if err != nil {
  123. return errors.Trace(err)
  124. }
  125. defer tcpEchoListener.Close()
  126. go runTCPEchoServer(tcpEchoListener)
  127. // QUIC tests UDP proxying, and provides reliable delivery of echoed data
  128. quicEchoServer, err := newQuicEchoServer()
  129. if err != nil {
  130. return errors.Trace(err)
  131. }
  132. defer quicEchoServer.Close()
  133. go quicEchoServer.Run()
  134. // Create signed server entry with capability
  135. serverPrivateKey, err := GenerateSessionPrivateKey()
  136. if err != nil {
  137. return errors.Trace(err)
  138. }
  139. serverPublicKey, err := serverPrivateKey.GetPublicKey()
  140. if err != nil {
  141. return errors.Trace(err)
  142. }
  143. serverRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  144. if err != nil {
  145. return errors.Trace(err)
  146. }
  147. serverEntry := make(protocol.ServerEntryFields)
  148. serverEntry["ipAddress"] = "127.0.0.1"
  149. _, tcpPort, _ := net.SplitHostPort(tcpEchoListener.Addr().String())
  150. _, udpPort, _ := net.SplitHostPort(quicEchoServer.Addr().String())
  151. serverEntry["inproxyOSSHPort"], _ = strconv.Atoi(tcpPort)
  152. serverEntry["inproxyQUICPort"], _ = strconv.Atoi(udpPort)
  153. serverEntry["capabilities"] = []string{"INPROXY-WEBRTC-OSSH", "INPROXY-WEBRTC-QUIC-OSSH"}
  154. serverEntry["inproxySessionPublicKey"] = base64.RawStdEncoding.EncodeToString(serverPublicKey[:])
  155. serverEntry["inproxySessionRootObfuscationSecret"] = base64.RawStdEncoding.EncodeToString(serverRootObfuscationSecret[:])
  156. testServerEntryTag := prng.HexString(16)
  157. serverEntry["tag"] = testServerEntryTag
  158. serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey, err :=
  159. protocol.NewServerEntrySignatureKeyPair()
  160. if err != nil {
  161. return errors.Trace(err)
  162. }
  163. err = serverEntry.AddSignature(serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey)
  164. if err != nil {
  165. return errors.Trace(err)
  166. }
  167. packedServerEntryFields, err := protocol.EncodePackedServerEntryFields(serverEntry)
  168. if err != nil {
  169. return errors.Trace(err)
  170. }
  171. packedDestinationServerEntry, err := protocol.CBOREncoding.Marshal(packedServerEntryFields)
  172. if err != nil {
  173. return errors.Trace(err)
  174. }
  175. // API parameter handlers
  176. apiParameterValidator := func(params common.APIParameters) error {
  177. if len(params) != len(baseAPIParameters) {
  178. return errors.TraceNew("unexpected base API parameter count")
  179. }
  180. for name, value := range params {
  181. if value.(string) != baseAPIParameters[name].(string) {
  182. return errors.Tracef(
  183. "unexpected base API parameter: %v: %v != %v",
  184. name,
  185. value.(string),
  186. baseAPIParameters[name].(string))
  187. }
  188. }
  189. return nil
  190. }
  191. apiParameterLogFieldFormatter := func(
  192. _ string, _ common.GeoIPData, params common.APIParameters) common.LogFields {
  193. logFields := common.LogFields{}
  194. logFields.Add(common.LogFields(params))
  195. return logFields
  196. }
  197. // Start broker
  198. logger.WithTrace().Info("START BROKER")
  199. brokerPrivateKey, err := GenerateSessionPrivateKey()
  200. if err != nil {
  201. return errors.Trace(err)
  202. }
  203. brokerPublicKey, err := brokerPrivateKey.GetPublicKey()
  204. if err != nil {
  205. return errors.Trace(err)
  206. }
  207. brokerRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  208. if err != nil {
  209. return errors.Trace(err)
  210. }
  211. brokerListener, err := net.Listen("tcp", "127.0.0.1:0")
  212. if err != nil {
  213. return errors.Trace(err)
  214. }
  215. defer brokerListener.Close()
  216. brokerConfig := &BrokerConfig{
  217. Logger: logger,
  218. CommonCompartmentIDs: testCommonCompartmentIDs,
  219. APIParameterValidator: apiParameterValidator,
  220. APIParameterLogFieldFormatter: apiParameterLogFieldFormatter,
  221. GetTacticsPayload: func(_ common.GeoIPData, _ common.APIParameters) ([]byte, string, error) {
  222. // Exercise both new and unchanged tactics
  223. if prng.FlipCoin() {
  224. return testNewTacticsPayload, testNewTacticsTag, nil
  225. }
  226. return testUnchangedTacticsPayload, "", nil
  227. },
  228. IsValidServerEntryTag: func(serverEntryTag string) bool { return serverEntryTag == testServerEntryTag },
  229. PrivateKey: brokerPrivateKey,
  230. ObfuscationRootSecret: brokerRootObfuscationSecret,
  231. ServerEntrySignaturePublicKey: serverEntrySignaturePublicKey,
  232. AllowProxy: func(common.GeoIPData) bool { return true },
  233. AllowClient: func(common.GeoIPData) bool { return true },
  234. AllowDomainFrontedDestinations: func(common.GeoIPData) bool { return true },
  235. AllowMatch: func(common.GeoIPData, common.GeoIPData) bool { return true },
  236. }
  237. broker, err := NewBroker(brokerConfig)
  238. if err != nil {
  239. return errors.Trace(err)
  240. }
  241. // Enable proxy quality (and otherwise use the default quality parameters)
  242. enableProxyQuality := true
  243. broker.SetProxyQualityParameters(
  244. enableProxyQuality,
  245. proxyQualityTTL,
  246. proxyQualityPendingFailedMatchDeadline,
  247. proxyQualityFailedMatchThreshold)
  248. err = broker.Start()
  249. if err != nil {
  250. return errors.Trace(err)
  251. }
  252. defer broker.Stop()
  253. testGroup.Go(func() error {
  254. err := runHTTPServer(brokerListener, broker)
  255. if testCtx.Err() != nil {
  256. return nil
  257. }
  258. return errors.Trace(err)
  259. })
  260. // Stub server broker request handler (in Psiphon, this will be the
  261. // destination Psiphon server; here, it's not necessary to build this
  262. // handler into the destination echo server)
  263. //
  264. // The stub server broker request handler also triggers a server proxy
  265. // quality request in the other direction.
  266. makeServerBrokerClientRoundTripper := func(_ SessionPublicKey) (
  267. RoundTripper, common.APIParameters, error) {
  268. return newHTTPRoundTripper(brokerListener.Addr().String(), "server"), nil, nil
  269. }
  270. serverSessionsConfig := &ServerBrokerSessionsConfig{
  271. Logger: logger,
  272. ServerPrivateKey: serverPrivateKey,
  273. ServerRootObfuscationSecret: serverRootObfuscationSecret,
  274. BrokerPublicKeys: []SessionPublicKey{brokerPublicKey},
  275. BrokerRootObfuscationSecrets: []ObfuscationSecret{brokerRootObfuscationSecret},
  276. BrokerRoundTripperMaker: makeServerBrokerClientRoundTripper,
  277. ProxyMetricsValidator: apiParameterValidator,
  278. ProxyMetricsFormatter: apiParameterLogFieldFormatter,
  279. ProxyMetricsPrefix: "",
  280. }
  281. serverSessions, err := NewServerBrokerSessions(serverSessionsConfig)
  282. if err != nil {
  283. return errors.Trace(err)
  284. }
  285. err = serverSessions.Start()
  286. if err != nil {
  287. return errors.Trace(err)
  288. }
  289. defer serverSessions.Stop()
  290. // Don't delay reporting quality.
  291. serverSessions.SetProxyQualityRequestParameters(
  292. proxyQualityReporterMaxRequestEntries,
  293. 0,
  294. proxyQualityReporterRequestTimeout,
  295. proxyQualityReporterRequestRetries)
  296. var pendingBrokerServerReportsMutex sync.Mutex
  297. pendingBrokerServerReports := make(map[ID]bool)
  298. addPendingBrokerServerReport := func(connectionID ID) {
  299. pendingBrokerServerReportsMutex.Lock()
  300. defer pendingBrokerServerReportsMutex.Unlock()
  301. pendingBrokerServerReports[connectionID] = true
  302. }
  303. removePendingBrokerServerReport := func(connectionID ID) {
  304. pendingBrokerServerReportsMutex.Lock()
  305. defer pendingBrokerServerReportsMutex.Unlock()
  306. delete(pendingBrokerServerReports, connectionID)
  307. }
  308. hasPendingBrokerServerReports := func() bool {
  309. pendingBrokerServerReportsMutex.Lock()
  310. defer pendingBrokerServerReportsMutex.Unlock()
  311. return len(pendingBrokerServerReports) > 0
  312. }
  313. serverQualityGroup := new(errgroup.Group)
  314. var serverQualityProxyIDsMutex sync.Mutex
  315. serverQualityProxyIDs := make(map[ID]struct{})
  316. testProxyASN := "65537"
  317. testClientASN := "65538"
  318. handleBrokerServerReports := func(in []byte, clientConnectionID ID) ([]byte, error) {
  319. handler := func(
  320. brokerVerifiedOriginalClientIP string,
  321. brokerReportedProxyID ID,
  322. brokerMatchedPersonalCompartments bool,
  323. logFields common.LogFields) {
  324. // Mark the report as no longer outstanding
  325. removePendingBrokerServerReport(clientConnectionID)
  326. // Trigger an asynchronous proxy quality request to the broker.
  327. // This roughly follows the Psiphon server functionality, where a
  328. // quality request is made sometime after the Psiphon handshake
  329. // completes, once tunnel quality thresholds are achieved.
  330. serverQualityGroup.Go(func() error {
  331. serverSessions.ReportQuality(
  332. brokerReportedProxyID, testProxyASN, testClientASN)
  333. serverQualityProxyIDsMutex.Lock()
  334. serverQualityProxyIDs[brokerReportedProxyID] = struct{}{}
  335. serverQualityProxyIDsMutex.Unlock()
  336. return nil
  337. })
  338. }
  339. out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
  340. return out, errors.Trace(err)
  341. }
  342. // Check that the tactics round trip succeeds
  343. var pendingProxyTacticsCallbacksMutex sync.Mutex
  344. pendingProxyTacticsCallbacks := make(map[SessionPrivateKey]bool)
  345. addPendingProxyTacticsCallback := func(proxyPrivateKey SessionPrivateKey) {
  346. pendingProxyTacticsCallbacksMutex.Lock()
  347. defer pendingProxyTacticsCallbacksMutex.Unlock()
  348. pendingProxyTacticsCallbacks[proxyPrivateKey] = true
  349. }
  350. hasPendingProxyTacticsCallbacks := func() bool {
  351. pendingProxyTacticsCallbacksMutex.Lock()
  352. defer pendingProxyTacticsCallbacksMutex.Unlock()
  353. return len(pendingProxyTacticsCallbacks) > 0
  354. }
  355. makeHandleTacticsPayload := func(
  356. proxyPrivateKey SessionPrivateKey,
  357. tacticsNetworkID string) func(_ string, _ bool, _ []byte) bool {
  358. return func(networkID string, _ bool, tacticsPayload []byte) bool {
  359. pendingProxyTacticsCallbacksMutex.Lock()
  360. defer pendingProxyTacticsCallbacksMutex.Unlock()
  361. // Check that the correct networkID is passed around; if not,
  362. // skip the delete, which will fail the test
  363. if networkID == tacticsNetworkID {
  364. // Certain state is reset when new tactics are applied -- the
  365. // return true case; exercise both cases
  366. if bytes.Equal(tacticsPayload, testNewTacticsPayload) {
  367. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  368. return true
  369. }
  370. if bytes.Equal(tacticsPayload, testUnchangedTacticsPayload) {
  371. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  372. return false
  373. }
  374. }
  375. panic("unexpected tactics payload")
  376. }
  377. }
  378. // Start proxies
  379. logger.WithTrace().Info("START PROXIES")
  380. for i := 0; i < numProxies; i++ {
  381. proxyPrivateKey, err := GenerateSessionPrivateKey()
  382. if err != nil {
  383. return errors.Trace(err)
  384. }
  385. brokerCoordinator := &testBrokerDialCoordinator{
  386. networkID: testNetworkID,
  387. networkType: testNetworkType,
  388. brokerClientPrivateKey: proxyPrivateKey,
  389. brokerPublicKey: brokerPublicKey,
  390. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  391. brokerClientRoundTripper: newHTTPRoundTripper(
  392. brokerListener.Addr().String(), "proxy"),
  393. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  394. brokerClientRoundTripperFailed: roundTripperFailed,
  395. // Minimize the delay before proxies reannounce after dial
  396. // failures, which may occur.
  397. announceDelay: 0,
  398. announceMaxBackoffDelay: 0,
  399. announceDelayJitter: 0.0,
  400. }
  401. webRTCCoordinator := &testWebRTCDialCoordinator{
  402. networkID: testNetworkID,
  403. networkType: testNetworkType,
  404. natType: testNATType,
  405. disableSTUN: testDisableSTUN,
  406. disablePortMapping: testDisablePortMapping,
  407. stunServerAddress: testSTUNServerAddress,
  408. stunServerAddressRFC5780: testSTUNServerAddress,
  409. stunServerAddressSucceeded: stunServerAddressSucceeded,
  410. stunServerAddressFailed: stunServerAddressFailed,
  411. setNATType: func(NATType) {},
  412. setPortMappingTypes: func(PortMappingTypes) {},
  413. bindToDevice: func(int) error { return nil },
  414. // Minimize the delay before proxies reannounce after failed
  415. // connections, which may occur.
  416. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  417. proxyRelayInactivityTimeout: 5 * time.Second,
  418. }
  419. // Each proxy has its own broker client
  420. brokerClient, err := NewBrokerClient(brokerCoordinator)
  421. if err != nil {
  422. return errors.Trace(err)
  423. }
  424. tacticsNetworkID := prng.HexString(32)
  425. runCtx, cancelRun := context.WithCancel(testCtx)
  426. // No deferred cancelRun due to testGroup.Go below
  427. name := fmt.Sprintf("proxy-%d", i)
  428. proxy, err := NewProxy(&ProxyConfig{
  429. Logger: testutils.NewTestLoggerWithComponent(name),
  430. WaitForNetworkConnectivity: func() bool {
  431. return true
  432. },
  433. GetCurrentNetworkContext: func() context.Context {
  434. return currentNetworkCtx
  435. },
  436. GetBrokerClient: func() (*BrokerClient, error) {
  437. return brokerClient, nil
  438. },
  439. GetBaseAPIParameters: func(bool) (common.APIParameters, string, error) {
  440. return baseAPIParameters, tacticsNetworkID, nil
  441. },
  442. MakeWebRTCDialCoordinator: func() (WebRTCDialCoordinator, error) {
  443. return webRTCCoordinator, nil
  444. },
  445. HandleTacticsPayload: makeHandleTacticsPayload(proxyPrivateKey, tacticsNetworkID),
  446. MaxClients: proxyMaxClients,
  447. LimitUpstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  448. LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  449. ActivityUpdater: func(
  450. announcing int32,
  451. connectingClients int32, connectedClients int32,
  452. bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
  453. fmt.Printf("[%s][%s] ACTIVITY: %d announcing, %d connecting, %d connected, %d up, %d down\n",
  454. time.Now().UTC().Format(time.RFC3339), name,
  455. announcing, connectingClients, connectedClients, bytesUp, bytesDown)
  456. },
  457. MustUpgrade: func() {
  458. close(receivedProxyMustUpgrade)
  459. cancelRun()
  460. },
  461. })
  462. if err != nil {
  463. return errors.Trace(err)
  464. }
  465. addPendingProxyTacticsCallback(proxyPrivateKey)
  466. testGroup.Go(func() error {
  467. proxy.Run(runCtx)
  468. return nil
  469. })
  470. }
  471. // Await proxy announcements before starting clients
  472. //
  473. // - Announcements may delay due to proxyAnnounceRetryDelay in Proxy.Run,
  474. // plus NAT discovery
  475. //
  476. // - Don't wait for > numProxies announcements due to
  477. // InitiatorSessions.NewRoundTrip waitToShareSession limitation
  478. if !doMustUpgrade {
  479. for {
  480. time.Sleep(100 * time.Millisecond)
  481. broker.matcher.announcementQueueMutex.Lock()
  482. n := broker.matcher.announcementQueue.getLen()
  483. broker.matcher.announcementQueueMutex.Unlock()
  484. if n >= numProxies {
  485. break
  486. }
  487. }
  488. }
  489. // Start clients
  490. var completedClientCount atomic.Int64
  491. logger.WithTrace().Info("START CLIENTS")
  492. clientsGroup := new(errgroup.Group)
  493. makeClientFunc := func(
  494. clientNum int,
  495. isTCP bool,
  496. brokerClient *BrokerClient,
  497. webRTCCoordinator WebRTCDialCoordinator) func() error {
  498. var networkProtocol NetworkProtocol
  499. var addr string
  500. var wrapWithQUIC bool
  501. if isTCP {
  502. networkProtocol = NetworkProtocolTCP
  503. addr = tcpEchoListener.Addr().String()
  504. } else {
  505. networkProtocol = NetworkProtocolUDP
  506. addr = quicEchoServer.Addr().String()
  507. wrapWithQUIC = true
  508. }
  509. return func() error {
  510. name := fmt.Sprintf("client-%d", clientNum)
  511. dialCtx, cancelDial := context.WithTimeout(testCtx, 60*time.Second)
  512. defer cancelDial()
  513. conn, err := DialClient(
  514. dialCtx,
  515. &ClientConfig{
  516. Logger: testutils.NewTestLoggerWithComponent(name),
  517. BaseAPIParameters: baseAPIParameters,
  518. BrokerClient: brokerClient,
  519. WebRTCDialCoordinator: webRTCCoordinator,
  520. ReliableTransport: isTCP,
  521. DialNetworkProtocol: networkProtocol,
  522. DialAddress: addr,
  523. PackedDestinationServerEntry: packedDestinationServerEntry,
  524. MustUpgrade: func() {
  525. close(receivedClientMustUpgrade)
  526. cancelDial()
  527. },
  528. })
  529. if err != nil {
  530. return errors.Trace(err)
  531. }
  532. var relayConn net.Conn
  533. relayConn = conn
  534. if wrapWithQUIC {
  535. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  536. if err != nil {
  537. return errors.Trace(err)
  538. }
  539. disablePathMTUDiscovery := true
  540. quicConn, err := quic.Dial(
  541. dialCtx,
  542. conn,
  543. udpAddr,
  544. "test",
  545. "QUICv1",
  546. nil,
  547. quicEchoServer.ObfuscationKey(),
  548. nil,
  549. nil,
  550. disablePathMTUDiscovery,
  551. GetQUICMaxPacketSizeAdjustment(),
  552. false,
  553. false,
  554. common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), ""),
  555. )
  556. if err != nil {
  557. return errors.Trace(err)
  558. }
  559. relayConn = quicConn
  560. }
  561. addPendingBrokerServerReport(conn.GetConnectionID())
  562. signalRelayComplete := make(chan struct{})
  563. clientsGroup.Go(func() error {
  564. defer close(signalRelayComplete)
  565. in := conn.InitialRelayPacket()
  566. for in != nil {
  567. out, err := handleBrokerServerReports(in, conn.GetConnectionID())
  568. if err != nil {
  569. if out == nil {
  570. return errors.Trace(err)
  571. } else {
  572. fmt.Printf("HandlePacket returned packet and error: %v\n", err)
  573. // Proceed with reset session token packet
  574. }
  575. }
  576. if out == nil {
  577. // Relay is complete
  578. break
  579. }
  580. in, err = conn.RelayPacket(testCtx, out)
  581. if err != nil {
  582. return errors.Trace(err)
  583. }
  584. }
  585. return nil
  586. })
  587. sendBytes := prng.Bytes(bytesToSend)
  588. clientsGroup.Go(func() error {
  589. for n := 0; n < bytesToSend; {
  590. m := prng.Range(1024, 32768)
  591. if bytesToSend-n < m {
  592. m = bytesToSend - n
  593. }
  594. _, err := relayConn.Write(sendBytes[n : n+m])
  595. if err != nil {
  596. return errors.Trace(err)
  597. }
  598. n += m
  599. }
  600. fmt.Printf("[%s][%s] %d bytes sent\n",
  601. time.Now().UTC().Format(time.RFC3339), name, bytesToSend)
  602. return nil
  603. })
  604. clientsGroup.Go(func() error {
  605. buf := make([]byte, 32768)
  606. n := 0
  607. for n < bytesToSend {
  608. m, err := relayConn.Read(buf)
  609. if err != nil {
  610. return errors.Trace(err)
  611. }
  612. if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
  613. return errors.Tracef(
  614. "unexpected bytes: expected at index %d, received at index %d",
  615. bytes.Index(sendBytes, buf[:m]), n)
  616. }
  617. n += m
  618. }
  619. completed := completedClientCount.Add(1)
  620. fmt.Printf("[%s][%s] %d bytes received; relay complete (%d/%d)\n",
  621. time.Now().UTC().Format(time.RFC3339), name,
  622. bytesToSend, completed, numClients)
  623. select {
  624. case <-signalRelayComplete:
  625. case <-testCtx.Done():
  626. }
  627. fmt.Printf("[%s][%s] closing\n",
  628. time.Now().UTC().Format(time.RFC3339), name)
  629. relayConn.Close()
  630. conn.Close()
  631. return nil
  632. })
  633. return nil
  634. }
  635. }
  636. newClientBrokerClient := func(
  637. disableWaitToShareSession bool) (*BrokerClient, error) {
  638. clientPrivateKey, err := GenerateSessionPrivateKey()
  639. if err != nil {
  640. return nil, errors.Trace(err)
  641. }
  642. brokerCoordinator := &testBrokerDialCoordinator{
  643. networkID: testNetworkID,
  644. networkType: testNetworkType,
  645. commonCompartmentIDs: testCommonCompartmentIDs,
  646. disableWaitToShareSession: disableWaitToShareSession,
  647. brokerClientPrivateKey: clientPrivateKey,
  648. brokerPublicKey: brokerPublicKey,
  649. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  650. brokerClientRoundTripper: newHTTPRoundTripper(
  651. brokerListener.Addr().String(), "client"),
  652. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  653. brokerClientRoundTripperFailed: roundTripperFailed,
  654. brokerClientNoMatch: noMatch,
  655. }
  656. brokerClient, err := NewBrokerClient(brokerCoordinator)
  657. if err != nil {
  658. return nil, errors.Trace(err)
  659. }
  660. return brokerClient, nil
  661. }
  662. newClientWebRTCDialCoordinator := func(
  663. isMobile bool,
  664. useMediaStreams bool) (*testWebRTCDialCoordinator, error) {
  665. clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  666. if err != nil {
  667. return nil, errors.Trace(err)
  668. }
  669. var trafficShapingParameters *TrafficShapingParameters
  670. if useMediaStreams {
  671. trafficShapingParameters = &TrafficShapingParameters{
  672. MinPaddedMessages: 0,
  673. MaxPaddedMessages: 10,
  674. MinPaddingSize: 0,
  675. MaxPaddingSize: 254,
  676. MinDecoyMessages: 0,
  677. MaxDecoyMessages: 10,
  678. MinDecoySize: 1,
  679. MaxDecoySize: 1200,
  680. DecoyMessageProbability: 0.5,
  681. }
  682. } else {
  683. trafficShapingParameters = &TrafficShapingParameters{
  684. MinPaddedMessages: 0,
  685. MaxPaddedMessages: 10,
  686. MinPaddingSize: 0,
  687. MaxPaddingSize: 1500,
  688. MinDecoyMessages: 0,
  689. MaxDecoyMessages: 10,
  690. MinDecoySize: 1,
  691. MaxDecoySize: 1500,
  692. DecoyMessageProbability: 0.5,
  693. }
  694. }
  695. webRTCCoordinator := &testWebRTCDialCoordinator{
  696. networkID: testNetworkID,
  697. networkType: testNetworkType,
  698. natType: testNATType,
  699. disableSTUN: testDisableSTUN,
  700. stunServerAddress: testSTUNServerAddress,
  701. stunServerAddressRFC5780: testSTUNServerAddress,
  702. stunServerAddressSucceeded: stunServerAddressSucceeded,
  703. stunServerAddressFailed: stunServerAddressFailed,
  704. clientRootObfuscationSecret: clientRootObfuscationSecret,
  705. doDTLSRandomization: prng.FlipCoin(),
  706. useMediaStreams: useMediaStreams,
  707. trafficShapingParameters: trafficShapingParameters,
  708. setNATType: func(NATType) {},
  709. setPortMappingTypes: func(PortMappingTypes) {},
  710. bindToDevice: func(int) error { return nil },
  711. // With STUN enabled (testDisableSTUN = false), there are cases
  712. // where the WebRTC peer connection is not successfully
  713. // established. With a short enough timeout here, clients will
  714. // redial and eventually succceed.
  715. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  716. }
  717. if isMobile {
  718. webRTCCoordinator.networkType = NetworkTypeMobile
  719. webRTCCoordinator.disableInboundForMobileNetworks = true
  720. }
  721. return webRTCCoordinator, nil
  722. }
  723. sharedBrokerClient, err := newClientBrokerClient(false)
  724. if err != nil {
  725. return errors.Trace(err)
  726. }
  727. sharedBrokerClientDisableWait, err := newClientBrokerClient(true)
  728. if err != nil {
  729. return errors.Trace(err)
  730. }
  731. for i := 0; i < numClients; i++ {
  732. // Test a mix of TCP and UDP proxying; also test the
  733. // DisableInboundForMobileNetworks code path.
  734. isTCP := i%2 == 0
  735. isMobile := i%4 == 0
  736. useMediaStreams := i%4 < 2
  737. // Exercise BrokerClients shared by multiple clients, but also create
  738. // several broker clients.
  739. var brokerClient *BrokerClient
  740. switch i % 3 {
  741. case 0:
  742. brokerClient = sharedBrokerClient
  743. case 1:
  744. brokerClient = sharedBrokerClientDisableWait
  745. case 2:
  746. brokerClient, err = newClientBrokerClient(true)
  747. if err != nil {
  748. return errors.Trace(err)
  749. }
  750. }
  751. webRTCCoordinator, err := newClientWebRTCDialCoordinator(
  752. isMobile, useMediaStreams)
  753. if err != nil {
  754. return errors.Trace(err)
  755. }
  756. clientsGroup.Go(
  757. makeClientFunc(
  758. i,
  759. isTCP,
  760. brokerClient,
  761. webRTCCoordinator))
  762. }
  763. if doMustUpgrade {
  764. // Await MustUpgrade callbacks
  765. logger.WithTrace().Info("AWAIT MUST UPGRADE")
  766. <-receivedProxyMustUpgrade
  767. <-receivedClientMustUpgrade
  768. _ = clientsGroup.Wait()
  769. } else {
  770. // Await client transfers complete
  771. logger.WithTrace().Info("AWAIT DATA TRANSFER")
  772. err = clientsGroup.Wait()
  773. if err != nil {
  774. return errors.Trace(err)
  775. }
  776. logger.WithTrace().Info("DONE DATA TRANSFER")
  777. if hasPendingBrokerServerReports() {
  778. return errors.TraceNew("unexpected pending broker server requests")
  779. }
  780. if hasPendingProxyTacticsCallbacks() {
  781. return errors.TraceNew("unexpected pending proxy tactics callback")
  782. }
  783. err = serverQualityGroup.Wait()
  784. if err != nil {
  785. return errors.Trace(err)
  786. }
  787. // Inspect the broker's proxy quality state, to verify that the proxy
  788. // quality request was processed.
  789. //
  790. // Limitation: currently we don't check the priority
  791. // announcement _queue_, as announcements may have arrived before the
  792. // quality request, and announcements are promoted between queues.
  793. serverQualityProxyIDsMutex.Lock()
  794. defer serverQualityProxyIDsMutex.Unlock()
  795. for proxyID := range serverQualityProxyIDs {
  796. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, "") {
  797. return errors.TraceNew("unexpected missing HasQuality (no client ASN)")
  798. }
  799. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, testClientASN) {
  800. return errors.TraceNew("unexpected missing HasQuality (with client ASN)")
  801. }
  802. }
  803. // TODO: check that elapsed time is consistent with rate limit (+/-)
  804. // Check if STUN server replay callbacks were triggered
  805. if !testDisableSTUN {
  806. if atomic.LoadInt32(&stunServerAddressSucceededCount) < 1 {
  807. return errors.TraceNew("unexpected STUN server succeeded count")
  808. }
  809. // Allow for some STUN server failures
  810. if atomic.LoadInt32(&stunServerAddressFailedCount) >= int32(numProxies/2) {
  811. return errors.TraceNew("unexpected STUN server failed count")
  812. }
  813. }
  814. // Check if RoundTripper server replay callbacks were triggered
  815. if atomic.LoadInt32(&roundTripperSucceededCount) < 1 {
  816. return errors.TraceNew("unexpected round tripper succeeded count")
  817. }
  818. if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
  819. return errors.TraceNew("unexpected round tripper failed count")
  820. }
  821. }
  822. // Await shutdowns
  823. stopTest()
  824. brokerListener.Close()
  825. err = testGroup.Wait()
  826. if err != nil {
  827. return errors.Trace(err)
  828. }
  829. return nil
  830. }
  831. func runHTTPServer(listener net.Listener, broker *Broker) error {
  832. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  833. // For this test, clients set the path to "/client" and proxies
  834. // set the path to "/proxy" and we use that to create stub GeoIP
  835. // data to pass the not-same-ASN condition.
  836. var geoIPData common.GeoIPData
  837. geoIPData.ASN = r.URL.Path
  838. requestPayload, err := ioutil.ReadAll(
  839. http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
  840. if err != nil {
  841. fmt.Printf("runHTTPServer ioutil.ReadAll failed: %v\n", err)
  842. http.Error(w, "", http.StatusNotFound)
  843. return
  844. }
  845. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
  846. extendTimeout := func(timeout time.Duration) {
  847. // TODO: set insufficient initial timeout, so extension is
  848. // required for success
  849. http.NewResponseController(w).SetWriteDeadline(time.Now().Add(timeout))
  850. }
  851. responsePayload, err := broker.HandleSessionPacket(
  852. r.Context(),
  853. extendTimeout,
  854. nil,
  855. clientIP,
  856. geoIPData,
  857. requestPayload)
  858. if err != nil {
  859. fmt.Printf("runHTTPServer HandleSessionPacket failed: %v\n", err)
  860. http.Error(w, "", http.StatusNotFound)
  861. return
  862. }
  863. w.WriteHeader(http.StatusOK)
  864. w.Write(responsePayload)
  865. })
  866. // WriteTimeout will be extended via extendTimeout.
  867. httpServer := &http.Server{
  868. ReadTimeout: 10 * time.Second,
  869. WriteTimeout: 10 * time.Second,
  870. IdleTimeout: 1 * time.Minute,
  871. Handler: handler,
  872. }
  873. certificate, privateKey, _, err := common.GenerateWebServerCertificate("www.example.com")
  874. if err != nil {
  875. return errors.Trace(err)
  876. }
  877. tlsCert, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
  878. if err != nil {
  879. return errors.Trace(err)
  880. }
  881. tlsConfig := &tls.Config{
  882. Certificates: []tls.Certificate{tlsCert},
  883. }
  884. err = httpServer.Serve(tls.NewListener(listener, tlsConfig))
  885. return errors.Trace(err)
  886. }
  887. type httpRoundTripper struct {
  888. httpClient *http.Client
  889. endpointAddr string
  890. path string
  891. }
  892. func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
  893. return &httpRoundTripper{
  894. httpClient: &http.Client{
  895. Transport: &http.Transport{
  896. ForceAttemptHTTP2: true,
  897. MaxIdleConns: 2,
  898. IdleConnTimeout: 1 * time.Minute,
  899. TLSHandshakeTimeout: 10 * time.Second,
  900. TLSClientConfig: &std_tls.Config{
  901. InsecureSkipVerify: true,
  902. },
  903. },
  904. },
  905. endpointAddr: endpointAddr,
  906. path: path,
  907. }
  908. }
  909. func (r *httpRoundTripper) RoundTrip(
  910. ctx context.Context,
  911. roundTripDelay time.Duration,
  912. roundTripTimeout time.Duration,
  913. requestPayload []byte) ([]byte, error) {
  914. if roundTripDelay > 0 {
  915. common.SleepWithContext(ctx, roundTripDelay)
  916. }
  917. requestCtx, requestCancelFunc := context.WithTimeout(ctx, roundTripTimeout)
  918. defer requestCancelFunc()
  919. url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
  920. request, err := http.NewRequestWithContext(
  921. requestCtx, "POST", url, bytes.NewReader(requestPayload))
  922. if err != nil {
  923. return nil, errors.Trace(err)
  924. }
  925. response, err := r.httpClient.Do(request)
  926. if err != nil {
  927. return nil, errors.Trace(err)
  928. }
  929. defer response.Body.Close()
  930. if response.StatusCode != http.StatusOK {
  931. return nil, errors.Tracef("unexpected response status code: %d", response.StatusCode)
  932. }
  933. responsePayload, err := io.ReadAll(response.Body)
  934. if err != nil {
  935. return nil, errors.Trace(err)
  936. }
  937. return responsePayload, nil
  938. }
  939. func (r *httpRoundTripper) Close() error {
  940. r.httpClient.CloseIdleConnections()
  941. return nil
  942. }
  943. func runTCPEchoServer(listener net.Listener) {
  944. for {
  945. conn, err := listener.Accept()
  946. if err != nil {
  947. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  948. return
  949. }
  950. go func(conn net.Conn) {
  951. buf := make([]byte, 32768)
  952. for {
  953. n, err := conn.Read(buf)
  954. if n > 0 {
  955. _, err = conn.Write(buf[:n])
  956. }
  957. if err != nil {
  958. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  959. return
  960. }
  961. }
  962. }(conn)
  963. }
  964. }
  965. type quicEchoServer struct {
  966. listener net.Listener
  967. obfuscationKey string
  968. }
  969. func newQuicEchoServer() (*quicEchoServer, error) {
  970. obfuscationKey := prng.HexString(32)
  971. listener, err := quic.Listen(
  972. nil,
  973. nil,
  974. "127.0.0.1:0",
  975. true,
  976. GetQUICMaxPacketSizeAdjustment(),
  977. obfuscationKey,
  978. false)
  979. if err != nil {
  980. return nil, errors.Trace(err)
  981. }
  982. return &quicEchoServer{
  983. listener: listener,
  984. obfuscationKey: obfuscationKey,
  985. }, nil
  986. }
  987. func (q *quicEchoServer) ObfuscationKey() string {
  988. return q.obfuscationKey
  989. }
  990. func (q *quicEchoServer) Close() error {
  991. return q.listener.Close()
  992. }
  993. func (q *quicEchoServer) Addr() net.Addr {
  994. return q.listener.Addr()
  995. }
  996. func (q *quicEchoServer) Run() {
  997. for {
  998. conn, err := q.listener.Accept()
  999. if err != nil {
  1000. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  1001. return
  1002. }
  1003. go func(conn net.Conn) {
  1004. buf := make([]byte, 32768)
  1005. for {
  1006. n, err := conn.Read(buf)
  1007. if n > 0 {
  1008. _, err = conn.Write(buf[:n])
  1009. }
  1010. if err != nil {
  1011. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  1012. return
  1013. }
  1014. }
  1015. }(conn)
  1016. }
  1017. }