controller_test.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "encoding/json"
  22. "flag"
  23. "fmt"
  24. "io"
  25. "io/ioutil"
  26. "log"
  27. "net"
  28. "net/http"
  29. "net/url"
  30. "os"
  31. "path/filepath"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "testing"
  36. "time"
  37. "github.com/Psiphon-Inc/goarista/monotime"
  38. "github.com/Psiphon-Inc/goproxy"
  39. socks "github.com/Psiphon-Inc/goptlib"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  42. )
  43. var testDataDirName string
  44. func TestMain(m *testing.M) {
  45. flag.Parse()
  46. var err error
  47. testDataDirName, err = ioutil.TempDir("", "psiphon-controller-test")
  48. if err != nil {
  49. fmt.Printf("TempDir failed: %s\n", err)
  50. os.Exit(1)
  51. }
  52. defer os.RemoveAll(testDataDirName)
  53. os.Remove(filepath.Join(testDataDirName, DATA_STORE_FILENAME))
  54. SetEmitDiagnosticNotices(true)
  55. initDisruptor()
  56. initUpstreamProxy()
  57. os.Exit(m.Run())
  58. }
  59. // Test case notes/limitations/dependencies:
  60. //
  61. // * Untunneled upgrade tests must execute before
  62. // the other tests to ensure no tunnel is established.
  63. // We need a way to reset the datastore after it's been
  64. // initialized in order to to clear out its data entries
  65. // and be able to arbitrarily order the tests.
  66. //
  67. // * The resumable download tests using disruptNetwork
  68. // depend on the download object being larger than the
  69. // disruptorMax limits so that the disruptor will actually
  70. // interrupt the first download attempt. Specifically, the
  71. // upgrade and remote server list at the URLs specified in
  72. // controller_test.config.enc.
  73. //
  74. // * The protocol tests assume there is at least one server
  75. // supporting each protocol in the server list at the URL
  76. // specified in controller_test.config.enc, and that these
  77. // servers are not overloaded.
  78. //
  79. // * fetchAndVerifyWebsite depends on the target URL being
  80. // available and responding.
  81. //
  82. func TestUntunneledUpgradeDownload(t *testing.T) {
  83. controllerRun(t,
  84. &controllerRunConfig{
  85. expectNoServerEntries: true,
  86. protocol: "",
  87. clientIsLatestVersion: false,
  88. disableUntunneledUpgrade: false,
  89. disableEstablishing: true,
  90. disableApi: false,
  91. tunnelPoolSize: 1,
  92. useUpstreamProxy: false,
  93. disruptNetwork: false,
  94. transformHostNames: false,
  95. runDuration: 0,
  96. })
  97. }
  98. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  99. controllerRun(t,
  100. &controllerRunConfig{
  101. expectNoServerEntries: true,
  102. protocol: "",
  103. clientIsLatestVersion: false,
  104. disableUntunneledUpgrade: false,
  105. disableEstablishing: true,
  106. disableApi: false,
  107. tunnelPoolSize: 1,
  108. useUpstreamProxy: false,
  109. disruptNetwork: true,
  110. transformHostNames: false,
  111. runDuration: 0,
  112. })
  113. }
  114. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  115. controllerRun(t,
  116. &controllerRunConfig{
  117. expectNoServerEntries: true,
  118. protocol: "",
  119. clientIsLatestVersion: true,
  120. disableUntunneledUpgrade: false,
  121. disableEstablishing: true,
  122. disableApi: false,
  123. tunnelPoolSize: 1,
  124. useUpstreamProxy: false,
  125. disruptNetwork: false,
  126. transformHostNames: false,
  127. runDuration: 0,
  128. })
  129. }
  130. func TestUntunneledResumableFetchRemoteServerList(t *testing.T) {
  131. controllerRun(t,
  132. &controllerRunConfig{
  133. expectNoServerEntries: true,
  134. protocol: "",
  135. clientIsLatestVersion: true,
  136. disableUntunneledUpgrade: false,
  137. disableEstablishing: false,
  138. disableApi: false,
  139. tunnelPoolSize: 1,
  140. useUpstreamProxy: false,
  141. disruptNetwork: true,
  142. transformHostNames: false,
  143. runDuration: 0,
  144. })
  145. }
  146. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  147. controllerRun(t,
  148. &controllerRunConfig{
  149. expectNoServerEntries: false,
  150. protocol: "",
  151. clientIsLatestVersion: true,
  152. disableUntunneledUpgrade: true,
  153. disableEstablishing: false,
  154. disableApi: false,
  155. tunnelPoolSize: 1,
  156. useUpstreamProxy: false,
  157. disruptNetwork: false,
  158. transformHostNames: false,
  159. runDuration: 0,
  160. })
  161. }
  162. func TestImpairedProtocols(t *testing.T) {
  163. // This test sets a tunnelPoolSize of 40 and runs
  164. // the session for 1 minute with network disruption
  165. // on. All 40 tunnels being disrupted every 10
  166. // seconds (followed by ssh keep alive probe timeout)
  167. // should be sufficient to trigger at least one
  168. // impaired protocol classification.
  169. controllerRun(t,
  170. &controllerRunConfig{
  171. expectNoServerEntries: false,
  172. protocol: "",
  173. clientIsLatestVersion: true,
  174. disableUntunneledUpgrade: true,
  175. disableEstablishing: false,
  176. disableApi: false,
  177. tunnelPoolSize: 40,
  178. useUpstreamProxy: false,
  179. disruptNetwork: true,
  180. transformHostNames: false,
  181. runDuration: 1 * time.Minute,
  182. })
  183. }
  184. func TestSSH(t *testing.T) {
  185. controllerRun(t,
  186. &controllerRunConfig{
  187. expectNoServerEntries: false,
  188. protocol: protocol.TUNNEL_PROTOCOL_SSH,
  189. clientIsLatestVersion: false,
  190. disableUntunneledUpgrade: true,
  191. disableEstablishing: false,
  192. disableApi: false,
  193. tunnelPoolSize: 1,
  194. useUpstreamProxy: false,
  195. disruptNetwork: false,
  196. transformHostNames: false,
  197. runDuration: 0,
  198. })
  199. }
  200. func TestObfuscatedSSH(t *testing.T) {
  201. controllerRun(t,
  202. &controllerRunConfig{
  203. expectNoServerEntries: false,
  204. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  205. clientIsLatestVersion: false,
  206. disableUntunneledUpgrade: true,
  207. disableEstablishing: false,
  208. disableApi: false,
  209. tunnelPoolSize: 1,
  210. useUpstreamProxy: false,
  211. disruptNetwork: false,
  212. transformHostNames: false,
  213. runDuration: 0,
  214. })
  215. }
  216. func TestUnfrontedMeek(t *testing.T) {
  217. controllerRun(t,
  218. &controllerRunConfig{
  219. expectNoServerEntries: false,
  220. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  221. clientIsLatestVersion: false,
  222. disableUntunneledUpgrade: true,
  223. disableEstablishing: false,
  224. disableApi: false,
  225. tunnelPoolSize: 1,
  226. useUpstreamProxy: false,
  227. disruptNetwork: false,
  228. transformHostNames: false,
  229. runDuration: 0,
  230. })
  231. }
  232. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  233. controllerRun(t,
  234. &controllerRunConfig{
  235. expectNoServerEntries: false,
  236. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  237. clientIsLatestVersion: true,
  238. disableUntunneledUpgrade: true,
  239. disableEstablishing: false,
  240. disableApi: false,
  241. tunnelPoolSize: 1,
  242. useUpstreamProxy: false,
  243. disruptNetwork: false,
  244. transformHostNames: true,
  245. runDuration: 0,
  246. })
  247. }
  248. func TestFrontedMeek(t *testing.T) {
  249. controllerRun(t,
  250. &controllerRunConfig{
  251. expectNoServerEntries: false,
  252. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  253. clientIsLatestVersion: false,
  254. disableUntunneledUpgrade: true,
  255. disableEstablishing: false,
  256. disableApi: false,
  257. tunnelPoolSize: 1,
  258. useUpstreamProxy: false,
  259. disruptNetwork: false,
  260. transformHostNames: false,
  261. runDuration: 0,
  262. })
  263. }
  264. func TestFrontedMeekWithTransformer(t *testing.T) {
  265. controllerRun(t,
  266. &controllerRunConfig{
  267. expectNoServerEntries: false,
  268. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  269. clientIsLatestVersion: true,
  270. disableUntunneledUpgrade: true,
  271. disableEstablishing: false,
  272. disableApi: false,
  273. tunnelPoolSize: 1,
  274. useUpstreamProxy: false,
  275. disruptNetwork: false,
  276. transformHostNames: true,
  277. runDuration: 0,
  278. })
  279. }
  280. func TestFrontedMeekHTTP(t *testing.T) {
  281. controllerRun(t,
  282. &controllerRunConfig{
  283. expectNoServerEntries: false,
  284. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  285. clientIsLatestVersion: true,
  286. disableUntunneledUpgrade: true,
  287. disableEstablishing: false,
  288. disableApi: false,
  289. tunnelPoolSize: 1,
  290. useUpstreamProxy: false,
  291. disruptNetwork: false,
  292. transformHostNames: false,
  293. runDuration: 0,
  294. })
  295. }
  296. func TestUnfrontedMeekHTTPS(t *testing.T) {
  297. controllerRun(t,
  298. &controllerRunConfig{
  299. expectNoServerEntries: false,
  300. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  301. clientIsLatestVersion: false,
  302. disableUntunneledUpgrade: true,
  303. disableEstablishing: false,
  304. disableApi: false,
  305. tunnelPoolSize: 1,
  306. useUpstreamProxy: false,
  307. disruptNetwork: false,
  308. transformHostNames: false,
  309. runDuration: 0,
  310. })
  311. }
  312. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  313. controllerRun(t,
  314. &controllerRunConfig{
  315. expectNoServerEntries: false,
  316. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  317. clientIsLatestVersion: true,
  318. disableUntunneledUpgrade: true,
  319. disableEstablishing: false,
  320. disableApi: false,
  321. tunnelPoolSize: 1,
  322. useUpstreamProxy: false,
  323. disruptNetwork: false,
  324. transformHostNames: true,
  325. runDuration: 0,
  326. })
  327. }
  328. func TestDisabledApi(t *testing.T) {
  329. controllerRun(t,
  330. &controllerRunConfig{
  331. expectNoServerEntries: false,
  332. protocol: "",
  333. clientIsLatestVersion: true,
  334. disableUntunneledUpgrade: true,
  335. disableEstablishing: false,
  336. disableApi: true,
  337. tunnelPoolSize: 1,
  338. useUpstreamProxy: false,
  339. disruptNetwork: false,
  340. transformHostNames: false,
  341. runDuration: 0,
  342. })
  343. }
  344. func TestObfuscatedSSHWithUpstreamProxy(t *testing.T) {
  345. controllerRun(t,
  346. &controllerRunConfig{
  347. expectNoServerEntries: false,
  348. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  349. clientIsLatestVersion: false,
  350. disableUntunneledUpgrade: true,
  351. disableEstablishing: false,
  352. disableApi: false,
  353. tunnelPoolSize: 1,
  354. useUpstreamProxy: true,
  355. disruptNetwork: false,
  356. transformHostNames: false,
  357. runDuration: 0,
  358. })
  359. }
  360. func TestUnfrontedMeekWithUpstreamProxy(t *testing.T) {
  361. controllerRun(t,
  362. &controllerRunConfig{
  363. expectNoServerEntries: false,
  364. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  365. clientIsLatestVersion: false,
  366. disableUntunneledUpgrade: true,
  367. disableEstablishing: false,
  368. disableApi: false,
  369. tunnelPoolSize: 1,
  370. useUpstreamProxy: true,
  371. disruptNetwork: false,
  372. transformHostNames: false,
  373. runDuration: 0,
  374. })
  375. }
  376. func TestUnfrontedMeekHTTPSWithUpstreamProxy(t *testing.T) {
  377. controllerRun(t,
  378. &controllerRunConfig{
  379. expectNoServerEntries: false,
  380. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  381. clientIsLatestVersion: false,
  382. disableUntunneledUpgrade: true,
  383. disableEstablishing: false,
  384. disableApi: false,
  385. tunnelPoolSize: 1,
  386. useUpstreamProxy: true,
  387. disruptNetwork: false,
  388. transformHostNames: false,
  389. runDuration: 0,
  390. })
  391. }
  392. type controllerRunConfig struct {
  393. expectNoServerEntries bool
  394. protocol string
  395. clientIsLatestVersion bool
  396. disableUntunneledUpgrade bool
  397. disableEstablishing bool
  398. disableApi bool
  399. tunnelPoolSize int
  400. useUpstreamProxy bool
  401. disruptNetwork bool
  402. transformHostNames bool
  403. runDuration time.Duration
  404. }
  405. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  406. configJSON, err := ioutil.ReadFile("controller_test.config")
  407. if err != nil {
  408. // Skip, don't fail, if config file is not present
  409. t.Skipf("error loading configuration file: %s", err)
  410. }
  411. // These fields must be filled in before calling LoadConfig
  412. var modifyConfig map[string]interface{}
  413. json.Unmarshal(configJSON, &modifyConfig)
  414. modifyConfig["DataStoreDirectory"] = testDataDirName
  415. modifyConfig["RemoteServerListDownloadFilename"] = filepath.Join(testDataDirName, "server_list_compressed")
  416. modifyConfig["UpgradeDownloadFilename"] = filepath.Join(testDataDirName, "upgrade")
  417. configJSON, _ = json.Marshal(modifyConfig)
  418. config, err := LoadConfig(configJSON)
  419. if err != nil {
  420. t.Fatalf("error processing configuration file: %s", err)
  421. }
  422. if runConfig.clientIsLatestVersion {
  423. config.ClientVersion = "999999999"
  424. }
  425. if runConfig.disableEstablishing {
  426. // Clear remote server list so tunnel cannot be established.
  427. // TODO: also delete all server entries in the datastore.
  428. config.DisableRemoteServerListFetcher = true
  429. }
  430. if runConfig.disableApi {
  431. config.DisableApi = true
  432. }
  433. config.TunnelPoolSize = runConfig.tunnelPoolSize
  434. if runConfig.disableUntunneledUpgrade {
  435. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  436. config.UpgradeDownloadClientVersionHeader = ""
  437. }
  438. if runConfig.useUpstreamProxy && runConfig.disruptNetwork {
  439. t.Fatalf("cannot use multiple upstream proxies")
  440. }
  441. if runConfig.disruptNetwork {
  442. config.UpstreamProxyUrl = disruptorProxyURL
  443. } else if runConfig.useUpstreamProxy {
  444. config.UpstreamProxyUrl = upstreamProxyURL
  445. config.CustomHeaders = upstreamProxyCustomHeaders
  446. }
  447. if runConfig.transformHostNames {
  448. config.TransformHostNames = "always"
  449. } else {
  450. config.TransformHostNames = "never"
  451. }
  452. // Override client retry throttle values to speed up automated
  453. // tests and ensure tests complete within fixed deadlines.
  454. fetchRemoteServerListRetryPeriodSeconds := 0
  455. config.FetchRemoteServerListRetryPeriodSeconds = &fetchRemoteServerListRetryPeriodSeconds
  456. downloadUpgradeRetryPeriodSeconds := 1
  457. config.DownloadUpgradeRetryPeriodSeconds = &downloadUpgradeRetryPeriodSeconds
  458. establishTunnelPausePeriodSeconds := 1
  459. config.EstablishTunnelPausePeriodSeconds = &establishTunnelPausePeriodSeconds
  460. config.TunnelProtocol = runConfig.protocol
  461. os.Remove(config.UpgradeDownloadFilename)
  462. os.Remove(config.RemoteServerListDownloadFilename)
  463. err = InitDataStore(config)
  464. if err != nil {
  465. t.Fatalf("error initializing datastore: %s", err)
  466. }
  467. serverEntryCount := CountServerEntries("", "")
  468. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  469. // TODO: replace expectNoServerEntries with resetServerEntries
  470. // so tests can run in arbitrary order
  471. t.Fatalf("unexpected server entries")
  472. }
  473. controller, err := NewController(config)
  474. if err != nil {
  475. t.Fatalf("error creating controller: %s", err)
  476. }
  477. // Monitor notices for "Tunnels" with count > 1, the
  478. // indication of tunnel establishment success.
  479. // Also record the selected HTTP proxy port to use
  480. // when fetching websites through the tunnel.
  481. httpProxyPort := 0
  482. tunnelEstablished := make(chan struct{}, 1)
  483. upgradeDownloaded := make(chan struct{}, 1)
  484. remoteServerListDownloaded := make(chan struct{}, 1)
  485. confirmedLatestVersion := make(chan struct{}, 1)
  486. var clientUpgradeDownloadedBytesCount int32
  487. var remoteServerListDownloadedBytesCount int32
  488. var impairedProtocolCount int32
  489. var impairedProtocolClassification = struct {
  490. sync.RWMutex
  491. classification map[string]int
  492. }{classification: make(map[string]int)}
  493. SetNoticeOutput(NewNoticeReceiver(
  494. func(notice []byte) {
  495. // TODO: log notices without logging server IPs:
  496. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  497. noticeType, payload, err := GetNotice(notice)
  498. if err != nil {
  499. return
  500. }
  501. switch noticeType {
  502. case "ListeningHttpProxyPort":
  503. httpProxyPort = int(payload["port"].(float64))
  504. case "ConnectingServer":
  505. serverProtocol := payload["protocol"].(string)
  506. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  507. // TODO: wrong goroutine for t.FatalNow()
  508. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  509. }
  510. case "Tunnels":
  511. count := int(payload["count"].(float64))
  512. if count > 0 {
  513. if runConfig.disableEstablishing {
  514. // TODO: wrong goroutine for t.FatalNow()
  515. t.Fatalf("tunnel established unexpectedly")
  516. } else {
  517. select {
  518. case tunnelEstablished <- *new(struct{}):
  519. default:
  520. }
  521. }
  522. }
  523. case "ClientUpgradeDownloadedBytes":
  524. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  525. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  526. case "ClientUpgradeDownloaded":
  527. select {
  528. case upgradeDownloaded <- *new(struct{}):
  529. default:
  530. }
  531. case "ClientIsLatestVersion":
  532. select {
  533. case confirmedLatestVersion <- *new(struct{}):
  534. default:
  535. }
  536. case "RemoteServerListResourceDownloadedBytes":
  537. url := payload["url"].(string)
  538. if url == config.RemoteServerListUrl {
  539. t.Logf("RemoteServerListResourceDownloadedBytes: %d", int(payload["bytes"].(float64)))
  540. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  541. }
  542. case "RemoteServerListResourceDownloaded":
  543. url := payload["url"].(string)
  544. if url == config.RemoteServerListUrl {
  545. t.Logf("RemoteServerListResourceDownloaded")
  546. select {
  547. case remoteServerListDownloaded <- *new(struct{}):
  548. default:
  549. }
  550. }
  551. case "ImpairedProtocolClassification":
  552. classification := payload["classification"].(map[string]interface{})
  553. impairedProtocolClassification.Lock()
  554. impairedProtocolClassification.classification = make(map[string]int)
  555. for k, v := range classification {
  556. count := int(v.(float64))
  557. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  558. atomic.AddInt32(&impairedProtocolCount, 1)
  559. }
  560. impairedProtocolClassification.classification[k] = count
  561. }
  562. impairedProtocolClassification.Unlock()
  563. case "ActiveTunnel":
  564. serverProtocol := payload["protocol"].(string)
  565. classification := make(map[string]int)
  566. impairedProtocolClassification.RLock()
  567. for k, v := range impairedProtocolClassification.classification {
  568. classification[k] = v
  569. }
  570. impairedProtocolClassification.RUnlock()
  571. count, ok := classification[serverProtocol]
  572. if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  573. // TODO: Fix this test case. Use of TunnelPoolSize breaks this
  574. // case, as many tunnel establishments are occurring in parallel,
  575. // and it can happen that a protocol is classified as impaired
  576. // while a tunnel with that protocol is established and set
  577. // active.
  578. // *not* t.Fatalf
  579. t.Logf("unexpected tunnel using impaired protocol: %s, %+v",
  580. serverProtocol, classification)
  581. }
  582. }
  583. }))
  584. // Run controller, which establishes tunnels
  585. shutdownBroadcast := make(chan struct{})
  586. controllerWaitGroup := new(sync.WaitGroup)
  587. controllerWaitGroup.Add(1)
  588. go func() {
  589. defer controllerWaitGroup.Done()
  590. controller.Run(shutdownBroadcast)
  591. }()
  592. defer func() {
  593. // Test: shutdown must complete within 20 seconds
  594. close(shutdownBroadcast)
  595. shutdownTimeout := time.NewTimer(20 * time.Second)
  596. shutdownOk := make(chan struct{}, 1)
  597. go func() {
  598. controllerWaitGroup.Wait()
  599. shutdownOk <- *new(struct{})
  600. }()
  601. select {
  602. case <-shutdownOk:
  603. case <-shutdownTimeout.C:
  604. t.Fatalf("controller shutdown timeout exceeded")
  605. }
  606. }()
  607. if !runConfig.disableEstablishing {
  608. // Test: tunnel must be established within 120 seconds
  609. establishTimeout := time.NewTimer(120 * time.Second)
  610. select {
  611. case <-tunnelEstablished:
  612. case <-establishTimeout.C:
  613. t.Fatalf("tunnel establish timeout exceeded")
  614. }
  615. // Test: if starting with no server entries, a fetch remote
  616. // server list must have succeeded. With disruptNetwork, the
  617. // fetch must have been resumed at least once.
  618. if serverEntryCount == 0 {
  619. select {
  620. case <-remoteServerListDownloaded:
  621. default:
  622. t.Fatalf("expected remote server list downloaded")
  623. }
  624. if runConfig.disruptNetwork {
  625. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  626. if count <= 1 {
  627. t.Fatalf("unexpected remote server list download progress: %d", count)
  628. }
  629. }
  630. }
  631. // Cannot establish port forwards in DisableApi mode
  632. if !runConfig.disableApi {
  633. // Test: fetch website through tunnel
  634. // Allow for known race condition described in NewHttpProxy():
  635. time.Sleep(1 * time.Second)
  636. if !runConfig.disruptNetwork {
  637. fetchAndVerifyWebsite(t, httpProxyPort)
  638. }
  639. // Test: run for duration, periodically using the tunnel to
  640. // ensure failed tunnel detection, and ultimately hitting
  641. // impaired protocol checks.
  642. startTime := monotime.Now()
  643. for {
  644. time.Sleep(1 * time.Second)
  645. useTunnel(t, httpProxyPort)
  646. if startTime.Add(runConfig.runDuration).Before(monotime.Now()) {
  647. break
  648. }
  649. }
  650. // Test: with disruptNetwork, impaired protocols should be exercised
  651. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  652. count := atomic.LoadInt32(&impairedProtocolCount)
  653. if count <= 0 {
  654. t.Fatalf("unexpected impaired protocol count: %d", count)
  655. } else {
  656. impairedProtocolClassification.RLock()
  657. t.Logf("impaired protocol classification: %+v",
  658. impairedProtocolClassification.classification)
  659. impairedProtocolClassification.RUnlock()
  660. }
  661. }
  662. }
  663. }
  664. // Test: upgrade check/download must be downloaded within 180 seconds
  665. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  666. if expectUpgrade {
  667. upgradeTimeout := time.NewTimer(120 * time.Second)
  668. select {
  669. case <-upgradeDownloaded:
  670. // TODO: verify downloaded file
  671. if runConfig.clientIsLatestVersion {
  672. t.Fatalf("upgrade downloaded unexpectedly")
  673. }
  674. // Test: with disruptNetwork, must be multiple download progress notices
  675. if runConfig.disruptNetwork {
  676. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  677. if count <= 1 {
  678. t.Fatalf("unexpected upgrade download progress: %d", count)
  679. }
  680. }
  681. case <-confirmedLatestVersion:
  682. if !runConfig.clientIsLatestVersion {
  683. t.Fatalf("confirmed latest version unexpectedly")
  684. }
  685. case <-upgradeTimeout.C:
  686. t.Fatalf("upgrade download timeout exceeded")
  687. }
  688. }
  689. }
  690. type TestHostNameTransformer struct {
  691. }
  692. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  693. return "example.com", true
  694. }
  695. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) error {
  696. testUrl := "https://psiphon.ca"
  697. roundTripTimeout := 30 * time.Second
  698. expectedResponseContains := "Psiphon"
  699. checkResponse := func(responseBody string) bool {
  700. return strings.Contains(responseBody, expectedResponseContains)
  701. }
  702. // Retries are made to compensate for intermittent failures due
  703. // to external network conditions.
  704. fetchWithRetries := func(fetchName string, fetchFunc func() error) error {
  705. retryCount := 6
  706. retryDelay := 5 * time.Second
  707. var err error
  708. for i := 0; i < retryCount; i++ {
  709. err = fetchFunc()
  710. if err == nil || i == retryCount-1 {
  711. break
  712. }
  713. time.Sleep(retryDelay)
  714. t.Logf("retrying %s...", fetchName)
  715. }
  716. return err
  717. }
  718. // Test: use HTTP proxy
  719. fetchUsingHTTPProxy := func() error {
  720. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  721. if err != nil {
  722. return fmt.Errorf("error initializing proxied HTTP request: %s", err)
  723. }
  724. httpTransport := &http.Transport{
  725. Proxy: http.ProxyURL(proxyUrl),
  726. DisableKeepAlives: true,
  727. }
  728. httpClient := &http.Client{
  729. Transport: httpTransport,
  730. Timeout: roundTripTimeout,
  731. }
  732. request, err := http.NewRequest("GET", testUrl, nil)
  733. if err != nil {
  734. return fmt.Errorf("error preparing proxied HTTP request: %s", err)
  735. }
  736. response, err := httpClient.Do(request)
  737. if err != nil {
  738. return fmt.Errorf("error sending proxied HTTP request: %s", err)
  739. }
  740. defer response.Body.Close()
  741. body, err := ioutil.ReadAll(response.Body)
  742. if err != nil {
  743. return fmt.Errorf("error reading proxied HTTP response: %s", err)
  744. }
  745. if !checkResponse(string(body)) {
  746. return fmt.Errorf("unexpected proxied HTTP response")
  747. }
  748. return nil
  749. }
  750. err := fetchWithRetries("proxied HTTP request", fetchUsingHTTPProxy)
  751. if err != nil {
  752. return err
  753. }
  754. // Delay before requesting from external service again
  755. time.Sleep(1 * time.Second)
  756. // Test: use direct URL proxy
  757. fetchUsingURLProxyDirect := func() error {
  758. httpTransport := &http.Transport{
  759. DisableKeepAlives: true,
  760. }
  761. httpClient := &http.Client{
  762. Transport: httpTransport,
  763. Timeout: roundTripTimeout,
  764. }
  765. request, err := http.NewRequest(
  766. "GET",
  767. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  768. httpProxyPort, url.QueryEscape(testUrl)),
  769. nil)
  770. if err != nil {
  771. return fmt.Errorf("error preparing direct URL request: %s", err)
  772. }
  773. response, err := httpClient.Do(request)
  774. if err != nil {
  775. return fmt.Errorf("error sending direct URL request: %s", err)
  776. }
  777. defer response.Body.Close()
  778. body, err := ioutil.ReadAll(response.Body)
  779. if err != nil {
  780. return fmt.Errorf("error reading direct URL response: %s", err)
  781. }
  782. if !checkResponse(string(body)) {
  783. return fmt.Errorf("unexpected direct URL response")
  784. }
  785. return nil
  786. }
  787. err = fetchWithRetries("direct URL request", fetchUsingURLProxyDirect)
  788. if err != nil {
  789. return err
  790. }
  791. // Delay before requesting from external service again
  792. time.Sleep(1 * time.Second)
  793. // Test: use tunneled URL proxy
  794. fetchUsingURLProxyTunneled := func() error {
  795. httpTransport := &http.Transport{
  796. DisableKeepAlives: true,
  797. }
  798. httpClient := &http.Client{
  799. Transport: httpTransport,
  800. Timeout: roundTripTimeout,
  801. }
  802. request, err := http.NewRequest(
  803. "GET",
  804. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  805. httpProxyPort, url.QueryEscape(testUrl)),
  806. nil)
  807. if err != nil {
  808. return fmt.Errorf("error preparing tunneled URL request: %s", err)
  809. }
  810. response, err := httpClient.Do(request)
  811. if err != nil {
  812. return fmt.Errorf("error sending tunneled URL request: %s", err)
  813. }
  814. defer response.Body.Close()
  815. body, err := ioutil.ReadAll(response.Body)
  816. if err != nil {
  817. return fmt.Errorf("error reading tunneled URL response: %s", err)
  818. }
  819. if !checkResponse(string(body)) {
  820. return fmt.Errorf("unexpected tunneled URL response")
  821. }
  822. return nil
  823. }
  824. err = fetchWithRetries("tunneled URL request", fetchUsingURLProxyTunneled)
  825. if err != nil {
  826. return err
  827. }
  828. return nil
  829. }
  830. func useTunnel(t *testing.T, httpProxyPort int) {
  831. // No action on errors as the tunnel is expected to fail sometimes
  832. testUrl := "https://psiphon3.com"
  833. roundTripTimeout := 1 * time.Second
  834. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  835. if err != nil {
  836. return
  837. }
  838. httpClient := &http.Client{
  839. Transport: &http.Transport{
  840. Proxy: http.ProxyURL(proxyUrl),
  841. },
  842. Timeout: roundTripTimeout,
  843. }
  844. response, err := httpClient.Get(testUrl)
  845. if err != nil {
  846. return
  847. }
  848. response.Body.Close()
  849. }
  850. // Note: Valid values for disruptorMaxConnectionBytes depend on the production
  851. // network; for example, the size of the remote server list resource must exceed
  852. // disruptorMaxConnectionBytes or else TestUntunneledResumableFetchRemoteServerList
  853. // will fail since no retries are required. But if disruptorMaxConnectionBytes is
  854. // too small, the test will take longer to run since more retries are necessary.
  855. //
  856. // Tests such as TestUntunneledResumableFetchRemoteServerList could be rewritten to
  857. // use mock components (for example, see TestObfuscatedRemoteServerLists); however
  858. // these test in controller_test serve the dual purpose of ensuring that tunnel
  859. // core works with the production network.
  860. //
  861. // TODO: set disruptorMaxConnectionBytes (and disruptorMaxConnectionTime) dynamically,
  862. // based on current production network configuration?
  863. const disruptorProxyAddress = "127.0.0.1:2160"
  864. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  865. const disruptorMaxConnectionBytes = 250000
  866. const disruptorMaxConnectionTime = 10 * time.Second
  867. func initDisruptor() {
  868. go func() {
  869. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  870. if err != nil {
  871. fmt.Printf("disruptor proxy listen error: %s\n", err)
  872. return
  873. }
  874. for {
  875. localConn, err := listener.AcceptSocks()
  876. if err != nil {
  877. fmt.Printf("disruptor proxy accept error: %s\n", err)
  878. return
  879. }
  880. go func() {
  881. defer localConn.Close()
  882. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  883. if err != nil {
  884. // TODO: log "err" without logging server IPs
  885. fmt.Printf("disruptor proxy dial error\n")
  886. return
  887. }
  888. defer remoteConn.Close()
  889. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  890. if err != nil {
  891. fmt.Printf("disruptor proxy grant error: %s\n", err)
  892. return
  893. }
  894. // Cut connection after disruptorMaxConnectionTime
  895. time.AfterFunc(disruptorMaxConnectionTime, func() {
  896. localConn.Close()
  897. remoteConn.Close()
  898. })
  899. // Relay connection, but only up to disruptorMaxConnectionBytes
  900. waitGroup := new(sync.WaitGroup)
  901. waitGroup.Add(1)
  902. go func() {
  903. defer waitGroup.Done()
  904. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  905. localConn.Close()
  906. remoteConn.Close()
  907. }()
  908. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  909. localConn.Close()
  910. remoteConn.Close()
  911. waitGroup.Wait()
  912. }()
  913. }
  914. }()
  915. }
  916. const upstreamProxyURL = "http://127.0.0.1:2161"
  917. var upstreamProxyCustomHeaders = map[string][]string{"X-Test-Header-Name": {"test-header-value1", "test-header-value2"}}
  918. func hasExpectedCustomHeaders(h http.Header) bool {
  919. for name, values := range upstreamProxyCustomHeaders {
  920. if h[name] == nil {
  921. return false
  922. }
  923. // Order may not be the same
  924. for _, value := range values {
  925. if !common.Contains(h[name], value) {
  926. return false
  927. }
  928. }
  929. }
  930. return true
  931. }
  932. func initUpstreamProxy() {
  933. go func() {
  934. proxy := goproxy.NewProxyHttpServer()
  935. proxy.Logger = log.New(ioutil.Discard, "", 0)
  936. proxy.OnRequest().DoFunc(
  937. func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
  938. if !hasExpectedCustomHeaders(r.Header) {
  939. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  940. return nil, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusUnauthorized, "")
  941. }
  942. return r, nil
  943. })
  944. proxy.OnRequest().HandleConnectFunc(
  945. func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) {
  946. if !hasExpectedCustomHeaders(ctx.Req.Header) {
  947. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  948. return goproxy.RejectConnect, host
  949. }
  950. return goproxy.OkConnect, host
  951. })
  952. err := http.ListenAndServe("127.0.0.1:2161", proxy)
  953. if err != nil {
  954. fmt.Printf("upstream proxy failed: %s\n", err)
  955. }
  956. }()
  957. // TODO: wait until listener is active?
  958. }