inproxy_test.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/tls"
  24. "encoding/base64"
  25. "fmt"
  26. "io"
  27. "io/ioutil"
  28. "net"
  29. "net/http"
  30. _ "net/http/pprof"
  31. "strconv"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "testing"
  36. "time"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  42. "golang.org/x/sync/errgroup"
  43. )
  44. func TestInproxy(t *testing.T) {
  45. err := runTestInproxy()
  46. if err != nil {
  47. t.Errorf(errors.Trace(err).Error())
  48. }
  49. }
  50. func runTestInproxy() error {
  51. // Note: use the environment variable PION_LOG_TRACE=all to emit WebRTC logging.
  52. numProxies := 5
  53. proxyMaxClients := 3
  54. numClients := 10
  55. bytesToSend := 1 << 20
  56. targetElapsedSeconds := 2
  57. baseAPIParameters := common.APIParameters{
  58. "sponsor_id": strings.ToUpper(prng.HexString(8)),
  59. "client_platform": "test-client-platform",
  60. }
  61. testCompartmentID, _ := MakeID()
  62. testCommonCompartmentIDs := []ID{testCompartmentID}
  63. testNetworkID := "NETWORK-ID-1"
  64. testNetworkType := NetworkTypeUnknown
  65. testNATType := NATTypeUnknown
  66. testSTUNServerAddress := "stun.nextcloud.com:443"
  67. testDisableSTUN := false
  68. testNewTacticsPayload := []byte(prng.HexString(100))
  69. testNewTacticsTag := "new-tactics-tag"
  70. testUnchangedTacticsPayload := []byte(prng.HexString(100))
  71. // TODO: test port mapping
  72. stunServerAddressSucceededCount := int32(0)
  73. stunServerAddressSucceeded := func(bool, string) { atomic.AddInt32(&stunServerAddressSucceededCount, 1) }
  74. stunServerAddressFailedCount := int32(0)
  75. stunServerAddressFailed := func(bool, string) { atomic.AddInt32(&stunServerAddressFailedCount, 1) }
  76. roundTripperSucceededCount := int32(0)
  77. roundTripperSucceded := func(RoundTripper) { atomic.AddInt32(&roundTripperSucceededCount, 1) }
  78. roundTripperFailedCount := int32(0)
  79. roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
  80. testCtx, stopTest := context.WithCancel(context.Background())
  81. defer stopTest()
  82. testGroup := new(errgroup.Group)
  83. // Enable test to run without requiring host firewall exceptions
  84. SetAllowBogonWebRTCConnections(true)
  85. // Init logging and profiling
  86. logger := newTestLogger()
  87. pprofListener, err := net.Listen("tcp", "127.0.0.1:0")
  88. go http.Serve(pprofListener, nil)
  89. defer pprofListener.Close()
  90. logger.WithTrace().Info(fmt.Sprintf("PPROF: http://%s/debug/pprof", pprofListener.Addr()))
  91. // Start echo servers
  92. tcpEchoListener, err := net.Listen("tcp", "127.0.0.1:0")
  93. if err != nil {
  94. return errors.Trace(err)
  95. }
  96. defer tcpEchoListener.Close()
  97. go runTCPEchoServer(tcpEchoListener)
  98. // QUIC tests UDP proxying, and provides reliable delivery of echoed data
  99. quicEchoServer, err := newQuicEchoServer()
  100. if err != nil {
  101. return errors.Trace(err)
  102. }
  103. defer quicEchoServer.Close()
  104. go quicEchoServer.Run()
  105. // Create signed server entry with capability
  106. serverPrivateKey, err := GenerateSessionPrivateKey()
  107. if err != nil {
  108. return errors.Trace(err)
  109. }
  110. serverPublicKey, err := serverPrivateKey.GetPublicKey()
  111. if err != nil {
  112. return errors.Trace(err)
  113. }
  114. serverRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  115. if err != nil {
  116. return errors.Trace(err)
  117. }
  118. serverEntry := make(protocol.ServerEntryFields)
  119. serverEntry["ipAddress"] = "127.0.0.1"
  120. _, tcpPort, _ := net.SplitHostPort(tcpEchoListener.Addr().String())
  121. _, udpPort, _ := net.SplitHostPort(quicEchoServer.Addr().String())
  122. serverEntry["inproxyOSSHPort"], _ = strconv.Atoi(tcpPort)
  123. serverEntry["inproxyQUICPort"], _ = strconv.Atoi(udpPort)
  124. serverEntry["capabilities"] = []string{"INPROXY-WEBRTC-OSSH", "INPROXY-WEBRTC-QUIC-OSSH"}
  125. serverEntry["inproxySessionPublicKey"] = base64.RawStdEncoding.EncodeToString(serverPublicKey[:])
  126. serverEntry["inproxySessionRootObfuscationSecret"] = base64.RawStdEncoding.EncodeToString(serverRootObfuscationSecret[:])
  127. testServerEntryTag := prng.HexString(16)
  128. serverEntry["tag"] = testServerEntryTag
  129. serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey, err :=
  130. protocol.NewServerEntrySignatureKeyPair()
  131. if err != nil {
  132. return errors.Trace(err)
  133. }
  134. err = serverEntry.AddSignature(serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey)
  135. if err != nil {
  136. return errors.Trace(err)
  137. }
  138. packedServerEntryFields, err := protocol.EncodePackedServerEntryFields(serverEntry)
  139. if err != nil {
  140. return errors.Trace(err)
  141. }
  142. packedDestinationServerEntry, err := protocol.CBOREncoding.Marshal(packedServerEntryFields)
  143. if err != nil {
  144. return errors.Trace(err)
  145. }
  146. // Start broker
  147. logger.WithTrace().Info("START BROKER")
  148. brokerPrivateKey, err := GenerateSessionPrivateKey()
  149. if err != nil {
  150. return errors.Trace(err)
  151. }
  152. brokerPublicKey, err := brokerPrivateKey.GetPublicKey()
  153. if err != nil {
  154. return errors.Trace(err)
  155. }
  156. brokerRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  157. if err != nil {
  158. return errors.Trace(err)
  159. }
  160. brokerListener, err := net.Listen("tcp", "127.0.0.1:0")
  161. if err != nil {
  162. return errors.Trace(err)
  163. }
  164. defer brokerListener.Close()
  165. brokerConfig := &BrokerConfig{
  166. Logger: logger,
  167. CommonCompartmentIDs: testCommonCompartmentIDs,
  168. APIParameterValidator: func(params common.APIParameters) error {
  169. if len(params) != len(baseAPIParameters) {
  170. return errors.TraceNew("unexpected base API parameter count")
  171. }
  172. for name, value := range params {
  173. if value.(string) != baseAPIParameters[name].(string) {
  174. return errors.Tracef(
  175. "unexpected base API parameter: %v: %v != %v",
  176. name,
  177. value.(string),
  178. baseAPIParameters[name].(string))
  179. }
  180. }
  181. return nil
  182. },
  183. APIParameterLogFieldFormatter: func(
  184. geoIPData common.GeoIPData, params common.APIParameters) common.LogFields {
  185. return common.LogFields(params)
  186. },
  187. GetTactics: func(_ common.GeoIPData, _ common.APIParameters) ([]byte, string, error) {
  188. // Exercise both new and unchanged tactics
  189. if prng.FlipCoin() {
  190. return testNewTacticsPayload, testNewTacticsTag, nil
  191. }
  192. return testUnchangedTacticsPayload, "", nil
  193. },
  194. IsValidServerEntryTag: func(serverEntryTag string) bool { return serverEntryTag == testServerEntryTag },
  195. PrivateKey: brokerPrivateKey,
  196. ObfuscationRootSecret: brokerRootObfuscationSecret,
  197. ServerEntrySignaturePublicKey: serverEntrySignaturePublicKey,
  198. AllowProxy: func(common.GeoIPData) bool { return true },
  199. AllowClient: func(common.GeoIPData) bool { return true },
  200. AllowDomainFrontedDestinations: func(common.GeoIPData) bool { return true },
  201. }
  202. broker, err := NewBroker(brokerConfig)
  203. if err != nil {
  204. return errors.Trace(err)
  205. }
  206. err = broker.Start()
  207. if err != nil {
  208. return errors.Trace(err)
  209. }
  210. defer broker.Stop()
  211. testGroup.Go(func() error {
  212. err := runHTTPServer(brokerListener, broker)
  213. if testCtx.Err() != nil {
  214. return nil
  215. }
  216. return errors.Trace(err)
  217. })
  218. // Stub server broker request handler (in Psiphon, this will be the
  219. // destination Psiphon server; here, it's not necessary to build this
  220. // handler into the destination echo server)
  221. serverSessions, err := NewServerBrokerSessions(
  222. serverPrivateKey, serverRootObfuscationSecret, []SessionPublicKey{brokerPublicKey})
  223. if err != nil {
  224. return errors.Trace(err)
  225. }
  226. var pendingBrokerServerReportsMutex sync.Mutex
  227. pendingBrokerServerReports := make(map[ID]bool)
  228. addPendingBrokerServerReport := func(connectionID ID) {
  229. pendingBrokerServerReportsMutex.Lock()
  230. defer pendingBrokerServerReportsMutex.Unlock()
  231. pendingBrokerServerReports[connectionID] = true
  232. }
  233. hasPendingBrokerServerReports := func() bool {
  234. pendingBrokerServerReportsMutex.Lock()
  235. defer pendingBrokerServerReportsMutex.Unlock()
  236. return len(pendingBrokerServerReports) > 0
  237. }
  238. handleBrokerServerReports := func(in []byte, clientConnectionID ID) ([]byte, error) {
  239. handler := func(brokerVerifiedOriginalClientIP string, logFields common.LogFields) {
  240. pendingBrokerServerReportsMutex.Lock()
  241. defer pendingBrokerServerReportsMutex.Unlock()
  242. // Mark the report as no longer outstanding
  243. delete(pendingBrokerServerReports, clientConnectionID)
  244. }
  245. out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
  246. return out, errors.Trace(err)
  247. }
  248. // Check that the tactics round trip succeeds
  249. var pendingProxyTacticsCallbacksMutex sync.Mutex
  250. pendingProxyTacticsCallbacks := make(map[SessionPrivateKey]bool)
  251. addPendingProxyTacticsCallback := func(proxyPrivateKey SessionPrivateKey) {
  252. pendingProxyTacticsCallbacksMutex.Lock()
  253. defer pendingProxyTacticsCallbacksMutex.Unlock()
  254. pendingProxyTacticsCallbacks[proxyPrivateKey] = true
  255. }
  256. hasPendingProxyTacticsCallbacks := func() bool {
  257. pendingProxyTacticsCallbacksMutex.Lock()
  258. defer pendingProxyTacticsCallbacksMutex.Unlock()
  259. return len(pendingProxyTacticsCallbacks) > 0
  260. }
  261. makeHandleTacticsPayload := func(
  262. proxyPrivateKey SessionPrivateKey,
  263. tacticsNetworkID string) func(_ string, _ []byte) bool {
  264. return func(networkID string, tacticsPayload []byte) bool {
  265. pendingProxyTacticsCallbacksMutex.Lock()
  266. defer pendingProxyTacticsCallbacksMutex.Unlock()
  267. // Check that the correct networkID is passed around; if not,
  268. // skip the delete, which will fail the test
  269. if networkID == tacticsNetworkID {
  270. // Certain state is reset when new tactics are applied -- the
  271. // return true case; exercise both cases
  272. if bytes.Equal(tacticsPayload, testNewTacticsPayload) {
  273. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  274. return true
  275. }
  276. if bytes.Equal(tacticsPayload, testUnchangedTacticsPayload) {
  277. delete(pendingProxyTacticsCallbacks, proxyPrivateKey)
  278. return false
  279. }
  280. }
  281. panic("unexpected tactics payload")
  282. }
  283. }
  284. // Start proxies
  285. logger.WithTrace().Info("START PROXIES")
  286. for i := 0; i < numProxies; i++ {
  287. proxyPrivateKey, err := GenerateSessionPrivateKey()
  288. if err != nil {
  289. return errors.Trace(err)
  290. }
  291. brokerCoordinator := &testBrokerDialCoordinator{
  292. networkID: testNetworkID,
  293. networkType: testNetworkType,
  294. brokerClientPrivateKey: proxyPrivateKey,
  295. brokerPublicKey: brokerPublicKey,
  296. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  297. brokerClientRoundTripper: newHTTPRoundTripper(
  298. brokerListener.Addr().String(), "proxy"),
  299. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  300. brokerClientRoundTripperFailed: roundTripperFailed,
  301. }
  302. webRTCCoordinator := &testWebRTCDialCoordinator{
  303. networkID: testNetworkID,
  304. networkType: testNetworkType,
  305. natType: testNATType,
  306. disableSTUN: testDisableSTUN,
  307. stunServerAddress: testSTUNServerAddress,
  308. stunServerAddressRFC5780: testSTUNServerAddress,
  309. stunServerAddressSucceeded: stunServerAddressSucceeded,
  310. stunServerAddressFailed: stunServerAddressFailed,
  311. setNATType: func(NATType) {},
  312. setPortMappingTypes: func(PortMappingTypes) {},
  313. bindToDevice: func(int) error { return nil },
  314. }
  315. // Each proxy has its own broker client
  316. brokerClient, err := NewBrokerClient(brokerCoordinator)
  317. if err != nil {
  318. return errors.Trace(err)
  319. }
  320. tacticsNetworkID := prng.HexString(32)
  321. proxy, err := NewProxy(&ProxyConfig{
  322. Logger: logger,
  323. GetBrokerClient: func() (*BrokerClient, error) {
  324. return brokerClient, nil
  325. },
  326. GetBaseAPIParameters: func() (common.APIParameters, string, error) {
  327. return baseAPIParameters, tacticsNetworkID, nil
  328. },
  329. MakeWebRTCDialCoordinator: func() (WebRTCDialCoordinator, error) {
  330. return webRTCCoordinator, nil
  331. },
  332. HandleTacticsPayload: makeHandleTacticsPayload(proxyPrivateKey, tacticsNetworkID),
  333. MaxClients: proxyMaxClients,
  334. LimitUpstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  335. LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  336. ActivityUpdater: func(connectingClients int32, connectedClients int32,
  337. bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
  338. fmt.Printf("[%s] ACTIVITY: %d connecting, %d connected, %d up, %d down\n",
  339. time.Now().UTC().Format(time.RFC3339),
  340. connectingClients, connectedClients, bytesUp, bytesDown)
  341. },
  342. })
  343. if err != nil {
  344. return errors.Trace(err)
  345. }
  346. addPendingProxyTacticsCallback(proxyPrivateKey)
  347. testGroup.Go(func() error {
  348. proxy.Run(testCtx)
  349. return nil
  350. })
  351. }
  352. // Await proxy announcements before starting clients
  353. //
  354. // - Announcements may delay due to proxyAnnounceRetryDelay in Proxy.Run,
  355. // plus NAT discovery
  356. //
  357. // - Don't wait for > numProxies announcements due to
  358. // InitiatorSessions.NewRoundTrip waitToShareSession limitation
  359. for {
  360. time.Sleep(100 * time.Millisecond)
  361. broker.matcher.announcementQueueMutex.Lock()
  362. n := broker.matcher.announcementQueue.Len()
  363. broker.matcher.announcementQueueMutex.Unlock()
  364. if n >= numProxies {
  365. break
  366. }
  367. }
  368. // Start clients
  369. logger.WithTrace().Info("START CLIENTS")
  370. clientsGroup := new(errgroup.Group)
  371. makeClientFunc := func(
  372. isTCP bool,
  373. isMobile bool,
  374. brokerClient *BrokerClient,
  375. webRTCCoordinator WebRTCDialCoordinator) func() error {
  376. var networkProtocol NetworkProtocol
  377. var addr string
  378. var wrapWithQUIC bool
  379. if isTCP {
  380. networkProtocol = NetworkProtocolTCP
  381. addr = tcpEchoListener.Addr().String()
  382. } else {
  383. networkProtocol = NetworkProtocolUDP
  384. addr = quicEchoServer.Addr().String()
  385. wrapWithQUIC = true
  386. }
  387. return func() error {
  388. dialCtx, cancelDial := context.WithTimeout(testCtx, 60*time.Second)
  389. defer cancelDial()
  390. conn, err := DialClient(
  391. dialCtx,
  392. &ClientConfig{
  393. Logger: logger,
  394. BaseAPIParameters: baseAPIParameters,
  395. BrokerClient: brokerClient,
  396. WebRTCDialCoordinator: webRTCCoordinator,
  397. ReliableTransport: isTCP,
  398. DialNetworkProtocol: networkProtocol,
  399. DialAddress: addr,
  400. PackedDestinationServerEntry: packedDestinationServerEntry,
  401. })
  402. if err != nil {
  403. return errors.Trace(err)
  404. }
  405. var relayConn net.Conn
  406. relayConn = conn
  407. if wrapWithQUIC {
  408. quicConn, err := quic.Dial(
  409. dialCtx,
  410. conn,
  411. &net.UDPAddr{Port: 1}, // This address is ignored, but the zero value is not allowed
  412. "test", "QUICv1", nil, quicEchoServer.ObfuscationKey(), nil, nil, true)
  413. if err != nil {
  414. return errors.Trace(err)
  415. }
  416. relayConn = quicConn
  417. }
  418. addPendingBrokerServerReport(conn.GetConnectionID())
  419. signalRelayComplete := make(chan struct{})
  420. clientsGroup.Go(func() error {
  421. defer close(signalRelayComplete)
  422. in := conn.InitialRelayPacket()
  423. for in != nil {
  424. out, err := handleBrokerServerReports(in, conn.GetConnectionID())
  425. if err != nil {
  426. if out == nil {
  427. return errors.Trace(err)
  428. } else {
  429. fmt.Printf("HandlePacket returned packet and error: %v\n", err)
  430. // Proceed with reset session token packet
  431. }
  432. }
  433. if out == nil {
  434. // Relay is complete
  435. break
  436. }
  437. in, err = conn.RelayPacket(testCtx, out)
  438. if err != nil {
  439. return errors.Trace(err)
  440. }
  441. }
  442. return nil
  443. })
  444. sendBytes := prng.Bytes(bytesToSend)
  445. clientsGroup.Go(func() error {
  446. for n := 0; n < bytesToSend; {
  447. m := prng.Range(1024, 32768)
  448. if bytesToSend-n < m {
  449. m = bytesToSend - n
  450. }
  451. _, err := relayConn.Write(sendBytes[n : n+m])
  452. if err != nil {
  453. return errors.Trace(err)
  454. }
  455. n += m
  456. }
  457. fmt.Printf("%d bytes sent\n", bytesToSend)
  458. return nil
  459. })
  460. clientsGroup.Go(func() error {
  461. buf := make([]byte, 32768)
  462. n := 0
  463. for n < bytesToSend {
  464. m, err := relayConn.Read(buf)
  465. if err != nil {
  466. return errors.Trace(err)
  467. }
  468. if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
  469. return errors.Tracef(
  470. "unexpected bytes: expected at index %d, received at index %d",
  471. bytes.Index(sendBytes, buf[:m]), n)
  472. }
  473. n += m
  474. }
  475. fmt.Printf("%d bytes received\n", bytesToSend)
  476. select {
  477. case <-signalRelayComplete:
  478. case <-testCtx.Done():
  479. }
  480. relayConn.Close()
  481. conn.Close()
  482. return nil
  483. })
  484. return nil
  485. }
  486. }
  487. newClientParams := func(isMobile bool) (*BrokerClient, *testWebRTCDialCoordinator, error) {
  488. clientPrivateKey, err := GenerateSessionPrivateKey()
  489. if err != nil {
  490. return nil, nil, errors.Trace(err)
  491. }
  492. clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  493. if err != nil {
  494. return nil, nil, errors.Trace(err)
  495. }
  496. brokerCoordinator := &testBrokerDialCoordinator{
  497. networkID: testNetworkID,
  498. networkType: testNetworkType,
  499. commonCompartmentIDs: testCommonCompartmentIDs,
  500. brokerClientPrivateKey: clientPrivateKey,
  501. brokerPublicKey: brokerPublicKey,
  502. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  503. brokerClientRoundTripper: newHTTPRoundTripper(
  504. brokerListener.Addr().String(), "client"),
  505. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  506. brokerClientRoundTripperFailed: roundTripperFailed,
  507. }
  508. webRTCCoordinator := &testWebRTCDialCoordinator{
  509. networkID: testNetworkID,
  510. networkType: testNetworkType,
  511. natType: testNATType,
  512. disableSTUN: testDisableSTUN,
  513. stunServerAddress: testSTUNServerAddress,
  514. stunServerAddressRFC5780: testSTUNServerAddress,
  515. stunServerAddressSucceeded: stunServerAddressSucceeded,
  516. stunServerAddressFailed: stunServerAddressFailed,
  517. clientRootObfuscationSecret: clientRootObfuscationSecret,
  518. doDTLSRandomization: prng.FlipCoin(),
  519. trafficShapingParameters: &DataChannelTrafficShapingParameters{
  520. MinPaddedMessages: 0,
  521. MaxPaddedMessages: 10,
  522. MinPaddingSize: 0,
  523. MaxPaddingSize: 1500,
  524. MinDecoyMessages: 0,
  525. MaxDecoyMessages: 10,
  526. MinDecoySize: 1,
  527. MaxDecoySize: 1500,
  528. DecoyMessageProbability: 0.5,
  529. },
  530. setNATType: func(NATType) {},
  531. setPortMappingTypes: func(PortMappingTypes) {},
  532. bindToDevice: func(int) error { return nil },
  533. // With STUN enabled (testDisableSTUN = false), there are cases
  534. // where the WebRTC Data Channel is not successfully established.
  535. // With a short enough timeout here, clients will redial and
  536. // eventually succceed.
  537. webRTCAwaitDataChannelTimeout: 5 * time.Second,
  538. }
  539. if isMobile {
  540. webRTCCoordinator.networkType = NetworkTypeMobile
  541. webRTCCoordinator.disableInboundForMobleNetworks = true
  542. }
  543. brokerClient, err := NewBrokerClient(brokerCoordinator)
  544. if err != nil {
  545. return nil, nil, errors.Trace(err)
  546. }
  547. return brokerClient, webRTCCoordinator, nil
  548. }
  549. clientBrokerClient, clientWebRTCCoordinator, err := newClientParams(false)
  550. if err != nil {
  551. return errors.Trace(err)
  552. }
  553. clientMobileBrokerClient, clientMobileWebRTCCoordinator, err := newClientParams(true)
  554. if err != nil {
  555. return errors.Trace(err)
  556. }
  557. for i := 0; i < numClients; i++ {
  558. // Test a mix of TCP and UDP proxying; also test the
  559. // DisableInboundForMobleNetworks code path.
  560. isTCP := i%2 == 0
  561. isMobile := i%4 == 0
  562. // Exercise BrokerClients shared by multiple clients, but also create
  563. // several broker clients.
  564. if i%8 == 0 {
  565. clientBrokerClient, clientWebRTCCoordinator, err = newClientParams(false)
  566. if err != nil {
  567. return errors.Trace(err)
  568. }
  569. clientMobileBrokerClient, clientMobileWebRTCCoordinator, err = newClientParams(true)
  570. if err != nil {
  571. return errors.Trace(err)
  572. }
  573. }
  574. brokerClient := clientBrokerClient
  575. webRTCCoordinator := clientWebRTCCoordinator
  576. if isMobile {
  577. brokerClient = clientMobileBrokerClient
  578. webRTCCoordinator = clientMobileWebRTCCoordinator
  579. }
  580. clientsGroup.Go(makeClientFunc(isTCP, isMobile, brokerClient, webRTCCoordinator))
  581. }
  582. // Await client transfers complete
  583. logger.WithTrace().Info("AWAIT DATA TRANSFER")
  584. err = clientsGroup.Wait()
  585. if err != nil {
  586. return errors.Trace(err)
  587. }
  588. logger.WithTrace().Info("DONE DATA TRANSFER")
  589. if hasPendingBrokerServerReports() {
  590. return errors.TraceNew("unexpected pending broker server requests")
  591. }
  592. if hasPendingProxyTacticsCallbacks() {
  593. return errors.TraceNew("unexpected pending proxy tactics callback")
  594. }
  595. // TODO: check that elapsed time is consistent with rate limit (+/-)
  596. // Check if STUN server replay callbacks were triggered
  597. if !testDisableSTUN {
  598. if atomic.LoadInt32(&stunServerAddressSucceededCount) < 1 {
  599. return errors.TraceNew("unexpected STUN server succeeded count")
  600. }
  601. }
  602. if atomic.LoadInt32(&stunServerAddressFailedCount) > 0 {
  603. return errors.TraceNew("unexpected STUN server failed count")
  604. }
  605. // Check if RoundTripper server replay callbacks were triggered
  606. if atomic.LoadInt32(&roundTripperSucceededCount) < 1 {
  607. return errors.TraceNew("unexpected round tripper succeeded count")
  608. }
  609. if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
  610. return errors.TraceNew("unexpected round tripper failed count")
  611. }
  612. // Await shutdowns
  613. stopTest()
  614. brokerListener.Close()
  615. err = testGroup.Wait()
  616. if err != nil {
  617. return errors.Trace(err)
  618. }
  619. return nil
  620. }
  621. func runHTTPServer(listener net.Listener, broker *Broker) error {
  622. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  623. // For this test, clients set the path to "/client" and proxies
  624. // set the path to "/proxy" and we use that to create stub GeoIP
  625. // data to pass the not-same-ASN condition.
  626. var geoIPData common.GeoIPData
  627. geoIPData.ASN = r.URL.Path
  628. requestPayload, err := ioutil.ReadAll(
  629. http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
  630. if err != nil {
  631. fmt.Printf("runHTTPServer ioutil.ReadAll failed: %v\n", err)
  632. http.Error(w, "", http.StatusNotFound)
  633. return
  634. }
  635. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
  636. extendTimeout := func(timeout time.Duration) {
  637. // TODO: set insufficient initial timeout, so extension is
  638. // required for success
  639. http.NewResponseController(w).SetWriteDeadline(time.Now().Add(timeout))
  640. }
  641. responsePayload, err := broker.HandleSessionPacket(
  642. r.Context(),
  643. extendTimeout,
  644. nil,
  645. clientIP,
  646. geoIPData,
  647. requestPayload)
  648. if err != nil {
  649. fmt.Printf("runHTTPServer HandleSessionPacket failed: %v\n", err)
  650. http.Error(w, "", http.StatusNotFound)
  651. return
  652. }
  653. w.WriteHeader(http.StatusOK)
  654. w.Write(responsePayload)
  655. })
  656. // WriteTimeout will be extended via extendTimeout.
  657. httpServer := &http.Server{
  658. ReadTimeout: 10 * time.Second,
  659. WriteTimeout: 10 * time.Second,
  660. IdleTimeout: 1 * time.Minute,
  661. Handler: handler,
  662. }
  663. certificate, privateKey, _, err := common.GenerateWebServerCertificate("www.example.com")
  664. if err != nil {
  665. return errors.Trace(err)
  666. }
  667. tlsCert, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
  668. if err != nil {
  669. return errors.Trace(err)
  670. }
  671. tlsConfig := &tls.Config{
  672. Certificates: []tls.Certificate{tlsCert},
  673. }
  674. err = httpServer.Serve(tls.NewListener(listener, tlsConfig))
  675. return errors.Trace(err)
  676. }
  677. type httpRoundTripper struct {
  678. httpClient *http.Client
  679. endpointAddr string
  680. path string
  681. }
  682. func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
  683. return &httpRoundTripper{
  684. httpClient: &http.Client{
  685. Transport: &http.Transport{
  686. ForceAttemptHTTP2: true,
  687. MaxIdleConns: 2,
  688. IdleConnTimeout: 1 * time.Minute,
  689. TLSHandshakeTimeout: 10 * time.Second,
  690. TLSClientConfig: &tls.Config{
  691. InsecureSkipVerify: true,
  692. },
  693. },
  694. },
  695. endpointAddr: endpointAddr,
  696. path: path,
  697. }
  698. }
  699. func (r *httpRoundTripper) RoundTrip(
  700. ctx context.Context, requestPayload []byte) ([]byte, error) {
  701. url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
  702. request, err := http.NewRequestWithContext(
  703. ctx, "POST", url, bytes.NewReader(requestPayload))
  704. if err != nil {
  705. return nil, errors.Trace(err)
  706. }
  707. response, err := r.httpClient.Do(request)
  708. if err != nil {
  709. return nil, errors.Trace(err)
  710. }
  711. defer response.Body.Close()
  712. if response.StatusCode != http.StatusOK {
  713. return nil, errors.Tracef("unexpected response status code: %d", response.StatusCode)
  714. }
  715. responsePayload, err := io.ReadAll(response.Body)
  716. if err != nil {
  717. return nil, errors.Trace(err)
  718. }
  719. return responsePayload, nil
  720. }
  721. func (r *httpRoundTripper) Close() error {
  722. r.httpClient.CloseIdleConnections()
  723. return nil
  724. }
  725. func runTCPEchoServer(listener net.Listener) {
  726. for {
  727. conn, err := listener.Accept()
  728. if err != nil {
  729. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  730. return
  731. }
  732. go func(conn net.Conn) {
  733. buf := make([]byte, 32768)
  734. for {
  735. n, err := conn.Read(buf)
  736. if n > 0 {
  737. _, err = conn.Write(buf[:n])
  738. }
  739. if err != nil {
  740. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  741. return
  742. }
  743. }
  744. }(conn)
  745. }
  746. }
  747. type quicEchoServer struct {
  748. listener net.Listener
  749. obfuscationKey string
  750. }
  751. func newQuicEchoServer() (*quicEchoServer, error) {
  752. obfuscationKey := prng.HexString(32)
  753. listener, err := quic.Listen(
  754. nil,
  755. nil,
  756. "127.0.0.1:0",
  757. obfuscationKey,
  758. false)
  759. if err != nil {
  760. return nil, errors.Trace(err)
  761. }
  762. return &quicEchoServer{
  763. listener: listener,
  764. obfuscationKey: obfuscationKey,
  765. }, nil
  766. }
  767. func (q *quicEchoServer) ObfuscationKey() string {
  768. return q.obfuscationKey
  769. }
  770. func (q *quicEchoServer) Close() error {
  771. return q.listener.Close()
  772. }
  773. func (q *quicEchoServer) Addr() net.Addr {
  774. return q.listener.Addr()
  775. }
  776. func (q *quicEchoServer) Run() {
  777. for {
  778. conn, err := q.listener.Accept()
  779. if err != nil {
  780. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  781. return
  782. }
  783. go func(conn net.Conn) {
  784. buf := make([]byte, 32768)
  785. for {
  786. n, err := conn.Read(buf)
  787. if n > 0 {
  788. _, err = conn.Write(buf[:n])
  789. }
  790. if err != nil {
  791. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  792. return
  793. }
  794. }
  795. }(conn)
  796. }
  797. }