controller_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "encoding/json"
  22. "flag"
  23. "fmt"
  24. "io"
  25. "io/ioutil"
  26. "net"
  27. "net/http"
  28. "net/url"
  29. "os"
  30. "path/filepath"
  31. "strings"
  32. "sync"
  33. "sync/atomic"
  34. "testing"
  35. "time"
  36. "github.com/Psiphon-Inc/goarista/monotime"
  37. "github.com/Psiphon-Inc/goproxy"
  38. socks "github.com/Psiphon-Inc/goptlib"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  41. )
  42. var testDataDirName string
  43. func TestMain(m *testing.M) {
  44. flag.Parse()
  45. var err error
  46. testDataDirName, err = ioutil.TempDir("", "psiphon-controller-test")
  47. if err != nil {
  48. fmt.Printf("TempDir failed: %s\n", err)
  49. os.Exit(1)
  50. }
  51. defer os.RemoveAll(testDataDirName)
  52. os.Remove(filepath.Join(testDataDirName, DATA_STORE_FILENAME))
  53. SetEmitDiagnosticNotices(true)
  54. initDisruptor()
  55. initUpstreamProxy()
  56. os.Exit(m.Run())
  57. }
  58. // Test case notes/limitations/dependencies:
  59. //
  60. // * Untunneled upgrade tests must execute before
  61. // the other tests to ensure no tunnel is established.
  62. // We need a way to reset the datastore after it's been
  63. // initialized in order to to clear out its data entries
  64. // and be able to arbitrarily order the tests.
  65. //
  66. // * The resumable download tests using disruptNetwork
  67. // depend on the download object being larger than the
  68. // disruptorMax limits so that the disruptor will actually
  69. // interrupt the first download attempt. Specifically, the
  70. // upgrade and remote server list at the URLs specified in
  71. // controller_test.config.enc.
  72. //
  73. // * The protocol tests assume there is at least one server
  74. // supporting each protocol in the server list at the URL
  75. // specified in controller_test.config.enc, and that these
  76. // servers are not overloaded.
  77. //
  78. // * fetchAndVerifyWebsite depends on the target URL being
  79. // available and responding.
  80. //
  81. func TestUntunneledUpgradeDownload(t *testing.T) {
  82. controllerRun(t,
  83. &controllerRunConfig{
  84. expectNoServerEntries: true,
  85. protocol: "",
  86. clientIsLatestVersion: false,
  87. disableUntunneledUpgrade: false,
  88. disableEstablishing: true,
  89. disableApi: false,
  90. tunnelPoolSize: 1,
  91. useUpstreamProxy: false,
  92. disruptNetwork: false,
  93. transformHostNames: false,
  94. runDuration: 0,
  95. })
  96. }
  97. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  98. controllerRun(t,
  99. &controllerRunConfig{
  100. expectNoServerEntries: true,
  101. protocol: "",
  102. clientIsLatestVersion: false,
  103. disableUntunneledUpgrade: false,
  104. disableEstablishing: true,
  105. disableApi: false,
  106. tunnelPoolSize: 1,
  107. useUpstreamProxy: false,
  108. disruptNetwork: true,
  109. transformHostNames: false,
  110. runDuration: 0,
  111. })
  112. }
  113. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  114. controllerRun(t,
  115. &controllerRunConfig{
  116. expectNoServerEntries: true,
  117. protocol: "",
  118. clientIsLatestVersion: true,
  119. disableUntunneledUpgrade: false,
  120. disableEstablishing: true,
  121. disableApi: false,
  122. tunnelPoolSize: 1,
  123. useUpstreamProxy: false,
  124. disruptNetwork: false,
  125. transformHostNames: false,
  126. runDuration: 0,
  127. })
  128. }
  129. func TestUntunneledResumableFetchRemoteServerList(t *testing.T) {
  130. controllerRun(t,
  131. &controllerRunConfig{
  132. expectNoServerEntries: true,
  133. protocol: "",
  134. clientIsLatestVersion: true,
  135. disableUntunneledUpgrade: false,
  136. disableEstablishing: false,
  137. disableApi: false,
  138. tunnelPoolSize: 1,
  139. useUpstreamProxy: false,
  140. disruptNetwork: true,
  141. transformHostNames: false,
  142. runDuration: 0,
  143. })
  144. }
  145. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  146. controllerRun(t,
  147. &controllerRunConfig{
  148. expectNoServerEntries: false,
  149. protocol: "",
  150. clientIsLatestVersion: true,
  151. disableUntunneledUpgrade: true,
  152. disableEstablishing: false,
  153. disableApi: false,
  154. tunnelPoolSize: 1,
  155. useUpstreamProxy: false,
  156. disruptNetwork: false,
  157. transformHostNames: false,
  158. runDuration: 0,
  159. })
  160. }
  161. func TestImpairedProtocols(t *testing.T) {
  162. // This test sets a tunnelPoolSize of 40 and runs
  163. // the session for 1 minute with network disruption
  164. // on. All 40 tunnels being disrupted every 10
  165. // seconds (followed by ssh keep alive probe timeout)
  166. // should be sufficient to trigger at least one
  167. // impaired protocol classification.
  168. controllerRun(t,
  169. &controllerRunConfig{
  170. expectNoServerEntries: false,
  171. protocol: "",
  172. clientIsLatestVersion: true,
  173. disableUntunneledUpgrade: true,
  174. disableEstablishing: false,
  175. disableApi: false,
  176. tunnelPoolSize: 40,
  177. useUpstreamProxy: false,
  178. disruptNetwork: true,
  179. transformHostNames: false,
  180. runDuration: 1 * time.Minute,
  181. })
  182. }
  183. func TestSSH(t *testing.T) {
  184. controllerRun(t,
  185. &controllerRunConfig{
  186. expectNoServerEntries: false,
  187. protocol: protocol.TUNNEL_PROTOCOL_SSH,
  188. clientIsLatestVersion: false,
  189. disableUntunneledUpgrade: true,
  190. disableEstablishing: false,
  191. disableApi: false,
  192. tunnelPoolSize: 1,
  193. useUpstreamProxy: false,
  194. disruptNetwork: false,
  195. transformHostNames: false,
  196. runDuration: 0,
  197. })
  198. }
  199. func TestObfuscatedSSH(t *testing.T) {
  200. controllerRun(t,
  201. &controllerRunConfig{
  202. expectNoServerEntries: false,
  203. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  204. clientIsLatestVersion: false,
  205. disableUntunneledUpgrade: true,
  206. disableEstablishing: false,
  207. disableApi: false,
  208. tunnelPoolSize: 1,
  209. useUpstreamProxy: false,
  210. disruptNetwork: false,
  211. transformHostNames: false,
  212. runDuration: 0,
  213. })
  214. }
  215. func TestUnfrontedMeek(t *testing.T) {
  216. controllerRun(t,
  217. &controllerRunConfig{
  218. expectNoServerEntries: false,
  219. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  220. clientIsLatestVersion: false,
  221. disableUntunneledUpgrade: true,
  222. disableEstablishing: false,
  223. disableApi: false,
  224. tunnelPoolSize: 1,
  225. useUpstreamProxy: false,
  226. disruptNetwork: false,
  227. transformHostNames: false,
  228. runDuration: 0,
  229. })
  230. }
  231. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  232. controllerRun(t,
  233. &controllerRunConfig{
  234. expectNoServerEntries: false,
  235. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  236. clientIsLatestVersion: true,
  237. disableUntunneledUpgrade: true,
  238. disableEstablishing: false,
  239. disableApi: false,
  240. tunnelPoolSize: 1,
  241. useUpstreamProxy: false,
  242. disruptNetwork: false,
  243. transformHostNames: true,
  244. runDuration: 0,
  245. })
  246. }
  247. func TestFrontedMeek(t *testing.T) {
  248. controllerRun(t,
  249. &controllerRunConfig{
  250. expectNoServerEntries: false,
  251. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  252. clientIsLatestVersion: false,
  253. disableUntunneledUpgrade: true,
  254. disableEstablishing: false,
  255. disableApi: false,
  256. tunnelPoolSize: 1,
  257. useUpstreamProxy: false,
  258. disruptNetwork: false,
  259. transformHostNames: false,
  260. runDuration: 0,
  261. })
  262. }
  263. func TestFrontedMeekWithTransformer(t *testing.T) {
  264. controllerRun(t,
  265. &controllerRunConfig{
  266. expectNoServerEntries: false,
  267. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  268. clientIsLatestVersion: true,
  269. disableUntunneledUpgrade: true,
  270. disableEstablishing: false,
  271. disableApi: false,
  272. tunnelPoolSize: 1,
  273. useUpstreamProxy: false,
  274. disruptNetwork: false,
  275. transformHostNames: true,
  276. runDuration: 0,
  277. })
  278. }
  279. func TestFrontedMeekHTTP(t *testing.T) {
  280. controllerRun(t,
  281. &controllerRunConfig{
  282. expectNoServerEntries: false,
  283. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  284. clientIsLatestVersion: true,
  285. disableUntunneledUpgrade: true,
  286. disableEstablishing: false,
  287. disableApi: false,
  288. tunnelPoolSize: 1,
  289. useUpstreamProxy: false,
  290. disruptNetwork: false,
  291. transformHostNames: false,
  292. runDuration: 0,
  293. })
  294. }
  295. func TestUnfrontedMeekHTTPS(t *testing.T) {
  296. controllerRun(t,
  297. &controllerRunConfig{
  298. expectNoServerEntries: false,
  299. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  300. clientIsLatestVersion: false,
  301. disableUntunneledUpgrade: true,
  302. disableEstablishing: false,
  303. disableApi: false,
  304. tunnelPoolSize: 1,
  305. useUpstreamProxy: false,
  306. disruptNetwork: false,
  307. transformHostNames: false,
  308. runDuration: 0,
  309. })
  310. }
  311. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  312. controllerRun(t,
  313. &controllerRunConfig{
  314. expectNoServerEntries: false,
  315. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  316. clientIsLatestVersion: true,
  317. disableUntunneledUpgrade: true,
  318. disableEstablishing: false,
  319. disableApi: false,
  320. tunnelPoolSize: 1,
  321. useUpstreamProxy: false,
  322. disruptNetwork: false,
  323. transformHostNames: true,
  324. runDuration: 0,
  325. })
  326. }
  327. func TestDisabledApi(t *testing.T) {
  328. controllerRun(t,
  329. &controllerRunConfig{
  330. expectNoServerEntries: false,
  331. protocol: "",
  332. clientIsLatestVersion: true,
  333. disableUntunneledUpgrade: true,
  334. disableEstablishing: false,
  335. disableApi: true,
  336. tunnelPoolSize: 1,
  337. useUpstreamProxy: false,
  338. disruptNetwork: false,
  339. transformHostNames: false,
  340. runDuration: 0,
  341. })
  342. }
  343. func TestObfuscatedSSHWithUpstreamProxy(t *testing.T) {
  344. controllerRun(t,
  345. &controllerRunConfig{
  346. expectNoServerEntries: false,
  347. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  348. clientIsLatestVersion: false,
  349. disableUntunneledUpgrade: true,
  350. disableEstablishing: false,
  351. disableApi: false,
  352. tunnelPoolSize: 1,
  353. useUpstreamProxy: true,
  354. disruptNetwork: false,
  355. transformHostNames: false,
  356. runDuration: 0,
  357. })
  358. }
  359. func TestUnfrontedMeekWithUpstreamProxy(t *testing.T) {
  360. controllerRun(t,
  361. &controllerRunConfig{
  362. expectNoServerEntries: false,
  363. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  364. clientIsLatestVersion: false,
  365. disableUntunneledUpgrade: true,
  366. disableEstablishing: false,
  367. disableApi: false,
  368. tunnelPoolSize: 1,
  369. useUpstreamProxy: true,
  370. disruptNetwork: false,
  371. transformHostNames: false,
  372. runDuration: 0,
  373. })
  374. }
  375. func TestUnfrontedMeekHTTPSWithUpstreamProxy(t *testing.T) {
  376. controllerRun(t,
  377. &controllerRunConfig{
  378. expectNoServerEntries: false,
  379. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  380. clientIsLatestVersion: false,
  381. disableUntunneledUpgrade: true,
  382. disableEstablishing: false,
  383. disableApi: false,
  384. tunnelPoolSize: 1,
  385. useUpstreamProxy: true,
  386. disruptNetwork: false,
  387. transformHostNames: false,
  388. runDuration: 0,
  389. })
  390. }
  391. type controllerRunConfig struct {
  392. expectNoServerEntries bool
  393. protocol string
  394. clientIsLatestVersion bool
  395. disableUntunneledUpgrade bool
  396. disableEstablishing bool
  397. disableApi bool
  398. tunnelPoolSize int
  399. useUpstreamProxy bool
  400. disruptNetwork bool
  401. transformHostNames bool
  402. runDuration time.Duration
  403. }
  404. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  405. configJSON, err := ioutil.ReadFile("controller_test.config")
  406. if err != nil {
  407. // Skip, don't fail, if config file is not present
  408. t.Skipf("error loading configuration file: %s", err)
  409. }
  410. // These fields must be filled in before calling LoadConfig
  411. var modifyConfig map[string]interface{}
  412. json.Unmarshal(configJSON, &modifyConfig)
  413. modifyConfig["DataStoreDirectory"] = testDataDirName
  414. modifyConfig["RemoteServerListDownloadFilename"] = filepath.Join(testDataDirName, "server_list_compressed")
  415. modifyConfig["UpgradeDownloadFilename"] = filepath.Join(testDataDirName, "upgrade")
  416. configJSON, _ = json.Marshal(modifyConfig)
  417. config, err := LoadConfig(configJSON)
  418. if err != nil {
  419. t.Fatalf("error processing configuration file: %s", err)
  420. }
  421. if runConfig.clientIsLatestVersion {
  422. config.ClientVersion = "999999999"
  423. }
  424. if runConfig.disableEstablishing {
  425. // Clear remote server list so tunnel cannot be established.
  426. // TODO: also delete all server entries in the datastore.
  427. config.DisableRemoteServerListFetcher = true
  428. }
  429. if runConfig.disableApi {
  430. config.DisableApi = true
  431. }
  432. config.TunnelPoolSize = runConfig.tunnelPoolSize
  433. if runConfig.disableUntunneledUpgrade {
  434. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  435. config.UpgradeDownloadClientVersionHeader = ""
  436. }
  437. if runConfig.useUpstreamProxy && runConfig.disruptNetwork {
  438. t.Fatalf("cannot use multiple upstream proxies")
  439. }
  440. if runConfig.disruptNetwork {
  441. config.UpstreamProxyUrl = disruptorProxyURL
  442. } else if runConfig.useUpstreamProxy {
  443. config.UpstreamProxyUrl = upstreamProxyURL
  444. config.CustomHeaders = upstreamProxyCustomHeaders
  445. }
  446. if runConfig.transformHostNames {
  447. config.TransformHostNames = "always"
  448. } else {
  449. config.TransformHostNames = "never"
  450. }
  451. // Override client retry throttle values to speed up automated
  452. // tests and ensure tests complete within fixed deadlines.
  453. fetchRemoteServerListRetryPeriodSeconds := 0
  454. config.FetchRemoteServerListRetryPeriodSeconds = &fetchRemoteServerListRetryPeriodSeconds
  455. downloadUpgradeRetryPeriodSeconds := 1
  456. config.DownloadUpgradeRetryPeriodSeconds = &downloadUpgradeRetryPeriodSeconds
  457. establishTunnelPausePeriodSeconds := 1
  458. config.EstablishTunnelPausePeriodSeconds = &establishTunnelPausePeriodSeconds
  459. config.TunnelProtocol = runConfig.protocol
  460. os.Remove(config.UpgradeDownloadFilename)
  461. os.Remove(config.RemoteServerListDownloadFilename)
  462. err = InitDataStore(config)
  463. if err != nil {
  464. t.Fatalf("error initializing datastore: %s", err)
  465. }
  466. serverEntryCount := CountServerEntries("", "")
  467. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  468. // TODO: replace expectNoServerEntries with resetServerEntries
  469. // so tests can run in arbitrary order
  470. t.Fatalf("unexpected server entries")
  471. }
  472. controller, err := NewController(config)
  473. if err != nil {
  474. t.Fatalf("error creating controller: %s", err)
  475. }
  476. // Monitor notices for "Tunnels" with count > 1, the
  477. // indication of tunnel establishment success.
  478. // Also record the selected HTTP proxy port to use
  479. // when fetching websites through the tunnel.
  480. httpProxyPort := 0
  481. tunnelEstablished := make(chan struct{}, 1)
  482. upgradeDownloaded := make(chan struct{}, 1)
  483. remoteServerListDownloaded := make(chan struct{}, 1)
  484. confirmedLatestVersion := make(chan struct{}, 1)
  485. var clientUpgradeDownloadedBytesCount int32
  486. var remoteServerListDownloadedBytesCount int32
  487. var impairedProtocolCount int32
  488. var impairedProtocolClassification = struct {
  489. sync.RWMutex
  490. classification map[string]int
  491. }{classification: make(map[string]int)}
  492. SetNoticeOutput(NewNoticeReceiver(
  493. func(notice []byte) {
  494. // TODO: log notices without logging server IPs:
  495. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  496. noticeType, payload, err := GetNotice(notice)
  497. if err != nil {
  498. return
  499. }
  500. switch noticeType {
  501. case "ListeningHttpProxyPort":
  502. httpProxyPort = int(payload["port"].(float64))
  503. case "ConnectingServer":
  504. serverProtocol := payload["protocol"].(string)
  505. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  506. // TODO: wrong goroutine for t.FatalNow()
  507. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  508. }
  509. case "Tunnels":
  510. count := int(payload["count"].(float64))
  511. if count > 0 {
  512. if runConfig.disableEstablishing {
  513. // TODO: wrong goroutine for t.FatalNow()
  514. t.Fatalf("tunnel established unexpectedly")
  515. } else {
  516. select {
  517. case tunnelEstablished <- *new(struct{}):
  518. default:
  519. }
  520. }
  521. }
  522. case "ClientUpgradeDownloadedBytes":
  523. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  524. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  525. case "ClientUpgradeDownloaded":
  526. select {
  527. case upgradeDownloaded <- *new(struct{}):
  528. default:
  529. }
  530. case "ClientIsLatestVersion":
  531. select {
  532. case confirmedLatestVersion <- *new(struct{}):
  533. default:
  534. }
  535. case "RemoteServerListResourceDownloadedBytes":
  536. url := payload["url"].(string)
  537. if url == config.RemoteServerListUrl {
  538. t.Logf("RemoteServerListResourceDownloadedBytes: %d", int(payload["bytes"].(float64)))
  539. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  540. }
  541. case "RemoteServerListResourceDownloaded":
  542. url := payload["url"].(string)
  543. if url == config.RemoteServerListUrl {
  544. t.Logf("RemoteServerListResourceDownloaded")
  545. select {
  546. case remoteServerListDownloaded <- *new(struct{}):
  547. default:
  548. }
  549. }
  550. case "ImpairedProtocolClassification":
  551. classification := payload["classification"].(map[string]interface{})
  552. impairedProtocolClassification.Lock()
  553. impairedProtocolClassification.classification = make(map[string]int)
  554. for k, v := range classification {
  555. count := int(v.(float64))
  556. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  557. atomic.AddInt32(&impairedProtocolCount, 1)
  558. }
  559. impairedProtocolClassification.classification[k] = count
  560. }
  561. impairedProtocolClassification.Unlock()
  562. case "ActiveTunnel":
  563. serverProtocol := payload["protocol"].(string)
  564. classification := make(map[string]int)
  565. impairedProtocolClassification.RLock()
  566. for k, v := range impairedProtocolClassification.classification {
  567. classification[k] = v
  568. }
  569. impairedProtocolClassification.RUnlock()
  570. count, ok := classification[serverProtocol]
  571. if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  572. // TODO: Fix this test case. Use of TunnelPoolSize breaks this
  573. // case, as many tunnel establishments are occurring in parallel,
  574. // and it can happen that a protocol is classified as impaired
  575. // while a tunnel with that protocol is established and set
  576. // active.
  577. // *not* t.Fatalf
  578. t.Logf("unexpected tunnel using impaired protocol: %s, %+v",
  579. serverProtocol, classification)
  580. }
  581. }
  582. }))
  583. // Run controller, which establishes tunnels
  584. shutdownBroadcast := make(chan struct{})
  585. controllerWaitGroup := new(sync.WaitGroup)
  586. controllerWaitGroup.Add(1)
  587. go func() {
  588. defer controllerWaitGroup.Done()
  589. controller.Run(shutdownBroadcast)
  590. }()
  591. defer func() {
  592. // Test: shutdown must complete within 20 seconds
  593. close(shutdownBroadcast)
  594. shutdownTimeout := time.NewTimer(20 * time.Second)
  595. shutdownOk := make(chan struct{}, 1)
  596. go func() {
  597. controllerWaitGroup.Wait()
  598. shutdownOk <- *new(struct{})
  599. }()
  600. select {
  601. case <-shutdownOk:
  602. case <-shutdownTimeout.C:
  603. t.Fatalf("controller shutdown timeout exceeded")
  604. }
  605. }()
  606. if !runConfig.disableEstablishing {
  607. // Test: tunnel must be established within 120 seconds
  608. establishTimeout := time.NewTimer(120 * time.Second)
  609. select {
  610. case <-tunnelEstablished:
  611. case <-establishTimeout.C:
  612. t.Fatalf("tunnel establish timeout exceeded")
  613. }
  614. // Test: if starting with no server entries, a fetch remote
  615. // server list must have succeeded. With disruptNetwork, the
  616. // fetch must have been resumed at least once.
  617. if serverEntryCount == 0 {
  618. select {
  619. case <-remoteServerListDownloaded:
  620. default:
  621. t.Fatalf("expected remote server list downloaded")
  622. }
  623. if runConfig.disruptNetwork {
  624. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  625. if count <= 1 {
  626. t.Fatalf("unexpected remote server list download progress: %d", count)
  627. }
  628. }
  629. }
  630. // Cannot establish port forwards in DisableApi mode
  631. if !runConfig.disableApi {
  632. // Test: fetch website through tunnel
  633. // Allow for known race condition described in NewHttpProxy():
  634. time.Sleep(1 * time.Second)
  635. fetchAndVerifyWebsite(t, httpProxyPort)
  636. // Test: run for duration, periodically using the tunnel to
  637. // ensure failed tunnel detection, and ultimately hitting
  638. // impaired protocol checks.
  639. startTime := monotime.Now()
  640. for {
  641. time.Sleep(1 * time.Second)
  642. useTunnel(t, httpProxyPort)
  643. if startTime.Add(runConfig.runDuration).Before(monotime.Now()) {
  644. break
  645. }
  646. }
  647. // Test: with disruptNetwork, impaired protocols should be exercised
  648. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  649. count := atomic.LoadInt32(&impairedProtocolCount)
  650. if count <= 0 {
  651. t.Fatalf("unexpected impaired protocol count: %d", count)
  652. } else {
  653. impairedProtocolClassification.RLock()
  654. t.Logf("impaired protocol classification: %+v",
  655. impairedProtocolClassification.classification)
  656. impairedProtocolClassification.RUnlock()
  657. }
  658. }
  659. }
  660. }
  661. // Test: upgrade check/download must be downloaded within 180 seconds
  662. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  663. if expectUpgrade {
  664. upgradeTimeout := time.NewTimer(180 * time.Second)
  665. select {
  666. case <-upgradeDownloaded:
  667. // TODO: verify downloaded file
  668. if runConfig.clientIsLatestVersion {
  669. t.Fatalf("upgrade downloaded unexpectedly")
  670. }
  671. // Test: with disruptNetwork, must be multiple download progress notices
  672. if runConfig.disruptNetwork {
  673. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  674. if count <= 1 {
  675. t.Fatalf("unexpected upgrade download progress: %d", count)
  676. }
  677. }
  678. case <-confirmedLatestVersion:
  679. if !runConfig.clientIsLatestVersion {
  680. t.Fatalf("confirmed latest version unexpectedly")
  681. }
  682. case <-upgradeTimeout.C:
  683. t.Fatalf("upgrade download timeout exceeded")
  684. }
  685. }
  686. }
  687. type TestHostNameTransformer struct {
  688. }
  689. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  690. return "example.com", true
  691. }
  692. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) {
  693. testUrl := "https://psiphon.ca"
  694. roundTripTimeout := 30 * time.Second
  695. expectedResponseContains := "Psiphon"
  696. checkResponse := func(responseBody string) bool {
  697. return strings.Contains(responseBody, expectedResponseContains)
  698. }
  699. // Test: use HTTP proxy
  700. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  701. if err != nil {
  702. t.Fatalf("error initializing proxied HTTP request: %s", err)
  703. }
  704. httpTransport := &http.Transport{
  705. Proxy: http.ProxyURL(proxyUrl),
  706. DisableKeepAlives: true,
  707. }
  708. httpClient := &http.Client{
  709. Transport: httpTransport,
  710. Timeout: roundTripTimeout,
  711. }
  712. request, err := http.NewRequest("GET", testUrl, nil)
  713. if err != nil {
  714. t.Fatalf("error preparing proxied HTTP request: %s", err)
  715. }
  716. response, err := httpClient.Do(request)
  717. if err != nil {
  718. t.Fatalf("error sending proxied HTTP request: %s", err)
  719. }
  720. body, err := ioutil.ReadAll(response.Body)
  721. if err != nil {
  722. t.Fatalf("error reading proxied HTTP response: %s", err)
  723. }
  724. response.Body.Close()
  725. if !checkResponse(string(body)) {
  726. t.Fatalf("unexpected proxied HTTP response")
  727. }
  728. // Delay before requesting from external service again
  729. time.Sleep(1 * time.Second)
  730. // Test: use direct URL proxy
  731. httpTransport = &http.Transport{
  732. DisableKeepAlives: true,
  733. }
  734. httpClient = &http.Client{
  735. Transport: httpTransport,
  736. Timeout: roundTripTimeout,
  737. }
  738. request, err = http.NewRequest(
  739. "GET",
  740. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  741. httpProxyPort, url.QueryEscape(testUrl)),
  742. nil)
  743. if err != nil {
  744. t.Fatalf("error preparing direct URL request: %s", err)
  745. }
  746. response, err = httpClient.Do(request)
  747. if err != nil {
  748. t.Fatalf("error sending direct URL request: %s", err)
  749. }
  750. body, err = ioutil.ReadAll(response.Body)
  751. if err != nil {
  752. t.Fatalf("error reading direct URL response: %s", err)
  753. }
  754. response.Body.Close()
  755. if !checkResponse(string(body)) {
  756. t.Fatalf("unexpected direct URL response")
  757. }
  758. // Delay before requesting from external service again
  759. time.Sleep(1 * time.Second)
  760. // Test: use tunneled URL proxy
  761. request, err = http.NewRequest(
  762. "GET",
  763. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  764. httpProxyPort, url.QueryEscape(testUrl)),
  765. nil)
  766. if err != nil {
  767. t.Fatalf("error preparing tunneled URL request: %s", err)
  768. }
  769. response, err = httpClient.Do(request)
  770. if err != nil {
  771. t.Fatalf("error sending tunneled URL request: %s", err)
  772. }
  773. body, err = ioutil.ReadAll(response.Body)
  774. if err != nil {
  775. t.Fatalf("error reading tunneled URL response: %s", err)
  776. }
  777. response.Body.Close()
  778. if !checkResponse(string(body)) {
  779. t.Fatalf("unexpected tunneled URL response")
  780. }
  781. }
  782. func useTunnel(t *testing.T, httpProxyPort int) {
  783. // No action on errors as the tunnel is expected to fail sometimes
  784. testUrl := "https://psiphon3.com"
  785. roundTripTimeout := 1 * time.Second
  786. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  787. if err != nil {
  788. return
  789. }
  790. httpClient := &http.Client{
  791. Transport: &http.Transport{
  792. Proxy: http.ProxyURL(proxyUrl),
  793. },
  794. Timeout: roundTripTimeout,
  795. }
  796. response, err := httpClient.Get(testUrl)
  797. if err != nil {
  798. return
  799. }
  800. response.Body.Close()
  801. }
  802. const disruptorProxyAddress = "127.0.0.1:2160"
  803. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  804. const disruptorMaxConnectionBytes = 500000
  805. const disruptorMaxConnectionTime = 10 * time.Second
  806. func initDisruptor() {
  807. go func() {
  808. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  809. if err != nil {
  810. fmt.Printf("disruptor proxy listen error: %s\n", err)
  811. return
  812. }
  813. for {
  814. localConn, err := listener.AcceptSocks()
  815. if err != nil {
  816. fmt.Printf("disruptor proxy accept error: %s\n", err)
  817. return
  818. }
  819. go func() {
  820. defer localConn.Close()
  821. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  822. if err != nil {
  823. // TODO: log "err" without logging server IPs
  824. fmt.Printf("disruptor proxy dial error\n")
  825. return
  826. }
  827. defer remoteConn.Close()
  828. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  829. if err != nil {
  830. fmt.Printf("disruptor proxy grant error: %s\n", err)
  831. return
  832. }
  833. // Cut connection after disruptorMaxConnectionTime
  834. time.AfterFunc(disruptorMaxConnectionTime, func() {
  835. localConn.Close()
  836. remoteConn.Close()
  837. })
  838. // Relay connection, but only up to disruptorMaxConnectionBytes
  839. waitGroup := new(sync.WaitGroup)
  840. waitGroup.Add(1)
  841. go func() {
  842. defer waitGroup.Done()
  843. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  844. }()
  845. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  846. waitGroup.Wait()
  847. }()
  848. }
  849. }()
  850. }
  851. const upstreamProxyURL = "http://127.0.0.1:2161"
  852. var upstreamProxyCustomHeaders = map[string][]string{"X-Test-Header-Name": []string{"test-header-value1", "test-header-value2"}}
  853. func hasExpectedCustomHeaders(h http.Header) bool {
  854. for name, values := range upstreamProxyCustomHeaders {
  855. if h[name] == nil {
  856. return false
  857. }
  858. // Order may not be the same
  859. for _, value := range values {
  860. if !common.Contains(h[name], value) {
  861. return false
  862. }
  863. }
  864. }
  865. return true
  866. }
  867. func initUpstreamProxy() {
  868. go func() {
  869. proxy := goproxy.NewProxyHttpServer()
  870. proxy.OnRequest().DoFunc(
  871. func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
  872. if !hasExpectedCustomHeaders(r.Header) {
  873. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  874. return nil, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusUnauthorized, "")
  875. }
  876. return r, nil
  877. })
  878. proxy.OnRequest().HandleConnectFunc(
  879. func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) {
  880. if !hasExpectedCustomHeaders(ctx.Req.Header) {
  881. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  882. return goproxy.RejectConnect, host
  883. }
  884. return goproxy.OkConnect, host
  885. })
  886. err := http.ListenAndServe("127.0.0.1:2161", proxy)
  887. if err != nil {
  888. fmt.Printf("upstream proxy failed: %s\n", err)
  889. }
  890. }()
  891. // TODO: wait until listener is active?
  892. }