controller_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "net"
  26. "net/http"
  27. "net/url"
  28. "os"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "testing"
  33. "time"
  34. socks "github.com/Psiphon-Inc/goptlib"
  35. )
  36. func TestMain(m *testing.M) {
  37. flag.Parse()
  38. os.Remove(DATA_STORE_FILENAME)
  39. initDisruptor()
  40. setEmitDiagnosticNotices(true)
  41. os.Exit(m.Run())
  42. }
  43. // Note: untunneled upgrade tests must execute before
  44. // the other tests to ensure no tunnel is established.
  45. // We need a way to reset the datastore after it's been
  46. // initialized in order to to clear out its data entries
  47. // and be able to arbitrarily order the tests.
  48. func TestUntunneledUpgradeDownload(t *testing.T) {
  49. controllerRun(t,
  50. &controllerRunConfig{
  51. protocol: "",
  52. clientIsLatestVersion: false,
  53. disableUntunneledUpgrade: false,
  54. disableEstablishing: true,
  55. tunnelPoolSize: 1,
  56. disruptNetwork: false,
  57. useHostNameTransformer: false,
  58. runDuration: 0,
  59. })
  60. }
  61. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  62. controllerRun(t,
  63. &controllerRunConfig{
  64. protocol: "",
  65. clientIsLatestVersion: false,
  66. disableUntunneledUpgrade: false,
  67. disableEstablishing: true,
  68. tunnelPoolSize: 1,
  69. disruptNetwork: true,
  70. useHostNameTransformer: false,
  71. runDuration: 0,
  72. })
  73. }
  74. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  75. controllerRun(t,
  76. &controllerRunConfig{
  77. protocol: "",
  78. clientIsLatestVersion: true,
  79. disableUntunneledUpgrade: false,
  80. disableEstablishing: true,
  81. tunnelPoolSize: 1,
  82. disruptNetwork: false,
  83. useHostNameTransformer: false,
  84. runDuration: 0,
  85. })
  86. }
  87. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  88. controllerRun(t,
  89. &controllerRunConfig{
  90. protocol: "",
  91. clientIsLatestVersion: true,
  92. disableUntunneledUpgrade: true,
  93. disableEstablishing: false,
  94. tunnelPoolSize: 1,
  95. disruptNetwork: false,
  96. useHostNameTransformer: false,
  97. runDuration: 0,
  98. })
  99. }
  100. func TestImpairedProtocols(t *testing.T) {
  101. // This test sets a tunnelPoolSize of 40 and runs
  102. // the session for 1 minute with network disruption
  103. // on. All 40 tunnels being disrupted every 10
  104. // seconds (followed by ssh keep alive probe timeout)
  105. // should be sufficient to trigger at least one
  106. // impaired protocol classification.
  107. controllerRun(t,
  108. &controllerRunConfig{
  109. protocol: "",
  110. clientIsLatestVersion: true,
  111. disableUntunneledUpgrade: true,
  112. disableEstablishing: false,
  113. tunnelPoolSize: 40,
  114. disruptNetwork: true,
  115. useHostNameTransformer: false,
  116. runDuration: 1 * time.Minute,
  117. })
  118. }
  119. func TestSSH(t *testing.T) {
  120. controllerRun(t,
  121. &controllerRunConfig{
  122. protocol: TUNNEL_PROTOCOL_SSH,
  123. clientIsLatestVersion: false,
  124. disableUntunneledUpgrade: true,
  125. disableEstablishing: false,
  126. tunnelPoolSize: 1,
  127. disruptNetwork: false,
  128. useHostNameTransformer: false,
  129. runDuration: 0,
  130. })
  131. }
  132. func TestObfuscatedSSH(t *testing.T) {
  133. controllerRun(t,
  134. &controllerRunConfig{
  135. protocol: TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  136. clientIsLatestVersion: false,
  137. disableUntunneledUpgrade: true,
  138. disableEstablishing: false,
  139. tunnelPoolSize: 1,
  140. disruptNetwork: false,
  141. useHostNameTransformer: false,
  142. runDuration: 0,
  143. })
  144. }
  145. func TestUnfrontedMeek(t *testing.T) {
  146. controllerRun(t,
  147. &controllerRunConfig{
  148. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  149. clientIsLatestVersion: false,
  150. disableUntunneledUpgrade: true,
  151. disableEstablishing: false,
  152. tunnelPoolSize: 1,
  153. disruptNetwork: false,
  154. useHostNameTransformer: false,
  155. runDuration: 0,
  156. })
  157. }
  158. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  159. controllerRun(t,
  160. &controllerRunConfig{
  161. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  162. clientIsLatestVersion: true,
  163. disableUntunneledUpgrade: true,
  164. disableEstablishing: false,
  165. tunnelPoolSize: 1,
  166. disruptNetwork: false,
  167. useHostNameTransformer: true,
  168. runDuration: 0,
  169. })
  170. }
  171. func TestFrontedMeek(t *testing.T) {
  172. controllerRun(t,
  173. &controllerRunConfig{
  174. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  175. clientIsLatestVersion: false,
  176. disableUntunneledUpgrade: true,
  177. disableEstablishing: false,
  178. tunnelPoolSize: 1,
  179. disruptNetwork: false,
  180. useHostNameTransformer: false,
  181. runDuration: 0,
  182. })
  183. }
  184. func TestFrontedMeekWithTransformer(t *testing.T) {
  185. controllerRun(t,
  186. &controllerRunConfig{
  187. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  188. clientIsLatestVersion: true,
  189. disableUntunneledUpgrade: true,
  190. disableEstablishing: false,
  191. tunnelPoolSize: 1,
  192. disruptNetwork: false,
  193. useHostNameTransformer: true,
  194. runDuration: 0,
  195. })
  196. }
  197. func TestFrontedMeekHTTP(t *testing.T) {
  198. controllerRun(t,
  199. &controllerRunConfig{
  200. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  201. clientIsLatestVersion: true,
  202. disableUntunneledUpgrade: true,
  203. disableEstablishing: false,
  204. tunnelPoolSize: 1,
  205. disruptNetwork: false,
  206. useHostNameTransformer: false,
  207. runDuration: 0,
  208. })
  209. }
  210. func TestUnfrontedMeekHTTPS(t *testing.T) {
  211. controllerRun(t,
  212. &controllerRunConfig{
  213. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  214. clientIsLatestVersion: false,
  215. disableUntunneledUpgrade: true,
  216. disableEstablishing: false,
  217. tunnelPoolSize: 1,
  218. disruptNetwork: false,
  219. useHostNameTransformer: false,
  220. runDuration: 0,
  221. })
  222. }
  223. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  224. controllerRun(t,
  225. &controllerRunConfig{
  226. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  227. clientIsLatestVersion: true,
  228. disableUntunneledUpgrade: true,
  229. disableEstablishing: false,
  230. tunnelPoolSize: 1,
  231. disruptNetwork: false,
  232. useHostNameTransformer: true,
  233. runDuration: 0,
  234. })
  235. }
  236. type controllerRunConfig struct {
  237. protocol string
  238. clientIsLatestVersion bool
  239. disableUntunneledUpgrade bool
  240. disableEstablishing bool
  241. tunnelPoolSize int
  242. disruptNetwork bool
  243. useHostNameTransformer bool
  244. runDuration time.Duration
  245. }
  246. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  247. configFileContents, err := ioutil.ReadFile("controller_test.config")
  248. if err != nil {
  249. // Skip, don't fail, if config file is not present
  250. t.Skipf("error loading configuration file: %s", err)
  251. }
  252. config, err := LoadConfig(configFileContents)
  253. if err != nil {
  254. t.Fatalf("error processing configuration file: %s", err)
  255. }
  256. if runConfig.clientIsLatestVersion {
  257. config.ClientVersion = "999999999"
  258. }
  259. if runConfig.disableEstablishing {
  260. // Clear remote server list so tunnel cannot be established.
  261. // TODO: also delete all server entries in the datastore.
  262. config.RemoteServerListUrl = ""
  263. }
  264. config.TunnelPoolSize = runConfig.tunnelPoolSize
  265. if runConfig.disableUntunneledUpgrade {
  266. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  267. config.UpgradeDownloadClientVersionHeader = ""
  268. }
  269. if runConfig.disruptNetwork {
  270. config.UpstreamProxyUrl = disruptorProxyURL
  271. }
  272. if runConfig.useHostNameTransformer {
  273. config.HostNameTransformer = &TestHostNameTransformer{}
  274. }
  275. os.Remove(config.UpgradeDownloadFilename)
  276. config.TunnelProtocol = runConfig.protocol
  277. err = InitDataStore(config)
  278. if err != nil {
  279. t.Fatalf("error initializing datastore: %s", err)
  280. }
  281. controller, err := NewController(config)
  282. if err != nil {
  283. t.Fatalf("error creating controller: %s", err)
  284. }
  285. // Monitor notices for "Tunnels" with count > 1, the
  286. // indication of tunnel establishment success.
  287. // Also record the selected HTTP proxy port to use
  288. // when fetching websites through the tunnel.
  289. httpProxyPort := 0
  290. tunnelEstablished := make(chan struct{}, 1)
  291. upgradeDownloaded := make(chan struct{}, 1)
  292. confirmedLatestVersion := make(chan struct{}, 1)
  293. var clientUpgradeDownloadedBytesCount int32
  294. var impairedProtocolCount int32
  295. var impairedProtocolClassification = struct {
  296. sync.RWMutex
  297. classification map[string]int
  298. }{classification: make(map[string]int)}
  299. SetNoticeOutput(NewNoticeReceiver(
  300. func(notice []byte) {
  301. // TODO: log notices without logging server IPs:
  302. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  303. noticeType, payload, err := GetNotice(notice)
  304. if err != nil {
  305. return
  306. }
  307. switch noticeType {
  308. case "ListeningHttpProxyPort":
  309. httpProxyPort = int(payload["port"].(float64))
  310. case "ConnectingServer":
  311. serverProtocol := payload["protocol"].(string)
  312. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  313. // TODO: wrong goroutine for t.FatalNow()
  314. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  315. }
  316. case "Tunnels":
  317. count := int(payload["count"].(float64))
  318. if count > 0 {
  319. if runConfig.disableEstablishing {
  320. // TODO: wrong goroutine for t.FatalNow()
  321. t.Fatalf("tunnel established unexpectedly")
  322. } else {
  323. select {
  324. case tunnelEstablished <- *new(struct{}):
  325. default:
  326. }
  327. }
  328. }
  329. case "ClientUpgradeDownloadedBytes":
  330. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  331. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  332. case "ClientUpgradeDownloaded":
  333. select {
  334. case upgradeDownloaded <- *new(struct{}):
  335. default:
  336. }
  337. case "ClientIsLatestVersion":
  338. select {
  339. case confirmedLatestVersion <- *new(struct{}):
  340. default:
  341. }
  342. case "ImpairedProtocolClassification":
  343. classification := payload["classification"].(map[string]interface{})
  344. impairedProtocolClassification.Lock()
  345. impairedProtocolClassification.classification = make(map[string]int)
  346. for k, v := range classification {
  347. count := int(v.(float64))
  348. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  349. atomic.AddInt32(&impairedProtocolCount, 1)
  350. }
  351. impairedProtocolClassification.classification[k] = count
  352. }
  353. impairedProtocolClassification.Unlock()
  354. case "ActiveTunnel":
  355. serverProtocol := payload["protocol"].(string)
  356. classification := make(map[string]int)
  357. impairedProtocolClassification.RLock()
  358. for k, v := range impairedProtocolClassification.classification {
  359. classification[k] = v
  360. }
  361. impairedProtocolClassification.RUnlock()
  362. count, ok := classification[serverProtocol]
  363. if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  364. // TODO: wrong goroutine for t.FatalNow()
  365. t.Fatalf("unexpected tunnel using impaired protocol: %s, %+v",
  366. serverProtocol, classification)
  367. }
  368. }
  369. }))
  370. // Run controller, which establishes tunnels
  371. shutdownBroadcast := make(chan struct{})
  372. controllerWaitGroup := new(sync.WaitGroup)
  373. controllerWaitGroup.Add(1)
  374. go func() {
  375. defer controllerWaitGroup.Done()
  376. controller.Run(shutdownBroadcast)
  377. }()
  378. defer func() {
  379. // Test: shutdown must complete within 20 seconds
  380. close(shutdownBroadcast)
  381. shutdownTimeout := time.NewTimer(20 * time.Second)
  382. shutdownOk := make(chan struct{}, 1)
  383. go func() {
  384. controllerWaitGroup.Wait()
  385. shutdownOk <- *new(struct{})
  386. }()
  387. select {
  388. case <-shutdownOk:
  389. case <-shutdownTimeout.C:
  390. t.Fatalf("controller shutdown timeout exceeded")
  391. }
  392. }()
  393. if !runConfig.disableEstablishing {
  394. // Test: tunnel must be established within 60 seconds
  395. establishTimeout := time.NewTimer(60 * time.Second)
  396. select {
  397. case <-tunnelEstablished:
  398. case <-establishTimeout.C:
  399. t.Fatalf("tunnel establish timeout exceeded")
  400. }
  401. // Test: fetch website through tunnel
  402. // Allow for known race condition described in NewHttpProxy():
  403. time.Sleep(1 * time.Second)
  404. fetchAndVerifyWebsite(t, httpProxyPort)
  405. // Test: run for duration, periodically using the tunnel to
  406. // ensure failed tunnel detection, and ultimately hitting
  407. // impaired protocol checks.
  408. startTime := time.Now()
  409. for {
  410. time.Sleep(1 * time.Second)
  411. useTunnel(t, httpProxyPort)
  412. if startTime.Add(runConfig.runDuration).Before(time.Now()) {
  413. break
  414. }
  415. }
  416. // Test: with disruptNetwork, impaired protocols should be exercised
  417. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  418. count := atomic.LoadInt32(&impairedProtocolCount)
  419. if count <= 0 {
  420. t.Fatalf("unexpected impaired protocol count: %d", count)
  421. } else {
  422. impairedProtocolClassification.RLock()
  423. t.Logf("impaired protocol classification: %+v",
  424. impairedProtocolClassification.classification)
  425. impairedProtocolClassification.RUnlock()
  426. }
  427. }
  428. }
  429. // Test: upgrade check/download must be downloaded within 120 seconds
  430. upgradeTimeout := time.NewTimer(120 * time.Second)
  431. select {
  432. case <-upgradeDownloaded:
  433. // TODO: verify downloaded file
  434. if runConfig.clientIsLatestVersion {
  435. t.Fatalf("upgrade downloaded unexpectedly")
  436. }
  437. // Test: with disruptNetwork, must be multiple download progress notices
  438. if runConfig.disruptNetwork {
  439. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  440. if count <= 1 {
  441. t.Fatalf("unexpected upgrade download progress: %d", count)
  442. }
  443. }
  444. case <-confirmedLatestVersion:
  445. if !runConfig.clientIsLatestVersion {
  446. t.Fatalf("confirmed latest version unexpectedly")
  447. }
  448. case <-upgradeTimeout.C:
  449. t.Fatalf("upgrade download timeout exceeded")
  450. }
  451. }
  452. type TestHostNameTransformer struct {
  453. }
  454. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  455. return "example.com", true
  456. }
  457. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) {
  458. testUrl := "https://raw.githubusercontent.com/Psiphon-Labs/psiphon-tunnel-core/master/LICENSE"
  459. roundTripTimeout := 10 * time.Second
  460. expectedResponsePrefix := " GNU GENERAL PUBLIC LICENSE"
  461. expectedResponseSize := 35148
  462. checkResponse := func(responseBody string) bool {
  463. return strings.HasPrefix(responseBody, expectedResponsePrefix) && len(responseBody) == expectedResponseSize
  464. }
  465. // Test: use HTTP proxy
  466. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  467. if err != nil {
  468. t.Fatalf("error initializing proxied HTTP request: %s", err)
  469. }
  470. httpClient := &http.Client{
  471. Transport: &http.Transport{
  472. Proxy: http.ProxyURL(proxyUrl),
  473. },
  474. Timeout: roundTripTimeout,
  475. }
  476. response, err := httpClient.Get(testUrl)
  477. if err != nil {
  478. t.Fatalf("error sending proxied HTTP request: %s", err)
  479. }
  480. body, err := ioutil.ReadAll(response.Body)
  481. if err != nil {
  482. t.Fatalf("error reading proxied HTTP response: %s", err)
  483. }
  484. response.Body.Close()
  485. if !checkResponse(string(body)) {
  486. t.Fatalf("unexpected proxied HTTP response")
  487. }
  488. // Test: use direct URL proxy
  489. httpClient = &http.Client{
  490. Transport: http.DefaultTransport,
  491. Timeout: roundTripTimeout,
  492. }
  493. response, err = httpClient.Get(
  494. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  495. httpProxyPort, url.QueryEscape(testUrl)))
  496. if err != nil {
  497. t.Fatalf("error sending direct URL request: %s", err)
  498. }
  499. body, err = ioutil.ReadAll(response.Body)
  500. if err != nil {
  501. t.Fatalf("error reading direct URL response: %s", err)
  502. }
  503. response.Body.Close()
  504. if !checkResponse(string(body)) {
  505. t.Fatalf("unexpected direct URL response")
  506. }
  507. // Test: use tunneled URL proxy
  508. response, err = httpClient.Get(
  509. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  510. httpProxyPort, url.QueryEscape(testUrl)))
  511. if err != nil {
  512. t.Fatalf("error sending tunneled URL request: %s", err)
  513. }
  514. body, err = ioutil.ReadAll(response.Body)
  515. if err != nil {
  516. t.Fatalf("error reading tunneled URL response: %s", err)
  517. }
  518. response.Body.Close()
  519. if !checkResponse(string(body)) {
  520. t.Fatalf("unexpected tunneled URL response")
  521. }
  522. }
  523. func useTunnel(t *testing.T, httpProxyPort int) {
  524. // No action on errors as the tunnel is expected to fail sometimes
  525. testUrl := "https://psiphon3.com"
  526. roundTripTimeout := 1 * time.Second
  527. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  528. if err != nil {
  529. return
  530. }
  531. httpClient := &http.Client{
  532. Transport: &http.Transport{
  533. Proxy: http.ProxyURL(proxyUrl),
  534. },
  535. Timeout: roundTripTimeout,
  536. }
  537. response, err := httpClient.Get(testUrl)
  538. if err != nil {
  539. return
  540. }
  541. response.Body.Close()
  542. }
  543. const disruptorProxyAddress = "127.0.0.1:2160"
  544. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  545. const disruptorMaxConnectionBytes = 2000000
  546. const disruptorMaxConnectionTime = 10 * time.Second
  547. func initDisruptor() {
  548. go func() {
  549. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  550. if err != nil {
  551. fmt.Errorf("disruptor proxy listen error: %s", err)
  552. return
  553. }
  554. for {
  555. localConn, err := listener.AcceptSocks()
  556. if err != nil {
  557. fmt.Errorf("disruptor proxy accept error: %s", err)
  558. return
  559. }
  560. go func() {
  561. defer localConn.Close()
  562. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  563. if err != nil {
  564. fmt.Errorf("disruptor proxy dial error: %s", err)
  565. return
  566. }
  567. defer remoteConn.Close()
  568. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  569. if err != nil {
  570. fmt.Errorf("disruptor proxy grant error: %s", err)
  571. return
  572. }
  573. // Cut connection after disruptorMaxConnectionTime
  574. time.AfterFunc(disruptorMaxConnectionTime, func() {
  575. localConn.Close()
  576. remoteConn.Close()
  577. })
  578. // Relay connection, but only up to disruptorMaxConnectionBytes
  579. waitGroup := new(sync.WaitGroup)
  580. waitGroup.Add(1)
  581. go func() {
  582. defer waitGroup.Done()
  583. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  584. }()
  585. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  586. waitGroup.Wait()
  587. }()
  588. }
  589. }()
  590. }