inproxy_test.go 38 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370
  1. //go:build !PSIPHON_DISABLE_INPROXY
  2. /*
  3. * Copyright (c) 2023, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. package inproxy
  21. import (
  22. "bytes"
  23. "context"
  24. std_tls "crypto/tls"
  25. "encoding/base64"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "net"
  30. "net/http"
  31. _ "net/http/pprof"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "testing"
  37. "time"
  38. tls "github.com/Psiphon-Labs/psiphon-tls"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/internal/testutils"
  45. "golang.org/x/sync/errgroup"
  46. )
  47. func TestInproxy(t *testing.T) {
  48. err := runTestInproxy(false)
  49. if err != nil {
  50. t.Error(errors.Trace(err).Error())
  51. }
  52. }
  53. func TestInproxyMustUpgrade(t *testing.T) {
  54. err := runTestInproxy(true)
  55. if err != nil {
  56. t.Error(errors.Trace(err).Error())
  57. }
  58. }
  59. func runTestInproxy(doMustUpgrade bool) error {
  60. // Note: use the environment variable PION_LOG_TRACE=all to emit WebRTC logging.
  61. numProxies := 5
  62. proxyMaxClients := 3
  63. numClients := 10
  64. bytesToSend := 1 << 20
  65. targetElapsedSeconds := 2
  66. baseAPIParameters := common.APIParameters{
  67. "sponsor_id": strings.ToUpper(prng.HexString(8)),
  68. "client_platform": "test-client-platform",
  69. }
  70. testCompartmentID, _ := MakeID()
  71. testCommonCompartmentIDs := []ID{testCompartmentID}
  72. personalCompartmentID, _ := MakeID()
  73. testPersonalCompartmentIDs := []ID{personalCompartmentID}
  74. testNetworkID := "NETWORK-ID-1"
  75. testNetworkType := NetworkTypeUnknown
  76. testNATType := NATTypeUnknown
  77. testSTUNServerAddress := "stun.voipgate.com:3478"
  78. testDisableSTUN := false
  79. testDisablePortMapping := false
  80. testNewTacticsPayload := []byte(prng.HexString(100))
  81. testNewTacticsTag := "new-tactics-tag"
  82. testUnchangedTacticsPayload := []byte(prng.HexString(100))
  83. currentNetworkCtx, currentNetworkCancelFunc := context.WithCancel(context.Background())
  84. defer currentNetworkCancelFunc()
  85. // TODO: test port mapping
  86. stunServerAddressSucceededCount := int32(0)
  87. stunServerAddressSucceeded := func(bool, string) { atomic.AddInt32(&stunServerAddressSucceededCount, 1) }
  88. stunServerAddressFailedCount := int32(0)
  89. stunServerAddressFailed := func(bool, string) { atomic.AddInt32(&stunServerAddressFailedCount, 1) }
  90. roundTripperSucceededCount := int32(0)
  91. roundTripperSucceded := func(RoundTripper) { atomic.AddInt32(&roundTripperSucceededCount, 1) }
  92. roundTripperFailedCount := int32(0)
  93. roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
  94. noMatch := func(RoundTripper) {}
  95. // Per-region activity tracking for testing the region activity feature.
  96. var regionTrackingMutex sync.Mutex
  97. seenCommonRegions := make(map[string]bool)
  98. seenPersonalRegions := make(map[string]bool)
  99. var totalCommonRegionBytesUp, totalCommonRegionBytesDown int64
  100. var totalPersonalRegionBytesUp, totalPersonalRegionBytesDown int64
  101. var receivedProxyMustUpgrade chan struct{}
  102. var receivedClientMustUpgrade chan struct{}
  103. if doMustUpgrade {
  104. receivedProxyMustUpgrade = make(chan struct{})
  105. receivedClientMustUpgrade = make(chan struct{})
  106. // trigger MustUpgrade
  107. minimumProxyProtocolVersion = LatestProtocolVersion + 1
  108. minimumClientProtocolVersion = LatestProtocolVersion + 1
  109. // Minimize test parameters for MustUpgrade case
  110. numProxies = 1
  111. proxyMaxClients = 1
  112. numClients = 1
  113. testDisableSTUN = true
  114. testDisablePortMapping = true
  115. }
  116. testCtx, stopTest := context.WithCancel(context.Background())
  117. defer stopTest()
  118. testGroup := new(errgroup.Group)
  119. // Enable test to run without requiring host firewall exceptions
  120. SetAllowBogonWebRTCConnections(true)
  121. defer SetAllowBogonWebRTCConnections(false)
  122. // Init logging and profiling
  123. logger := testutils.NewTestLogger()
  124. pprofListener, err := net.Listen("tcp", "127.0.0.1:0")
  125. go http.Serve(pprofListener, nil)
  126. defer pprofListener.Close()
  127. logger.WithTrace().Info(fmt.Sprintf("PPROF: http://%s/debug/pprof", pprofListener.Addr()))
  128. // Start echo servers
  129. tcpEchoListener, err := net.Listen("tcp", "127.0.0.1:0")
  130. if err != nil {
  131. return errors.Trace(err)
  132. }
  133. defer tcpEchoListener.Close()
  134. go runTCPEchoServer(tcpEchoListener)
  135. // QUIC tests UDP proxying, and provides reliable delivery of echoed data
  136. quicEchoServer, err := newQuicEchoServer()
  137. if err != nil {
  138. return errors.Trace(err)
  139. }
  140. defer quicEchoServer.Close()
  141. go quicEchoServer.Run()
  142. // Create signed server entry with capability
  143. serverPrivateKey, err := GenerateSessionPrivateKey()
  144. if err != nil {
  145. return errors.Trace(err)
  146. }
  147. serverPublicKey, err := serverPrivateKey.GetPublicKey()
  148. if err != nil {
  149. return errors.Trace(err)
  150. }
  151. serverRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  152. if err != nil {
  153. return errors.Trace(err)
  154. }
  155. serverEntry := make(protocol.ServerEntryFields)
  156. serverEntry["ipAddress"] = "127.0.0.1"
  157. _, tcpPort, _ := net.SplitHostPort(tcpEchoListener.Addr().String())
  158. _, udpPort, _ := net.SplitHostPort(quicEchoServer.Addr().String())
  159. serverEntry["inproxyOSSHPort"], _ = strconv.Atoi(tcpPort)
  160. serverEntry["inproxyQUICPort"], _ = strconv.Atoi(udpPort)
  161. serverEntry["capabilities"] = []string{"INPROXY-WEBRTC-OSSH", "INPROXY-WEBRTC-QUIC-OSSH"}
  162. serverEntry["inproxySessionPublicKey"] = base64.RawStdEncoding.EncodeToString(serverPublicKey[:])
  163. serverEntry["inproxySessionRootObfuscationSecret"] = base64.RawStdEncoding.EncodeToString(serverRootObfuscationSecret[:])
  164. testServerEntryTag := prng.HexString(16)
  165. serverEntry["tag"] = testServerEntryTag
  166. serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey, err :=
  167. protocol.NewServerEntrySignatureKeyPair()
  168. if err != nil {
  169. return errors.Trace(err)
  170. }
  171. err = serverEntry.AddSignature(serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey)
  172. if err != nil {
  173. return errors.Trace(err)
  174. }
  175. packedServerEntryFields, err := protocol.EncodePackedServerEntryFields(serverEntry)
  176. if err != nil {
  177. return errors.Trace(err)
  178. }
  179. packedDestinationServerEntry, err := protocol.CBOREncoding.Marshal(packedServerEntryFields)
  180. if err != nil {
  181. return errors.Trace(err)
  182. }
  183. // API parameter handlers
  184. apiParameterValidator := func(params common.APIParameters) error {
  185. if len(params) != len(baseAPIParameters) {
  186. return errors.TraceNew("unexpected base API parameter count")
  187. }
  188. for name, value := range params {
  189. if value.(string) != baseAPIParameters[name].(string) {
  190. return errors.Tracef(
  191. "unexpected base API parameter: %v: %v != %v",
  192. name,
  193. value.(string),
  194. baseAPIParameters[name].(string))
  195. }
  196. }
  197. return nil
  198. }
  199. apiParameterLogFieldFormatter := func(
  200. _ string, _ common.GeoIPData, params common.APIParameters) common.LogFields {
  201. logFields := common.LogFields{}
  202. logFields.Add(common.LogFields(params))
  203. return logFields
  204. }
  205. // Start broker
  206. logger.WithTrace().Info("START BROKER")
  207. brokerPrivateKey, err := GenerateSessionPrivateKey()
  208. if err != nil {
  209. return errors.Trace(err)
  210. }
  211. brokerPublicKey, err := brokerPrivateKey.GetPublicKey()
  212. if err != nil {
  213. return errors.Trace(err)
  214. }
  215. brokerRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  216. if err != nil {
  217. return errors.Trace(err)
  218. }
  219. brokerListener, err := net.Listen("tcp", "127.0.0.1:0")
  220. if err != nil {
  221. return errors.Trace(err)
  222. }
  223. defer brokerListener.Close()
  224. brokerConfig := &BrokerConfig{
  225. Logger: logger,
  226. CommonCompartmentIDs: testCommonCompartmentIDs,
  227. APIParameterValidator: apiParameterValidator,
  228. APIParameterLogFieldFormatter: apiParameterLogFieldFormatter,
  229. GetTacticsPayload: func(_ common.GeoIPData, _ common.APIParameters) ([]byte, string, error) {
  230. // Exercise both new and unchanged tactics
  231. if prng.FlipCoin() {
  232. return testNewTacticsPayload, testNewTacticsTag, nil
  233. }
  234. return testUnchangedTacticsPayload, "", nil
  235. },
  236. IsValidServerEntryTag: func(serverEntryTag string) bool { return serverEntryTag == testServerEntryTag },
  237. PrivateKey: brokerPrivateKey,
  238. ObfuscationRootSecret: brokerRootObfuscationSecret,
  239. ServerEntrySignaturePublicKey: serverEntrySignaturePublicKey,
  240. AllowProxy: func(common.GeoIPData) bool { return true },
  241. AllowClient: func(common.GeoIPData) bool { return true },
  242. AllowDomainFrontedDestinations: func(common.GeoIPData) bool { return true },
  243. AllowMatch: func(common.GeoIPData, common.GeoIPData) bool { return true },
  244. }
  245. broker, err := NewBroker(brokerConfig)
  246. if err != nil {
  247. return errors.Trace(err)
  248. }
  249. // Enable proxy quality (and otherwise use the default quality parameters)
  250. enableProxyQuality := true
  251. broker.SetProxyQualityParameters(
  252. enableProxyQuality,
  253. proxyQualityTTL,
  254. proxyQualityPendingFailedMatchDeadline,
  255. proxyQualityFailedMatchThreshold)
  256. err = broker.Start()
  257. if err != nil {
  258. return errors.Trace(err)
  259. }
  260. defer broker.Stop()
  261. testGroup.Go(func() error {
  262. err := runHTTPServer(brokerListener, broker)
  263. if testCtx.Err() != nil {
  264. return nil
  265. }
  266. return errors.Trace(err)
  267. })
  268. // Stub server broker request handler (in Psiphon, this will be the
  269. // destination Psiphon server; here, it's not necessary to build this
  270. // handler into the destination echo server)
  271. //
  272. // The stub server broker request handler also triggers a server proxy
  273. // quality request in the other direction.
  274. makeServerBrokerClientRoundTripper := func(_ SessionPublicKey) (
  275. RoundTripper, common.APIParameters, error) {
  276. return newHTTPRoundTripper(brokerListener.Addr().String(), "server"), nil, nil
  277. }
  278. serverSessionsConfig := &ServerBrokerSessionsConfig{
  279. Logger: logger,
  280. ServerPrivateKey: serverPrivateKey,
  281. ServerRootObfuscationSecret: serverRootObfuscationSecret,
  282. BrokerPublicKeys: []SessionPublicKey{brokerPublicKey},
  283. BrokerRootObfuscationSecrets: []ObfuscationSecret{brokerRootObfuscationSecret},
  284. BrokerRoundTripperMaker: makeServerBrokerClientRoundTripper,
  285. ProxyMetricsValidator: apiParameterValidator,
  286. ProxyMetricsFormatter: apiParameterLogFieldFormatter,
  287. ProxyMetricsPrefix: "",
  288. }
  289. serverSessions, err := NewServerBrokerSessions(serverSessionsConfig)
  290. if err != nil {
  291. return errors.Trace(err)
  292. }
  293. err = serverSessions.Start()
  294. if err != nil {
  295. return errors.Trace(err)
  296. }
  297. defer serverSessions.Stop()
  298. // Don't delay reporting quality.
  299. serverSessions.SetProxyQualityRequestParameters(
  300. proxyQualityReporterMaxRequestEntries,
  301. 0,
  302. proxyQualityReporterRequestTimeout,
  303. proxyQualityReporterRequestRetries)
  304. var pendingBrokerServerReportsMutex sync.Mutex
  305. pendingBrokerServerReports := make(map[ID]bool)
  306. addPendingBrokerServerReport := func(connectionID ID) {
  307. pendingBrokerServerReportsMutex.Lock()
  308. defer pendingBrokerServerReportsMutex.Unlock()
  309. pendingBrokerServerReports[connectionID] = true
  310. }
  311. removePendingBrokerServerReport := func(connectionID ID) {
  312. pendingBrokerServerReportsMutex.Lock()
  313. defer pendingBrokerServerReportsMutex.Unlock()
  314. delete(pendingBrokerServerReports, connectionID)
  315. }
  316. hasPendingBrokerServerReports := func() bool {
  317. pendingBrokerServerReportsMutex.Lock()
  318. defer pendingBrokerServerReportsMutex.Unlock()
  319. return len(pendingBrokerServerReports) > 0
  320. }
  321. serverQualityGroup := new(errgroup.Group)
  322. var serverQualityProxyIDsMutex sync.Mutex
  323. serverQualityProxyIDs := make(map[ID]struct{})
  324. testProxyASN := "65537"
  325. testClientASN := "65538"
  326. handleBrokerServerReports := func(in []byte, clientConnectionID ID) ([]byte, error) {
  327. handler := func(
  328. brokerVerifiedOriginalClientIP string,
  329. brokerReportedProxyID ID,
  330. brokerMatchedPersonalCompartments bool,
  331. logFields common.LogFields) {
  332. // Mark the report as no longer outstanding
  333. removePendingBrokerServerReport(clientConnectionID)
  334. // Trigger an asynchronous proxy quality request to the broker.
  335. // This roughly follows the Psiphon server functionality, where a
  336. // quality request is made sometime after the Psiphon handshake
  337. // completes, once tunnel quality thresholds are achieved.
  338. serverQualityGroup.Go(func() error {
  339. serverSessions.ReportQuality(
  340. brokerReportedProxyID, testProxyASN, testClientASN)
  341. serverQualityProxyIDsMutex.Lock()
  342. serverQualityProxyIDs[brokerReportedProxyID] = struct{}{}
  343. serverQualityProxyIDsMutex.Unlock()
  344. return nil
  345. })
  346. }
  347. out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
  348. return out, errors.Trace(err)
  349. }
  350. // Check that the tactics round trip succeeds
  351. var pendingProxyTacticsCallbacksMutex sync.Mutex
  352. pendingProxyTacticsCallbacks := make(map[SessionPrivateKey]bool)
  353. addPendingProxyTacticsCallback := func(proxyPrivateKey SessionPrivateKey) {
  354. pendingProxyTacticsCallbacksMutex.Lock()
  355. defer pendingProxyTacticsCallbacksMutex.Unlock()
  356. pendingProxyTacticsCallbacks[proxyPrivateKey] = true
  357. }
  358. hasPendingProxyTacticsCallbacks := func() bool {
  359. pendingProxyTacticsCallbacksMutex.Lock()
  360. defer pendingProxyTacticsCallbacksMutex.Unlock()
  361. return len(pendingProxyTacticsCallbacks) > 0
  362. }
  363. makeHandleTacticsPayload := func(
  364. proxyPrivateKey SessionPrivateKey,
  365. tacticsNetworkID string) func(_ string, _ bool, _ []byte) bool {
  366. return func(networkID string, _ bool, tacticsPayload []byte) bool {
  367. pendingProxyTacticsCallbacksMutex.Lock()
  368. defer pendingProxyTacticsCallbacksMutex.Unlock()
  369. // Check that the correct networkID is passed around; if not,
  370. // skip the delete, which will fail the test
  371. if networkID == tacticsNetworkID {
  372. // Certain state is reset when new tactics are applied -- the
  373. // return true case; exercise both cases
  374. if bytes.Equal(tacticsPayload, testNewTacticsPayload) {
  375. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  376. return true
  377. }
  378. if bytes.Equal(tacticsPayload, testUnchangedTacticsPayload) {
  379. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  380. return false
  381. }
  382. }
  383. panic("unexpected tactics payload")
  384. }
  385. }
  386. // Start proxies
  387. logger.WithTrace().Info("START PROXIES")
  388. for i := 0; i < numProxies; i++ {
  389. proxyPrivateKey, err := GenerateSessionPrivateKey()
  390. if err != nil {
  391. return errors.Trace(err)
  392. }
  393. brokerCoordinator := &testBrokerDialCoordinator{
  394. networkID: testNetworkID,
  395. networkType: testNetworkType,
  396. brokerClientPrivateKey: proxyPrivateKey,
  397. brokerPublicKey: brokerPublicKey,
  398. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  399. personalCompartmentIDs: testPersonalCompartmentIDs,
  400. brokerClientRoundTripper: newHTTPRoundTripper(
  401. brokerListener.Addr().String(), "proxy"),
  402. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  403. brokerClientRoundTripperFailed: roundTripperFailed,
  404. // Minimize the delay before proxies reannounce after dial
  405. // failures, which may occur.
  406. announceDelay: 0,
  407. announceMaxBackoffDelay: 0,
  408. announceDelayJitter: 0.0,
  409. }
  410. webRTCCoordinator := &testWebRTCDialCoordinator{
  411. networkID: testNetworkID,
  412. networkType: testNetworkType,
  413. natType: testNATType,
  414. disableSTUN: testDisableSTUN,
  415. disablePortMapping: testDisablePortMapping,
  416. stunServerAddress: testSTUNServerAddress,
  417. stunServerAddressRFC5780: testSTUNServerAddress,
  418. stunServerAddressSucceeded: stunServerAddressSucceeded,
  419. stunServerAddressFailed: stunServerAddressFailed,
  420. setNATType: func(NATType) {},
  421. setPortMappingTypes: func(PortMappingTypes) {},
  422. bindToDevice: func(int) error { return nil },
  423. // Minimize the delay before proxies reannounce after failed
  424. // connections, which may occur.
  425. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  426. proxyRelayInactivityTimeout: 5 * time.Second,
  427. }
  428. // Each proxy has its own broker client
  429. brokerClient, err := NewBrokerClient(brokerCoordinator)
  430. if err != nil {
  431. return errors.Trace(err)
  432. }
  433. tacticsNetworkID := prng.HexString(32)
  434. runCtx, cancelRun := context.WithCancel(testCtx)
  435. // No deferred cancelRun due to testGroup.Go below
  436. name := fmt.Sprintf("proxy-%d", i)
  437. proxy, err := NewProxy(&ProxyConfig{
  438. Logger: testutils.NewTestLoggerWithComponent(name),
  439. WaitForNetworkConnectivity: func() bool {
  440. return true
  441. },
  442. GetCurrentNetworkContext: func() context.Context {
  443. return currentNetworkCtx
  444. },
  445. GetBrokerClient: func() (*BrokerClient, error) {
  446. return brokerClient, nil
  447. },
  448. GetBaseAPIParameters: func(bool) (common.APIParameters, string, error) {
  449. return baseAPIParameters, tacticsNetworkID, nil
  450. },
  451. MakeWebRTCDialCoordinator: func() (WebRTCDialCoordinator, error) {
  452. return webRTCCoordinator, nil
  453. },
  454. HandleTacticsPayload: makeHandleTacticsPayload(proxyPrivateKey, tacticsNetworkID),
  455. MaxCommonClients: proxyMaxClients,
  456. MaxPersonalClients: proxyMaxClients,
  457. LimitUpstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  458. LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  459. ActivityUpdater: func(
  460. announcing int32,
  461. connectingClients int32, connectedClients int32,
  462. bytesUp int64, bytesDown int64, bytesDuration time.Duration,
  463. personalRegionActivity map[string]RegionActivitySnapshot,
  464. commonRegionActivity map[string]RegionActivitySnapshot) {
  465. fmt.Printf("[%s][%s] ACTIVITY: %d announcing, %d connecting, %d connected, %d up, %d down\n",
  466. time.Now().UTC().Format(time.RFC3339), name,
  467. announcing, connectingClients, connectedClients, bytesUp, bytesDown)
  468. regionTrackingMutex.Lock()
  469. for region, stats := range commonRegionActivity {
  470. seenCommonRegions[region] = true
  471. totalCommonRegionBytesUp += stats.BytesUp
  472. totalCommonRegionBytesDown += stats.BytesDown
  473. fmt.Printf("[%s][%s] COMMON REGION %s: connecting=%d, connected=%d, up=%d, down=%d\n",
  474. time.Now().UTC().Format(time.RFC3339), name, region,
  475. stats.ConnectingClients, stats.ConnectedClients, stats.BytesUp, stats.BytesDown)
  476. }
  477. for region, stats := range personalRegionActivity {
  478. seenPersonalRegions[region] = true
  479. totalPersonalRegionBytesUp += stats.BytesUp
  480. totalPersonalRegionBytesDown += stats.BytesDown
  481. fmt.Printf("[%s][%s] PERSONAL REGION %s: connecting=%d, connected=%d, up=%d, down=%d\n",
  482. time.Now().UTC().Format(time.RFC3339), name, region,
  483. stats.ConnectingClients, stats.ConnectedClients, stats.BytesUp, stats.BytesDown)
  484. }
  485. regionTrackingMutex.Unlock()
  486. },
  487. MustUpgrade: func() {
  488. close(receivedProxyMustUpgrade)
  489. cancelRun()
  490. },
  491. })
  492. if err != nil {
  493. return errors.Trace(err)
  494. }
  495. addPendingProxyTacticsCallback(proxyPrivateKey)
  496. testGroup.Go(func() error {
  497. proxy.Run(runCtx)
  498. return nil
  499. })
  500. }
  501. // Await proxy announcements before starting clients
  502. //
  503. // - Announcements may delay due to proxyAnnounceRetryDelay in Proxy.Run,
  504. // plus NAT discovery
  505. //
  506. // - Don't wait for > numProxies announcements due to
  507. // InitiatorSessions.NewRoundTrip waitToShareSession limitation
  508. if !doMustUpgrade {
  509. for {
  510. time.Sleep(100 * time.Millisecond)
  511. broker.matcher.announcementQueueMutex.Lock()
  512. n := broker.matcher.announcementQueue.getLen()
  513. broker.matcher.announcementQueueMutex.Unlock()
  514. if n >= numProxies {
  515. break
  516. }
  517. }
  518. }
  519. // Start clients
  520. var completedClientCount atomic.Int64
  521. logger.WithTrace().Info("START CLIENTS")
  522. clientsGroup := new(errgroup.Group)
  523. makeClientFunc := func(
  524. clientNum int,
  525. isTCP bool,
  526. brokerClient *BrokerClient,
  527. webRTCCoordinator WebRTCDialCoordinator) func() error {
  528. var networkProtocol NetworkProtocol
  529. var addr string
  530. var wrapWithQUIC bool
  531. if isTCP {
  532. networkProtocol = NetworkProtocolTCP
  533. addr = tcpEchoListener.Addr().String()
  534. } else {
  535. networkProtocol = NetworkProtocolUDP
  536. addr = quicEchoServer.Addr().String()
  537. wrapWithQUIC = true
  538. }
  539. return func() error {
  540. name := fmt.Sprintf("client-%d", clientNum)
  541. dialCtx, cancelDial := context.WithTimeout(testCtx, 60*time.Second)
  542. defer cancelDial()
  543. conn, err := DialClient(
  544. dialCtx,
  545. &ClientConfig{
  546. Logger: testutils.NewTestLoggerWithComponent(name),
  547. BaseAPIParameters: baseAPIParameters,
  548. BrokerClient: brokerClient,
  549. WebRTCDialCoordinator: webRTCCoordinator,
  550. ReliableTransport: isTCP,
  551. DialNetworkProtocol: networkProtocol,
  552. DialAddress: addr,
  553. PackedDestinationServerEntry: packedDestinationServerEntry,
  554. MustUpgrade: func() {
  555. close(receivedClientMustUpgrade)
  556. cancelDial()
  557. },
  558. })
  559. if err != nil {
  560. return errors.Trace(err)
  561. }
  562. var relayConn net.Conn
  563. relayConn = conn
  564. if wrapWithQUIC {
  565. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  566. if err != nil {
  567. return errors.Trace(err)
  568. }
  569. disablePathMTUDiscovery := true
  570. quicConn, err := quic.Dial(
  571. dialCtx,
  572. conn,
  573. udpAddr,
  574. "test",
  575. "QUICv1",
  576. nil,
  577. quicEchoServer.ObfuscationKey(),
  578. nil,
  579. nil,
  580. disablePathMTUDiscovery,
  581. GetQUICMaxPacketSizeAdjustment(),
  582. false,
  583. false,
  584. common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), ""),
  585. )
  586. if err != nil {
  587. return errors.Trace(err)
  588. }
  589. relayConn = quicConn
  590. }
  591. addPendingBrokerServerReport(conn.GetConnectionID())
  592. signalRelayComplete := make(chan struct{})
  593. clientsGroup.Go(func() error {
  594. defer close(signalRelayComplete)
  595. in := conn.InitialRelayPacket()
  596. for in != nil {
  597. out, err := handleBrokerServerReports(in, conn.GetConnectionID())
  598. if err != nil {
  599. if out == nil {
  600. return errors.Trace(err)
  601. } else {
  602. fmt.Printf("HandlePacket returned packet and error: %v\n", err)
  603. // Proceed with reset session token packet
  604. }
  605. }
  606. if out == nil {
  607. // Relay is complete
  608. break
  609. }
  610. in, err = conn.RelayPacket(testCtx, out)
  611. if err != nil {
  612. return errors.Trace(err)
  613. }
  614. }
  615. return nil
  616. })
  617. sendBytes := prng.Bytes(bytesToSend)
  618. clientsGroup.Go(func() error {
  619. for n := 0; n < bytesToSend; {
  620. m := prng.Range(1024, 32768)
  621. if bytesToSend-n < m {
  622. m = bytesToSend - n
  623. }
  624. _, err := relayConn.Write(sendBytes[n : n+m])
  625. if err != nil {
  626. return errors.Trace(err)
  627. }
  628. n += m
  629. }
  630. fmt.Printf("[%s][%s] %d bytes sent\n",
  631. time.Now().UTC().Format(time.RFC3339), name, bytesToSend)
  632. return nil
  633. })
  634. clientsGroup.Go(func() error {
  635. buf := make([]byte, 32768)
  636. n := 0
  637. for n < bytesToSend {
  638. m, err := relayConn.Read(buf)
  639. if err != nil {
  640. return errors.Trace(err)
  641. }
  642. if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
  643. return errors.Tracef(
  644. "unexpected bytes: expected at index %d, received at index %d",
  645. bytes.Index(sendBytes, buf[:m]), n)
  646. }
  647. n += m
  648. }
  649. completed := completedClientCount.Add(1)
  650. fmt.Printf("[%s][%s] %d bytes received; relay complete (%d/%d)\n",
  651. time.Now().UTC().Format(time.RFC3339), name,
  652. bytesToSend, completed, numClients)
  653. select {
  654. case <-signalRelayComplete:
  655. case <-testCtx.Done():
  656. }
  657. fmt.Printf("[%s][%s] closing\n",
  658. time.Now().UTC().Format(time.RFC3339), name)
  659. relayConn.Close()
  660. conn.Close()
  661. return nil
  662. })
  663. return nil
  664. }
  665. }
  666. newClientBrokerClient := func(
  667. disableWaitToShareSession bool,
  668. commonCompartmentIDs []ID,
  669. personalCompartmentIDs []ID) (*BrokerClient, error) {
  670. clientPrivateKey, err := GenerateSessionPrivateKey()
  671. if err != nil {
  672. return nil, errors.Trace(err)
  673. }
  674. brokerCoordinator := &testBrokerDialCoordinator{
  675. networkID: testNetworkID,
  676. networkType: testNetworkType,
  677. commonCompartmentIDs: commonCompartmentIDs,
  678. personalCompartmentIDs: personalCompartmentIDs,
  679. disableWaitToShareSession: disableWaitToShareSession,
  680. brokerClientPrivateKey: clientPrivateKey,
  681. brokerPublicKey: brokerPublicKey,
  682. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  683. brokerClientRoundTripper: newHTTPRoundTripper(
  684. brokerListener.Addr().String(), "client"),
  685. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  686. brokerClientRoundTripperFailed: roundTripperFailed,
  687. brokerClientNoMatch: noMatch,
  688. }
  689. brokerClient, err := NewBrokerClient(brokerCoordinator)
  690. if err != nil {
  691. return nil, errors.Trace(err)
  692. }
  693. return brokerClient, nil
  694. }
  695. newClientWebRTCDialCoordinator := func(
  696. isMobile bool,
  697. useMediaStreams bool) (*testWebRTCDialCoordinator, error) {
  698. clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  699. if err != nil {
  700. return nil, errors.Trace(err)
  701. }
  702. var trafficShapingParameters *TrafficShapingParameters
  703. if useMediaStreams {
  704. trafficShapingParameters = &TrafficShapingParameters{
  705. MinPaddedMessages: 0,
  706. MaxPaddedMessages: 10,
  707. MinPaddingSize: 0,
  708. MaxPaddingSize: 254,
  709. MinDecoyMessages: 0,
  710. MaxDecoyMessages: 10,
  711. MinDecoySize: 1,
  712. MaxDecoySize: 1200,
  713. DecoyMessageProbability: 0.5,
  714. }
  715. } else {
  716. trafficShapingParameters = &TrafficShapingParameters{
  717. MinPaddedMessages: 0,
  718. MaxPaddedMessages: 10,
  719. MinPaddingSize: 0,
  720. MaxPaddingSize: 1500,
  721. MinDecoyMessages: 0,
  722. MaxDecoyMessages: 10,
  723. MinDecoySize: 1,
  724. MaxDecoySize: 1500,
  725. DecoyMessageProbability: 0.5,
  726. }
  727. }
  728. webRTCCoordinator := &testWebRTCDialCoordinator{
  729. networkID: testNetworkID,
  730. networkType: testNetworkType,
  731. natType: testNATType,
  732. disableSTUN: testDisableSTUN,
  733. stunServerAddress: testSTUNServerAddress,
  734. stunServerAddressRFC5780: testSTUNServerAddress,
  735. stunServerAddressSucceeded: stunServerAddressSucceeded,
  736. stunServerAddressFailed: stunServerAddressFailed,
  737. clientRootObfuscationSecret: clientRootObfuscationSecret,
  738. doDTLSRandomization: prng.FlipCoin(),
  739. useMediaStreams: useMediaStreams,
  740. trafficShapingParameters: trafficShapingParameters,
  741. setNATType: func(NATType) {},
  742. setPortMappingTypes: func(PortMappingTypes) {},
  743. bindToDevice: func(int) error { return nil },
  744. // With STUN enabled (testDisableSTUN = false), there are cases
  745. // where the WebRTC peer connection is not successfully
  746. // established. With a short enough timeout here, clients will
  747. // redial and eventually succceed.
  748. webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
  749. }
  750. if isMobile {
  751. webRTCCoordinator.networkType = NetworkTypeMobile
  752. webRTCCoordinator.disableInboundForMobileNetworks = true
  753. }
  754. return webRTCCoordinator, nil
  755. }
  756. sharedCommonBrokerClient, err := newClientBrokerClient(false, testCommonCompartmentIDs, nil)
  757. if err != nil {
  758. return errors.Trace(err)
  759. }
  760. sharedCommonBrokerClientDisableWait, err := newClientBrokerClient(true, testCommonCompartmentIDs, nil)
  761. if err != nil {
  762. return errors.Trace(err)
  763. }
  764. sharedPersonalBrokerClient, err := newClientBrokerClient(false, nil, testPersonalCompartmentIDs)
  765. if err != nil {
  766. return errors.Trace(err)
  767. }
  768. sharedPersonalBrokerClientDisableWait, err := newClientBrokerClient(true, nil, testPersonalCompartmentIDs)
  769. if err != nil {
  770. return errors.Trace(err)
  771. }
  772. for i := 0; i < numClients; i++ {
  773. // Test a mix of TCP and UDP proxying; also test the
  774. // DisableInboundForMobileNetworks code path.
  775. isTCP := i%2 == 0
  776. isMobile := i%4 == 0
  777. useMediaStreams := i%4 < 2
  778. // First half of clients are personal, second half are common.
  779. isPersonalClient := i < numClients/2
  780. // Exercise BrokerClients shared by multiple clients, but also create
  781. // several broker clients.
  782. //
  783. // Per-region testing is handled by the HTTP server alternating regions
  784. // based on request count.
  785. var brokerClient *BrokerClient
  786. switch i % 3 {
  787. case 0:
  788. if isPersonalClient {
  789. brokerClient = sharedPersonalBrokerClient
  790. } else {
  791. brokerClient = sharedCommonBrokerClient
  792. }
  793. case 1:
  794. if isPersonalClient {
  795. brokerClient = sharedPersonalBrokerClientDisableWait
  796. } else {
  797. brokerClient = sharedCommonBrokerClientDisableWait
  798. }
  799. case 2:
  800. if isPersonalClient {
  801. brokerClient, err = newClientBrokerClient(true, nil, testPersonalCompartmentIDs)
  802. } else {
  803. brokerClient, err = newClientBrokerClient(true, testCommonCompartmentIDs, nil)
  804. }
  805. if err != nil {
  806. return errors.Trace(err)
  807. }
  808. }
  809. webRTCCoordinator, err := newClientWebRTCDialCoordinator(
  810. isMobile, useMediaStreams)
  811. if err != nil {
  812. return errors.Trace(err)
  813. }
  814. clientsGroup.Go(
  815. makeClientFunc(
  816. i,
  817. isTCP,
  818. brokerClient,
  819. webRTCCoordinator))
  820. }
  821. if doMustUpgrade {
  822. // Await MustUpgrade callbacks
  823. logger.WithTrace().Info("AWAIT MUST UPGRADE")
  824. <-receivedProxyMustUpgrade
  825. <-receivedClientMustUpgrade
  826. _ = clientsGroup.Wait()
  827. } else {
  828. // Await client transfers complete
  829. logger.WithTrace().Info("AWAIT DATA TRANSFER")
  830. err = clientsGroup.Wait()
  831. if err != nil {
  832. return errors.Trace(err)
  833. }
  834. logger.WithTrace().Info("DONE DATA TRANSFER")
  835. if hasPendingBrokerServerReports() {
  836. return errors.TraceNew("unexpected pending broker server requests")
  837. }
  838. if hasPendingProxyTacticsCallbacks() {
  839. return errors.TraceNew("unexpected pending proxy tactics callback")
  840. }
  841. err = serverQualityGroup.Wait()
  842. if err != nil {
  843. return errors.Trace(err)
  844. }
  845. // Inspect the broker's proxy quality state, to verify that the proxy
  846. // quality request was processed.
  847. //
  848. // Limitation: currently we don't check the priority
  849. // announcement _queue_, as announcements may have arrived before the
  850. // quality request, and announcements are promoted between queues.
  851. serverQualityProxyIDsMutex.Lock()
  852. defer serverQualityProxyIDsMutex.Unlock()
  853. for proxyID := range serverQualityProxyIDs {
  854. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, "") {
  855. return errors.TraceNew("unexpected missing HasQuality (no client ASN)")
  856. }
  857. if !broker.proxyQualityState.HasQuality(proxyID, testProxyASN, testClientASN) {
  858. return errors.TraceNew("unexpected missing HasQuality (with client ASN)")
  859. }
  860. }
  861. // TODO: check that elapsed time is consistent with rate limit (+/-)
  862. // Check if STUN server replay callbacks were triggered
  863. if !testDisableSTUN {
  864. if atomic.LoadInt32(&stunServerAddressSucceededCount) < 1 {
  865. return errors.TraceNew("unexpected STUN server succeeded count")
  866. }
  867. // Allow for some STUN server failures
  868. if atomic.LoadInt32(&stunServerAddressFailedCount) >= int32(numProxies/2) {
  869. return errors.TraceNew("unexpected STUN server failed count")
  870. }
  871. }
  872. // Check if RoundTripper server replay callbacks were triggered
  873. if atomic.LoadInt32(&roundTripperSucceededCount) < 1 {
  874. return errors.TraceNew("unexpected round tripper succeeded count")
  875. }
  876. if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
  877. return errors.TraceNew("unexpected round tripper failed count")
  878. }
  879. // Check per-region activity tracking
  880. regionTrackingMutex.Lock()
  881. if !seenCommonRegions["REGION-A"] {
  882. regionTrackingMutex.Unlock()
  883. return errors.TraceNew("expected to see REGION-A in common region activity")
  884. }
  885. if !seenCommonRegions["REGION-B"] {
  886. regionTrackingMutex.Unlock()
  887. return errors.TraceNew("expected to see REGION-B in common region activity")
  888. }
  889. if totalCommonRegionBytesUp == 0 {
  890. regionTrackingMutex.Unlock()
  891. return errors.TraceNew("expected non-zero per-region bytes up")
  892. }
  893. if totalCommonRegionBytesDown == 0 {
  894. regionTrackingMutex.Unlock()
  895. return errors.TraceNew("expected non-zero per-region bytes down")
  896. }
  897. if !seenPersonalRegions["REGION-A"] && !seenPersonalRegions["REGION-B"] {
  898. regionTrackingMutex.Unlock()
  899. return errors.TraceNew("expected to see personal region activity")
  900. }
  901. if totalPersonalRegionBytesUp == 0 {
  902. regionTrackingMutex.Unlock()
  903. return errors.TraceNew("expected non-zero personal per-region bytes up")
  904. }
  905. if totalPersonalRegionBytesDown == 0 {
  906. regionTrackingMutex.Unlock()
  907. return errors.TraceNew("expected non-zero personal per-region bytes down")
  908. }
  909. fmt.Printf("Per-region test passed: seen regions %v, total bytes up=%d, down=%d\n",
  910. seenCommonRegions, totalCommonRegionBytesUp, totalCommonRegionBytesDown)
  911. regionTrackingMutex.Unlock()
  912. }
  913. // Await shutdowns
  914. stopTest()
  915. brokerListener.Close()
  916. err = testGroup.Wait()
  917. if err != nil {
  918. return errors.Trace(err)
  919. }
  920. return nil
  921. }
  922. func runHTTPServer(listener net.Listener, broker *Broker) error {
  923. // Track client regions by RemoteAddr
  924. var clientRegionsMutex sync.Mutex
  925. clientRegions := make(map[string]string)
  926. var clientRegionCount atomic.Int32
  927. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  928. // For this test, clients set the path to "/client" and proxies
  929. // set the path to "/proxy" and we use that to create stub GeoIP
  930. // data to pass the not-same-ASN condition.
  931. //
  932. // For per-region testing, each client gets an alternating sticky region
  933. // (REGION-A or REGION-B) based on its RemoteAddr
  934. var geoIPData common.GeoIPData
  935. geoIPData.ASN = r.URL.Path
  936. path := r.URL.Path
  937. if strings.HasPrefix(path, "/client") {
  938. clientRegionsMutex.Lock()
  939. clientRegion, ok := clientRegions[r.RemoteAddr]
  940. if !ok {
  941. if clientRegionCount.Add(1)%2 == 0 {
  942. clientRegion = "REGION-A"
  943. } else {
  944. clientRegion = "REGION-B"
  945. }
  946. clientRegions[r.RemoteAddr] = clientRegion
  947. }
  948. clientRegionsMutex.Unlock()
  949. geoIPData.Country = clientRegion
  950. } else if strings.HasPrefix(path, "/proxy") {
  951. geoIPData.Country = "PROXY-REGION"
  952. }
  953. requestPayload, err := ioutil.ReadAll(
  954. http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
  955. if err != nil {
  956. fmt.Printf("runHTTPServer ioutil.ReadAll failed: %v\n", err)
  957. http.Error(w, "", http.StatusNotFound)
  958. return
  959. }
  960. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
  961. extendTimeout := func(timeout time.Duration) {
  962. // TODO: set insufficient initial timeout, so extension is
  963. // required for success
  964. http.NewResponseController(w).SetWriteDeadline(time.Now().Add(timeout))
  965. }
  966. responsePayload, err := broker.HandleSessionPacket(
  967. r.Context(),
  968. extendTimeout,
  969. nil,
  970. clientIP,
  971. geoIPData,
  972. requestPayload)
  973. if err != nil {
  974. fmt.Printf("runHTTPServer HandleSessionPacket failed: %v\n", err)
  975. http.Error(w, "", http.StatusNotFound)
  976. return
  977. }
  978. w.WriteHeader(http.StatusOK)
  979. w.Write(responsePayload)
  980. })
  981. // WriteTimeout will be extended via extendTimeout.
  982. httpServer := &http.Server{
  983. ReadTimeout: 10 * time.Second,
  984. WriteTimeout: 10 * time.Second,
  985. IdleTimeout: 1 * time.Minute,
  986. Handler: handler,
  987. }
  988. certificate, privateKey, _, err := common.GenerateWebServerCertificate("www.example.com")
  989. if err != nil {
  990. return errors.Trace(err)
  991. }
  992. tlsCert, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
  993. if err != nil {
  994. return errors.Trace(err)
  995. }
  996. tlsConfig := &tls.Config{
  997. Certificates: []tls.Certificate{tlsCert},
  998. }
  999. err = httpServer.Serve(tls.NewListener(listener, tlsConfig))
  1000. return errors.Trace(err)
  1001. }
  1002. type httpRoundTripper struct {
  1003. httpClient *http.Client
  1004. endpointAddr string
  1005. path string
  1006. }
  1007. func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
  1008. return &httpRoundTripper{
  1009. httpClient: &http.Client{
  1010. Transport: &http.Transport{
  1011. ForceAttemptHTTP2: true,
  1012. MaxIdleConns: 2,
  1013. IdleConnTimeout: 1 * time.Minute,
  1014. TLSHandshakeTimeout: 10 * time.Second,
  1015. TLSClientConfig: &std_tls.Config{
  1016. InsecureSkipVerify: true,
  1017. },
  1018. },
  1019. },
  1020. endpointAddr: endpointAddr,
  1021. path: path,
  1022. }
  1023. }
  1024. func (r *httpRoundTripper) RoundTrip(
  1025. ctx context.Context,
  1026. roundTripDelay time.Duration,
  1027. roundTripTimeout time.Duration,
  1028. requestPayload []byte) ([]byte, error) {
  1029. if roundTripDelay > 0 {
  1030. common.SleepWithContext(ctx, roundTripDelay)
  1031. }
  1032. requestCtx, requestCancelFunc := context.WithTimeout(ctx, roundTripTimeout)
  1033. defer requestCancelFunc()
  1034. url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
  1035. request, err := http.NewRequestWithContext(
  1036. requestCtx, "POST", url, bytes.NewReader(requestPayload))
  1037. if err != nil {
  1038. return nil, errors.Trace(err)
  1039. }
  1040. response, err := r.httpClient.Do(request)
  1041. if err != nil {
  1042. return nil, errors.Trace(err)
  1043. }
  1044. defer response.Body.Close()
  1045. if response.StatusCode != http.StatusOK {
  1046. return nil, errors.Tracef("unexpected response status code: %d", response.StatusCode)
  1047. }
  1048. responsePayload, err := io.ReadAll(response.Body)
  1049. if err != nil {
  1050. return nil, errors.Trace(err)
  1051. }
  1052. return responsePayload, nil
  1053. }
  1054. func (r *httpRoundTripper) Close() error {
  1055. r.httpClient.CloseIdleConnections()
  1056. return nil
  1057. }
  1058. func runTCPEchoServer(listener net.Listener) {
  1059. for {
  1060. conn, err := listener.Accept()
  1061. if err != nil {
  1062. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  1063. return
  1064. }
  1065. go func(conn net.Conn) {
  1066. buf := make([]byte, 32768)
  1067. for {
  1068. n, err := conn.Read(buf)
  1069. if n > 0 {
  1070. _, err = conn.Write(buf[:n])
  1071. }
  1072. if err != nil {
  1073. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  1074. return
  1075. }
  1076. }
  1077. }(conn)
  1078. }
  1079. }
  1080. type quicEchoServer struct {
  1081. listener net.Listener
  1082. obfuscationKey string
  1083. }
  1084. func newQuicEchoServer() (*quicEchoServer, error) {
  1085. obfuscationKey := prng.HexString(32)
  1086. listener, err := quic.Listen(
  1087. nil,
  1088. nil,
  1089. "127.0.0.1:0",
  1090. true,
  1091. GetQUICMaxPacketSizeAdjustment(),
  1092. obfuscationKey,
  1093. false)
  1094. if err != nil {
  1095. return nil, errors.Trace(err)
  1096. }
  1097. return &quicEchoServer{
  1098. listener: listener,
  1099. obfuscationKey: obfuscationKey,
  1100. }, nil
  1101. }
  1102. func (q *quicEchoServer) ObfuscationKey() string {
  1103. return q.obfuscationKey
  1104. }
  1105. func (q *quicEchoServer) Close() error {
  1106. return q.listener.Close()
  1107. }
  1108. func (q *quicEchoServer) Addr() net.Addr {
  1109. return q.listener.Addr()
  1110. }
  1111. func (q *quicEchoServer) Run() {
  1112. for {
  1113. conn, err := q.listener.Accept()
  1114. if err != nil {
  1115. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  1116. return
  1117. }
  1118. go func(conn net.Conn) {
  1119. buf := make([]byte, 32768)
  1120. for {
  1121. n, err := conn.Read(buf)
  1122. if n > 0 {
  1123. _, err = conn.Write(buf[:n])
  1124. }
  1125. if err != nil {
  1126. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  1127. return
  1128. }
  1129. }
  1130. }(conn)
  1131. }
  1132. }