controller_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "net"
  26. "net/http"
  27. "net/url"
  28. "os"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "testing"
  33. "time"
  34. "github.com/Psiphon-Inc/goarista/monotime"
  35. socks "github.com/Psiphon-Inc/goptlib"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  38. "github.com/elazarl/goproxy"
  39. )
  40. func TestMain(m *testing.M) {
  41. flag.Parse()
  42. os.Remove(DATA_STORE_FILENAME)
  43. initDisruptor()
  44. initUpstreamProxy()
  45. SetEmitDiagnosticNotices(true)
  46. os.Exit(m.Run())
  47. }
  48. // Test case notes/limitations/dependencies:
  49. //
  50. // * Untunneled upgrade tests must execute before
  51. // the other tests to ensure no tunnel is established.
  52. // We need a way to reset the datastore after it's been
  53. // initialized in order to to clear out its data entries
  54. // and be able to arbitrarily order the tests.
  55. //
  56. // * The resumable download tests using disruptNetwork
  57. // depend on the download object being larger than the
  58. // disruptorMax limits so that the disruptor will actually
  59. // interrupt the first download attempt. Specifically, the
  60. // upgrade and remote server list at the URLs specified in
  61. // controller_test.config.enc.
  62. //
  63. // * The protocol tests assume there is at least one server
  64. // supporting each protocol in the server list at the URL
  65. // specified in controller_test.config.enc, and that these
  66. // servers are not overloaded.
  67. //
  68. // * fetchAndVerifyWebsite depends on the target URL being
  69. // available and responding.
  70. //
  71. func TestUntunneledUpgradeDownload(t *testing.T) {
  72. controllerRun(t,
  73. &controllerRunConfig{
  74. expectNoServerEntries: true,
  75. protocol: "",
  76. clientIsLatestVersion: false,
  77. disableUntunneledUpgrade: false,
  78. disableEstablishing: true,
  79. disableApi: false,
  80. tunnelPoolSize: 1,
  81. useUpstreamProxy: false,
  82. disruptNetwork: false,
  83. useHostNameTransformer: false,
  84. runDuration: 0,
  85. })
  86. }
  87. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  88. controllerRun(t,
  89. &controllerRunConfig{
  90. expectNoServerEntries: true,
  91. protocol: "",
  92. clientIsLatestVersion: false,
  93. disableUntunneledUpgrade: false,
  94. disableEstablishing: true,
  95. disableApi: false,
  96. tunnelPoolSize: 1,
  97. useUpstreamProxy: false,
  98. disruptNetwork: true,
  99. useHostNameTransformer: false,
  100. runDuration: 0,
  101. })
  102. }
  103. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  104. controllerRun(t,
  105. &controllerRunConfig{
  106. expectNoServerEntries: true,
  107. protocol: "",
  108. clientIsLatestVersion: true,
  109. disableUntunneledUpgrade: false,
  110. disableEstablishing: true,
  111. disableApi: false,
  112. tunnelPoolSize: 1,
  113. useUpstreamProxy: false,
  114. disruptNetwork: false,
  115. useHostNameTransformer: false,
  116. runDuration: 0,
  117. })
  118. }
  119. func TestUntunneledResumableFetchRemoveServerList(t *testing.T) {
  120. controllerRun(t,
  121. &controllerRunConfig{
  122. expectNoServerEntries: true,
  123. protocol: "",
  124. clientIsLatestVersion: true,
  125. disableUntunneledUpgrade: false,
  126. disableEstablishing: false,
  127. disableApi: false,
  128. tunnelPoolSize: 1,
  129. useUpstreamProxy: false,
  130. disruptNetwork: true,
  131. useHostNameTransformer: false,
  132. runDuration: 0,
  133. })
  134. }
  135. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  136. controllerRun(t,
  137. &controllerRunConfig{
  138. expectNoServerEntries: false,
  139. protocol: "",
  140. clientIsLatestVersion: true,
  141. disableUntunneledUpgrade: true,
  142. disableEstablishing: false,
  143. disableApi: false,
  144. tunnelPoolSize: 1,
  145. useUpstreamProxy: false,
  146. disruptNetwork: false,
  147. useHostNameTransformer: false,
  148. runDuration: 0,
  149. })
  150. }
  151. func TestImpairedProtocols(t *testing.T) {
  152. // This test sets a tunnelPoolSize of 40 and runs
  153. // the session for 1 minute with network disruption
  154. // on. All 40 tunnels being disrupted every 10
  155. // seconds (followed by ssh keep alive probe timeout)
  156. // should be sufficient to trigger at least one
  157. // impaired protocol classification.
  158. controllerRun(t,
  159. &controllerRunConfig{
  160. expectNoServerEntries: false,
  161. protocol: "",
  162. clientIsLatestVersion: true,
  163. disableUntunneledUpgrade: true,
  164. disableEstablishing: false,
  165. disableApi: false,
  166. tunnelPoolSize: 40,
  167. useUpstreamProxy: false,
  168. disruptNetwork: true,
  169. useHostNameTransformer: false,
  170. runDuration: 1 * time.Minute,
  171. })
  172. }
  173. func TestSSH(t *testing.T) {
  174. controllerRun(t,
  175. &controllerRunConfig{
  176. expectNoServerEntries: false,
  177. protocol: protocol.TUNNEL_PROTOCOL_SSH,
  178. clientIsLatestVersion: false,
  179. disableUntunneledUpgrade: true,
  180. disableEstablishing: false,
  181. disableApi: false,
  182. tunnelPoolSize: 1,
  183. useUpstreamProxy: false,
  184. disruptNetwork: false,
  185. useHostNameTransformer: false,
  186. runDuration: 0,
  187. })
  188. }
  189. func TestObfuscatedSSH(t *testing.T) {
  190. controllerRun(t,
  191. &controllerRunConfig{
  192. expectNoServerEntries: false,
  193. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  194. clientIsLatestVersion: false,
  195. disableUntunneledUpgrade: true,
  196. disableEstablishing: false,
  197. disableApi: false,
  198. tunnelPoolSize: 1,
  199. useUpstreamProxy: false,
  200. disruptNetwork: false,
  201. useHostNameTransformer: false,
  202. runDuration: 0,
  203. })
  204. }
  205. func TestUnfrontedMeek(t *testing.T) {
  206. controllerRun(t,
  207. &controllerRunConfig{
  208. expectNoServerEntries: false,
  209. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  210. clientIsLatestVersion: false,
  211. disableUntunneledUpgrade: true,
  212. disableEstablishing: false,
  213. disableApi: false,
  214. tunnelPoolSize: 1,
  215. useUpstreamProxy: false,
  216. disruptNetwork: false,
  217. useHostNameTransformer: false,
  218. runDuration: 0,
  219. })
  220. }
  221. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  222. controllerRun(t,
  223. &controllerRunConfig{
  224. expectNoServerEntries: false,
  225. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  226. clientIsLatestVersion: true,
  227. disableUntunneledUpgrade: true,
  228. disableEstablishing: false,
  229. disableApi: false,
  230. tunnelPoolSize: 1,
  231. useUpstreamProxy: false,
  232. disruptNetwork: false,
  233. useHostNameTransformer: true,
  234. runDuration: 0,
  235. })
  236. }
  237. func TestFrontedMeek(t *testing.T) {
  238. controllerRun(t,
  239. &controllerRunConfig{
  240. expectNoServerEntries: false,
  241. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  242. clientIsLatestVersion: false,
  243. disableUntunneledUpgrade: true,
  244. disableEstablishing: false,
  245. disableApi: false,
  246. tunnelPoolSize: 1,
  247. useUpstreamProxy: false,
  248. disruptNetwork: false,
  249. useHostNameTransformer: false,
  250. runDuration: 0,
  251. })
  252. }
  253. func TestFrontedMeekWithTransformer(t *testing.T) {
  254. controllerRun(t,
  255. &controllerRunConfig{
  256. expectNoServerEntries: false,
  257. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  258. clientIsLatestVersion: true,
  259. disableUntunneledUpgrade: true,
  260. disableEstablishing: false,
  261. disableApi: false,
  262. tunnelPoolSize: 1,
  263. useUpstreamProxy: false,
  264. disruptNetwork: false,
  265. useHostNameTransformer: true,
  266. runDuration: 0,
  267. })
  268. }
  269. func TestFrontedMeekHTTP(t *testing.T) {
  270. controllerRun(t,
  271. &controllerRunConfig{
  272. expectNoServerEntries: false,
  273. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  274. clientIsLatestVersion: true,
  275. disableUntunneledUpgrade: true,
  276. disableEstablishing: false,
  277. disableApi: false,
  278. tunnelPoolSize: 1,
  279. useUpstreamProxy: false,
  280. disruptNetwork: false,
  281. useHostNameTransformer: false,
  282. runDuration: 0,
  283. })
  284. }
  285. func TestUnfrontedMeekHTTPS(t *testing.T) {
  286. controllerRun(t,
  287. &controllerRunConfig{
  288. expectNoServerEntries: false,
  289. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  290. clientIsLatestVersion: false,
  291. disableUntunneledUpgrade: true,
  292. disableEstablishing: false,
  293. disableApi: false,
  294. tunnelPoolSize: 1,
  295. useUpstreamProxy: false,
  296. disruptNetwork: false,
  297. useHostNameTransformer: false,
  298. runDuration: 0,
  299. })
  300. }
  301. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  302. controllerRun(t,
  303. &controllerRunConfig{
  304. expectNoServerEntries: false,
  305. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  306. clientIsLatestVersion: true,
  307. disableUntunneledUpgrade: true,
  308. disableEstablishing: false,
  309. disableApi: false,
  310. tunnelPoolSize: 1,
  311. useUpstreamProxy: false,
  312. disruptNetwork: false,
  313. useHostNameTransformer: true,
  314. runDuration: 0,
  315. })
  316. }
  317. func TestDisabledApi(t *testing.T) {
  318. controllerRun(t,
  319. &controllerRunConfig{
  320. expectNoServerEntries: false,
  321. protocol: "",
  322. clientIsLatestVersion: true,
  323. disableUntunneledUpgrade: true,
  324. disableEstablishing: false,
  325. disableApi: true,
  326. tunnelPoolSize: 1,
  327. useUpstreamProxy: false,
  328. disruptNetwork: false,
  329. useHostNameTransformer: false,
  330. runDuration: 0,
  331. })
  332. }
  333. func TestObfuscatedSSHWithUpstreamProxy(t *testing.T) {
  334. controllerRun(t,
  335. &controllerRunConfig{
  336. expectNoServerEntries: false,
  337. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  338. clientIsLatestVersion: false,
  339. disableUntunneledUpgrade: true,
  340. disableEstablishing: false,
  341. disableApi: false,
  342. tunnelPoolSize: 1,
  343. useUpstreamProxy: true,
  344. disruptNetwork: false,
  345. useHostNameTransformer: false,
  346. runDuration: 0,
  347. })
  348. }
  349. func TestUnfrontedMeekWithUpstreamProxy(t *testing.T) {
  350. controllerRun(t,
  351. &controllerRunConfig{
  352. expectNoServerEntries: false,
  353. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  354. clientIsLatestVersion: false,
  355. disableUntunneledUpgrade: true,
  356. disableEstablishing: false,
  357. disableApi: false,
  358. tunnelPoolSize: 1,
  359. useUpstreamProxy: true,
  360. disruptNetwork: false,
  361. useHostNameTransformer: false,
  362. runDuration: 0,
  363. })
  364. }
  365. func TestUnfrontedMeekHTTPSWithUpstreamProxy(t *testing.T) {
  366. controllerRun(t,
  367. &controllerRunConfig{
  368. expectNoServerEntries: false,
  369. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  370. clientIsLatestVersion: false,
  371. disableUntunneledUpgrade: true,
  372. disableEstablishing: false,
  373. disableApi: false,
  374. tunnelPoolSize: 1,
  375. useUpstreamProxy: true,
  376. disruptNetwork: false,
  377. useHostNameTransformer: false,
  378. runDuration: 0,
  379. })
  380. }
  381. type controllerRunConfig struct {
  382. expectNoServerEntries bool
  383. protocol string
  384. clientIsLatestVersion bool
  385. disableUntunneledUpgrade bool
  386. disableEstablishing bool
  387. disableApi bool
  388. tunnelPoolSize int
  389. useUpstreamProxy bool
  390. disruptNetwork bool
  391. useHostNameTransformer bool
  392. runDuration time.Duration
  393. }
  394. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  395. configFileContents, err := ioutil.ReadFile("controller_test.config")
  396. if err != nil {
  397. // Skip, don't fail, if config file is not present
  398. t.Skipf("error loading configuration file: %s", err)
  399. }
  400. config, err := LoadConfig(configFileContents)
  401. if err != nil {
  402. t.Fatalf("error processing configuration file: %s", err)
  403. }
  404. if runConfig.clientIsLatestVersion {
  405. config.ClientVersion = "999999999"
  406. }
  407. if runConfig.disableEstablishing {
  408. // Clear remote server list so tunnel cannot be established.
  409. // TODO: also delete all server entries in the datastore.
  410. config.RemoteServerListUrl = ""
  411. }
  412. if runConfig.disableApi {
  413. config.DisableApi = true
  414. }
  415. config.TunnelPoolSize = runConfig.tunnelPoolSize
  416. if runConfig.disableUntunneledUpgrade {
  417. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  418. config.UpgradeDownloadClientVersionHeader = ""
  419. }
  420. if runConfig.useUpstreamProxy && runConfig.disruptNetwork {
  421. t.Fatalf("cannot use multiple upstream proxies")
  422. }
  423. if runConfig.disruptNetwork {
  424. config.UpstreamProxyUrl = disruptorProxyURL
  425. } else if runConfig.useUpstreamProxy {
  426. config.UpstreamProxyUrl = upstreamProxyURL
  427. config.UpstreamProxyCustomHeaders = upstreamProxyCustomHeaders
  428. }
  429. if runConfig.useHostNameTransformer {
  430. config.HostNameTransformer = &TestHostNameTransformer{}
  431. }
  432. // Override client retry throttle values to speed up automated
  433. // tests and ensure tests complete within fixed deadlines.
  434. fetchRemoteServerListRetryPeriodSeconds := 0
  435. config.FetchRemoteServerListRetryPeriodSeconds = &fetchRemoteServerListRetryPeriodSeconds
  436. downloadUpgradeRetryPeriodSeconds := 0
  437. config.DownloadUpgradeRetryPeriodSeconds = &downloadUpgradeRetryPeriodSeconds
  438. establishTunnelPausePeriodSeconds := 1
  439. config.EstablishTunnelPausePeriodSeconds = &establishTunnelPausePeriodSeconds
  440. os.Remove(config.UpgradeDownloadFilename)
  441. config.TunnelProtocol = runConfig.protocol
  442. err = InitDataStore(config)
  443. if err != nil {
  444. t.Fatalf("error initializing datastore: %s", err)
  445. }
  446. serverEntryCount := CountServerEntries("", "")
  447. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  448. // TODO: replace expectNoServerEntries with resetServerEntries
  449. // so tests can run in arbitrary order
  450. t.Fatalf("unexpected server entries")
  451. }
  452. controller, err := NewController(config)
  453. if err != nil {
  454. t.Fatalf("error creating controller: %s", err)
  455. }
  456. // Monitor notices for "Tunnels" with count > 1, the
  457. // indication of tunnel establishment success.
  458. // Also record the selected HTTP proxy port to use
  459. // when fetching websites through the tunnel.
  460. httpProxyPort := 0
  461. tunnelEstablished := make(chan struct{}, 1)
  462. upgradeDownloaded := make(chan struct{}, 1)
  463. remoteServerListDownloaded := make(chan struct{}, 1)
  464. confirmedLatestVersion := make(chan struct{}, 1)
  465. var clientUpgradeDownloadedBytesCount int32
  466. var remoteServerListDownloadedBytesCount int32
  467. var impairedProtocolCount int32
  468. var impairedProtocolClassification = struct {
  469. sync.RWMutex
  470. classification map[string]int
  471. }{classification: make(map[string]int)}
  472. SetNoticeOutput(NewNoticeReceiver(
  473. func(notice []byte) {
  474. // TODO: log notices without logging server IPs:
  475. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  476. noticeType, payload, err := GetNotice(notice)
  477. if err != nil {
  478. return
  479. }
  480. switch noticeType {
  481. case "ListeningHttpProxyPort":
  482. httpProxyPort = int(payload["port"].(float64))
  483. case "ConnectingServer":
  484. serverProtocol := payload["protocol"].(string)
  485. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  486. // TODO: wrong goroutine for t.FatalNow()
  487. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  488. }
  489. case "Tunnels":
  490. count := int(payload["count"].(float64))
  491. if count > 0 {
  492. if runConfig.disableEstablishing {
  493. // TODO: wrong goroutine for t.FatalNow()
  494. t.Fatalf("tunnel established unexpectedly")
  495. } else {
  496. select {
  497. case tunnelEstablished <- *new(struct{}):
  498. default:
  499. }
  500. }
  501. }
  502. case "ClientUpgradeDownloadedBytes":
  503. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  504. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  505. case "ClientUpgradeDownloaded":
  506. select {
  507. case upgradeDownloaded <- *new(struct{}):
  508. default:
  509. }
  510. case "ClientIsLatestVersion":
  511. select {
  512. case confirmedLatestVersion <- *new(struct{}):
  513. default:
  514. }
  515. case "RemoteServerListDownloadedBytes":
  516. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  517. t.Logf("RemoteServerListDownloadedBytes: %d", int(payload["bytes"].(float64)))
  518. case "RemoteServerListDownloaded":
  519. select {
  520. case remoteServerListDownloaded <- *new(struct{}):
  521. default:
  522. }
  523. case "ImpairedProtocolClassification":
  524. classification := payload["classification"].(map[string]interface{})
  525. impairedProtocolClassification.Lock()
  526. impairedProtocolClassification.classification = make(map[string]int)
  527. for k, v := range classification {
  528. count := int(v.(float64))
  529. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  530. atomic.AddInt32(&impairedProtocolCount, 1)
  531. }
  532. impairedProtocolClassification.classification[k] = count
  533. }
  534. impairedProtocolClassification.Unlock()
  535. case "ActiveTunnel":
  536. serverProtocol := payload["protocol"].(string)
  537. classification := make(map[string]int)
  538. impairedProtocolClassification.RLock()
  539. for k, v := range impairedProtocolClassification.classification {
  540. classification[k] = v
  541. }
  542. impairedProtocolClassification.RUnlock()
  543. count, ok := classification[serverProtocol]
  544. if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  545. // TODO: wrong goroutine for t.FatalNow()
  546. t.Fatalf("unexpected tunnel using impaired protocol: %s, %+v",
  547. serverProtocol, classification)
  548. }
  549. }
  550. }))
  551. // Run controller, which establishes tunnels
  552. shutdownBroadcast := make(chan struct{})
  553. controllerWaitGroup := new(sync.WaitGroup)
  554. controllerWaitGroup.Add(1)
  555. go func() {
  556. defer controllerWaitGroup.Done()
  557. controller.Run(shutdownBroadcast)
  558. }()
  559. defer func() {
  560. // Test: shutdown must complete within 20 seconds
  561. close(shutdownBroadcast)
  562. shutdownTimeout := time.NewTimer(20 * time.Second)
  563. shutdownOk := make(chan struct{}, 1)
  564. go func() {
  565. controllerWaitGroup.Wait()
  566. shutdownOk <- *new(struct{})
  567. }()
  568. select {
  569. case <-shutdownOk:
  570. case <-shutdownTimeout.C:
  571. t.Fatalf("controller shutdown timeout exceeded")
  572. }
  573. }()
  574. if !runConfig.disableEstablishing {
  575. // Test: tunnel must be established within 120 seconds
  576. establishTimeout := time.NewTimer(120 * time.Second)
  577. select {
  578. case <-tunnelEstablished:
  579. case <-establishTimeout.C:
  580. t.Fatalf("tunnel establish timeout exceeded")
  581. }
  582. // Test: if starting with no server entries, a fetch remote
  583. // server list must have succeeded. With disruptNetwork, the
  584. // fetch must have been resumed at least once.
  585. if serverEntryCount == 0 {
  586. select {
  587. case <-remoteServerListDownloaded:
  588. default:
  589. t.Fatalf("expected remote server list downloaded")
  590. }
  591. if runConfig.disruptNetwork {
  592. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  593. if count <= 1 {
  594. t.Fatalf("unexpected remote server list download progress: %d", count)
  595. }
  596. }
  597. }
  598. // Test: fetch website through tunnel
  599. // Allow for known race condition described in NewHttpProxy():
  600. time.Sleep(1 * time.Second)
  601. fetchAndVerifyWebsite(t, httpProxyPort)
  602. // Test: run for duration, periodically using the tunnel to
  603. // ensure failed tunnel detection, and ultimately hitting
  604. // impaired protocol checks.
  605. startTime := monotime.Now()
  606. for {
  607. time.Sleep(1 * time.Second)
  608. useTunnel(t, httpProxyPort)
  609. if startTime.Add(runConfig.runDuration).Before(monotime.Now()) {
  610. break
  611. }
  612. }
  613. // Test: with disruptNetwork, impaired protocols should be exercised
  614. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  615. count := atomic.LoadInt32(&impairedProtocolCount)
  616. if count <= 0 {
  617. t.Fatalf("unexpected impaired protocol count: %d", count)
  618. } else {
  619. impairedProtocolClassification.RLock()
  620. t.Logf("impaired protocol classification: %+v",
  621. impairedProtocolClassification.classification)
  622. impairedProtocolClassification.RUnlock()
  623. }
  624. }
  625. }
  626. // Test: upgrade check/download must be downloaded within 180 seconds
  627. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  628. if expectUpgrade {
  629. upgradeTimeout := time.NewTimer(180 * time.Second)
  630. select {
  631. case <-upgradeDownloaded:
  632. // TODO: verify downloaded file
  633. if runConfig.clientIsLatestVersion {
  634. t.Fatalf("upgrade downloaded unexpectedly")
  635. }
  636. // Test: with disruptNetwork, must be multiple download progress notices
  637. if runConfig.disruptNetwork {
  638. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  639. if count <= 1 {
  640. t.Fatalf("unexpected upgrade download progress: %d", count)
  641. }
  642. }
  643. case <-confirmedLatestVersion:
  644. if !runConfig.clientIsLatestVersion {
  645. t.Fatalf("confirmed latest version unexpectedly")
  646. }
  647. case <-upgradeTimeout.C:
  648. t.Fatalf("upgrade download timeout exceeded")
  649. }
  650. }
  651. }
  652. type TestHostNameTransformer struct {
  653. }
  654. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  655. return "example.com", true
  656. }
  657. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) {
  658. testUrl := "https://raw.githubusercontent.com/Psiphon-Labs/psiphon-tunnel-core/master/LICENSE"
  659. roundTripTimeout := 30 * time.Second
  660. expectedResponsePrefix := " GNU GENERAL PUBLIC LICENSE"
  661. expectedResponseSize := 35148
  662. checkResponse := func(responseBody string) bool {
  663. return strings.HasPrefix(responseBody, expectedResponsePrefix) && len(responseBody) == expectedResponseSize
  664. }
  665. // Test: use HTTP proxy
  666. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  667. if err != nil {
  668. t.Fatalf("error initializing proxied HTTP request: %s", err)
  669. }
  670. httpClient := &http.Client{
  671. Transport: &http.Transport{
  672. Proxy: http.ProxyURL(proxyUrl),
  673. },
  674. Timeout: roundTripTimeout,
  675. }
  676. response, err := httpClient.Get(testUrl)
  677. if err != nil {
  678. t.Fatalf("error sending proxied HTTP request: %s", err)
  679. }
  680. body, err := ioutil.ReadAll(response.Body)
  681. if err != nil {
  682. t.Fatalf("error reading proxied HTTP response: %s", err)
  683. }
  684. response.Body.Close()
  685. if !checkResponse(string(body)) {
  686. t.Fatalf("unexpected proxied HTTP response")
  687. }
  688. // Test: use direct URL proxy
  689. httpClient = &http.Client{
  690. Transport: http.DefaultTransport,
  691. Timeout: roundTripTimeout,
  692. }
  693. response, err = httpClient.Get(
  694. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  695. httpProxyPort, url.QueryEscape(testUrl)))
  696. if err != nil {
  697. t.Fatalf("error sending direct URL request: %s", err)
  698. }
  699. body, err = ioutil.ReadAll(response.Body)
  700. if err != nil {
  701. t.Fatalf("error reading direct URL response: %s", err)
  702. }
  703. response.Body.Close()
  704. if !checkResponse(string(body)) {
  705. t.Fatalf("unexpected direct URL response")
  706. }
  707. // Test: use tunneled URL proxy
  708. response, err = httpClient.Get(
  709. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  710. httpProxyPort, url.QueryEscape(testUrl)))
  711. if err != nil {
  712. t.Fatalf("error sending tunneled URL request: %s", err)
  713. }
  714. body, err = ioutil.ReadAll(response.Body)
  715. if err != nil {
  716. t.Fatalf("error reading tunneled URL response: %s", err)
  717. }
  718. response.Body.Close()
  719. if !checkResponse(string(body)) {
  720. t.Fatalf("unexpected tunneled URL response")
  721. }
  722. }
  723. func useTunnel(t *testing.T, httpProxyPort int) {
  724. // No action on errors as the tunnel is expected to fail sometimes
  725. testUrl := "https://psiphon3.com"
  726. roundTripTimeout := 1 * time.Second
  727. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  728. if err != nil {
  729. return
  730. }
  731. httpClient := &http.Client{
  732. Transport: &http.Transport{
  733. Proxy: http.ProxyURL(proxyUrl),
  734. },
  735. Timeout: roundTripTimeout,
  736. }
  737. response, err := httpClient.Get(testUrl)
  738. if err != nil {
  739. return
  740. }
  741. response.Body.Close()
  742. }
  743. const disruptorProxyAddress = "127.0.0.1:2160"
  744. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  745. const disruptorMaxConnectionBytes = 500000
  746. const disruptorMaxConnectionTime = 10 * time.Second
  747. func initDisruptor() {
  748. go func() {
  749. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  750. if err != nil {
  751. fmt.Errorf("disruptor proxy listen error: %s", err)
  752. return
  753. }
  754. for {
  755. localConn, err := listener.AcceptSocks()
  756. if err != nil {
  757. fmt.Errorf("disruptor proxy accept error: %s", err)
  758. return
  759. }
  760. go func() {
  761. defer localConn.Close()
  762. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  763. if err != nil {
  764. fmt.Errorf("disruptor proxy dial error: %s", err)
  765. return
  766. }
  767. defer remoteConn.Close()
  768. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  769. if err != nil {
  770. fmt.Errorf("disruptor proxy grant error: %s", err)
  771. return
  772. }
  773. // Cut connection after disruptorMaxConnectionTime
  774. time.AfterFunc(disruptorMaxConnectionTime, func() {
  775. localConn.Close()
  776. remoteConn.Close()
  777. })
  778. // Relay connection, but only up to disruptorMaxConnectionBytes
  779. waitGroup := new(sync.WaitGroup)
  780. waitGroup.Add(1)
  781. go func() {
  782. defer waitGroup.Done()
  783. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  784. }()
  785. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  786. waitGroup.Wait()
  787. }()
  788. }
  789. }()
  790. }
  791. const upstreamProxyURL = "http://127.0.0.1:2161"
  792. var upstreamProxyCustomHeaders = map[string][]string{"X-Test-Header-Name": []string{"test-header-value1", "test-header-value2"}}
  793. func hasExpectedCustomHeaders(h http.Header) bool {
  794. for name, values := range upstreamProxyCustomHeaders {
  795. if h[name] == nil {
  796. return false
  797. }
  798. // Order may not be the same
  799. for _, value := range values {
  800. if !common.Contains(h[name], value) {
  801. return false
  802. }
  803. }
  804. }
  805. return true
  806. }
  807. func initUpstreamProxy() {
  808. go func() {
  809. proxy := goproxy.NewProxyHttpServer()
  810. proxy.OnRequest().DoFunc(
  811. func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
  812. if !hasExpectedCustomHeaders(r.Header) {
  813. ctx.Logf("missing expected headers: %+v", ctx.Req.Header)
  814. return nil, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusUnauthorized, "")
  815. }
  816. return r, nil
  817. })
  818. proxy.OnRequest().HandleConnectFunc(
  819. func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) {
  820. if !hasExpectedCustomHeaders(ctx.Req.Header) {
  821. ctx.Logf("missing expected headers: %+v", ctx.Req.Header)
  822. return goproxy.RejectConnect, host
  823. }
  824. return goproxy.OkConnect, host
  825. })
  826. err := http.ListenAndServe("127.0.0.1:2161", proxy)
  827. if err != nil {
  828. fmt.Printf("upstream proxy failed: %s", err)
  829. }
  830. }()
  831. // TODO: wait until listener is active?
  832. }