controller_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "net"
  26. "net/http"
  27. "net/url"
  28. "os"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "testing"
  33. "time"
  34. socks "github.com/Psiphon-Inc/goptlib"
  35. )
  36. func TestMain(m *testing.M) {
  37. flag.Parse()
  38. os.Remove(DATA_STORE_FILENAME)
  39. initDisruptor()
  40. SetEmitDiagnosticNotices(true)
  41. os.Exit(m.Run())
  42. }
  43. // Test case notes/limitations/dependencies:
  44. //
  45. // * Untunneled upgrade tests must execute before
  46. // the other tests to ensure no tunnel is established.
  47. // We need a way to reset the datastore after it's been
  48. // initialized in order to to clear out its data entries
  49. // and be able to arbitrarily order the tests.
  50. //
  51. // * The resumable download tests using disruptNetwork
  52. // depend on the download object being larger than the
  53. // disruptorMax limits so that the disruptor will actually
  54. // interrupt the first download attempt. Specifically, the
  55. // upgrade and remote server list at the URLs specified in
  56. // controller_test.config.enc.
  57. //
  58. // * The protocol tests assume there is at least one server
  59. // supporting each protocol in the server list at the URL
  60. // specified in controller_test.config.enc, and that these
  61. // servers are not overloaded.
  62. //
  63. // * fetchAndVerifyWebsite depends on the target URL being
  64. // available and responding.
  65. //
  66. func TestUntunneledUpgradeDownload(t *testing.T) {
  67. controllerRun(t,
  68. &controllerRunConfig{
  69. expectNoServerEntries: true,
  70. protocol: "",
  71. clientIsLatestVersion: false,
  72. disableUntunneledUpgrade: false,
  73. disableEstablishing: true,
  74. disableApi: false,
  75. tunnelPoolSize: 1,
  76. disruptNetwork: false,
  77. useHostNameTransformer: false,
  78. runDuration: 0,
  79. })
  80. }
  81. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  82. controllerRun(t,
  83. &controllerRunConfig{
  84. expectNoServerEntries: true,
  85. protocol: "",
  86. clientIsLatestVersion: false,
  87. disableUntunneledUpgrade: false,
  88. disableEstablishing: true,
  89. disableApi: false,
  90. tunnelPoolSize: 1,
  91. disruptNetwork: true,
  92. useHostNameTransformer: false,
  93. runDuration: 0,
  94. })
  95. }
  96. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  97. controllerRun(t,
  98. &controllerRunConfig{
  99. expectNoServerEntries: true,
  100. protocol: "",
  101. clientIsLatestVersion: true,
  102. disableUntunneledUpgrade: false,
  103. disableEstablishing: true,
  104. disableApi: false,
  105. tunnelPoolSize: 1,
  106. disruptNetwork: false,
  107. useHostNameTransformer: false,
  108. runDuration: 0,
  109. })
  110. }
  111. func TestUntunneledResumableFetchRemoveServerList(t *testing.T) {
  112. controllerRun(t,
  113. &controllerRunConfig{
  114. expectNoServerEntries: true,
  115. protocol: "",
  116. clientIsLatestVersion: true,
  117. disableUntunneledUpgrade: false,
  118. disableEstablishing: false,
  119. disableApi: false,
  120. tunnelPoolSize: 1,
  121. disruptNetwork: true,
  122. useHostNameTransformer: false,
  123. runDuration: 0,
  124. })
  125. }
  126. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  127. controllerRun(t,
  128. &controllerRunConfig{
  129. expectNoServerEntries: false,
  130. protocol: "",
  131. clientIsLatestVersion: true,
  132. disableUntunneledUpgrade: true,
  133. disableEstablishing: false,
  134. disableApi: false,
  135. tunnelPoolSize: 1,
  136. disruptNetwork: false,
  137. useHostNameTransformer: false,
  138. runDuration: 0,
  139. })
  140. }
  141. func TestImpairedProtocols(t *testing.T) {
  142. // This test sets a tunnelPoolSize of 40 and runs
  143. // the session for 1 minute with network disruption
  144. // on. All 40 tunnels being disrupted every 10
  145. // seconds (followed by ssh keep alive probe timeout)
  146. // should be sufficient to trigger at least one
  147. // impaired protocol classification.
  148. controllerRun(t,
  149. &controllerRunConfig{
  150. expectNoServerEntries: false,
  151. protocol: "",
  152. clientIsLatestVersion: true,
  153. disableUntunneledUpgrade: true,
  154. disableEstablishing: false,
  155. disableApi: false,
  156. tunnelPoolSize: 40,
  157. disruptNetwork: true,
  158. useHostNameTransformer: false,
  159. runDuration: 1 * time.Minute,
  160. })
  161. }
  162. func TestSSH(t *testing.T) {
  163. controllerRun(t,
  164. &controllerRunConfig{
  165. expectNoServerEntries: false,
  166. protocol: TUNNEL_PROTOCOL_SSH,
  167. clientIsLatestVersion: false,
  168. disableUntunneledUpgrade: true,
  169. disableEstablishing: false,
  170. disableApi: false,
  171. tunnelPoolSize: 1,
  172. disruptNetwork: false,
  173. useHostNameTransformer: false,
  174. runDuration: 0,
  175. })
  176. }
  177. func TestObfuscatedSSH(t *testing.T) {
  178. controllerRun(t,
  179. &controllerRunConfig{
  180. expectNoServerEntries: false,
  181. protocol: TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  182. clientIsLatestVersion: false,
  183. disableUntunneledUpgrade: true,
  184. disableEstablishing: false,
  185. disableApi: false,
  186. tunnelPoolSize: 1,
  187. disruptNetwork: false,
  188. useHostNameTransformer: false,
  189. runDuration: 0,
  190. })
  191. }
  192. func TestUnfrontedMeek(t *testing.T) {
  193. controllerRun(t,
  194. &controllerRunConfig{
  195. expectNoServerEntries: false,
  196. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  197. clientIsLatestVersion: false,
  198. disableUntunneledUpgrade: true,
  199. disableEstablishing: false,
  200. disableApi: false,
  201. tunnelPoolSize: 1,
  202. disruptNetwork: false,
  203. useHostNameTransformer: false,
  204. runDuration: 0,
  205. })
  206. }
  207. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  208. controllerRun(t,
  209. &controllerRunConfig{
  210. expectNoServerEntries: false,
  211. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  212. clientIsLatestVersion: true,
  213. disableUntunneledUpgrade: true,
  214. disableEstablishing: false,
  215. disableApi: false,
  216. tunnelPoolSize: 1,
  217. disruptNetwork: false,
  218. useHostNameTransformer: true,
  219. runDuration: 0,
  220. })
  221. }
  222. func TestFrontedMeek(t *testing.T) {
  223. controllerRun(t,
  224. &controllerRunConfig{
  225. expectNoServerEntries: false,
  226. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  227. clientIsLatestVersion: false,
  228. disableUntunneledUpgrade: true,
  229. disableEstablishing: false,
  230. disableApi: false,
  231. tunnelPoolSize: 1,
  232. disruptNetwork: false,
  233. useHostNameTransformer: false,
  234. runDuration: 0,
  235. })
  236. }
  237. func TestFrontedMeekWithTransformer(t *testing.T) {
  238. controllerRun(t,
  239. &controllerRunConfig{
  240. expectNoServerEntries: false,
  241. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
  242. clientIsLatestVersion: true,
  243. disableUntunneledUpgrade: true,
  244. disableEstablishing: false,
  245. disableApi: false,
  246. tunnelPoolSize: 1,
  247. disruptNetwork: false,
  248. useHostNameTransformer: true,
  249. runDuration: 0,
  250. })
  251. }
  252. func TestFrontedMeekHTTP(t *testing.T) {
  253. controllerRun(t,
  254. &controllerRunConfig{
  255. expectNoServerEntries: false,
  256. protocol: TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  257. clientIsLatestVersion: true,
  258. disableUntunneledUpgrade: true,
  259. disableEstablishing: false,
  260. disableApi: false,
  261. tunnelPoolSize: 1,
  262. disruptNetwork: false,
  263. useHostNameTransformer: false,
  264. runDuration: 0,
  265. })
  266. }
  267. func TestUnfrontedMeekHTTPS(t *testing.T) {
  268. controllerRun(t,
  269. &controllerRunConfig{
  270. expectNoServerEntries: false,
  271. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  272. clientIsLatestVersion: false,
  273. disableUntunneledUpgrade: true,
  274. disableEstablishing: false,
  275. disableApi: false,
  276. tunnelPoolSize: 1,
  277. disruptNetwork: false,
  278. useHostNameTransformer: false,
  279. runDuration: 0,
  280. })
  281. }
  282. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  283. controllerRun(t,
  284. &controllerRunConfig{
  285. expectNoServerEntries: false,
  286. protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  287. clientIsLatestVersion: true,
  288. disableUntunneledUpgrade: true,
  289. disableEstablishing: false,
  290. disableApi: false,
  291. tunnelPoolSize: 1,
  292. disruptNetwork: false,
  293. useHostNameTransformer: true,
  294. runDuration: 0,
  295. })
  296. }
  297. func TestDisabledApi(t *testing.T) {
  298. controllerRun(t,
  299. &controllerRunConfig{
  300. expectNoServerEntries: false,
  301. protocol: "",
  302. clientIsLatestVersion: true,
  303. disableUntunneledUpgrade: true,
  304. disableEstablishing: false,
  305. disableApi: true,
  306. tunnelPoolSize: 1,
  307. disruptNetwork: false,
  308. useHostNameTransformer: false,
  309. runDuration: 0,
  310. })
  311. }
  312. type controllerRunConfig struct {
  313. expectNoServerEntries bool
  314. protocol string
  315. clientIsLatestVersion bool
  316. disableUntunneledUpgrade bool
  317. disableEstablishing bool
  318. tunnelPoolSize int
  319. disruptNetwork bool
  320. useHostNameTransformer bool
  321. runDuration time.Duration
  322. disableApi bool
  323. }
  324. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  325. configFileContents, err := ioutil.ReadFile("controller_test.config")
  326. if err != nil {
  327. // Skip, don't fail, if config file is not present
  328. t.Skipf("error loading configuration file: %s", err)
  329. }
  330. config, err := LoadConfig(configFileContents)
  331. if err != nil {
  332. t.Fatalf("error processing configuration file: %s", err)
  333. }
  334. if runConfig.clientIsLatestVersion {
  335. config.ClientVersion = "999999999"
  336. }
  337. if runConfig.disableEstablishing {
  338. // Clear remote server list so tunnel cannot be established.
  339. // TODO: also delete all server entries in the datastore.
  340. config.RemoteServerListUrl = ""
  341. }
  342. if runConfig.disableApi {
  343. config.DisableApi = true
  344. }
  345. config.TunnelPoolSize = runConfig.tunnelPoolSize
  346. if runConfig.disableUntunneledUpgrade {
  347. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  348. config.UpgradeDownloadClientVersionHeader = ""
  349. }
  350. if runConfig.disruptNetwork {
  351. config.UpstreamProxyUrl = disruptorProxyURL
  352. }
  353. if runConfig.useHostNameTransformer {
  354. config.HostNameTransformer = &TestHostNameTransformer{}
  355. }
  356. // Override client retry throttle values to speed up automated
  357. // tests and ensure tests complete within fixed deadlines.
  358. fetchRemoteServerListRetryPeriodSeconds := 0
  359. config.FetchRemoteServerListRetryPeriodSeconds = &fetchRemoteServerListRetryPeriodSeconds
  360. downloadUpgradeRetryPeriodSeconds := 0
  361. config.DownloadUpgradeRetryPeriodSeconds = &downloadUpgradeRetryPeriodSeconds
  362. establishTunnelPausePeriodSeconds := 1
  363. config.EstablishTunnelPausePeriodSeconds = &establishTunnelPausePeriodSeconds
  364. os.Remove(config.UpgradeDownloadFilename)
  365. config.TunnelProtocol = runConfig.protocol
  366. err = InitDataStore(config)
  367. if err != nil {
  368. t.Fatalf("error initializing datastore: %s", err)
  369. }
  370. serverEntryCount := CountServerEntries("", "")
  371. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  372. // TODO: replace expectNoServerEntries with resetServerEntries
  373. // so tests can run in arbitrary order
  374. t.Fatalf("unexpected server entries")
  375. }
  376. controller, err := NewController(config)
  377. if err != nil {
  378. t.Fatalf("error creating controller: %s", err)
  379. }
  380. // Monitor notices for "Tunnels" with count > 1, the
  381. // indication of tunnel establishment success.
  382. // Also record the selected HTTP proxy port to use
  383. // when fetching websites through the tunnel.
  384. httpProxyPort := 0
  385. tunnelEstablished := make(chan struct{}, 1)
  386. upgradeDownloaded := make(chan struct{}, 1)
  387. remoteServerListDownloaded := make(chan struct{}, 1)
  388. confirmedLatestVersion := make(chan struct{}, 1)
  389. var clientUpgradeDownloadedBytesCount int32
  390. var remoteServerListDownloadedBytesCount int32
  391. var impairedProtocolCount int32
  392. var impairedProtocolClassification = struct {
  393. sync.RWMutex
  394. classification map[string]int
  395. }{classification: make(map[string]int)}
  396. SetNoticeOutput(NewNoticeReceiver(
  397. func(notice []byte) {
  398. // TODO: log notices without logging server IPs:
  399. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  400. noticeType, payload, err := GetNotice(notice)
  401. if err != nil {
  402. return
  403. }
  404. switch noticeType {
  405. case "ListeningHttpProxyPort":
  406. httpProxyPort = int(payload["port"].(float64))
  407. case "ConnectingServer":
  408. serverProtocol := payload["protocol"].(string)
  409. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  410. // TODO: wrong goroutine for t.FatalNow()
  411. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  412. }
  413. case "Tunnels":
  414. count := int(payload["count"].(float64))
  415. if count > 0 {
  416. if runConfig.disableEstablishing {
  417. // TODO: wrong goroutine for t.FatalNow()
  418. t.Fatalf("tunnel established unexpectedly")
  419. } else {
  420. select {
  421. case tunnelEstablished <- *new(struct{}):
  422. default:
  423. }
  424. }
  425. }
  426. case "ClientUpgradeDownloadedBytes":
  427. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  428. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  429. case "ClientUpgradeDownloaded":
  430. select {
  431. case upgradeDownloaded <- *new(struct{}):
  432. default:
  433. }
  434. case "ClientIsLatestVersion":
  435. select {
  436. case confirmedLatestVersion <- *new(struct{}):
  437. default:
  438. }
  439. case "RemoteServerListDownloadedBytes":
  440. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  441. t.Logf("RemoteServerListDownloadedBytes: %d", int(payload["bytes"].(float64)))
  442. case "RemoteServerListDownloaded":
  443. select {
  444. case remoteServerListDownloaded <- *new(struct{}):
  445. default:
  446. }
  447. case "ImpairedProtocolClassification":
  448. classification := payload["classification"].(map[string]interface{})
  449. impairedProtocolClassification.Lock()
  450. impairedProtocolClassification.classification = make(map[string]int)
  451. for k, v := range classification {
  452. count := int(v.(float64))
  453. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  454. atomic.AddInt32(&impairedProtocolCount, 1)
  455. }
  456. impairedProtocolClassification.classification[k] = count
  457. }
  458. impairedProtocolClassification.Unlock()
  459. case "ActiveTunnel":
  460. serverProtocol := payload["protocol"].(string)
  461. classification := make(map[string]int)
  462. impairedProtocolClassification.RLock()
  463. for k, v := range impairedProtocolClassification.classification {
  464. classification[k] = v
  465. }
  466. impairedProtocolClassification.RUnlock()
  467. count, ok := classification[serverProtocol]
  468. if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  469. // TODO: wrong goroutine for t.FatalNow()
  470. t.Fatalf("unexpected tunnel using impaired protocol: %s, %+v",
  471. serverProtocol, classification)
  472. }
  473. }
  474. }))
  475. // Run controller, which establishes tunnels
  476. shutdownBroadcast := make(chan struct{})
  477. controllerWaitGroup := new(sync.WaitGroup)
  478. controllerWaitGroup.Add(1)
  479. go func() {
  480. defer controllerWaitGroup.Done()
  481. controller.Run(shutdownBroadcast)
  482. }()
  483. defer func() {
  484. // Test: shutdown must complete within 20 seconds
  485. close(shutdownBroadcast)
  486. shutdownTimeout := time.NewTimer(20 * time.Second)
  487. shutdownOk := make(chan struct{}, 1)
  488. go func() {
  489. controllerWaitGroup.Wait()
  490. shutdownOk <- *new(struct{})
  491. }()
  492. select {
  493. case <-shutdownOk:
  494. case <-shutdownTimeout.C:
  495. t.Fatalf("controller shutdown timeout exceeded")
  496. }
  497. }()
  498. if !runConfig.disableEstablishing {
  499. // Test: tunnel must be established within 120 seconds
  500. establishTimeout := time.NewTimer(120 * time.Second)
  501. select {
  502. case <-tunnelEstablished:
  503. case <-establishTimeout.C:
  504. t.Fatalf("tunnel establish timeout exceeded")
  505. }
  506. // Test: if starting with no server entries, a fetch remote
  507. // server list must have succeeded. With disruptNetwork, the
  508. // fetch must have been resumed at least once.
  509. if serverEntryCount == 0 {
  510. select {
  511. case <-remoteServerListDownloaded:
  512. default:
  513. t.Fatalf("expected remote server list downloaded")
  514. }
  515. if runConfig.disruptNetwork {
  516. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  517. if count <= 1 {
  518. t.Fatalf("unexpected remote server list download progress: %d", count)
  519. }
  520. }
  521. }
  522. // Test: fetch website through tunnel
  523. // Allow for known race condition described in NewHttpProxy():
  524. time.Sleep(1 * time.Second)
  525. fetchAndVerifyWebsite(t, httpProxyPort)
  526. // Test: run for duration, periodically using the tunnel to
  527. // ensure failed tunnel detection, and ultimately hitting
  528. // impaired protocol checks.
  529. startTime := time.Now()
  530. for {
  531. time.Sleep(1 * time.Second)
  532. useTunnel(t, httpProxyPort)
  533. if startTime.Add(runConfig.runDuration).Before(time.Now()) {
  534. break
  535. }
  536. }
  537. // Test: with disruptNetwork, impaired protocols should be exercised
  538. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  539. count := atomic.LoadInt32(&impairedProtocolCount)
  540. if count <= 0 {
  541. t.Fatalf("unexpected impaired protocol count: %d", count)
  542. } else {
  543. impairedProtocolClassification.RLock()
  544. t.Logf("impaired protocol classification: %+v",
  545. impairedProtocolClassification.classification)
  546. impairedProtocolClassification.RUnlock()
  547. }
  548. }
  549. }
  550. // Test: upgrade check/download must be downloaded within 180 seconds
  551. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  552. if expectUpgrade {
  553. upgradeTimeout := time.NewTimer(180 * time.Second)
  554. select {
  555. case <-upgradeDownloaded:
  556. // TODO: verify downloaded file
  557. if runConfig.clientIsLatestVersion {
  558. t.Fatalf("upgrade downloaded unexpectedly")
  559. }
  560. // Test: with disruptNetwork, must be multiple download progress notices
  561. if runConfig.disruptNetwork {
  562. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  563. if count <= 1 {
  564. t.Fatalf("unexpected upgrade download progress: %d", count)
  565. }
  566. }
  567. case <-confirmedLatestVersion:
  568. if !runConfig.clientIsLatestVersion {
  569. t.Fatalf("confirmed latest version unexpectedly")
  570. }
  571. case <-upgradeTimeout.C:
  572. t.Fatalf("upgrade download timeout exceeded")
  573. }
  574. }
  575. }
  576. type TestHostNameTransformer struct {
  577. }
  578. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  579. return "example.com", true
  580. }
  581. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) {
  582. testUrl := "https://raw.githubusercontent.com/Psiphon-Labs/psiphon-tunnel-core/master/LICENSE"
  583. roundTripTimeout := 10 * time.Second
  584. expectedResponsePrefix := " GNU GENERAL PUBLIC LICENSE"
  585. expectedResponseSize := 35148
  586. checkResponse := func(responseBody string) bool {
  587. return strings.HasPrefix(responseBody, expectedResponsePrefix) && len(responseBody) == expectedResponseSize
  588. }
  589. // Test: use HTTP proxy
  590. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  591. if err != nil {
  592. t.Fatalf("error initializing proxied HTTP request: %s", err)
  593. }
  594. httpClient := &http.Client{
  595. Transport: &http.Transport{
  596. Proxy: http.ProxyURL(proxyUrl),
  597. },
  598. Timeout: roundTripTimeout,
  599. }
  600. response, err := httpClient.Get(testUrl)
  601. if err != nil {
  602. t.Fatalf("error sending proxied HTTP request: %s", err)
  603. }
  604. body, err := ioutil.ReadAll(response.Body)
  605. if err != nil {
  606. t.Fatalf("error reading proxied HTTP response: %s", err)
  607. }
  608. response.Body.Close()
  609. if !checkResponse(string(body)) {
  610. t.Fatalf("unexpected proxied HTTP response")
  611. }
  612. // Test: use direct URL proxy
  613. httpClient = &http.Client{
  614. Transport: http.DefaultTransport,
  615. Timeout: roundTripTimeout,
  616. }
  617. response, err = httpClient.Get(
  618. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  619. httpProxyPort, url.QueryEscape(testUrl)))
  620. if err != nil {
  621. t.Fatalf("error sending direct URL request: %s", err)
  622. }
  623. body, err = ioutil.ReadAll(response.Body)
  624. if err != nil {
  625. t.Fatalf("error reading direct URL response: %s", err)
  626. }
  627. response.Body.Close()
  628. if !checkResponse(string(body)) {
  629. t.Fatalf("unexpected direct URL response")
  630. }
  631. // Test: use tunneled URL proxy
  632. response, err = httpClient.Get(
  633. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  634. httpProxyPort, url.QueryEscape(testUrl)))
  635. if err != nil {
  636. t.Fatalf("error sending tunneled URL request: %s", err)
  637. }
  638. body, err = ioutil.ReadAll(response.Body)
  639. if err != nil {
  640. t.Fatalf("error reading tunneled URL response: %s", err)
  641. }
  642. response.Body.Close()
  643. if !checkResponse(string(body)) {
  644. t.Fatalf("unexpected tunneled URL response")
  645. }
  646. }
  647. func useTunnel(t *testing.T, httpProxyPort int) {
  648. // No action on errors as the tunnel is expected to fail sometimes
  649. testUrl := "https://psiphon3.com"
  650. roundTripTimeout := 1 * time.Second
  651. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  652. if err != nil {
  653. return
  654. }
  655. httpClient := &http.Client{
  656. Transport: &http.Transport{
  657. Proxy: http.ProxyURL(proxyUrl),
  658. },
  659. Timeout: roundTripTimeout,
  660. }
  661. response, err := httpClient.Get(testUrl)
  662. if err != nil {
  663. return
  664. }
  665. response.Body.Close()
  666. }
  667. const disruptorProxyAddress = "127.0.0.1:2160"
  668. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  669. const disruptorMaxConnectionBytes = 625000
  670. const disruptorMaxConnectionTime = 10 * time.Second
  671. func initDisruptor() {
  672. go func() {
  673. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  674. if err != nil {
  675. fmt.Errorf("disruptor proxy listen error: %s", err)
  676. return
  677. }
  678. for {
  679. localConn, err := listener.AcceptSocks()
  680. if err != nil {
  681. fmt.Errorf("disruptor proxy accept error: %s", err)
  682. return
  683. }
  684. go func() {
  685. defer localConn.Close()
  686. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  687. if err != nil {
  688. fmt.Errorf("disruptor proxy dial error: %s", err)
  689. return
  690. }
  691. defer remoteConn.Close()
  692. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  693. if err != nil {
  694. fmt.Errorf("disruptor proxy grant error: %s", err)
  695. return
  696. }
  697. // Cut connection after disruptorMaxConnectionTime
  698. time.AfterFunc(disruptorMaxConnectionTime, func() {
  699. localConn.Close()
  700. remoteConn.Close()
  701. })
  702. // Relay connection, but only up to disruptorMaxConnectionBytes
  703. waitGroup := new(sync.WaitGroup)
  704. waitGroup.Add(1)
  705. go func() {
  706. defer waitGroup.Done()
  707. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  708. }()
  709. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  710. waitGroup.Wait()
  711. }()
  712. }
  713. }()
  714. }