controller_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "net"
  26. "net/http"
  27. "net/url"
  28. "os"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "testing"
  33. "time"
  34. socks "github.com/Psiphon-Inc/goptlib"
  35. )
  36. func TestMain(m *testing.M) {
  37. flag.Parse()
  38. os.Remove(DATA_STORE_FILENAME)
  39. initDisruptor()
  40. setEmitDiagnosticNotices(true)
  41. os.Exit(m.Run())
  42. }
  43. // Test case notes/limitations/dependencies:
  44. //
  45. // * Untunneled upgrade tests must execute before
  46. // the other tests to ensure no tunnel is established.
  47. // We need a way to reset the datastore after it's been
  48. // initialized in order to to clear out its data entries
  49. // and be able to arbitrarily order the tests.
  50. //
  51. // * The resumable download tests using disruptNetwork
  52. // depend on the download object being larger than the
  53. // disruptorMax limits so that the disruptor will actually
  54. // interrupt the first download attempt. Specifically, the
  55. // upgrade and remote server list at the URLs specified in
  56. // controller_test.config.enc.
  57. //
  58. // * The protocol tests assume there is at least one server
  59. // supporting each protocol in the server list at the URL
  60. // specified in controller_test.config.enc, and that these
  61. // servers are not overloaded.
  62. //
  63. // * fetchAndVerifyWebsite depends on the target URL being
  64. // available and responding.
  65. //
  66. func TestUntunneledUpgradeDownload(t *testing.T) {
  67. controllerRun(t,
  68. &controllerRunConfig{
  69. expectNoServerEntries: true,
  70. protocol: "",
  71. clientIsLatestVersion: false,
  72. disableUntunneledUpgrade: false,
  73. disableEstablishing: true,
  74. tunnelPoolSize: 1,
  75. disruptNetwork: false,
  76. useHostNameTransformer: false,
  77. runDuration: 0,
  78. })
  79. }
  80. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  81. controllerRun(t,
  82. &controllerRunConfig{
  83. expectNoServerEntries: true,
  84. protocol: "",
  85. clientIsLatestVersion: false,
  86. disableUntunneledUpgrade: false,
  87. disableEstablishing: true,
  88. tunnelPoolSize: 1,
  89. disruptNetwork: true,
  90. useHostNameTransformer: false,
  91. runDuration: 0,
  92. })
  93. }
  94. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  95. controllerRun(t,
  96. &controllerRunConfig{
  97. expectNoServerEntries: true,
  98. protocol: "",
  99. clientIsLatestVersion: true,
  100. disableUntunneledUpgrade: false,
  101. disableEstablishing: true,
  102. tunnelPoolSize: 1,
  103. disruptNetwork: false,
  104. useHostNameTransformer: false,
  105. runDuration: 0,
  106. })
  107. }
  108. func TestUntunneledResumableFetchRemoveServerList(t *testing.T) {
  109. controllerRun(t,
  110. &controllerRunConfig{
  111. expectNoServerEntries: true,
  112. protocol: "",
  113. clientIsLatestVersion: true,
  114. disableUntunneledUpgrade: false,
  115. disableEstablishing: false,
  116. tunnelPoolSize: 1,
  117. disruptNetwork: true,
  118. useHostNameTransformer: false,
  119. runDuration: 0,
  120. })
  121. }
  122. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  123. controllerRun(t,
  124. &controllerRunConfig{
  125. expectNoServerEntries: false,
  126. protocol: "",
  127. clientIsLatestVersion: true,
  128. disableUntunneledUpgrade: true,
  129. disableEstablishing: false,
  130. tunnelPoolSize: 1,
  131. disruptNetwork: false,
  132. useHostNameTransformer: false,
  133. runDuration: 0,
  134. })
  135. }
  136. func TestImpairedProtocols(t *testing.T) {
  137. // This test sets a tunnelPoolSize of 40 and runs
  138. // the session for 1 minute with network disruption
  139. // on. All 40 tunnels being disrupted every 10
  140. // seconds (followed by ssh keep alive probe timeout)
  141. // should be sufficient to trigger at least one
  142. // impaired protocol classification.
  143. controllerRun(t,
  144. &controllerRunConfig{
  145. expectNoServerEntries: false,
  146. protocol: "",
  147. clientIsLatestVersion: true,
  148. disableUntunneledUpgrade: true,
  149. disableEstablishing: false,
  150. tunnelPoolSize: 40,
  151. disruptNetwork: true,
  152. useHostNameTransformer: false,
  153. runDuration: 1 * time.Minute,
  154. })
  155. }
  156. func TestSSH(t *testing.T) {
  157. controllerRun(t,
  158. &controllerRunConfig{
  159. expectNoServerEntries: false,
  160. protocol: TUNNEL_PROTOCOL_SSH,
  161. clientIsLatestVersion: false,
  162. disableUntunneledUpgrade: true,
  163. disableEstablishing: false,
  164. tunnelPoolSize: 1,
  165. disruptNetwork: false,
  166. useHostNameTransformer: false,
  167. runDuration: 0,
  168. })
  169. }
  170. func TestObfuscatedSSH(t *testing.T) {
  171. controllerRun(t,
  172. &controllerRunConfig{
  173. expectNoServerEntries: false,
  174. protocol: TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  175. clientIsLatestVersion: false,
  176. disableUntunneledUpgrade: true,
  177. disableEstablishing: false,
  178. tunnelPoolSize: 1,
  179. disruptNetwork: false,
  180. useHostNameTransformer: false,
  181. runDuration: 0,
  182. })
  183. }
  184. func TestUnfrontedMeek(t *testing.T) {
  185. controllerRun(t,
  186. &controllerRunConfig{
  187. expectNoServerEntries: false,
  188. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  189. clientIsLatestVersion: false,
  190. disableUntunneledUpgrade: true,
  191. disableEstablishing: false,
  192. tunnelPoolSize: 1,
  193. disruptNetwork: false,
  194. useHostNameTransformer: false,
  195. runDuration: 0,
  196. })
  197. }
  198. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  199. controllerRun(t,
  200. &controllerRunConfig{
  201. expectNoServerEntries: false,
  202. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  203. clientIsLatestVersion: true,
  204. disableUntunneledUpgrade: true,
  205. disableEstablishing: false,
  206. tunnelPoolSize: 1,
  207. disruptNetwork: false,
  208. useHostNameTransformer: true,
  209. runDuration: 0,
  210. })
  211. }
  212. func TestFrontedMeek(t *testing.T) {
  213. controllerRun(t,
  214. &controllerRunConfig{
  215. expectNoServerEntries: false,
  216. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  217. clientIsLatestVersion: false,
  218. disableUntunneledUpgrade: true,
  219. disableEstablishing: false,
  220. tunnelPoolSize: 1,
  221. disruptNetwork: false,
  222. useHostNameTransformer: false,
  223. runDuration: 0,
  224. })
  225. }
  226. func TestFrontedMeekWithTransformer(t *testing.T) {
  227. controllerRun(t,
  228. &controllerRunConfig{
  229. expectNoServerEntries: false,
  230. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  231. clientIsLatestVersion: true,
  232. disableUntunneledUpgrade: true,
  233. disableEstablishing: false,
  234. tunnelPoolSize: 1,
  235. disruptNetwork: false,
  236. useHostNameTransformer: true,
  237. runDuration: 0,
  238. })
  239. }
  240. func TestFrontedMeekHTTP(t *testing.T) {
  241. controllerRun(t,
  242. &controllerRunConfig{
  243. expectNoServerEntries: false,
  244. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  245. clientIsLatestVersion: true,
  246. disableUntunneledUpgrade: true,
  247. disableEstablishing: false,
  248. tunnelPoolSize: 1,
  249. disruptNetwork: false,
  250. useHostNameTransformer: false,
  251. runDuration: 0,
  252. })
  253. }
  254. func TestUnfrontedMeekHTTPS(t *testing.T) {
  255. controllerRun(t,
  256. &controllerRunConfig{
  257. expectNoServerEntries: false,
  258. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  259. clientIsLatestVersion: false,
  260. disableUntunneledUpgrade: true,
  261. disableEstablishing: false,
  262. tunnelPoolSize: 1,
  263. disruptNetwork: false,
  264. useHostNameTransformer: false,
  265. runDuration: 0,
  266. })
  267. }
  268. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  269. controllerRun(t,
  270. &controllerRunConfig{
  271. expectNoServerEntries: false,
  272. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  273. clientIsLatestVersion: true,
  274. disableUntunneledUpgrade: true,
  275. disableEstablishing: false,
  276. tunnelPoolSize: 1,
  277. disruptNetwork: false,
  278. useHostNameTransformer: true,
  279. runDuration: 0,
  280. })
  281. }
  282. type controllerRunConfig struct {
  283. expectNoServerEntries bool
  284. protocol string
  285. clientIsLatestVersion bool
  286. disableUntunneledUpgrade bool
  287. disableEstablishing bool
  288. tunnelPoolSize int
  289. disruptNetwork bool
  290. useHostNameTransformer bool
  291. runDuration time.Duration
  292. }
  293. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  294. configFileContents, err := ioutil.ReadFile("controller_test.config")
  295. if err != nil {
  296. // Skip, don't fail, if config file is not present
  297. t.Skipf("error loading configuration file: %s", err)
  298. }
  299. config, err := LoadConfig(configFileContents)
  300. if err != nil {
  301. t.Fatalf("error processing configuration file: %s", err)
  302. }
  303. if runConfig.clientIsLatestVersion {
  304. config.ClientVersion = "999999999"
  305. }
  306. if runConfig.disableEstablishing {
  307. // Clear remote server list so tunnel cannot be established.
  308. // TODO: also delete all server entries in the datastore.
  309. config.RemoteServerListUrl = ""
  310. }
  311. config.TunnelPoolSize = runConfig.tunnelPoolSize
  312. if runConfig.disableUntunneledUpgrade {
  313. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  314. config.UpgradeDownloadClientVersionHeader = ""
  315. }
  316. if runConfig.disruptNetwork {
  317. config.UpstreamProxyUrl = disruptorProxyURL
  318. }
  319. if runConfig.useHostNameTransformer {
  320. config.HostNameTransformer = &TestHostNameTransformer{}
  321. }
  322. os.Remove(config.UpgradeDownloadFilename)
  323. config.TunnelProtocol = runConfig.protocol
  324. err = InitDataStore(config)
  325. if err != nil {
  326. t.Fatalf("error initializing datastore: %s", err)
  327. }
  328. serverEntryCount := CountServerEntries("", "")
  329. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  330. // TODO: replace expectNoServerEntries with resetServerEntries
  331. // so tests can run in arbitrary order
  332. t.Fatalf("unexpected server entries")
  333. }
  334. controller, err := NewController(config)
  335. if err != nil {
  336. t.Fatalf("error creating controller: %s", err)
  337. }
  338. // Monitor notices for "Tunnels" with count > 1, the
  339. // indication of tunnel establishment success.
  340. // Also record the selected HTTP proxy port to use
  341. // when fetching websites through the tunnel.
  342. httpProxyPort := 0
  343. tunnelEstablished := make(chan struct{}, 1)
  344. upgradeDownloaded := make(chan struct{}, 1)
  345. remoteServerListDownloaded := make(chan struct{}, 1)
  346. confirmedLatestVersion := make(chan struct{}, 1)
  347. var clientUpgradeDownloadedBytesCount int32
  348. var remoteServerListDownloadedBytesCount int32
  349. var impairedProtocolCount int32
  350. var impairedProtocolClassification = struct {
  351. sync.RWMutex
  352. classification map[string]int
  353. }{classification: make(map[string]int)}
  354. SetNoticeOutput(NewNoticeReceiver(
  355. func(notice []byte) {
  356. // TODO: log notices without logging server IPs:
  357. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  358. noticeType, payload, err := GetNotice(notice)
  359. if err != nil {
  360. return
  361. }
  362. switch noticeType {
  363. case "ListeningHttpProxyPort":
  364. httpProxyPort = int(payload["port"].(float64))
  365. case "ConnectingServer":
  366. serverProtocol := payload["protocol"].(string)
  367. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  368. // TODO: wrong goroutine for t.FatalNow()
  369. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  370. }
  371. case "Tunnels":
  372. count := int(payload["count"].(float64))
  373. if count > 0 {
  374. if runConfig.disableEstablishing {
  375. // TODO: wrong goroutine for t.FatalNow()
  376. t.Fatalf("tunnel established unexpectedly")
  377. } else {
  378. select {
  379. case tunnelEstablished <- *new(struct{}):
  380. default:
  381. }
  382. }
  383. }
  384. case "ClientUpgradeDownloadedBytes":
  385. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  386. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  387. case "ClientUpgradeDownloaded":
  388. select {
  389. case upgradeDownloaded <- *new(struct{}):
  390. default:
  391. }
  392. case "ClientIsLatestVersion":
  393. select {
  394. case confirmedLatestVersion <- *new(struct{}):
  395. default:
  396. }
  397. case "RemoteServerListDownloadedBytes":
  398. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  399. t.Logf("RemoteServerListDownloadedBytes: %d", int(payload["bytes"].(float64)))
  400. case "RemoteServerListDownloaded":
  401. select {
  402. case remoteServerListDownloaded <- *new(struct{}):
  403. default:
  404. }
  405. case "ImpairedProtocolClassification":
  406. classification := payload["classification"].(map[string]interface{})
  407. impairedProtocolClassification.Lock()
  408. impairedProtocolClassification.classification = make(map[string]int)
  409. for k, v := range classification {
  410. count := int(v.(float64))
  411. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  412. atomic.AddInt32(&impairedProtocolCount, 1)
  413. }
  414. impairedProtocolClassification.classification[k] = count
  415. }
  416. impairedProtocolClassification.Unlock()
  417. case "ActiveTunnel":
  418. serverProtocol := payload["protocol"].(string)
  419. classification := make(map[string]int)
  420. impairedProtocolClassification.RLock()
  421. for k, v := range impairedProtocolClassification.classification {
  422. classification[k] = v
  423. }
  424. impairedProtocolClassification.RUnlock()
  425. count, ok := classification[serverProtocol]
  426. if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  427. // TODO: wrong goroutine for t.FatalNow()
  428. t.Fatalf("unexpected tunnel using impaired protocol: %s, %+v",
  429. serverProtocol, classification)
  430. }
  431. }
  432. }))
  433. // Run controller, which establishes tunnels
  434. shutdownBroadcast := make(chan struct{})
  435. controllerWaitGroup := new(sync.WaitGroup)
  436. controllerWaitGroup.Add(1)
  437. go func() {
  438. defer controllerWaitGroup.Done()
  439. controller.Run(shutdownBroadcast)
  440. }()
  441. defer func() {
  442. // Test: shutdown must complete within 20 seconds
  443. close(shutdownBroadcast)
  444. shutdownTimeout := time.NewTimer(20 * time.Second)
  445. shutdownOk := make(chan struct{}, 1)
  446. go func() {
  447. controllerWaitGroup.Wait()
  448. shutdownOk <- *new(struct{})
  449. }()
  450. select {
  451. case <-shutdownOk:
  452. case <-shutdownTimeout.C:
  453. t.Fatalf("controller shutdown timeout exceeded")
  454. }
  455. }()
  456. if !runConfig.disableEstablishing {
  457. // Test: tunnel must be established within 120 seconds
  458. establishTimeout := time.NewTimer(120 * time.Second)
  459. select {
  460. case <-tunnelEstablished:
  461. case <-establishTimeout.C:
  462. t.Fatalf("tunnel establish timeout exceeded")
  463. }
  464. // Test: if starting with no server entries, a fetch remote
  465. // server list must have succeeded. With disruptNetwork, the
  466. // fetch must have been resumed at least once.
  467. if serverEntryCount == 0 {
  468. select {
  469. case <-remoteServerListDownloaded:
  470. default:
  471. t.Fatalf("expected remote server list downloaded")
  472. }
  473. if runConfig.disruptNetwork {
  474. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  475. if count <= 1 {
  476. t.Fatalf("unexpected remote server list download progress: %d", count)
  477. }
  478. }
  479. }
  480. // Test: fetch website through tunnel
  481. // Allow for known race condition described in NewHttpProxy():
  482. time.Sleep(1 * time.Second)
  483. fetchAndVerifyWebsite(t, httpProxyPort)
  484. // Test: run for duration, periodically using the tunnel to
  485. // ensure failed tunnel detection, and ultimately hitting
  486. // impaired protocol checks.
  487. startTime := time.Now()
  488. for {
  489. time.Sleep(1 * time.Second)
  490. useTunnel(t, httpProxyPort)
  491. if startTime.Add(runConfig.runDuration).Before(time.Now()) {
  492. break
  493. }
  494. }
  495. // Test: with disruptNetwork, impaired protocols should be exercised
  496. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  497. count := atomic.LoadInt32(&impairedProtocolCount)
  498. if count <= 0 {
  499. t.Fatalf("unexpected impaired protocol count: %d", count)
  500. } else {
  501. impairedProtocolClassification.RLock()
  502. t.Logf("impaired protocol classification: %+v",
  503. impairedProtocolClassification.classification)
  504. impairedProtocolClassification.RUnlock()
  505. }
  506. }
  507. }
  508. // Test: upgrade check/download must be downloaded within 180 seconds
  509. upgradeTimeout := time.NewTimer(180 * time.Second)
  510. select {
  511. case <-upgradeDownloaded:
  512. // TODO: verify downloaded file
  513. if runConfig.clientIsLatestVersion {
  514. t.Fatalf("upgrade downloaded unexpectedly")
  515. }
  516. // Test: with disruptNetwork, must be multiple download progress notices
  517. if runConfig.disruptNetwork {
  518. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  519. if count <= 1 {
  520. t.Fatalf("unexpected upgrade download progress: %d", count)
  521. }
  522. }
  523. case <-confirmedLatestVersion:
  524. if !runConfig.clientIsLatestVersion {
  525. t.Fatalf("confirmed latest version unexpectedly")
  526. }
  527. case <-upgradeTimeout.C:
  528. t.Fatalf("upgrade download timeout exceeded")
  529. }
  530. }
  531. type TestHostNameTransformer struct {
  532. }
  533. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  534. return "example.com", true
  535. }
  536. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) {
  537. testUrl := "https://raw.githubusercontent.com/Psiphon-Labs/psiphon-tunnel-core/master/LICENSE"
  538. roundTripTimeout := 10 * time.Second
  539. expectedResponsePrefix := " GNU GENERAL PUBLIC LICENSE"
  540. expectedResponseSize := 35148
  541. checkResponse := func(responseBody string) bool {
  542. return strings.HasPrefix(responseBody, expectedResponsePrefix) && len(responseBody) == expectedResponseSize
  543. }
  544. // Test: use HTTP proxy
  545. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  546. if err != nil {
  547. t.Fatalf("error initializing proxied HTTP request: %s", err)
  548. }
  549. httpClient := &http.Client{
  550. Transport: &http.Transport{
  551. Proxy: http.ProxyURL(proxyUrl),
  552. },
  553. Timeout: roundTripTimeout,
  554. }
  555. response, err := httpClient.Get(testUrl)
  556. if err != nil {
  557. t.Fatalf("error sending proxied HTTP request: %s", err)
  558. }
  559. body, err := ioutil.ReadAll(response.Body)
  560. if err != nil {
  561. t.Fatalf("error reading proxied HTTP response: %s", err)
  562. }
  563. response.Body.Close()
  564. if !checkResponse(string(body)) {
  565. t.Fatalf("unexpected proxied HTTP response")
  566. }
  567. // Test: use direct URL proxy
  568. httpClient = &http.Client{
  569. Transport: http.DefaultTransport,
  570. Timeout: roundTripTimeout,
  571. }
  572. response, err = httpClient.Get(
  573. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  574. httpProxyPort, url.QueryEscape(testUrl)))
  575. if err != nil {
  576. t.Fatalf("error sending direct URL request: %s", err)
  577. }
  578. body, err = ioutil.ReadAll(response.Body)
  579. if err != nil {
  580. t.Fatalf("error reading direct URL response: %s", err)
  581. }
  582. response.Body.Close()
  583. if !checkResponse(string(body)) {
  584. t.Fatalf("unexpected direct URL response")
  585. }
  586. // Test: use tunneled URL proxy
  587. response, err = httpClient.Get(
  588. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  589. httpProxyPort, url.QueryEscape(testUrl)))
  590. if err != nil {
  591. t.Fatalf("error sending tunneled URL request: %s", err)
  592. }
  593. body, err = ioutil.ReadAll(response.Body)
  594. if err != nil {
  595. t.Fatalf("error reading tunneled URL response: %s", err)
  596. }
  597. response.Body.Close()
  598. if !checkResponse(string(body)) {
  599. t.Fatalf("unexpected tunneled URL response")
  600. }
  601. }
  602. func useTunnel(t *testing.T, httpProxyPort int) {
  603. // No action on errors as the tunnel is expected to fail sometimes
  604. testUrl := "https://psiphon3.com"
  605. roundTripTimeout := 1 * time.Second
  606. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  607. if err != nil {
  608. return
  609. }
  610. httpClient := &http.Client{
  611. Transport: &http.Transport{
  612. Proxy: http.ProxyURL(proxyUrl),
  613. },
  614. Timeout: roundTripTimeout,
  615. }
  616. response, err := httpClient.Get(testUrl)
  617. if err != nil {
  618. return
  619. }
  620. response.Body.Close()
  621. }
  622. const disruptorProxyAddress = "127.0.0.1:2160"
  623. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  624. const disruptorMaxConnectionBytes = 625000
  625. const disruptorMaxConnectionTime = 10 * time.Second
  626. func initDisruptor() {
  627. go func() {
  628. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  629. if err != nil {
  630. fmt.Errorf("disruptor proxy listen error: %s", err)
  631. return
  632. }
  633. for {
  634. localConn, err := listener.AcceptSocks()
  635. if err != nil {
  636. fmt.Errorf("disruptor proxy accept error: %s", err)
  637. return
  638. }
  639. go func() {
  640. defer localConn.Close()
  641. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  642. if err != nil {
  643. fmt.Errorf("disruptor proxy dial error: %s", err)
  644. return
  645. }
  646. defer remoteConn.Close()
  647. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  648. if err != nil {
  649. fmt.Errorf("disruptor proxy grant error: %s", err)
  650. return
  651. }
  652. // Cut connection after disruptorMaxConnectionTime
  653. time.AfterFunc(disruptorMaxConnectionTime, func() {
  654. localConn.Close()
  655. remoteConn.Close()
  656. })
  657. // Relay connection, but only up to disruptorMaxConnectionBytes
  658. waitGroup := new(sync.WaitGroup)
  659. waitGroup.Add(1)
  660. go func() {
  661. defer waitGroup.Done()
  662. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  663. }()
  664. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  665. waitGroup.Wait()
  666. }()
  667. }
  668. }()
  669. }