inproxy_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/tls"
  24. "encoding/base64"
  25. "encoding/json"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "net"
  30. "net/http"
  31. "strconv"
  32. "sync"
  33. "sync/atomic"
  34. "testing"
  35. "time"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  41. "golang.org/x/sync/errgroup"
  42. )
  43. func TestInProxy(t *testing.T) {
  44. err := runTestInProxy()
  45. if err != nil {
  46. t.Errorf(errors.Trace(err).Error())
  47. }
  48. }
  49. func runTestInProxy() error {
  50. // Note: use the environment variable PION_LOG_TRACE=all to emit WebRTC logging.
  51. numProxies := 5
  52. proxyMaxClients := 2
  53. numClients := 10
  54. bytesToSend := 1 << 20
  55. messageSize := 1 << 10
  56. targetElapsedSeconds := 2
  57. baseMetrics := common.APIParameters{
  58. "sponsor_id": "test-sponsor-id",
  59. "client_platform": "test-client-platform",
  60. }
  61. testTransportSecret, _ := MakeID()
  62. testCompartmentID, _ := MakeID()
  63. testCommonCompartmentIDs := []ID{testCompartmentID}
  64. testNetworkID := "NETWORK-ID-1"
  65. testNetworkType := NetworkTypeUnknown
  66. testNATType := NATTypeUnknown
  67. testSTUNServerAddress := "stun.nextcloud.com:443"
  68. // TODO: test port mapping
  69. stunServerAddressSucceededCount := int32(0)
  70. stunServerAddressSucceeded := func(bool, string) { atomic.AddInt32(&stunServerAddressSucceededCount, 1) }
  71. stunServerAddressFailedCount := int32(0)
  72. stunServerAddressFailed := func(bool, string) { atomic.AddInt32(&stunServerAddressFailedCount, 1) }
  73. roundTripperSucceededCount := int32(0)
  74. roundTripperSucceded := func(RoundTripper) { atomic.AddInt32(&roundTripperSucceededCount, 1) }
  75. roundTripperFailedCount := int32(0)
  76. roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
  77. testCtx, stopTest := context.WithCancel(context.Background())
  78. defer stopTest()
  79. testGroup := new(errgroup.Group)
  80. // Enable test to run without requiring host firewall exceptions
  81. setAllowLoopbackWebRTCConnections(true)
  82. // Init logging
  83. logger := newTestLogger()
  84. // Start echo servers
  85. tcpEchoListener, err := net.Listen("tcp", "127.0.0.1:0")
  86. if err != nil {
  87. return errors.Trace(err)
  88. }
  89. defer tcpEchoListener.Close()
  90. go runTCPEchoServer(tcpEchoListener)
  91. // QUIC tests UDP proxying, and provides reliable delivery of echoed data
  92. quicEchoServer, err := newQuicEchoServer()
  93. if err != nil {
  94. return errors.Trace(err)
  95. }
  96. defer quicEchoServer.Close()
  97. go quicEchoServer.Run()
  98. // Create signed server entry with capability
  99. serverPrivateKey, err := GenerateSessionPrivateKey()
  100. if err != nil {
  101. return errors.Trace(err)
  102. }
  103. serverPublicKey, err := GetSessionPublicKey(serverPrivateKey)
  104. if err != nil {
  105. return errors.Trace(err)
  106. }
  107. serverRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  108. if err != nil {
  109. return errors.Trace(err)
  110. }
  111. serverEntry := make(protocol.ServerEntryFields)
  112. serverEntry["ipAddress"] = "127.0.0.1"
  113. _, tcpPort, _ := net.SplitHostPort(tcpEchoListener.Addr().String())
  114. _, udpPort, _ := net.SplitHostPort(quicEchoServer.Addr().String())
  115. serverEntry["sshObfuscatedPort"], _ = strconv.Atoi(tcpPort)
  116. serverEntry["sshObfuscatedQUICPort"], _ = strconv.Atoi(udpPort)
  117. serverEntry["capabilities"] = []string{"OSSH", "QUIC", "inproxy"}
  118. serverEntry["inProxySessionPublicKey"] = base64.StdEncoding.EncodeToString(serverPublicKey[:])
  119. serverEntry["inProxySessionRootObfuscationSecret"] = base64.StdEncoding.EncodeToString(serverRootObfuscationSecret[:])
  120. testServerEntryTag := prng.HexString(16)
  121. serverEntry["tag"] = testServerEntryTag
  122. serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey, err :=
  123. protocol.NewServerEntrySignatureKeyPair()
  124. if err != nil {
  125. return errors.Trace(err)
  126. }
  127. err = serverEntry.AddSignature(serverEntrySignaturePublicKey, serverEntrySignaturePrivateKey)
  128. if err != nil {
  129. return errors.Trace(err)
  130. }
  131. serverEntryJSON, err := json.Marshal(serverEntry)
  132. if err != nil {
  133. return errors.Trace(err)
  134. }
  135. // Start broker
  136. brokerPrivateKey, err := GenerateSessionPrivateKey()
  137. if err != nil {
  138. return errors.Trace(err)
  139. }
  140. brokerPublicKey, err := GetSessionPublicKey(brokerPrivateKey)
  141. if err != nil {
  142. return errors.Trace(err)
  143. }
  144. brokerRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  145. if err != nil {
  146. return errors.Trace(err)
  147. }
  148. brokerListener, err := net.Listen("tcp", "127.0.0.1:0")
  149. if err != nil {
  150. return errors.Trace(err)
  151. }
  152. defer brokerListener.Close()
  153. brokerConfig := &BrokerConfig{
  154. Logger: logger,
  155. CommonCompartmentIDs: testCommonCompartmentIDs,
  156. APIParameterValidator: func(params common.APIParameters) error {
  157. if len(params) != len(baseMetrics) {
  158. return errors.TraceNew("unexpected base metrics")
  159. }
  160. for name, value := range params {
  161. if value.(string) != baseMetrics[name].(string) {
  162. return errors.TraceNew("unexpected base metrics")
  163. }
  164. }
  165. return nil
  166. },
  167. APIParameterLogFieldFormatter: func(
  168. geoIPData common.GeoIPData, params common.APIParameters) common.LogFields {
  169. return common.LogFields(params)
  170. },
  171. TransportSecret: TransportSecret(testTransportSecret),
  172. PrivateKey: brokerPrivateKey,
  173. ObfuscationRootSecret: brokerRootObfuscationSecret,
  174. ServerEntrySignaturePublicKey: serverEntrySignaturePublicKey,
  175. IsValidServerEntryTag: func(serverEntryTag string) bool { return serverEntryTag == testServerEntryTag },
  176. AllowProxy: func(common.GeoIPData) bool { return true },
  177. AllowClient: func(common.GeoIPData) bool { return true },
  178. AllowDomainDestination: func(common.GeoIPData) bool { return true },
  179. }
  180. broker, err := NewBroker(brokerConfig)
  181. if err != nil {
  182. return errors.Trace(err)
  183. }
  184. err = broker.Start()
  185. if err != nil {
  186. return errors.Trace(err)
  187. }
  188. defer broker.Stop()
  189. testGroup.Go(func() error {
  190. err := runHTTPServer(brokerListener, broker)
  191. if testCtx.Err() != nil {
  192. return nil
  193. }
  194. return errors.Trace(err)
  195. })
  196. // Stub server broker request handler (in Psiphon, this will be the
  197. // destination Psiphon server; here, it's not necessary to build this
  198. // handler into the destination echo server)
  199. serverSessions, err := NewServerBrokerSessions(
  200. serverPrivateKey, serverRootObfuscationSecret, []SessionPublicKey{brokerPublicKey})
  201. if err != nil {
  202. return errors.Trace(err)
  203. }
  204. var pendingBrokerServerRequestsMutex sync.Mutex
  205. pendingBrokerServerRequests := make(map[ID]bool)
  206. addPendingBrokerServerRequest := func(connectionID ID) {
  207. pendingBrokerServerRequestsMutex.Lock()
  208. defer pendingBrokerServerRequestsMutex.Unlock()
  209. pendingBrokerServerRequests[connectionID] = true
  210. }
  211. hasPendingBrokerServerRequests := func() bool {
  212. pendingBrokerServerRequestsMutex.Lock()
  213. defer pendingBrokerServerRequestsMutex.Unlock()
  214. return len(pendingBrokerServerRequests) > 0
  215. }
  216. handleBrokerServerRequests := func(in []byte, clientConnectionID ID) ([]byte, error) {
  217. handler := func(brokerVerifiedOriginalClientIP string, logFields common.LogFields) {
  218. pendingBrokerServerRequestsMutex.Lock()
  219. defer pendingBrokerServerRequestsMutex.Unlock()
  220. // Mark the request as no longer outstanding
  221. delete(pendingBrokerServerRequests, clientConnectionID)
  222. }
  223. out, err := serverSessions.HandlePacket(logger, in, clientConnectionID, handler)
  224. if err != nil {
  225. return nil, errors.Trace(err)
  226. }
  227. return out, nil
  228. }
  229. // Start proxies
  230. for i := 0; i < numProxies; i++ {
  231. proxyPrivateKey, err := GenerateSessionPrivateKey()
  232. if err != nil {
  233. return errors.Trace(err)
  234. }
  235. dialParams := &testDialParameters{
  236. networkID: testNetworkID,
  237. networkType: testNetworkType,
  238. natType: testNATType,
  239. stunServerAddress: testSTUNServerAddress,
  240. stunServerAddressRFC5780: testSTUNServerAddress,
  241. stunServerAddressSucceeded: stunServerAddressSucceeded,
  242. stunServerAddressFailed: stunServerAddressFailed,
  243. brokerClientPrivateKey: proxyPrivateKey,
  244. brokerPublicKey: brokerPublicKey,
  245. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  246. brokerClientRoundTripper: newHTTPRoundTripper(
  247. brokerListener.Addr().String(), "proxy"),
  248. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  249. brokerClientRoundTripperFailed: roundTripperFailed,
  250. setNATType: func(NATType) {},
  251. setPortMappingTypes: func(PortMappingTypes) {},
  252. bindToDevice: func(int) error { return nil },
  253. }
  254. proxy, err := NewProxy(&ProxyConfig{
  255. Logger: logger,
  256. BaseMetrics: baseMetrics,
  257. DialParameters: dialParams,
  258. MaxClients: proxyMaxClients,
  259. LimitUpstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  260. LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
  261. ActivityUpdater: func(connectingClients int32, connectedClients int32,
  262. bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
  263. fmt.Printf("[%s] ACTIVITY: %d connecting, %d connected, %d up, %d down\n",
  264. time.Now().UTC().Format(time.RFC3339),
  265. connectingClients, connectedClients, bytesUp, bytesDown)
  266. },
  267. })
  268. if err != nil {
  269. return errors.Trace(err)
  270. }
  271. testGroup.Go(func() error {
  272. proxy.Run(testCtx)
  273. return nil
  274. })
  275. }
  276. // Run clients
  277. clientsGroup := new(errgroup.Group)
  278. makeClientFunc := func(
  279. isTCP bool,
  280. isMobile bool,
  281. dialParams DialParameters,
  282. brokerClient *BrokerClient) func() error {
  283. var networkProtocol NetworkProtocol
  284. var addr string
  285. var wrapWithQUIC bool
  286. if isTCP {
  287. networkProtocol = NetworkProtocolTCP
  288. addr = tcpEchoListener.Addr().String()
  289. } else {
  290. networkProtocol = NetworkProtocolUDP
  291. addr = quicEchoServer.Addr().String()
  292. wrapWithQUIC = true
  293. }
  294. return func() error {
  295. dialCtx, cancelDial := context.WithTimeout(testCtx, 30*time.Second)
  296. defer cancelDial()
  297. conn, err := DialClient(
  298. dialCtx,
  299. &ClientConfig{
  300. Logger: logger,
  301. BaseMetrics: baseMetrics,
  302. DialParameters: dialParams,
  303. BrokerClient: brokerClient,
  304. ReliableTransport: isTCP,
  305. DialNetworkProtocol: networkProtocol,
  306. DialAddress: addr,
  307. DestinationServerEntryJSON: serverEntryJSON,
  308. })
  309. if err != nil {
  310. return errors.Trace(err)
  311. }
  312. var relayConn net.Conn
  313. relayConn = conn
  314. if wrapWithQUIC {
  315. quicConn, err := quic.Dial(
  316. dialCtx,
  317. conn,
  318. &net.UDPAddr{Port: 1}, // This address is ignored, but the zero value is not allowed
  319. "test", "QUICv1", nil, quicEchoServer.ObfuscationKey(), nil, nil, true)
  320. if err != nil {
  321. return errors.Trace(err)
  322. }
  323. relayConn = quicConn
  324. }
  325. addPendingBrokerServerRequest(conn.GetConnectionID())
  326. signalRelayComplete := make(chan struct{})
  327. clientsGroup.Go(func() error {
  328. defer close(signalRelayComplete)
  329. in := conn.InitialRelayPacket()
  330. for in != nil {
  331. out, err := handleBrokerServerRequests(in, conn.GetConnectionID())
  332. // In general, trying to use an expired session results in an expected error...
  333. sessionInvalid := err != nil
  334. // ...but no error is expected in this test run.
  335. if err != nil {
  336. fmt.Printf("handleBrokerServerRequests failed: %v\n", err)
  337. }
  338. in, err = conn.RelayPacket(testCtx, out, sessionInvalid)
  339. if err != nil {
  340. return errors.Trace(err)
  341. }
  342. }
  343. return nil
  344. })
  345. sendBytes := prng.Bytes(bytesToSend)
  346. clientsGroup.Go(func() error {
  347. for n := 0; n < bytesToSend; n += messageSize {
  348. m := messageSize
  349. if bytesToSend-n < m {
  350. m = bytesToSend - n
  351. }
  352. _, err := relayConn.Write(sendBytes[n : n+m])
  353. if err != nil {
  354. return errors.Trace(err)
  355. }
  356. }
  357. fmt.Printf("%d bytes sent\n", bytesToSend)
  358. return nil
  359. })
  360. clientsGroup.Go(func() error {
  361. buf := make([]byte, messageSize)
  362. n := 0
  363. for n < bytesToSend {
  364. m, err := relayConn.Read(buf)
  365. if err != nil {
  366. return errors.Trace(err)
  367. }
  368. if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
  369. return errors.Tracef(
  370. "unexpected bytes: expected at index %d, received at index %d",
  371. bytes.Index(sendBytes, buf[:m]), n)
  372. }
  373. n += m
  374. }
  375. fmt.Printf("%d bytes received\n", bytesToSend)
  376. select {
  377. case <-signalRelayComplete:
  378. case <-testCtx.Done():
  379. }
  380. relayConn.Close()
  381. conn.Close()
  382. return nil
  383. })
  384. return nil
  385. }
  386. }
  387. newClientParams := func(isMobile bool) (*testDialParameters, *BrokerClient, error) {
  388. clientPrivateKey, err := GenerateSessionPrivateKey()
  389. if err != nil {
  390. return nil, nil, errors.Trace(err)
  391. }
  392. clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
  393. if err != nil {
  394. return nil, nil, errors.Trace(err)
  395. }
  396. dialParams := &testDialParameters{
  397. commonCompartmentIDs: testCommonCompartmentIDs,
  398. networkID: testNetworkID,
  399. networkType: testNetworkType,
  400. natType: testNATType,
  401. stunServerAddress: testSTUNServerAddress,
  402. stunServerAddressRFC5780: testSTUNServerAddress,
  403. stunServerAddressSucceeded: stunServerAddressSucceeded,
  404. stunServerAddressFailed: stunServerAddressFailed,
  405. brokerClientPrivateKey: clientPrivateKey,
  406. brokerPublicKey: brokerPublicKey,
  407. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  408. brokerClientRoundTripper: newHTTPRoundTripper(
  409. brokerListener.Addr().String(), "client"),
  410. brokerClientRoundTripperSucceeded: roundTripperSucceded,
  411. brokerClientRoundTripperFailed: roundTripperFailed,
  412. clientRootObfuscationSecret: clientRootObfuscationSecret,
  413. doDTLSRandomization: true,
  414. setNATType: func(NATType) {},
  415. setPortMappingTypes: func(PortMappingTypes) {},
  416. bindToDevice: func(int) error { return nil },
  417. }
  418. if isMobile {
  419. dialParams.networkType = NetworkTypeMobile
  420. dialParams.disableInboundForMobleNetworks = true
  421. }
  422. brokerClient, err := NewBrokerClient(dialParams)
  423. if err != nil {
  424. return nil, nil, errors.Trace(err)
  425. }
  426. return dialParams, brokerClient, nil
  427. }
  428. clientDialParams, clientBrokerClient, err := newClientParams(false)
  429. if err != nil {
  430. return errors.Trace(err)
  431. }
  432. clientMobileDialParams, clientMobileBrokerClient, err := newClientParams(true)
  433. if err != nil {
  434. return errors.Trace(err)
  435. }
  436. for i := 0; i < numClients; i++ {
  437. // Test a mix of TCP and UDP proxying; also test the
  438. // DisableInboundForMobleNetworks code path.
  439. isTCP := i%2 == 0
  440. isMobile := i%4 == 0
  441. // Exercise BrokerClients shared by multiple clients, but also create
  442. // several broker clients.
  443. if i%8 == 0 {
  444. clientDialParams, clientBrokerClient, err = newClientParams(false)
  445. if err != nil {
  446. return errors.Trace(err)
  447. }
  448. clientMobileDialParams, clientMobileBrokerClient, err = newClientParams(true)
  449. if err != nil {
  450. return errors.Trace(err)
  451. }
  452. }
  453. dialParams := clientDialParams
  454. brokerClient := clientBrokerClient
  455. if isMobile {
  456. dialParams = clientMobileDialParams
  457. brokerClient = clientMobileBrokerClient
  458. }
  459. clientsGroup.Go(makeClientFunc(isTCP, isMobile, dialParams, brokerClient))
  460. }
  461. // Await client transfers complete
  462. err = clientsGroup.Wait()
  463. if err != nil {
  464. return errors.Trace(err)
  465. }
  466. if hasPendingBrokerServerRequests() {
  467. return errors.TraceNew("unexpected pending broker server requests")
  468. }
  469. // Await shutdowns
  470. stopTest()
  471. brokerListener.Close()
  472. err = testGroup.Wait()
  473. if err != nil {
  474. return errors.Trace(err)
  475. }
  476. // TODO: check that elapsed time is consistent with rate limit (+/-)
  477. // Check if STUN server replay callbacks were triggered
  478. if atomic.LoadInt32(&stunServerAddressSucceededCount) < 1 {
  479. return errors.TraceNew("unexpected STUN server succeeded count")
  480. }
  481. if atomic.LoadInt32(&stunServerAddressFailedCount) > 0 {
  482. return errors.TraceNew("unexpected STUN server failed count")
  483. }
  484. // Check if RoundTripper server replay callbacks were triggered
  485. if atomic.LoadInt32(&roundTripperSucceededCount) < 1 {
  486. return errors.TraceNew("unexpected round tripper succeeded count")
  487. }
  488. if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
  489. return errors.TraceNew("unexpected round tripper failed count")
  490. }
  491. return nil
  492. }
  493. func runHTTPServer(listener net.Listener, broker *Broker) error {
  494. httpServer := &http.Server{
  495. ReadTimeout: BrokerReadTimeout,
  496. WriteTimeout: BrokerWriteTimeout,
  497. IdleTimeout: BrokerIdleTimeout,
  498. Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  499. // For this test, clients set the path to "/client" and proxies
  500. // set the path to "/proxy" and we use that to create stub GeoIP
  501. // data to pass the not-same-ASN condition.
  502. var geoIPData common.GeoIPData
  503. geoIPData.ASN = r.URL.Path
  504. // Not an actual HTTP header in this test.
  505. transportSecret := broker.config.TransportSecret
  506. requestPayload, err := ioutil.ReadAll(
  507. http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
  508. if err != nil {
  509. fmt.Printf("runHTTPServer ioutil.ReadAll failed: %v\n", err)
  510. http.Error(w, "", http.StatusNotFound)
  511. return
  512. }
  513. clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
  514. responsePayload, err := broker.HandleSessionPacket(
  515. r.Context(),
  516. transportSecret,
  517. clientIP,
  518. geoIPData,
  519. requestPayload)
  520. if err != nil {
  521. fmt.Printf("runHTTPServer HandleSessionPacket failed: %v", err)
  522. http.Error(w, "", http.StatusNotFound)
  523. return
  524. }
  525. w.WriteHeader(http.StatusOK)
  526. w.Write(responsePayload)
  527. }),
  528. }
  529. certificate, privateKey, err := common.GenerateWebServerCertificate("www.example.com")
  530. if err != nil {
  531. return errors.Trace(err)
  532. }
  533. tlsCert, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
  534. if err != nil {
  535. return errors.Trace(err)
  536. }
  537. tlsConfig := &tls.Config{
  538. Certificates: []tls.Certificate{tlsCert},
  539. }
  540. err = httpServer.Serve(tls.NewListener(listener, tlsConfig))
  541. return errors.Trace(err)
  542. }
  543. type httpRoundTripper struct {
  544. httpClient *http.Client
  545. endpointAddr string
  546. path string
  547. }
  548. func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
  549. return &httpRoundTripper{
  550. httpClient: &http.Client{
  551. Transport: &http.Transport{
  552. ForceAttemptHTTP2: true,
  553. MaxIdleConns: 2,
  554. IdleConnTimeout: BrokerIdleTimeout,
  555. TLSHandshakeTimeout: 1 * time.Second,
  556. TLSClientConfig: &tls.Config{
  557. InsecureSkipVerify: true,
  558. },
  559. },
  560. },
  561. endpointAddr: endpointAddr,
  562. path: path,
  563. }
  564. }
  565. func (r *httpRoundTripper) RoundTrip(
  566. ctx context.Context, requestPayload []byte) ([]byte, error) {
  567. url := fmt.Sprintf("https://%s/%s", r.endpointAddr, r.path)
  568. request, err := http.NewRequestWithContext(
  569. ctx, "POST", url, bytes.NewReader(requestPayload))
  570. if err != nil {
  571. return nil, errors.Trace(err)
  572. }
  573. response, err := r.httpClient.Do(request)
  574. if err != nil {
  575. return nil, errors.Trace(err)
  576. }
  577. defer response.Body.Close()
  578. if response.StatusCode != http.StatusOK {
  579. return nil, errors.Tracef("unexpected response status code: %d", response.StatusCode)
  580. }
  581. responsePayload, err := io.ReadAll(response.Body)
  582. if err != nil {
  583. return nil, errors.Trace(err)
  584. }
  585. return responsePayload, nil
  586. }
  587. func (r *httpRoundTripper) Close() error {
  588. r.httpClient.CloseIdleConnections()
  589. return nil
  590. }
  591. func runTCPEchoServer(listener net.Listener) {
  592. for {
  593. conn, err := listener.Accept()
  594. if err != nil {
  595. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  596. return
  597. }
  598. go func(conn net.Conn) {
  599. buf := make([]byte, 1024)
  600. for {
  601. n, err := conn.Read(buf)
  602. if n > 0 {
  603. _, err = conn.Write(buf[:n])
  604. }
  605. if err != nil {
  606. fmt.Printf("runTCPEchoServer failed: %v\n", errors.Trace(err))
  607. return
  608. }
  609. }
  610. }(conn)
  611. }
  612. }
  613. type quicEchoServer struct {
  614. listener net.Listener
  615. obfuscationKey string
  616. }
  617. func newQuicEchoServer() (*quicEchoServer, error) {
  618. obfuscationKey := prng.HexString(32)
  619. listener, err := quic.Listen(
  620. nil,
  621. nil,
  622. "127.0.0.1:0",
  623. obfuscationKey,
  624. false)
  625. if err != nil {
  626. return nil, errors.Trace(err)
  627. }
  628. return &quicEchoServer{
  629. listener: listener,
  630. obfuscationKey: obfuscationKey,
  631. }, nil
  632. }
  633. func (q *quicEchoServer) ObfuscationKey() string {
  634. return q.obfuscationKey
  635. }
  636. func (q *quicEchoServer) Close() error {
  637. return q.listener.Close()
  638. }
  639. func (q *quicEchoServer) Addr() net.Addr {
  640. return q.listener.Addr()
  641. }
  642. func (q *quicEchoServer) Run() {
  643. for {
  644. conn, err := q.listener.Accept()
  645. if err != nil {
  646. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  647. return
  648. }
  649. go func(conn net.Conn) {
  650. buf := make([]byte, 1024)
  651. for {
  652. n, err := conn.Read(buf)
  653. if n > 0 {
  654. _, err = conn.Write(buf[:n])
  655. }
  656. if err != nil {
  657. fmt.Printf("quicEchoServer failed: %v\n", errors.Trace(err))
  658. return
  659. }
  660. }
  661. }(conn)
  662. }
  663. }