controller_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "net"
  26. "net/http"
  27. "net/url"
  28. "os"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "testing"
  33. "time"
  34. socks "github.com/Psiphon-Inc/goptlib"
  35. )
  36. func TestMain(m *testing.M) {
  37. flag.Parse()
  38. os.Remove(DATA_STORE_FILENAME)
  39. initDisruptor()
  40. setEmitDiagnosticNotices(true)
  41. os.Exit(m.Run())
  42. }
  43. // Test case notes/limitations/dependencies:
  44. //
  45. // * Untunneled upgrade tests must execute before
  46. // the other tests to ensure no tunnel is established.
  47. // We need a way to reset the datastore after it's been
  48. // initialized in order to to clear out its data entries
  49. // and be able to arbitrarily order the tests.
  50. //
  51. // * The resumable download tests using disruptNetwork
  52. // depend on the download object being larger than the
  53. // disruptorMax limits so that the disruptor will actually
  54. // interrupt the first download attempt. Specifically, the
  55. // upgrade and remote server list at the URLs specified in
  56. // controller_test.config.enc.
  57. //
  58. // * The protocol tests assume there is at least one server
  59. // supporting each protocol in the server list at the URL
  60. // specified in controller_test.config.enc, and that these
  61. // servers are not overloaded.
  62. //
  63. // * fetchAndVerifyWebsite depends on the target URL being
  64. // available and responding.
  65. //
  66. func TestUntunneledUpgradeDownload(t *testing.T) {
  67. controllerRun(t,
  68. &controllerRunConfig{
  69. expectNoServerEntries: true,
  70. protocol: "",
  71. clientIsLatestVersion: false,
  72. disableUntunneledUpgrade: false,
  73. disableEstablishing: true,
  74. disableApi: false,
  75. tunnelPoolSize: 1,
  76. disruptNetwork: false,
  77. useHostNameTransformer: false,
  78. runDuration: 0,
  79. })
  80. }
  81. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  82. controllerRun(t,
  83. &controllerRunConfig{
  84. expectNoServerEntries: true,
  85. protocol: "",
  86. clientIsLatestVersion: false,
  87. disableUntunneledUpgrade: false,
  88. disableEstablishing: true,
  89. disableApi: false,
  90. tunnelPoolSize: 1,
  91. disruptNetwork: true,
  92. useHostNameTransformer: false,
  93. runDuration: 0,
  94. })
  95. }
  96. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  97. controllerRun(t,
  98. &controllerRunConfig{
  99. expectNoServerEntries: true,
  100. protocol: "",
  101. clientIsLatestVersion: true,
  102. disableUntunneledUpgrade: false,
  103. disableEstablishing: true,
  104. disableApi: false,
  105. tunnelPoolSize: 1,
  106. disruptNetwork: false,
  107. useHostNameTransformer: false,
  108. runDuration: 0,
  109. })
  110. }
  111. func TestUntunneledResumableFetchRemoveServerList(t *testing.T) {
  112. controllerRun(t,
  113. &controllerRunConfig{
  114. expectNoServerEntries: true,
  115. protocol: "",
  116. clientIsLatestVersion: true,
  117. disableUntunneledUpgrade: false,
  118. disableEstablishing: false,
  119. disableApi: false,
  120. tunnelPoolSize: 1,
  121. disruptNetwork: true,
  122. useHostNameTransformer: false,
  123. runDuration: 0,
  124. })
  125. }
  126. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  127. controllerRun(t,
  128. &controllerRunConfig{
  129. expectNoServerEntries: false,
  130. protocol: "",
  131. clientIsLatestVersion: true,
  132. disableUntunneledUpgrade: true,
  133. disableEstablishing: false,
  134. disableApi: false,
  135. tunnelPoolSize: 1,
  136. disruptNetwork: false,
  137. useHostNameTransformer: false,
  138. runDuration: 0,
  139. })
  140. }
  141. func TestImpairedProtocols(t *testing.T) {
  142. // This test sets a tunnelPoolSize of 40 and runs
  143. // the session for 1 minute with network disruption
  144. // on. All 40 tunnels being disrupted every 10
  145. // seconds (followed by ssh keep alive probe timeout)
  146. // should be sufficient to trigger at least one
  147. // impaired protocol classification.
  148. controllerRun(t,
  149. &controllerRunConfig{
  150. expectNoServerEntries: false,
  151. protocol: "",
  152. clientIsLatestVersion: true,
  153. disableUntunneledUpgrade: true,
  154. disableEstablishing: false,
  155. disableApi: false,
  156. tunnelPoolSize: 40,
  157. disruptNetwork: true,
  158. useHostNameTransformer: false,
  159. runDuration: 1 * time.Minute,
  160. })
  161. }
  162. func TestSSH(t *testing.T) {
  163. controllerRun(t,
  164. &controllerRunConfig{
  165. expectNoServerEntries: false,
  166. protocol: TUNNEL_PROTOCOL_SSH,
  167. clientIsLatestVersion: false,
  168. disableUntunneledUpgrade: true,
  169. disableEstablishing: false,
  170. disableApi: false,
  171. tunnelPoolSize: 1,
  172. disruptNetwork: false,
  173. useHostNameTransformer: false,
  174. runDuration: 0,
  175. })
  176. }
  177. func TestObfuscatedSSH(t *testing.T) {
  178. controllerRun(t,
  179. &controllerRunConfig{
  180. expectNoServerEntries: false,
  181. protocol: TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  182. clientIsLatestVersion: false,
  183. disableUntunneledUpgrade: true,
  184. disableEstablishing: false,
  185. disableApi: false,
  186. tunnelPoolSize: 1,
  187. disruptNetwork: false,
  188. useHostNameTransformer: false,
  189. runDuration: 0,
  190. })
  191. }
  192. func TestUnfrontedMeek(t *testing.T) {
  193. controllerRun(t,
  194. &controllerRunConfig{
  195. expectNoServerEntries: false,
  196. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  197. clientIsLatestVersion: false,
  198. disableUntunneledUpgrade: true,
  199. disableEstablishing: false,
  200. disableApi: false,
  201. tunnelPoolSize: 1,
  202. disruptNetwork: false,
  203. useHostNameTransformer: false,
  204. runDuration: 0,
  205. })
  206. }
  207. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  208. controllerRun(t,
  209. &controllerRunConfig{
  210. expectNoServerEntries: false,
  211. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  212. clientIsLatestVersion: true,
  213. disableUntunneledUpgrade: true,
  214. disableEstablishing: false,
  215. disableApi: false,
  216. tunnelPoolSize: 1,
  217. disruptNetwork: false,
  218. useHostNameTransformer: true,
  219. runDuration: 0,
  220. })
  221. }
  222. func TestFrontedMeek(t *testing.T) {
  223. controllerRun(t,
  224. &controllerRunConfig{
  225. expectNoServerEntries: false,
  226. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  227. clientIsLatestVersion: false,
  228. disableUntunneledUpgrade: true,
  229. disableEstablishing: false,
  230. disableApi: false,
  231. tunnelPoolSize: 1,
  232. disruptNetwork: false,
  233. useHostNameTransformer: false,
  234. runDuration: 0,
  235. })
  236. }
  237. func TestFrontedMeekWithTransformer(t *testing.T) {
  238. controllerRun(t,
  239. &controllerRunConfig{
  240. expectNoServerEntries: false,
  241. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  242. clientIsLatestVersion: true,
  243. disableUntunneledUpgrade: true,
  244. disableEstablishing: false,
  245. disableApi: false,
  246. tunnelPoolSize: 1,
  247. disruptNetwork: false,
  248. useHostNameTransformer: true,
  249. runDuration: 0,
  250. })
  251. }
  252. func TestFrontedMeekHTTP(t *testing.T) {
  253. controllerRun(t,
  254. &controllerRunConfig{
  255. expectNoServerEntries: false,
  256. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  257. clientIsLatestVersion: true,
  258. disableUntunneledUpgrade: true,
  259. disableEstablishing: false,
  260. disableApi: false,
  261. tunnelPoolSize: 1,
  262. disruptNetwork: false,
  263. useHostNameTransformer: false,
  264. runDuration: 0,
  265. })
  266. }
  267. func TestUnfrontedMeekHTTPS(t *testing.T) {
  268. controllerRun(t,
  269. &controllerRunConfig{
  270. expectNoServerEntries: false,
  271. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  272. clientIsLatestVersion: false,
  273. disableUntunneledUpgrade: true,
  274. disableEstablishing: false,
  275. disableApi: false,
  276. tunnelPoolSize: 1,
  277. disruptNetwork: false,
  278. useHostNameTransformer: false,
  279. runDuration: 0,
  280. })
  281. }
  282. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  283. controllerRun(t,
  284. &controllerRunConfig{
  285. expectNoServerEntries: false,
  286. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  287. clientIsLatestVersion: true,
  288. disableUntunneledUpgrade: true,
  289. disableEstablishing: false,
  290. disableApi: false,
  291. tunnelPoolSize: 1,
  292. disruptNetwork: false,
  293. useHostNameTransformer: true,
  294. runDuration: 0,
  295. })
  296. }
  297. func TestDisabledApi(t *testing.T) {
  298. controllerRun(t,
  299. &controllerRunConfig{
  300. expectNoServerEntries: false,
  301. protocol: "",
  302. clientIsLatestVersion: true,
  303. disableUntunneledUpgrade: true,
  304. disableEstablishing: false,
  305. disableApi: true,
  306. tunnelPoolSize: 1,
  307. disruptNetwork: false,
  308. useHostNameTransformer: false,
  309. runDuration: 0,
  310. })
  311. }
  312. type controllerRunConfig struct {
  313. expectNoServerEntries bool
  314. protocol string
  315. clientIsLatestVersion bool
  316. disableUntunneledUpgrade bool
  317. disableEstablishing bool
  318. tunnelPoolSize int
  319. disruptNetwork bool
  320. useHostNameTransformer bool
  321. runDuration time.Duration
  322. disableApi bool
  323. }
  324. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  325. configFileContents, err := ioutil.ReadFile("controller_test.config")
  326. if err != nil {
  327. // Skip, don't fail, if config file is not present
  328. t.Skipf("error loading configuration file: %s", err)
  329. }
  330. config, err := LoadConfig(configFileContents)
  331. if err != nil {
  332. t.Fatalf("error processing configuration file: %s", err)
  333. }
  334. if runConfig.clientIsLatestVersion {
  335. config.ClientVersion = "999999999"
  336. }
  337. if runConfig.disableEstablishing {
  338. // Clear remote server list so tunnel cannot be established.
  339. // TODO: also delete all server entries in the datastore.
  340. config.RemoteServerListUrl = ""
  341. }
  342. if runConfig.disableApi {
  343. config.DisableApi = true
  344. }
  345. config.TunnelPoolSize = runConfig.tunnelPoolSize
  346. if runConfig.disableUntunneledUpgrade {
  347. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  348. config.UpgradeDownloadClientVersionHeader = ""
  349. }
  350. if runConfig.disruptNetwork {
  351. config.UpstreamProxyUrl = disruptorProxyURL
  352. }
  353. if runConfig.useHostNameTransformer {
  354. config.HostNameTransformer = &TestHostNameTransformer{}
  355. }
  356. os.Remove(config.UpgradeDownloadFilename)
  357. config.TunnelProtocol = runConfig.protocol
  358. err = InitDataStore(config)
  359. if err != nil {
  360. t.Fatalf("error initializing datastore: %s", err)
  361. }
  362. serverEntryCount := CountServerEntries("", "")
  363. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  364. // TODO: replace expectNoServerEntries with resetServerEntries
  365. // so tests can run in arbitrary order
  366. t.Fatalf("unexpected server entries")
  367. }
  368. controller, err := NewController(config)
  369. if err != nil {
  370. t.Fatalf("error creating controller: %s", err)
  371. }
  372. // Monitor notices for "Tunnels" with count > 1, the
  373. // indication of tunnel establishment success.
  374. // Also record the selected HTTP proxy port to use
  375. // when fetching websites through the tunnel.
  376. httpProxyPort := 0
  377. tunnelEstablished := make(chan struct{}, 1)
  378. upgradeDownloaded := make(chan struct{}, 1)
  379. remoteServerListDownloaded := make(chan struct{}, 1)
  380. confirmedLatestVersion := make(chan struct{}, 1)
  381. var clientUpgradeDownloadedBytesCount int32
  382. var remoteServerListDownloadedBytesCount int32
  383. var impairedProtocolCount int32
  384. var impairedProtocolClassification = struct {
  385. sync.RWMutex
  386. classification map[string]int
  387. }{classification: make(map[string]int)}
  388. SetNoticeOutput(NewNoticeReceiver(
  389. func(notice []byte) {
  390. // TODO: log notices without logging server IPs:
  391. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  392. noticeType, payload, err := GetNotice(notice)
  393. if err != nil {
  394. return
  395. }
  396. switch noticeType {
  397. case "ListeningHttpProxyPort":
  398. httpProxyPort = int(payload["port"].(float64))
  399. case "ConnectingServer":
  400. serverProtocol := payload["protocol"].(string)
  401. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  402. // TODO: wrong goroutine for t.FatalNow()
  403. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  404. }
  405. case "Tunnels":
  406. count := int(payload["count"].(float64))
  407. if count > 0 {
  408. if runConfig.disableEstablishing {
  409. // TODO: wrong goroutine for t.FatalNow()
  410. t.Fatalf("tunnel established unexpectedly")
  411. } else {
  412. select {
  413. case tunnelEstablished <- *new(struct{}):
  414. default:
  415. }
  416. }
  417. }
  418. case "ClientUpgradeDownloadedBytes":
  419. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  420. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  421. case "ClientUpgradeDownloaded":
  422. select {
  423. case upgradeDownloaded <- *new(struct{}):
  424. default:
  425. }
  426. case "ClientIsLatestVersion":
  427. select {
  428. case confirmedLatestVersion <- *new(struct{}):
  429. default:
  430. }
  431. case "RemoteServerListDownloadedBytes":
  432. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  433. t.Logf("RemoteServerListDownloadedBytes: %d", int(payload["bytes"].(float64)))
  434. case "RemoteServerListDownloaded":
  435. select {
  436. case remoteServerListDownloaded <- *new(struct{}):
  437. default:
  438. }
  439. case "ImpairedProtocolClassification":
  440. classification := payload["classification"].(map[string]interface{})
  441. impairedProtocolClassification.Lock()
  442. impairedProtocolClassification.classification = make(map[string]int)
  443. for k, v := range classification {
  444. count := int(v.(float64))
  445. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  446. atomic.AddInt32(&impairedProtocolCount, 1)
  447. }
  448. impairedProtocolClassification.classification[k] = count
  449. }
  450. impairedProtocolClassification.Unlock()
  451. case "ActiveTunnel":
  452. serverProtocol := payload["protocol"].(string)
  453. classification := make(map[string]int)
  454. impairedProtocolClassification.RLock()
  455. for k, v := range impairedProtocolClassification.classification {
  456. classification[k] = v
  457. }
  458. impairedProtocolClassification.RUnlock()
  459. count, ok := classification[serverProtocol]
  460. if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  461. // TODO: wrong goroutine for t.FatalNow()
  462. t.Fatalf("unexpected tunnel using impaired protocol: %s, %+v",
  463. serverProtocol, classification)
  464. }
  465. }
  466. }))
  467. // Run controller, which establishes tunnels
  468. shutdownBroadcast := make(chan struct{})
  469. controllerWaitGroup := new(sync.WaitGroup)
  470. controllerWaitGroup.Add(1)
  471. go func() {
  472. defer controllerWaitGroup.Done()
  473. controller.Run(shutdownBroadcast)
  474. }()
  475. defer func() {
  476. // Test: shutdown must complete within 20 seconds
  477. close(shutdownBroadcast)
  478. shutdownTimeout := time.NewTimer(20 * time.Second)
  479. shutdownOk := make(chan struct{}, 1)
  480. go func() {
  481. controllerWaitGroup.Wait()
  482. shutdownOk <- *new(struct{})
  483. }()
  484. select {
  485. case <-shutdownOk:
  486. case <-shutdownTimeout.C:
  487. t.Fatalf("controller shutdown timeout exceeded")
  488. }
  489. }()
  490. if !runConfig.disableEstablishing {
  491. // Test: tunnel must be established within 120 seconds
  492. establishTimeout := time.NewTimer(120 * time.Second)
  493. select {
  494. case <-tunnelEstablished:
  495. case <-establishTimeout.C:
  496. t.Fatalf("tunnel establish timeout exceeded")
  497. }
  498. // Test: if starting with no server entries, a fetch remote
  499. // server list must have succeeded. With disruptNetwork, the
  500. // fetch must have been resumed at least once.
  501. if serverEntryCount == 0 {
  502. select {
  503. case <-remoteServerListDownloaded:
  504. default:
  505. t.Fatalf("expected remote server list downloaded")
  506. }
  507. if runConfig.disruptNetwork {
  508. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  509. if count <= 1 {
  510. t.Fatalf("unexpected remote server list download progress: %d", count)
  511. }
  512. }
  513. }
  514. // Test: fetch website through tunnel
  515. // Allow for known race condition described in NewHttpProxy():
  516. time.Sleep(1 * time.Second)
  517. fetchAndVerifyWebsite(t, httpProxyPort)
  518. // Test: run for duration, periodically using the tunnel to
  519. // ensure failed tunnel detection, and ultimately hitting
  520. // impaired protocol checks.
  521. startTime := time.Now()
  522. for {
  523. time.Sleep(1 * time.Second)
  524. useTunnel(t, httpProxyPort)
  525. if startTime.Add(runConfig.runDuration).Before(time.Now()) {
  526. break
  527. }
  528. }
  529. // Test: with disruptNetwork, impaired protocols should be exercised
  530. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  531. count := atomic.LoadInt32(&impairedProtocolCount)
  532. if count <= 0 {
  533. t.Fatalf("unexpected impaired protocol count: %d", count)
  534. } else {
  535. impairedProtocolClassification.RLock()
  536. t.Logf("impaired protocol classification: %+v",
  537. impairedProtocolClassification.classification)
  538. impairedProtocolClassification.RUnlock()
  539. }
  540. }
  541. }
  542. // Test: upgrade check/download must be downloaded within 180 seconds
  543. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  544. if expectUpgrade {
  545. upgradeTimeout := time.NewTimer(180 * time.Second)
  546. select {
  547. case <-upgradeDownloaded:
  548. // TODO: verify downloaded file
  549. if runConfig.clientIsLatestVersion {
  550. t.Fatalf("upgrade downloaded unexpectedly")
  551. }
  552. // Test: with disruptNetwork, must be multiple download progress notices
  553. if runConfig.disruptNetwork {
  554. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  555. if count <= 1 {
  556. t.Fatalf("unexpected upgrade download progress: %d", count)
  557. }
  558. }
  559. case <-confirmedLatestVersion:
  560. if !runConfig.clientIsLatestVersion {
  561. t.Fatalf("confirmed latest version unexpectedly")
  562. }
  563. case <-upgradeTimeout.C:
  564. t.Fatalf("upgrade download timeout exceeded")
  565. }
  566. }
  567. }
  568. type TestHostNameTransformer struct {
  569. }
  570. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  571. return "example.com", true
  572. }
  573. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) {
  574. testUrl := "https://raw.githubusercontent.com/Psiphon-Labs/psiphon-tunnel-core/master/LICENSE"
  575. roundTripTimeout := 10 * time.Second
  576. expectedResponsePrefix := " GNU GENERAL PUBLIC LICENSE"
  577. expectedResponseSize := 35148
  578. checkResponse := func(responseBody string) bool {
  579. return strings.HasPrefix(responseBody, expectedResponsePrefix) && len(responseBody) == expectedResponseSize
  580. }
  581. // Test: use HTTP proxy
  582. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  583. if err != nil {
  584. t.Fatalf("error initializing proxied HTTP request: %s", err)
  585. }
  586. httpClient := &http.Client{
  587. Transport: &http.Transport{
  588. Proxy: http.ProxyURL(proxyUrl),
  589. },
  590. Timeout: roundTripTimeout,
  591. }
  592. response, err := httpClient.Get(testUrl)
  593. if err != nil {
  594. t.Fatalf("error sending proxied HTTP request: %s", err)
  595. }
  596. body, err := ioutil.ReadAll(response.Body)
  597. if err != nil {
  598. t.Fatalf("error reading proxied HTTP response: %s", err)
  599. }
  600. response.Body.Close()
  601. if !checkResponse(string(body)) {
  602. t.Fatalf("unexpected proxied HTTP response")
  603. }
  604. // Test: use direct URL proxy
  605. httpClient = &http.Client{
  606. Transport: http.DefaultTransport,
  607. Timeout: roundTripTimeout,
  608. }
  609. response, err = httpClient.Get(
  610. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  611. httpProxyPort, url.QueryEscape(testUrl)))
  612. if err != nil {
  613. t.Fatalf("error sending direct URL request: %s", err)
  614. }
  615. body, err = ioutil.ReadAll(response.Body)
  616. if err != nil {
  617. t.Fatalf("error reading direct URL response: %s", err)
  618. }
  619. response.Body.Close()
  620. if !checkResponse(string(body)) {
  621. t.Fatalf("unexpected direct URL response")
  622. }
  623. // Test: use tunneled URL proxy
  624. response, err = httpClient.Get(
  625. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  626. httpProxyPort, url.QueryEscape(testUrl)))
  627. if err != nil {
  628. t.Fatalf("error sending tunneled URL request: %s", err)
  629. }
  630. body, err = ioutil.ReadAll(response.Body)
  631. if err != nil {
  632. t.Fatalf("error reading tunneled URL response: %s", err)
  633. }
  634. response.Body.Close()
  635. if !checkResponse(string(body)) {
  636. t.Fatalf("unexpected tunneled URL response")
  637. }
  638. }
  639. func useTunnel(t *testing.T, httpProxyPort int) {
  640. // No action on errors as the tunnel is expected to fail sometimes
  641. testUrl := "https://psiphon3.com"
  642. roundTripTimeout := 1 * time.Second
  643. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  644. if err != nil {
  645. return
  646. }
  647. httpClient := &http.Client{
  648. Transport: &http.Transport{
  649. Proxy: http.ProxyURL(proxyUrl),
  650. },
  651. Timeout: roundTripTimeout,
  652. }
  653. response, err := httpClient.Get(testUrl)
  654. if err != nil {
  655. return
  656. }
  657. response.Body.Close()
  658. }
  659. const disruptorProxyAddress = "127.0.0.1:2160"
  660. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  661. const disruptorMaxConnectionBytes = 625000
  662. const disruptorMaxConnectionTime = 10 * time.Second
  663. func initDisruptor() {
  664. go func() {
  665. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  666. if err != nil {
  667. fmt.Errorf("disruptor proxy listen error: %s", err)
  668. return
  669. }
  670. for {
  671. localConn, err := listener.AcceptSocks()
  672. if err != nil {
  673. fmt.Errorf("disruptor proxy accept error: %s", err)
  674. return
  675. }
  676. go func() {
  677. defer localConn.Close()
  678. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  679. if err != nil {
  680. fmt.Errorf("disruptor proxy dial error: %s", err)
  681. return
  682. }
  683. defer remoteConn.Close()
  684. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  685. if err != nil {
  686. fmt.Errorf("disruptor proxy grant error: %s", err)
  687. return
  688. }
  689. // Cut connection after disruptorMaxConnectionTime
  690. time.AfterFunc(disruptorMaxConnectionTime, func() {
  691. localConn.Close()
  692. remoteConn.Close()
  693. })
  694. // Relay connection, but only up to disruptorMaxConnectionBytes
  695. waitGroup := new(sync.WaitGroup)
  696. waitGroup.Add(1)
  697. go func() {
  698. defer waitGroup.Done()
  699. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  700. }()
  701. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  702. waitGroup.Wait()
  703. }()
  704. }
  705. }()
  706. }