inproxy_test.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244
  1. //go:build PSIPHON_ENABLE_INPROXY
  2. /*
  3. * Copyright (c) 2023, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package inproxy
  21. import (
  22. "bytes"
  23. "context"
  24. std_tls "crypto/tls"
  25. "encoding/base64"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "net"
  30. "net/http"
  31. _ "net/http/pprof"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "testing"
  37. "time"
  38. tls "github.com/Psiphon-Labs/psiphon-tls"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  44. "golang.org/x/sync/errgroup"
  45. )
  46. func TestInproxy(t *testing.T) {
  47. err := runTestInproxy(false)
  48. if err != nil {
  49. t.Error(errors.Trace(err).Error())
  50. }
  51. }
  52. func TestInproxyMustUpgrade(t *testing.T) {
  53. err := runTestInproxy(true)
  54. if err != nil {
  55. t.Error(errors.Trace(err).Error())
  56. }
  57. }
  58. func runTestInproxy(doMustUpgrade bool) error {
  59. // Note: use the environment variable PION_LOG_TRACE=all to emit WebRTC logging.
  60. numProxies := 5
  61. proxyMaxClients := 3
  62. numClients := 10
  63. bytesToSend := 1 << 20
  64. targetElapsedSeconds := 2
  65. baseAPIParameters := common.APIParameters{
  66. "sponsor_id": strings.ToUpper(prng.HexString(8)),
  67. "client_platform": "test-client-platform",
  68. }
  69. testCompartmentID, _ := MakeID()
  70. testCommonCompartmentIDs := []ID{testCompartmentID}
  71. testNetworkID := "NETWORK-ID-1"
  72. testNetworkType := NetworkTypeUnknown
  73. testNATType := NATTypeUnknown
  74. testSTUNServerAddress := "stun.nextcloud.com:443"
  75. testDisableSTUN := false
  76. testDisablePortMapping := false
  77. testNewTacticsPayload := []byte(prng.HexString(100))
  78. testNewTacticsTag := "new-tactics-tag"
  79. testUnchangedTacticsPayload := []byte(prng.HexString(100))
  80. currentNetworkCtx, currentNetworkCancelFunc := context.WithCancel(context.Background())
  81. defer currentNetworkCancelFunc()
  82. // TODO: test port mapping
  83. stunServerAddressSucceededCount := int32(0)
  84. stunServerAddressSucceeded := func(bool, string) { atomic.AddInt32(&stunServerAddressSucceededCount, 1) }
  85. stunServerAddressFailedCount := int32(0)
  86. stunServerAddressFailed := func(bool, string) { atomic.AddInt32(&stunServerAddressFailedCount, 1) }
  87. roundTripperSucceededCount := int32(0)
  88. roundTripperSucceded := func(RoundTripper) { atomic.AddInt32(&roundTripperSucceededCount, 1) }
  89. roundTripperFailedCount := int32(0)
  90. roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
  91. noMatch := func(RoundTripper) {}
  92. var receivedProxyMustUpgrade chan struct{}
  93. var receivedClientMustUpgrade chan struct{}
  94. if doMustUpgrade {
  95. receivedProxyMustUpgrade = make(chan struct{})
  96. receivedClientMustUpgrade = make(chan struct{})
  97. // trigger MustUpgrade
  98. minimumProxyProtocolVersion = LatestProtocolVersion + 1
  99. minimumClientProtocolVersion = LatestProtocolVersion + 1
  100. // Minimize test parameters for MustUpgrade case
  101. numProxies = 1
  102. proxyMaxClients = 1
  103. numClients = 1
  104. testDisableSTUN = true
  105. testDisablePortMapping = true
  106. }
  107. testCtx, stopTest := context.WithCancel(context.Background())
  108. defer stopTest()
  109. testGroup := new(errgroup.Group)
  110. // Enable test to run without requiring host firewall exceptions
  111. SetAllowBogonWebRTCConnections(true)
  112. defer SetAllowBogonWebRTCConnections(false)
  113. // Init logging and profiling
  114. logger := newTestLogger()
  115. pprofListener, err := net.Listen("tcp", "127.0.0.1:0")
  116. go http.Serve(pprofListener, nil)
  117. defer pprofListener.Close()
  118. logger.WithTrace().Info(fmt.Sprintf("PPROF: http://%s/debug/pprof", pprofListener.Addr()))
  119. // Start echo servers
  120. tcpEchoListener, err := net.Listen("tcp", "127.0.0.1:0")
  121. if err != nil {
  122. return errors.Trace(err)
  123. }
  124. defer tcpEchoListener.Close()
  125. go runTCPEchoServer(tcpEchoListener)
  126. // QUIC tests UDP proxying, and provides reliable delivery of echoed data
  127. quicEchoServer, err := newQuicEchoServer()
  128. if err != nil {
  129. return errors.Trace(err)
  130. }
  131. defer quicEchoServer.Close()
  132. go quicEchoServer.Run()
  133. // Create signed server entry with capability
  134. serverPrivateKey, err := GenerateSessionPrivateKey()
  135. if err != nil {
  136. return errors.Trace(err)
  137. }
  138. serverPublicKey, err := serverPrivateKey.GetPublicKey()
  139. if err != nil {
  140. return errors.Trace(err)
  141. }
  142. serverRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  143. if err != nil {
  144. return errors.Trace(err)
  145. }
  146. serverEntry := make(protocol.ServerEntryFields)
  147. serverEntry["ipAddress"] = "127.0.0.1"
  148. _, tcpPort, _ := net.SplitHostPort(tcpEchoListener.Addr().String())
  149. _, udpPort, _ := net.SplitHostPort(quicEchoServer.Addr().String())
  150. serverEntry["inproxyOSSHPort"], _ = strconv.Atoi(tcpPort)
  151. serverEntry["inproxyQUICPort"], _ = strconv.Atoi(udpPort)
  152. serverEntry["capabilities"] = []string{"INPROXY-WEBRTC-OSSH", "INPROXY-WEBRTC-QUIC-OSSH"}
  153. serverEntry["inproxySessionPublicKey"] = base64.RawStdEncoding.EncodeToString(serverPublicKey[:])
  154. serverEntry["inproxySessionRootObfuscationSecret"] = base64.RawStdEncoding.EncodeToString(serverRootObfuscationSecret[:])
  155. testServerEntryTag := prng.HexString(16)
  156. serverEntry["tag"] = testServerEntryTag
  157. serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey, err :=
  158. protocol.NewServerEntrySignatureKeyPair()
  159. if err != nil {
  160. return errors.Trace(err)
  161. }
  162. err = serverEntry.AddSignature(serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey)
  163. if err != nil {
  164. return errors.Trace(err)
  165. }
  166. packedServerEntryFields, err := protocol.EncodePackedServerEntryFields(serverEntry)
  167. if err != nil {
  168. return errors.Trace(err)
  169. }
  170. packedDestinationServerEntry, err := protocol.CBOREncoding.Marshal(packedServerEntryFields)
  171. if err != nil {
  172. return errors.Trace(err)
  173. }
  174. // API parameter handlers
  175. apiParameterValidator := func(params common.APIParameters) error {
  176. if len(params) != len(baseAPIParameters) {
  177. return errors.TraceNew("unexpected base API parameter count")
  178. }
  179. for name, value := range params {
  180. if value.(string) != baseAPIParameters[name].(string) {
  181. return errors.Tracef(
  182. "unexpected base API parameter: %v: %v != %v",
  183. name,
  184. value.(string),
  185. baseAPIParameters[name].(string))
  186. }
  187. }
  188. return nil
  189. }
  190. apiParameterLogFieldFormatter := func(
  191. _ string, _ common.GeoIPData, params common.APIParameters) common.LogFields {
  192. logFields := common.LogFields{}
  193. logFields.Add(common.LogFields(params))
  194. return logFields
  195. }
  196. // Start broker
  197. logger.WithTrace().Info("START BROKER")
  198. brokerPrivateKey, err := GenerateSessionPrivateKey()
  199. if err != nil {
  200. return errors.Trace(err)
  201. }
  202. brokerPublicKey, err := brokerPrivateKey.GetPublicKey()
  203. if err != nil {
  204. return errors.Trace(err)
  205. }
  206. brokerRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  207. if err != nil {
  208. return errors.Trace(err)
  209. }
  210. brokerListener, err := net.Listen("tcp", "127.0.0.1:0")
  211. if err != nil {
  212. return errors.Trace(err)
  213. }
  214. defer brokerListener.Close()
  215. brokerConfig := &BrokerConfig{
  216. Logger: logger,
  217. CommonCompartmentIDs: testCommonCompartmentIDs,
  218. APIParameterValidator: apiParameterValidator,
  219. APIParameterLogFieldFormatter: apiParameterLogFieldFormatter,
  220. GetTacticsPayload: func(_ common.GeoIPData, _ common.APIParameters) ([]byte, string, error) {
  221. // Exercise both new and unchanged tactics
  222. if prng.FlipCoin() {
  223. return testNewTacticsPayload, testNewTacticsTag, nil
  224. }
  225. return testUnchangedTacticsPayload, "", nil
  226. },
  227. IsValidServerEntryTag: func(serverEntryTag string) bool { return serverEntryTag == testServerEntryTag },
  228. PrivateKey: brokerPrivateKey,
  229. ObfuscationRootSecret: brokerRootObfuscationSecret,
  230. ServerEntrySignaturePublicKey: serverEntrySignaturePublicKey,
  231. AllowProxy: func(common.GeoIPData) bool { return true },
  232. AllowClient: func(common.GeoIPData) bool { return true },
  233. AllowDomainFrontedDestinations: func(common.GeoIPData) bool { return true },
  234. AllowMatch: func(common.GeoIPData, common.GeoIPData) bool { return true },
  235. }
  236. broker, err := NewBroker(brokerConfig)
  237. if err != nil {
  238. return errors.Trace(err)
  239. }
  240. // Enable proxy quality (and otherwise use the default quality parameters)
  241. enableProxyQuality := true
  242. broker.SetProxyQualityParameters(
  243. enableProxyQuality,
  244. proxyQualityTTL,
  245. proxyQualityPendingFailedMatchDeadline,
  246. proxyQualityFailedMatchThreshold)
  247. err = broker.Start()
  248. if err != nil {
  249. return errors.Trace(err)
  250. }
  251. defer broker.Stop()
  252. testGroup.Go(func() error {
  253. err := runHTTPServer(brokerListener, broker)
  254. if testCtx.Err() != nil {
  255. return nil
  256. }
  257. return errors.Trace(err)
  258. })
  259. // Stub server broker request handler (in Psiphon, this will be the
  260. // destination Psiphon server; here, it's not necessary to build this
  261. // handler into the destination echo server)
  262. //
  263. // The stub server broker request handler also triggers a server proxy
  264. // quality request in the other direction.
  265. makeServerBrokerClientRoundTripper := func(_ SessionPublicKey) (
  266. RoundTripper, common.APIParameters, error) {
  267. return newHTTPRoundTripper(brokerListener.Addr().String(), "server"), nil, nil
  268. }
  269. serverSessionsConfig := &ServerBrokerSessionsConfig{
  270. Logger: logger,
  271. ServerPrivateKey: serverPrivateKey,
  272. ServerRootObfuscationSecret: serverRootObfuscationSecret,
  273. BrokerPublicKeys: []SessionPublicKey{brokerPublicKey},
  274. BrokerRootObfuscationSecrets: []ObfuscationSecret{brokerRootObfuscationSecret},
  275. BrokerRoundTripperMaker: makeServerBrokerClientRoundTripper,
  276. ProxyMetricsValidator: apiParameterValidator,
  277. ProxyMetricsFormatter: apiParameterLogFieldFormatter,
  278. ProxyMetricsPrefix: "",
  279. }
  280. serverSessions, err := NewServerBrokerSessions(serverSessionsConfig)
  281. if err != nil {
  282. return errors.Trace(err)
  283. }
  284. err = serverSessions.Start()
  285. if err != nil {
  286. return errors.Trace(err)
  287. }
  288. defer serverSessions.Stop()
  289. // Don't delay reporting quality.
  290. serverSessions.SetProxyQualityRequestParameters(
  291. proxyQualityReporterMaxRequestEntries,
  292. 0,
  293. proxyQualityReporterRequestTimeout,
  294. proxyQualityReporterRequestRetries)
  295. var pendingBrokerServerReportsMutex sync.Mutex
  296. pendingBrokerServerReports := make(map[ID]bool)
  297. addPendingBrokerServerReport := func(connectionID ID) {
  298. pendingBrokerServerReportsMutex.Lock()
  299. defer pendingBrokerServerReportsMutex.Unlock()
  300. pendingBrokerServerReports[connectionID] = true
  301. }
  302. removePendingBrokerServerReport := func(connectionID ID) {
  303. pendingBrokerServerReportsMutex.Lock()
  304. defer pendingBrokerServerReportsMutex.Unlock()
  305. delete(pendingBrokerServerReports, connectionID)
  306. }
  307. hasPendingBrokerServerReports := func() bool {
  308. pendingBrokerServerReportsMutex.Lock()
  309. defer pendingBrokerServerReportsMutex.Unlock()
  310. return len(pendingBrokerServerReports) > 0
  311. }
  312. serverQualityGroup := new(errgroup.Group)
  313. var serverQualityProxyIDsMutex sync.Mutex
  314. serverQualityProxyIDs := make(map[ID]struct{})
  315. testProxyASN := "65537"
  316. testClientASN := "65538"
  317. handleBrokerServerReports := func(in []byte, clientConnectionID ID) ([]byte, error) {
  318. handler := func(
  319. brokerVerifiedOriginalClientIP string,
  320. brokerReportedProxyID ID,
  321. brokerMatchedPersonalCompartments bool,
  322. logFields common.LogFields) {
  323. // Mark the report as no longer outstanding
  324. removePendingBrokerServerReport(clientConnectionID)
  325. // Trigger an asynchronous proxy quality request to the broker.
  326. // This roughly follows the Psiphon server functionality, where a
  327. // quality request is made sometime after the Psiphon handshake
  328. // completes, once tunnel quality thresholds are achieved.
  329. serverQualityGroup.Go(func() error {
  330. serverSessions.ReportQuality(
  331. brokerReportedProxyID, testProxyASN, testClientASN)
  332. serverQualityProxyIDsMutex.Lock()
  333. serverQualityProxyIDs[brokerReportedProxyID] = struct{}{}
  334. serverQualityProxyIDsMutex.Unlock()
  335. return nil
  336. })
  337. }
  338. out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
  339. return out, errors.Trace(err)
  340. }
  341. // Check that the tactics round trip succeeds
  342. var pendingProxyTacticsCallbacksMutex sync.Mutex
  343. pendingProxyTacticsCallbacks := make(map[SessionPrivateKey]bool)
  344. addPendingProxyTacticsCallback := func(proxyPrivateKey SessionPrivateKey) {
  345. pendingProxyTacticsCallbacksMutex.Lock()
  346. defer pendingProxyTacticsCallbacksMutex.Unlock()
  347. pendingProxyTacticsCallbacks[proxyPrivateKey] = true
  348. }
  349. hasPendingProxyTacticsCallbacks := func() bool {
  350. pendingProxyTacticsCallbacksMutex.Lock()
  351. defer pendingProxyTacticsCallbacksMutex.Unlock()
  352. return len(pendingProxyTacticsCallbacks) > 0
  353. }
  354. makeHandleTacticsPayload := func(
  355. proxyPrivateKey SessionPrivateKey,
  356. tacticsNetworkID string) func(_ string, _ []byte) bool {
  357. return func(networkID string, tacticsPayload []byte) bool {
  358. pendingProxyTacticsCallbacksMutex.Lock()
  359. defer pendingProxyTacticsCallbacksMutex.Unlock()
  360. // Check that the correct networkID is passed around; if not,
  361. // skip the delete, which will fail the test
  362. if networkID == tacticsNetworkID {
  363. // Certain state is reset when new tactics are applied -- the
  364. // return true case; exercise both cases
  365. if bytes.Equal(tacticsPayload, testNewTacticsPayload) {
  366. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  367. return true
  368. }
  369. if bytes.Equal(tacticsPayload, testUnchangedTacticsPayload) {
  370. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  371. return false
  372. }
  373. }
  374. panic("unexpected tactics payload")
  375. }
  376. }
  377. // Start proxies
  378. logger.WithTrace().Info("START PROXIES")
  379. for i := 0; i < numProxies; i++ {
  380. proxyPrivateKey, err := GenerateSessionPrivateKey()
  381. if err != nil {
  382. return errors.Trace(err)
  383. }
  384. brokerCoordinator := &testBrokerDialCoordinator{
  385. networkID: testNetworkID,
  386. networkType: testNetworkType,
  387. brokerClientPrivateKey: proxyPrivateKey,
  388. brokerPublicKey: brokerPublicKey,
  389. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  390. brokerClientRoundTripper: newHTTPRoundTripper(
  391. brokerListener.Addr().String(), "proxy"),
  392. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  393. brokerClientRoundTripperFailed: roundTripperFailed,
  394. // Minimize the delay before proxies reannounce after dial
  395. // failures, which may occur.
  396. announceDelay: 0,
  397. announceMaxBackoffDelay: 0,
  398. announceDelayJitter: 0.0,
  399. }
  400. webRTCCoordinator := &testWebRTCDialCoordinator{
  401. networkID: testNetworkID,
  402. networkType: testNetworkType,
  403. natType: testNATType,
  404. disableSTUN: testDisableSTUN,
  405. disablePortMapping: testDisablePortMapping,
  406. stunServerAddress: testSTUNServerAddress,
  407. stunServerAddressRFC5780: testSTUNServerAddress,
  408. stunServerAddressSucceeded: stunServerAddressSucceeded,
  409. stunServerAddressFailed: stunServerAddressFailed,
  410. setNATType: func(NATType) {},
  411. setPortMappingTypes: func(PortMappingTypes) {},
  412. bindToDevice: func(int) error { return nil },
  413. // Minimize the delay before proxies reannounce after failed
  414. // connections, which may occur.
  415. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  416. proxyRelayInactivityTimeout: 5 * time.Second,
  417. }
  418. // Each proxy has its own broker client
  419. brokerClient, err := NewBrokerClient(brokerCoordinator)
  420. if err != nil {
  421. return errors.Trace(err)
  422. }
  423. tacticsNetworkID := prng.HexString(32)
  424. runCtx, cancelRun := context.WithCancel(testCtx)
  425. // No deferred cancelRun due to testGroup.Go below
  426. name := fmt.Sprintf("proxy-%d", i)
  427. proxy, err := NewProxy(&ProxyConfig{
  428. Logger: newTestLoggerWithComponent(name),
  429. WaitForNetworkConnectivity: func() bool {
  430. return true
  431. },
  432. GetCurrentNetworkContext: func() context.Context {
  433. return currentNetworkCtx
  434. },
  435. GetBrokerClient: func() (*BrokerClient, error) {
  436. return brokerClient, nil
  437. },
  438. GetBaseAPIParameters: func(bool) (common.APIParameters, string, error) {
  439. return baseAPIParameters, tacticsNetworkID, nil
  440. },
  441. MakeWebRTCDialCoordinator: func() (WebRTCDialCoordinator, error) {
  442. return webRTCCoordinator, nil
  443. },
  444. HandleTacticsPayload: makeHandleTacticsPayload(proxyPrivateKey, tacticsNetworkID),
  445. MaxClients: proxyMaxClients,
  446. LimitUpstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  447. LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  448. ActivityUpdater: func(connectingClients int32, connectedClients int32,
  449. bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
  450. fmt.Printf("[%s][%s] ACTIVITY: %d connecting, %d connected, %d up, %d down\n",
  451. time.Now().UTC().Format(time.RFC3339), name,
  452. connectingClients, connectedClients, bytesUp, bytesDown)
  453. },
  454. MustUpgrade: func() {
  455. close(receivedProxyMustUpgrade)
  456. cancelRun()
  457. },
  458. })
  459. if err != nil {
  460. return errors.Trace(err)
  461. }
  462. addPendingProxyTacticsCallback(proxyPrivateKey)
  463. testGroup.Go(func() error {
  464. proxy.Run(runCtx)
  465. return nil
  466. })
  467. }
  468. // Await proxy announcements before starting clients
  469. //
  470. // - Announcements may delay due to proxyAnnounceRetryDelay in Proxy.Run,
  471. // plus NAT discovery
  472. //
  473. // - Don't wait for > numProxies announcements due to
  474. // InitiatorSessions.NewRoundTrip waitToShareSession limitation
  475. if !doMustUpgrade {
  476. for {
  477. time.Sleep(100 * time.Millisecond)
  478. broker.matcher.announcementQueueMutex.Lock()
  479. n := broker.matcher.announcementQueue.getLen()
  480. broker.matcher.announcementQueueMutex.Unlock()
  481. if n >= numProxies {
  482. break
  483. }
  484. }
  485. }
  486. // Start clients
  487. var completedClientCount atomic.Int64
  488. logger.WithTrace().Info("START CLIENTS")
  489. clientsGroup := new(errgroup.Group)
  490. makeClientFunc := func(
  491. clientNum int,
  492. isTCP bool,
  493. brokerClient *BrokerClient,
  494. webRTCCoordinator WebRTCDialCoordinator) func() error {
  495. var networkProtocol NetworkProtocol
  496. var addr string
  497. var wrapWithQUIC bool
  498. if isTCP {
  499. networkProtocol = NetworkProtocolTCP
  500. addr = tcpEchoListener.Addr().String()
  501. } else {
  502. networkProtocol = NetworkProtocolUDP
  503. addr = quicEchoServer.Addr().String()
  504. wrapWithQUIC = true
  505. }
  506. return func() error {
  507. name := fmt.Sprintf("client-%d", clientNum)
  508. dialCtx, cancelDial := context.WithTimeout(testCtx, 60*time.Second)
  509. defer cancelDial()
  510. conn, err := DialClient(
  511. dialCtx,
  512. &ClientConfig{
  513. Logger: newTestLoggerWithComponent(name),
  514. BaseAPIParameters: baseAPIParameters,
  515. BrokerClient: brokerClient,
  516. WebRTCDialCoordinator: webRTCCoordinator,
  517. ReliableTransport: isTCP,
  518. DialNetworkProtocol: networkProtocol,
  519. DialAddress: addr,
  520. PackedDestinationServerEntry: packedDestinationServerEntry,
  521. MustUpgrade: func() {
  522. close(receivedClientMustUpgrade)
  523. cancelDial()
  524. },
  525. })
  526. if err != nil {
  527. return errors.Trace(err)
  528. }
  529. var relayConn net.Conn
  530. relayConn = conn
  531. if wrapWithQUIC {
  532. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  533. if err != nil {
  534. return errors.Trace(err)
  535. }
  536. disablePathMTUDiscovery := true
  537. quicConn, err := quic.Dial(
  538. dialCtx,
  539. conn,
  540. udpAddr,
  541. "test",
  542. "QUICv1",
  543. nil,
  544. quicEchoServer.ObfuscationKey(),
  545. nil,
  546. nil,
  547. disablePathMTUDiscovery,
  548. GetQUICMaxPacketSizeAdjustment(),
  549. false,
  550. false,
  551. common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), ""),
  552. )
  553. if err != nil {
  554. return errors.Trace(err)
  555. }
  556. relayConn = quicConn
  557. }
  558. addPendingBrokerServerReport(conn.GetConnectionID())
  559. signalRelayComplete := make(chan struct{})
  560. clientsGroup.Go(func() error {
  561. defer close(signalRelayComplete)
  562. in := conn.InitialRelayPacket()
  563. for in != nil {
  564. out, err := handleBrokerServerReports(in, conn.GetConnectionID())
  565. if err != nil {
  566. if out == nil {
  567. return errors.Trace(err)
  568. } else {
  569. fmt.Printf("HandlePacket returned packet and error: %v\n", err)
  570. // Proceed with reset session token packet
  571. }
  572. }
  573. if out == nil {
  574. // Relay is complete
  575. break
  576. }
  577. in, err = conn.RelayPacket(testCtx, out)
  578. if err != nil {
  579. return errors.Trace(err)
  580. }
  581. }
  582. return nil
  583. })
  584. sendBytes := prng.Bytes(bytesToSend)
  585. clientsGroup.Go(func() error {
  586. for n := 0; n < bytesToSend; {
  587. m := prng.Range(1024, 32768)
  588. if bytesToSend-n < m {
  589. m = bytesToSend - n
  590. }
  591. _, err := relayConn.Write(sendBytes[n : n+m])
  592. if err != nil {
  593. return errors.Trace(err)
  594. }
  595. n += m
  596. }
  597. fmt.Printf("[%s][%s] %d bytes sent\n",
  598. time.Now().UTC().Format(time.RFC3339), name, bytesToSend)
  599. return nil
  600. })
  601. clientsGroup.Go(func() error {
  602. buf := make([]byte, 32768)
  603. n := 0
  604. for n < bytesToSend {
  605. m, err := relayConn.Read(buf)
  606. if err != nil {
  607. return errors.Trace(err)
  608. }
  609. if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
  610. return errors.Tracef(
  611. "unexpected bytes: expected at index %d, received at index %d",
  612. bytes.Index(sendBytes, buf[:m]), n)
  613. }
  614. n += m
  615. }
  616. completed := completedClientCount.Add(1)
  617. fmt.Printf("[%s][%s] %d bytes received; relay complete (%d/%d)\n",
  618. time.Now().UTC().Format(time.RFC3339), name,
  619. bytesToSend, completed, numClients)
  620. select {
  621. case <-signalRelayComplete:
  622. case <-testCtx.Done():
  623. }
  624. fmt.Printf("[%s][%s] closing\n",
  625. time.Now().UTC().Format(time.RFC3339), name)
  626. relayConn.Close()
  627. conn.Close()
  628. return nil
  629. })
  630. return nil
  631. }
  632. }
  633. newClientBrokerClient := func(
  634. disableWaitToShareSession bool) (*BrokerClient, error) {
  635. clientPrivateKey, err := GenerateSessionPrivateKey()
  636. if err != nil {
  637. return nil, errors.Trace(err)
  638. }
  639. brokerCoordinator := &testBrokerDialCoordinator{
  640. networkID: testNetworkID,
  641. networkType: testNetworkType,
  642. commonCompartmentIDs: testCommonCompartmentIDs,
  643. disableWaitToShareSession: disableWaitToShareSession,
  644. brokerClientPrivateKey: clientPrivateKey,
  645. brokerPublicKey: brokerPublicKey,
  646. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  647. brokerClientRoundTripper: newHTTPRoundTripper(
  648. brokerListener.Addr().String(), "client"),
  649. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  650. brokerClientRoundTripperFailed: roundTripperFailed,
  651. brokerClientNoMatch: noMatch,
  652. }
  653. brokerClient, err := NewBrokerClient(brokerCoordinator)
  654. if err != nil {
  655. return nil, errors.Trace(err)
  656. }
  657. return brokerClient, nil
  658. }
  659. newClientWebRTCDialCoordinator := func(
  660. isMobile bool,
  661. useMediaStreams bool) (*testWebRTCDialCoordinator, error) {
  662. clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  663. if err != nil {
  664. return nil, errors.Trace(err)
  665. }
  666. var trafficShapingParameters *TrafficShapingParameters
  667. if useMediaStreams {
  668. trafficShapingParameters = &TrafficShapingParameters{
  669. MinPaddedMessages: 0,
  670. MaxPaddedMessages: 10,
  671. MinPaddingSize: 0,
  672. MaxPaddingSize: 254,
  673. MinDecoyMessages: 0,
  674. MaxDecoyMessages: 10,
  675. MinDecoySize: 1,
  676. MaxDecoySize: 1200,
  677. DecoyMessageProbability: 0.5,
  678. }
  679. } else {
  680. trafficShapingParameters = &TrafficShapingParameters{
  681. MinPaddedMessages: 0,
  682. MaxPaddedMessages: 10,
  683. MinPaddingSize: 0,
  684. MaxPaddingSize: 1500,
  685. MinDecoyMessages: 0,
  686. MaxDecoyMessages: 10,
  687. MinDecoySize: 1,
  688. MaxDecoySize: 1500,
  689. DecoyMessageProbability: 0.5,
  690. }
  691. }
  692. webRTCCoordinator := &testWebRTCDialCoordinator{
  693. networkID: testNetworkID,
  694. networkType: testNetworkType,
  695. natType: testNATType,
  696. disableSTUN: testDisableSTUN,
  697. stunServerAddress: testSTUNServerAddress,
  698. stunServerAddressRFC5780: testSTUNServerAddress,
  699. stunServerAddressSucceeded: stunServerAddressSucceeded,
  700. stunServerAddressFailed: stunServerAddressFailed,
  701. clientRootObfuscationSecret: clientRootObfuscationSecret,
  702. doDTLSRandomization: prng.FlipCoin(),
  703. useMediaStreams: useMediaStreams,
  704. trafficShapingParameters: trafficShapingParameters,
  705. setNATType: func(NATType) {},
  706. setPortMappingTypes: func(PortMappingTypes) {},
  707. bindToDevice: func(int) error { return nil },
  708. // With STUN enabled (testDisableSTUN = false), there are cases
  709. // where the WebRTC peer connection is not successfully
  710. // established. With a short enough timeout here, clients will
  711. // redial and eventually succceed.
  712. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  713. }
  714. if isMobile {
  715. webRTCCoordinator.networkType = NetworkTypeMobile
  716. webRTCCoordinator.disableInboundForMobileNetworks = true
  717. }
  718. return webRTCCoordinator, nil
  719. }
  720. sharedBrokerClient, err := newClientBrokerClient(false)
  721. if err != nil {
  722. return errors.Trace(err)
  723. }
  724. sharedBrokerClientDisableWait, err := newClientBrokerClient(true)
  725. if err != nil {
  726. return errors.Trace(err)
  727. }
  728. for i := 0; i < numClients; i++ {
  729. // Test a mix of TCP and UDP proxying; also test the
  730. // DisableInboundForMobileNetworks code path.
  731. isTCP := i%2 == 0
  732. isMobile := i%4 == 0
  733. useMediaStreams := i%4 < 2
  734. // Exercise BrokerClients shared by multiple clients, but also create
  735. // several broker clients.
  736. var brokerClient *BrokerClient
  737. switch i % 3 {
  738. case 0:
  739. brokerClient = sharedBrokerClient
  740. case 1:
  741. brokerClient = sharedBrokerClientDisableWait
  742. case 2:
  743. brokerClient, err = newClientBrokerClient(true)
  744. if err != nil {
  745. return errors.Trace(err)
  746. }
  747. }
  748. webRTCCoordinator, err := newClientWebRTCDialCoordinator(
  749. isMobile, useMediaStreams)
  750. if err != nil {
  751. return errors.Trace(err)
  752. }
  753. clientsGroup.Go(
  754. makeClientFunc(
  755. i,
  756. isTCP,
  757. brokerClient,
  758. webRTCCoordinator))
  759. }
  760. if doMustUpgrade {
  761. // Await MustUpgrade callbacks
  762. logger.WithTrace().Info("AWAIT MUST UPGRADE")
  763. <-receivedProxyMustUpgrade
  764. <-receivedClientMustUpgrade
  765. _ = clientsGroup.Wait()
  766. } else {
  767. // Await client transfers complete
  768. logger.WithTrace().Info("AWAIT DATA TRANSFER")
  769. err = clientsGroup.Wait()
  770. if err != nil {
  771. return errors.Trace(err)
  772. }
  773. logger.WithTrace().Info("DONE DATA TRANSFER")
  774. if hasPendingBrokerServerReports() {
  775. return errors.TraceNew("unexpected pending broker server requests")
  776. }
  777. if hasPendingProxyTacticsCallbacks() {
  778. return errors.TraceNew("unexpected pending proxy tactics callback")
  779. }
  780. err = serverQualityGroup.Wait()
  781. if err != nil {
  782. return errors.Trace(err)
  783. }
  784. // Inspect the broker's proxy quality state, to verify that the proxy
  785. // quality request was processed.
  786. //
  787. // Limitation: currently we don't check the priority
  788. // announcement _queue_, as announcements may have arrived before the
  789. // quality request, and announcements are promoted between queues.
  790. serverQualityProxyIDsMutex.Lock()
  791. defer serverQualityProxyIDsMutex.Unlock()
  792. for proxyID := range serverQualityProxyIDs {
  793. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, "") {
  794. return errors.TraceNew("unexpected missing HasQuality (no client ASN)")
  795. }
  796. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, testClientASN) {
  797. return errors.TraceNew("unexpected missing HasQuality (with client ASN)")
  798. }
  799. }
  800. // TODO: check that elapsed time is consistent with rate limit (+/-)
  801. // Check if STUN server replay callbacks were triggered
  802. if !testDisableSTUN {
  803. if atomic.LoadInt32(&stunServerAddressSucceededCount) < 1 {
  804. return errors.TraceNew("unexpected STUN server succeeded count")
  805. }
  806. // Allow for some STUN server failures
  807. if atomic.LoadInt32(&stunServerAddressFailedCount) >= int32(numProxies/2) {
  808. return errors.TraceNew("unexpected STUN server failed count")
  809. }
  810. }
  811. // Check if RoundTripper server replay callbacks were triggered
  812. if atomic.LoadInt32(&roundTripperSucceededCount) < 1 {
  813. return errors.TraceNew("unexpected round tripper succeeded count")
  814. }
  815. if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
  816. return errors.TraceNew("unexpected round tripper failed count")
  817. }
  818. }
  819. // Await shutdowns
  820. stopTest()
  821. brokerListener.Close()
  822. err = testGroup.Wait()
  823. if err != nil {
  824. return errors.Trace(err)
  825. }
  826. return nil
  827. }
  828. func runHTTPServer(listener net.Listener, broker *Broker) error {
  829. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  830. // For this test, clients set the path to "/client" and proxies
  831. // set the path to "/proxy" and we use that to create stub GeoIP
  832. // data to pass the not-same-ASN condition.
  833. var geoIPData common.GeoIPData
  834. geoIPData.ASN = r.URL.Path
  835. requestPayload, err := ioutil.ReadAll(
  836. http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
  837. if err != nil {
  838. fmt.Printf("runHTTPServer ioutil.ReadAll failed: %v\n", err)
  839. http.Error(w, "", http.StatusNotFound)
  840. return
  841. }
  842. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
  843. extendTimeout := func(timeout time.Duration) {
  844. // TODO: set insufficient initial timeout, so extension is
  845. // required for success
  846. http.NewResponseController(w).SetWriteDeadline(time.Now().Add(timeout))
  847. }
  848. responsePayload, err := broker.HandleSessionPacket(
  849. r.Context(),
  850. extendTimeout,
  851. nil,
  852. clientIP,
  853. geoIPData,
  854. requestPayload)
  855. if err != nil {
  856. fmt.Printf("runHTTPServer HandleSessionPacket failed: %v\n", err)
  857. http.Error(w, "", http.StatusNotFound)
  858. return
  859. }
  860. w.WriteHeader(http.StatusOK)
  861. w.Write(responsePayload)
  862. })
  863. // WriteTimeout will be extended via extendTimeout.
  864. httpServer := &http.Server{
  865. ReadTimeout: 10 * time.Second,
  866. WriteTimeout: 10 * time.Second,
  867. IdleTimeout: 1 * time.Minute,
  868. Handler: handler,
  869. }
  870. certificate, privateKey, _, err := common.GenerateWebServerCertificate("www.example.com")
  871. if err != nil {
  872. return errors.Trace(err)
  873. }
  874. tlsCert, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
  875. if err != nil {
  876. return errors.Trace(err)
  877. }
  878. tlsConfig := &tls.Config{
  879. Certificates: []tls.Certificate{tlsCert},
  880. }
  881. err = httpServer.Serve(tls.NewListener(listener, tlsConfig))
  882. return errors.Trace(err)
  883. }
  884. type httpRoundTripper struct {
  885. httpClient *http.Client
  886. endpointAddr string
  887. path string
  888. }
  889. func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
  890. return &httpRoundTripper{
  891. httpClient: &http.Client{
  892. Transport: &http.Transport{
  893. ForceAttemptHTTP2: true,
  894. MaxIdleConns: 2,
  895. IdleConnTimeout: 1 * time.Minute,
  896. TLSHandshakeTimeout: 10 * time.Second,
  897. TLSClientConfig: &std_tls.Config{
  898. InsecureSkipVerify: true,
  899. },
  900. },
  901. },
  902. endpointAddr: endpointAddr,
  903. path: path,
  904. }
  905. }
  906. func (r *httpRoundTripper) RoundTrip(
  907. ctx context.Context,
  908. roundTripDelay time.Duration,
  909. roundTripTimeout time.Duration,
  910. requestPayload []byte) ([]byte, error) {
  911. if roundTripDelay > 0 {
  912. common.SleepWithContext(ctx, roundTripDelay)
  913. }
  914. requestCtx, requestCancelFunc := context.WithTimeout(ctx, roundTripTimeout)
  915. defer requestCancelFunc()
  916. url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
  917. request, err := http.NewRequestWithContext(
  918. requestCtx, "POST", url, bytes.NewReader(requestPayload))
  919. if err != nil {
  920. return nil, errors.Trace(err)
  921. }
  922. response, err := r.httpClient.Do(request)
  923. if err != nil {
  924. return nil, errors.Trace(err)
  925. }
  926. defer response.Body.Close()
  927. if response.StatusCode != http.StatusOK {
  928. return nil, errors.Tracef("unexpected response status code: %d", response.StatusCode)
  929. }
  930. responsePayload, err := io.ReadAll(response.Body)
  931. if err != nil {
  932. return nil, errors.Trace(err)
  933. }
  934. return responsePayload, nil
  935. }
  936. func (r *httpRoundTripper) Close() error {
  937. r.httpClient.CloseIdleConnections()
  938. return nil
  939. }
  940. func runTCPEchoServer(listener net.Listener) {
  941. for {
  942. conn, err := listener.Accept()
  943. if err != nil {
  944. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  945. return
  946. }
  947. go func(conn net.Conn) {
  948. buf := make([]byte, 32768)
  949. for {
  950. n, err := conn.Read(buf)
  951. if n > 0 {
  952. _, err = conn.Write(buf[:n])
  953. }
  954. if err != nil {
  955. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  956. return
  957. }
  958. }
  959. }(conn)
  960. }
  961. }
  962. type quicEchoServer struct {
  963. listener net.Listener
  964. obfuscationKey string
  965. }
  966. func newQuicEchoServer() (*quicEchoServer, error) {
  967. obfuscationKey := prng.HexString(32)
  968. listener, err := quic.Listen(
  969. nil,
  970. nil,
  971. "127.0.0.1:0",
  972. true,
  973. GetQUICMaxPacketSizeAdjustment(),
  974. obfuscationKey,
  975. false)
  976. if err != nil {
  977. return nil, errors.Trace(err)
  978. }
  979. return &quicEchoServer{
  980. listener: listener,
  981. obfuscationKey: obfuscationKey,
  982. }, nil
  983. }
  984. func (q *quicEchoServer) ObfuscationKey() string {
  985. return q.obfuscationKey
  986. }
  987. func (q *quicEchoServer) Close() error {
  988. return q.listener.Close()
  989. }
  990. func (q *quicEchoServer) Addr() net.Addr {
  991. return q.listener.Addr()
  992. }
  993. func (q *quicEchoServer) Run() {
  994. for {
  995. conn, err := q.listener.Accept()
  996. if err != nil {
  997. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  998. return
  999. }
  1000. go func(conn net.Conn) {
  1001. buf := make([]byte, 32768)
  1002. for {
  1003. n, err := conn.Read(buf)
  1004. if n > 0 {
  1005. _, err = conn.Write(buf[:n])
  1006. }
  1007. if err != nil {
  1008. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  1009. return
  1010. }
  1011. }
  1012. }(conn)
  1013. }
  1014. }