controller_test.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "context"
  22. "encoding/json"
  23. "flag"
  24. "fmt"
  25. "io"
  26. "io/ioutil"
  27. "log"
  28. "net"
  29. "net/http"
  30. "net/url"
  31. "os"
  32. "path/filepath"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "testing"
  37. "time"
  38. "github.com/Psiphon-Inc/goarista/monotime"
  39. socks "github.com/Psiphon-Inc/goptlib"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  43. "github.com/elazarl/goproxy"
  44. )
  45. var testDataDirName string
  46. func TestMain(m *testing.M) {
  47. flag.Parse()
  48. var err error
  49. testDataDirName, err = ioutil.TempDir("", "psiphon-controller-test")
  50. if err != nil {
  51. fmt.Printf("TempDir failed: %s\n", err)
  52. os.Exit(1)
  53. }
  54. defer os.RemoveAll(testDataDirName)
  55. os.Remove(filepath.Join(testDataDirName, DATA_STORE_FILENAME))
  56. SetEmitDiagnosticNotices(true)
  57. initDisruptor()
  58. initUpstreamProxy()
  59. os.Exit(m.Run())
  60. }
  61. // Test case notes/limitations/dependencies:
  62. //
  63. // * Untunneled upgrade tests must execute before
  64. // the other tests to ensure no tunnel is established.
  65. // We need a way to reset the datastore after it's been
  66. // initialized in order to to clear out its data entries
  67. // and be able to arbitrarily order the tests.
  68. //
  69. // * The resumable download tests using disruptNetwork
  70. // depend on the download object being larger than the
  71. // disruptorMax limits so that the disruptor will actually
  72. // interrupt the first download attempt. Specifically, the
  73. // upgrade and remote server list at the URLs specified in
  74. // controller_test.config.enc.
  75. //
  76. // * The protocol tests assume there is at least one server
  77. // supporting each protocol in the server list at the URL
  78. // specified in controller_test.config.enc, and that these
  79. // servers are not overloaded.
  80. //
  81. // * fetchAndVerifyWebsite depends on the target URL being
  82. // available and responding.
  83. //
  84. func TestUntunneledUpgradeDownload(t *testing.T) {
  85. controllerRun(t,
  86. &controllerRunConfig{
  87. expectNoServerEntries: true,
  88. protocol: "",
  89. clientIsLatestVersion: false,
  90. disableUntunneledUpgrade: false,
  91. disableEstablishing: true,
  92. disableApi: false,
  93. tunnelPoolSize: 1,
  94. useUpstreamProxy: false,
  95. disruptNetwork: false,
  96. transformHostNames: false,
  97. runDuration: 0,
  98. })
  99. }
  100. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  101. controllerRun(t,
  102. &controllerRunConfig{
  103. expectNoServerEntries: true,
  104. protocol: "",
  105. clientIsLatestVersion: false,
  106. disableUntunneledUpgrade: false,
  107. disableEstablishing: true,
  108. disableApi: false,
  109. tunnelPoolSize: 1,
  110. useUpstreamProxy: false,
  111. disruptNetwork: true,
  112. transformHostNames: false,
  113. runDuration: 0,
  114. })
  115. }
  116. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  117. controllerRun(t,
  118. &controllerRunConfig{
  119. expectNoServerEntries: true,
  120. protocol: "",
  121. clientIsLatestVersion: true,
  122. disableUntunneledUpgrade: false,
  123. disableEstablishing: true,
  124. disableApi: false,
  125. tunnelPoolSize: 1,
  126. useUpstreamProxy: false,
  127. disruptNetwork: false,
  128. transformHostNames: false,
  129. runDuration: 0,
  130. })
  131. }
  132. func TestUntunneledResumableFetchRemoteServerList(t *testing.T) {
  133. controllerRun(t,
  134. &controllerRunConfig{
  135. expectNoServerEntries: true,
  136. protocol: "",
  137. clientIsLatestVersion: true,
  138. disableUntunneledUpgrade: false,
  139. disableEstablishing: false,
  140. disableApi: false,
  141. tunnelPoolSize: 1,
  142. useUpstreamProxy: false,
  143. disruptNetwork: true,
  144. transformHostNames: false,
  145. runDuration: 0,
  146. })
  147. }
  148. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  149. controllerRun(t,
  150. &controllerRunConfig{
  151. expectNoServerEntries: false,
  152. protocol: "",
  153. clientIsLatestVersion: true,
  154. disableUntunneledUpgrade: true,
  155. disableEstablishing: false,
  156. disableApi: false,
  157. tunnelPoolSize: 1,
  158. useUpstreamProxy: false,
  159. disruptNetwork: false,
  160. transformHostNames: false,
  161. runDuration: 0,
  162. })
  163. }
  164. func TestImpairedProtocols(t *testing.T) {
  165. // This test sets a tunnelPoolSize of 40 and runs
  166. // the session for 1 minute with network disruption
  167. // on. All 40 tunnels being disrupted every 10
  168. // seconds (followed by ssh keep alive probe timeout)
  169. // should be sufficient to trigger at least one
  170. // impaired protocol classification.
  171. controllerRun(t,
  172. &controllerRunConfig{
  173. expectNoServerEntries: false,
  174. protocol: "",
  175. clientIsLatestVersion: true,
  176. disableUntunneledUpgrade: true,
  177. disableEstablishing: false,
  178. disableApi: false,
  179. tunnelPoolSize: 40,
  180. useUpstreamProxy: false,
  181. disruptNetwork: true,
  182. transformHostNames: false,
  183. runDuration: 1 * time.Minute,
  184. })
  185. }
  186. func TestSSH(t *testing.T) {
  187. controllerRun(t,
  188. &controllerRunConfig{
  189. expectNoServerEntries: false,
  190. protocol: protocol.TUNNEL_PROTOCOL_SSH,
  191. clientIsLatestVersion: false,
  192. disableUntunneledUpgrade: true,
  193. disableEstablishing: false,
  194. disableApi: false,
  195. tunnelPoolSize: 1,
  196. useUpstreamProxy: false,
  197. disruptNetwork: false,
  198. transformHostNames: false,
  199. runDuration: 0,
  200. })
  201. }
  202. func TestObfuscatedSSH(t *testing.T) {
  203. controllerRun(t,
  204. &controllerRunConfig{
  205. expectNoServerEntries: false,
  206. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  207. clientIsLatestVersion: false,
  208. disableUntunneledUpgrade: true,
  209. disableEstablishing: false,
  210. disableApi: false,
  211. tunnelPoolSize: 1,
  212. useUpstreamProxy: false,
  213. disruptNetwork: false,
  214. transformHostNames: false,
  215. runDuration: 0,
  216. })
  217. }
  218. func TestUnfrontedMeek(t *testing.T) {
  219. controllerRun(t,
  220. &controllerRunConfig{
  221. expectNoServerEntries: false,
  222. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  223. clientIsLatestVersion: false,
  224. disableUntunneledUpgrade: true,
  225. disableEstablishing: false,
  226. disableApi: false,
  227. tunnelPoolSize: 1,
  228. useUpstreamProxy: false,
  229. disruptNetwork: false,
  230. transformHostNames: false,
  231. runDuration: 0,
  232. })
  233. }
  234. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  235. controllerRun(t,
  236. &controllerRunConfig{
  237. expectNoServerEntries: false,
  238. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  239. clientIsLatestVersion: true,
  240. disableUntunneledUpgrade: true,
  241. disableEstablishing: false,
  242. disableApi: false,
  243. tunnelPoolSize: 1,
  244. useUpstreamProxy: false,
  245. disruptNetwork: false,
  246. transformHostNames: true,
  247. runDuration: 0,
  248. })
  249. }
  250. func TestFrontedMeek(t *testing.T) {
  251. controllerRun(t,
  252. &controllerRunConfig{
  253. expectNoServerEntries: false,
  254. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  255. clientIsLatestVersion: false,
  256. disableUntunneledUpgrade: true,
  257. disableEstablishing: false,
  258. disableApi: false,
  259. tunnelPoolSize: 1,
  260. useUpstreamProxy: false,
  261. disruptNetwork: false,
  262. transformHostNames: false,
  263. runDuration: 0,
  264. })
  265. }
  266. func TestFrontedMeekWithTransformer(t *testing.T) {
  267. controllerRun(t,
  268. &controllerRunConfig{
  269. expectNoServerEntries: false,
  270. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  271. clientIsLatestVersion: true,
  272. disableUntunneledUpgrade: true,
  273. disableEstablishing: false,
  274. disableApi: false,
  275. tunnelPoolSize: 1,
  276. useUpstreamProxy: false,
  277. disruptNetwork: false,
  278. transformHostNames: true,
  279. runDuration: 0,
  280. })
  281. }
  282. func TestFrontedMeekHTTP(t *testing.T) {
  283. controllerRun(t,
  284. &controllerRunConfig{
  285. expectNoServerEntries: false,
  286. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  287. clientIsLatestVersion: true,
  288. disableUntunneledUpgrade: true,
  289. disableEstablishing: false,
  290. disableApi: false,
  291. tunnelPoolSize: 1,
  292. useUpstreamProxy: false,
  293. disruptNetwork: false,
  294. transformHostNames: false,
  295. runDuration: 0,
  296. })
  297. }
  298. func TestUnfrontedMeekHTTPS(t *testing.T) {
  299. controllerRun(t,
  300. &controllerRunConfig{
  301. expectNoServerEntries: false,
  302. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  303. clientIsLatestVersion: false,
  304. disableUntunneledUpgrade: true,
  305. disableEstablishing: false,
  306. disableApi: false,
  307. tunnelPoolSize: 1,
  308. useUpstreamProxy: false,
  309. disruptNetwork: false,
  310. transformHostNames: false,
  311. runDuration: 0,
  312. })
  313. }
  314. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  315. controllerRun(t,
  316. &controllerRunConfig{
  317. expectNoServerEntries: false,
  318. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  319. clientIsLatestVersion: true,
  320. disableUntunneledUpgrade: true,
  321. disableEstablishing: false,
  322. disableApi: false,
  323. tunnelPoolSize: 1,
  324. useUpstreamProxy: false,
  325. disruptNetwork: false,
  326. transformHostNames: true,
  327. runDuration: 0,
  328. })
  329. }
  330. func TestDisabledApi(t *testing.T) {
  331. controllerRun(t,
  332. &controllerRunConfig{
  333. expectNoServerEntries: false,
  334. protocol: "",
  335. clientIsLatestVersion: true,
  336. disableUntunneledUpgrade: true,
  337. disableEstablishing: false,
  338. disableApi: true,
  339. tunnelPoolSize: 1,
  340. useUpstreamProxy: false,
  341. disruptNetwork: false,
  342. transformHostNames: false,
  343. runDuration: 0,
  344. })
  345. }
  346. func TestObfuscatedSSHWithUpstreamProxy(t *testing.T) {
  347. controllerRun(t,
  348. &controllerRunConfig{
  349. expectNoServerEntries: false,
  350. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  351. clientIsLatestVersion: false,
  352. disableUntunneledUpgrade: true,
  353. disableEstablishing: false,
  354. disableApi: false,
  355. tunnelPoolSize: 1,
  356. useUpstreamProxy: true,
  357. disruptNetwork: false,
  358. transformHostNames: false,
  359. runDuration: 0,
  360. })
  361. }
  362. func TestUnfrontedMeekWithUpstreamProxy(t *testing.T) {
  363. controllerRun(t,
  364. &controllerRunConfig{
  365. expectNoServerEntries: false,
  366. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  367. clientIsLatestVersion: false,
  368. disableUntunneledUpgrade: true,
  369. disableEstablishing: false,
  370. disableApi: false,
  371. tunnelPoolSize: 1,
  372. useUpstreamProxy: true,
  373. disruptNetwork: false,
  374. transformHostNames: false,
  375. runDuration: 0,
  376. })
  377. }
  378. func TestUnfrontedMeekHTTPSWithUpstreamProxy(t *testing.T) {
  379. controllerRun(t,
  380. &controllerRunConfig{
  381. expectNoServerEntries: false,
  382. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  383. clientIsLatestVersion: false,
  384. disableUntunneledUpgrade: true,
  385. disableEstablishing: false,
  386. disableApi: false,
  387. tunnelPoolSize: 1,
  388. useUpstreamProxy: true,
  389. disruptNetwork: false,
  390. transformHostNames: false,
  391. runDuration: 0,
  392. })
  393. }
  394. type controllerRunConfig struct {
  395. expectNoServerEntries bool
  396. protocol string
  397. clientIsLatestVersion bool
  398. disableUntunneledUpgrade bool
  399. disableEstablishing bool
  400. disableApi bool
  401. tunnelPoolSize int
  402. useUpstreamProxy bool
  403. disruptNetwork bool
  404. transformHostNames bool
  405. runDuration time.Duration
  406. }
  407. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  408. configJSON, err := ioutil.ReadFile("controller_test.config")
  409. if err != nil {
  410. // Skip, don't fail, if config file is not present
  411. t.Skipf("error loading configuration file: %s", err)
  412. }
  413. // These fields must be filled in before calling LoadConfig
  414. var modifyConfig map[string]interface{}
  415. json.Unmarshal(configJSON, &modifyConfig)
  416. modifyConfig["DataStoreDirectory"] = testDataDirName
  417. modifyConfig["RemoteServerListDownloadFilename"] = filepath.Join(testDataDirName, "server_list_compressed")
  418. modifyConfig["UpgradeDownloadFilename"] = filepath.Join(testDataDirName, "upgrade")
  419. if runConfig.protocol != "" {
  420. modifyConfig["TunnelProtocols"] = protocol.TunnelProtocols{runConfig.protocol}
  421. }
  422. // Override client retry throttle values to speed up automated
  423. // tests and ensure tests complete within fixed deadlines.
  424. modifyConfig["FetchRemoteServerListRetryPeriodMilliseconds"] = 250
  425. modifyConfig["FetchUpgradeRetryPeriodMilliseconds"] = 250
  426. modifyConfig["EstablishTunnelPausePeriodSeconds"] = 1
  427. if runConfig.disableUntunneledUpgrade {
  428. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  429. modifyConfig["UpgradeDownloadClientVersionHeader"] = "invalid-value"
  430. }
  431. configJSON, _ = json.Marshal(modifyConfig)
  432. config, err := LoadConfig(configJSON)
  433. if err != nil {
  434. t.Fatalf("error processing configuration file: %s", err)
  435. }
  436. // The following config values are not client parameters can be set directly.
  437. if runConfig.clientIsLatestVersion {
  438. config.ClientVersion = "999999999"
  439. }
  440. if runConfig.disableEstablishing {
  441. // Clear remote server list so tunnel cannot be established.
  442. // TODO: also delete all server entries in the datastore.
  443. config.DisableRemoteServerListFetcher = true
  444. }
  445. if runConfig.disableApi {
  446. config.DisableApi = true
  447. }
  448. config.TunnelPoolSize = runConfig.tunnelPoolSize
  449. if runConfig.useUpstreamProxy && runConfig.disruptNetwork {
  450. t.Fatalf("cannot use multiple upstream proxies")
  451. }
  452. if runConfig.disruptNetwork {
  453. config.UpstreamProxyURL = disruptorProxyURL
  454. } else if runConfig.useUpstreamProxy {
  455. config.UpstreamProxyURL = upstreamProxyURL
  456. config.CustomHeaders = upstreamProxyCustomHeaders
  457. }
  458. // Enable tactics requests. This will passively exercise the code
  459. // paths. server_test runs a more comprehensive test that checks
  460. // that the tactics request succeeds.
  461. config.NetworkIDGetter = &testNetworkGetter{}
  462. // The following values can only be applied through client parameters.
  463. // TODO: a successful tactics request can reset these parameters.
  464. applyParameters := make(map[string]interface{})
  465. if runConfig.transformHostNames {
  466. applyParameters[parameters.TransformHostNameProbability] = 1.0
  467. } else {
  468. applyParameters[parameters.TransformHostNameProbability] = 0.0
  469. }
  470. err = config.SetClientParameters("", true, applyParameters)
  471. if err != nil {
  472. t.Fatalf("SetClientParameters failed: %s", err)
  473. }
  474. os.Remove(config.UpgradeDownloadFilename)
  475. os.Remove(config.RemoteServerListDownloadFilename)
  476. err = InitDataStore(config)
  477. if err != nil {
  478. t.Fatalf("error initializing datastore: %s", err)
  479. }
  480. serverEntryCount := CountServerEntries("", nil)
  481. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  482. // TODO: replace expectNoServerEntries with resetServerEntries
  483. // so tests can run in arbitrary order
  484. t.Fatalf("unexpected server entries")
  485. }
  486. controller, err := NewController(config)
  487. if err != nil {
  488. t.Fatalf("error creating controller: %s", err)
  489. }
  490. // Monitor notices for "Tunnels" with count > 1, the
  491. // indication of tunnel establishment success.
  492. // Also record the selected HTTP proxy port to use
  493. // when fetching websites through the tunnel.
  494. httpProxyPort := 0
  495. tunnelEstablished := make(chan struct{}, 1)
  496. upgradeDownloaded := make(chan struct{}, 1)
  497. remoteServerListDownloaded := make(chan struct{}, 1)
  498. confirmedLatestVersion := make(chan struct{}, 1)
  499. var clientUpgradeDownloadedBytesCount int32
  500. var remoteServerListDownloadedBytesCount int32
  501. var impairedProtocolCount int32
  502. var impairedProtocolClassification = struct {
  503. sync.RWMutex
  504. classification map[string]int
  505. }{classification: make(map[string]int)}
  506. SetNoticeWriter(NewNoticeReceiver(
  507. func(notice []byte) {
  508. // TODO: log notices without logging server IPs:
  509. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  510. noticeType, payload, err := GetNotice(notice)
  511. if err != nil {
  512. return
  513. }
  514. switch noticeType {
  515. case "ListeningHttpProxyPort":
  516. httpProxyPort = int(payload["port"].(float64))
  517. case "ConnectingServer":
  518. serverProtocol := payload["protocol"].(string)
  519. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  520. // TODO: wrong goroutine for t.FatalNow()
  521. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  522. }
  523. case "Tunnels":
  524. count := int(payload["count"].(float64))
  525. if count > 0 {
  526. if runConfig.disableEstablishing {
  527. // TODO: wrong goroutine for t.FatalNow()
  528. t.Fatalf("tunnel established unexpectedly")
  529. } else {
  530. select {
  531. case tunnelEstablished <- *new(struct{}):
  532. default:
  533. }
  534. }
  535. }
  536. case "ClientUpgradeDownloadedBytes":
  537. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  538. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  539. case "ClientUpgradeDownloaded":
  540. select {
  541. case upgradeDownloaded <- *new(struct{}):
  542. default:
  543. }
  544. case "ClientIsLatestVersion":
  545. select {
  546. case confirmedLatestVersion <- *new(struct{}):
  547. default:
  548. }
  549. case "RemoteServerListResourceDownloadedBytes":
  550. url := payload["url"].(string)
  551. if url == config.RemoteServerListUrl {
  552. t.Logf("RemoteServerListResourceDownloadedBytes: %d", int(payload["bytes"].(float64)))
  553. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  554. }
  555. case "RemoteServerListResourceDownloaded":
  556. url := payload["url"].(string)
  557. if url == config.RemoteServerListUrl {
  558. t.Logf("RemoteServerListResourceDownloaded")
  559. select {
  560. case remoteServerListDownloaded <- *new(struct{}):
  561. default:
  562. }
  563. }
  564. case "ImpairedProtocolClassification":
  565. classification := payload["classification"].(map[string]interface{})
  566. impairedProtocolClassification.Lock()
  567. impairedProtocolClassification.classification = make(map[string]int)
  568. for k, v := range classification {
  569. count := int(v.(float64))
  570. if count >= config.clientParameters.Get().Int(parameters.ImpairedProtocolClassificationThreshold) {
  571. atomic.AddInt32(&impairedProtocolCount, 1)
  572. }
  573. impairedProtocolClassification.classification[k] = count
  574. }
  575. impairedProtocolClassification.Unlock()
  576. case "ActiveTunnel":
  577. serverProtocol := payload["protocol"].(string)
  578. classification := make(map[string]int)
  579. impairedProtocolClassification.RLock()
  580. for k, v := range impairedProtocolClassification.classification {
  581. classification[k] = v
  582. }
  583. impairedProtocolClassification.RUnlock()
  584. count, ok := classification[serverProtocol]
  585. if ok && count >= config.clientParameters.Get().Int(parameters.ImpairedProtocolClassificationThreshold) {
  586. // TODO: Fix this test case. Use of TunnelPoolSize breaks this
  587. // case, as many tunnel establishments are occurring in parallel,
  588. // and it can happen that a protocol is classified as impaired
  589. // while a tunnel with that protocol is established and set
  590. // active.
  591. // *not* t.Fatalf
  592. t.Logf("unexpected tunnel using impaired protocol: %s, %+v",
  593. serverProtocol, classification)
  594. }
  595. }
  596. }))
  597. // Run controller, which establishes tunnels
  598. ctx, cancelFunc := context.WithCancel(context.Background())
  599. controllerWaitGroup := new(sync.WaitGroup)
  600. controllerWaitGroup.Add(1)
  601. go func() {
  602. defer controllerWaitGroup.Done()
  603. controller.Run(ctx)
  604. }()
  605. defer func() {
  606. // Test: shutdown must complete within 20 seconds
  607. cancelFunc()
  608. shutdownTimeout := time.NewTimer(20 * time.Second)
  609. shutdownOk := make(chan struct{}, 1)
  610. go func() {
  611. controllerWaitGroup.Wait()
  612. shutdownOk <- *new(struct{})
  613. }()
  614. select {
  615. case <-shutdownOk:
  616. case <-shutdownTimeout.C:
  617. t.Fatalf("controller shutdown timeout exceeded")
  618. }
  619. }()
  620. if !runConfig.disableEstablishing {
  621. // Test: tunnel must be established within 120 seconds
  622. establishTimeout := time.NewTimer(120 * time.Second)
  623. select {
  624. case <-tunnelEstablished:
  625. case <-establishTimeout.C:
  626. t.Fatalf("tunnel establish timeout exceeded")
  627. }
  628. // Test: if starting with no server entries, a fetch remote
  629. // server list must have succeeded. With disruptNetwork, the
  630. // fetch must have been resumed at least once.
  631. if serverEntryCount == 0 {
  632. select {
  633. case <-remoteServerListDownloaded:
  634. default:
  635. t.Fatalf("expected remote server list downloaded")
  636. }
  637. if runConfig.disruptNetwork {
  638. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  639. if count <= 1 {
  640. t.Fatalf("unexpected remote server list download progress: %d", count)
  641. }
  642. }
  643. }
  644. // Cannot establish port forwards in DisableApi mode
  645. if !runConfig.disableApi {
  646. // Test: fetch website through tunnel
  647. // Allow for known race condition described in NewHttpProxy():
  648. time.Sleep(1 * time.Second)
  649. if !runConfig.disruptNetwork {
  650. fetchAndVerifyWebsite(t, httpProxyPort)
  651. }
  652. // Test: run for duration, periodically using the tunnel to
  653. // ensure failed tunnel detection, and ultimately hitting
  654. // impaired protocol checks.
  655. startTime := monotime.Now()
  656. for {
  657. time.Sleep(1 * time.Second)
  658. useTunnel(t, httpProxyPort)
  659. if startTime.Add(runConfig.runDuration).Before(monotime.Now()) {
  660. break
  661. }
  662. }
  663. // Test: with disruptNetwork, impaired protocols should be exercised
  664. if runConfig.runDuration > 0 && runConfig.disruptNetwork {
  665. count := atomic.LoadInt32(&impairedProtocolCount)
  666. if count <= 0 {
  667. t.Fatalf("unexpected impaired protocol count: %d", count)
  668. } else {
  669. impairedProtocolClassification.RLock()
  670. t.Logf("impaired protocol classification: %+v",
  671. impairedProtocolClassification.classification)
  672. impairedProtocolClassification.RUnlock()
  673. }
  674. }
  675. }
  676. }
  677. // Test: upgrade check/download must be downloaded within 180 seconds
  678. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  679. if expectUpgrade {
  680. upgradeTimeout := time.NewTimer(120 * time.Second)
  681. select {
  682. case <-upgradeDownloaded:
  683. // TODO: verify downloaded file
  684. if runConfig.clientIsLatestVersion {
  685. t.Fatalf("upgrade downloaded unexpectedly")
  686. }
  687. // Test: with disruptNetwork, must be multiple download progress notices
  688. if runConfig.disruptNetwork {
  689. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  690. if count <= 1 {
  691. t.Fatalf("unexpected upgrade download progress: %d", count)
  692. }
  693. }
  694. case <-confirmedLatestVersion:
  695. if !runConfig.clientIsLatestVersion {
  696. t.Fatalf("confirmed latest version unexpectedly")
  697. }
  698. case <-upgradeTimeout.C:
  699. t.Fatalf("upgrade download timeout exceeded")
  700. }
  701. }
  702. }
  703. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) error {
  704. testUrl := "https://psiphon.ca"
  705. roundTripTimeout := 30 * time.Second
  706. expectedResponseContains := "Psiphon"
  707. checkResponse := func(responseBody string) bool {
  708. return strings.Contains(responseBody, expectedResponseContains)
  709. }
  710. // Retries are made to compensate for intermittent failures due
  711. // to external network conditions.
  712. fetchWithRetries := func(fetchName string, fetchFunc func() error) error {
  713. retryCount := 6
  714. retryDelay := 5 * time.Second
  715. var err error
  716. for i := 0; i < retryCount; i++ {
  717. err = fetchFunc()
  718. if err == nil || i == retryCount-1 {
  719. break
  720. }
  721. time.Sleep(retryDelay)
  722. t.Logf("retrying %s...", fetchName)
  723. }
  724. return err
  725. }
  726. // Test: use HTTP proxy
  727. fetchUsingHTTPProxy := func() error {
  728. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  729. if err != nil {
  730. return fmt.Errorf("error initializing proxied HTTP request: %s", err)
  731. }
  732. httpTransport := &http.Transport{
  733. Proxy: http.ProxyURL(proxyUrl),
  734. DisableKeepAlives: true,
  735. }
  736. httpClient := &http.Client{
  737. Transport: httpTransport,
  738. Timeout: roundTripTimeout,
  739. }
  740. request, err := http.NewRequest("GET", testUrl, nil)
  741. if err != nil {
  742. return fmt.Errorf("error preparing proxied HTTP request: %s", err)
  743. }
  744. response, err := httpClient.Do(request)
  745. if err != nil {
  746. return fmt.Errorf("error sending proxied HTTP request: %s", err)
  747. }
  748. defer response.Body.Close()
  749. body, err := ioutil.ReadAll(response.Body)
  750. if err != nil {
  751. return fmt.Errorf("error reading proxied HTTP response: %s", err)
  752. }
  753. if !checkResponse(string(body)) {
  754. return fmt.Errorf("unexpected proxied HTTP response")
  755. }
  756. return nil
  757. }
  758. err := fetchWithRetries("proxied HTTP request", fetchUsingHTTPProxy)
  759. if err != nil {
  760. return err
  761. }
  762. // Delay before requesting from external service again
  763. time.Sleep(1 * time.Second)
  764. // Test: use direct URL proxy
  765. fetchUsingURLProxyDirect := func() error {
  766. httpTransport := &http.Transport{
  767. DisableKeepAlives: true,
  768. }
  769. httpClient := &http.Client{
  770. Transport: httpTransport,
  771. Timeout: roundTripTimeout,
  772. }
  773. request, err := http.NewRequest(
  774. "GET",
  775. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  776. httpProxyPort, url.QueryEscape(testUrl)),
  777. nil)
  778. if err != nil {
  779. return fmt.Errorf("error preparing direct URL request: %s", err)
  780. }
  781. response, err := httpClient.Do(request)
  782. if err != nil {
  783. return fmt.Errorf("error sending direct URL request: %s", err)
  784. }
  785. defer response.Body.Close()
  786. body, err := ioutil.ReadAll(response.Body)
  787. if err != nil {
  788. return fmt.Errorf("error reading direct URL response: %s", err)
  789. }
  790. if !checkResponse(string(body)) {
  791. return fmt.Errorf("unexpected direct URL response")
  792. }
  793. return nil
  794. }
  795. err = fetchWithRetries("direct URL request", fetchUsingURLProxyDirect)
  796. if err != nil {
  797. return err
  798. }
  799. // Delay before requesting from external service again
  800. time.Sleep(1 * time.Second)
  801. // Test: use tunneled URL proxy
  802. fetchUsingURLProxyTunneled := func() error {
  803. httpTransport := &http.Transport{
  804. DisableKeepAlives: true,
  805. }
  806. httpClient := &http.Client{
  807. Transport: httpTransport,
  808. Timeout: roundTripTimeout,
  809. }
  810. request, err := http.NewRequest(
  811. "GET",
  812. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  813. httpProxyPort, url.QueryEscape(testUrl)),
  814. nil)
  815. if err != nil {
  816. return fmt.Errorf("error preparing tunneled URL request: %s", err)
  817. }
  818. response, err := httpClient.Do(request)
  819. if err != nil {
  820. return fmt.Errorf("error sending tunneled URL request: %s", err)
  821. }
  822. defer response.Body.Close()
  823. body, err := ioutil.ReadAll(response.Body)
  824. if err != nil {
  825. return fmt.Errorf("error reading tunneled URL response: %s", err)
  826. }
  827. if !checkResponse(string(body)) {
  828. return fmt.Errorf("unexpected tunneled URL response")
  829. }
  830. return nil
  831. }
  832. err = fetchWithRetries("tunneled URL request", fetchUsingURLProxyTunneled)
  833. if err != nil {
  834. return err
  835. }
  836. return nil
  837. }
  838. func useTunnel(t *testing.T, httpProxyPort int) {
  839. // No action on errors as the tunnel is expected to fail sometimes
  840. testUrl := "https://psiphon3.com"
  841. roundTripTimeout := 1 * time.Second
  842. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  843. if err != nil {
  844. return
  845. }
  846. httpClient := &http.Client{
  847. Transport: &http.Transport{
  848. Proxy: http.ProxyURL(proxyUrl),
  849. },
  850. Timeout: roundTripTimeout,
  851. }
  852. response, err := httpClient.Get(testUrl)
  853. if err != nil {
  854. return
  855. }
  856. response.Body.Close()
  857. }
  858. // Note: Valid values for disruptorMaxConnectionBytes depend on the production
  859. // network; for example, the size of the remote server list resource must exceed
  860. // disruptorMaxConnectionBytes or else TestUntunneledResumableFetchRemoteServerList
  861. // will fail since no retries are required. But if disruptorMaxConnectionBytes is
  862. // too small, the test will take longer to run since more retries are necessary.
  863. //
  864. // Tests such as TestUntunneledResumableFetchRemoteServerList could be rewritten to
  865. // use mock components (for example, see TestObfuscatedRemoteServerLists); however
  866. // these test in controller_test serve the dual purpose of ensuring that tunnel
  867. // core works with the production network.
  868. //
  869. // TODO: set disruptorMaxConnectionBytes (and disruptorMaxConnectionTime) dynamically,
  870. // based on current production network configuration?
  871. const disruptorProxyAddress = "127.0.0.1:2160"
  872. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  873. const disruptorMaxConnectionBytes = 250000
  874. const disruptorMaxConnectionTime = 10 * time.Second
  875. func initDisruptor() {
  876. go func() {
  877. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  878. if err != nil {
  879. fmt.Printf("disruptor proxy listen error: %s\n", err)
  880. return
  881. }
  882. for {
  883. localConn, err := listener.AcceptSocks()
  884. if err != nil {
  885. if e, ok := err.(net.Error); ok && e.Temporary() {
  886. fmt.Printf("disruptor proxy temporary accept error: %s\n", err)
  887. continue
  888. }
  889. fmt.Printf("disruptor proxy accept error: %s\n", err)
  890. return
  891. }
  892. go func() {
  893. defer localConn.Close()
  894. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  895. if err != nil {
  896. // TODO: log "err" without logging server IPs
  897. fmt.Printf("disruptor proxy dial error\n")
  898. return
  899. }
  900. defer remoteConn.Close()
  901. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  902. if err != nil {
  903. fmt.Printf("disruptor proxy grant error: %s\n", err)
  904. return
  905. }
  906. // Cut connection after disruptorMaxConnectionTime
  907. time.AfterFunc(disruptorMaxConnectionTime, func() {
  908. localConn.Close()
  909. remoteConn.Close()
  910. })
  911. // Relay connection, but only up to disruptorMaxConnectionBytes
  912. waitGroup := new(sync.WaitGroup)
  913. waitGroup.Add(1)
  914. go func() {
  915. defer waitGroup.Done()
  916. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  917. localConn.Close()
  918. remoteConn.Close()
  919. }()
  920. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  921. localConn.Close()
  922. remoteConn.Close()
  923. waitGroup.Wait()
  924. }()
  925. }
  926. }()
  927. }
  928. const upstreamProxyURL = "http://127.0.0.1:2161"
  929. var upstreamProxyCustomHeaders = map[string][]string{"X-Test-Header-Name": {"test-header-value1", "test-header-value2"}}
  930. func hasExpectedCustomHeaders(h http.Header) bool {
  931. for name, values := range upstreamProxyCustomHeaders {
  932. if h[name] == nil {
  933. return false
  934. }
  935. // Order may not be the same
  936. for _, value := range values {
  937. if !common.Contains(h[name], value) {
  938. return false
  939. }
  940. }
  941. }
  942. return true
  943. }
  944. func initUpstreamProxy() {
  945. go func() {
  946. proxy := goproxy.NewProxyHttpServer()
  947. proxy.Logger = log.New(ioutil.Discard, "", 0)
  948. proxy.OnRequest().DoFunc(
  949. func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
  950. if !hasExpectedCustomHeaders(r.Header) {
  951. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  952. return nil, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusUnauthorized, "")
  953. }
  954. return r, nil
  955. })
  956. proxy.OnRequest().HandleConnectFunc(
  957. func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) {
  958. if !hasExpectedCustomHeaders(ctx.Req.Header) {
  959. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  960. return goproxy.RejectConnect, host
  961. }
  962. return goproxy.OkConnect, host
  963. })
  964. err := http.ListenAndServe("127.0.0.1:2161", proxy)
  965. if err != nil {
  966. fmt.Printf("upstream proxy failed: %s\n", err)
  967. }
  968. }()
  969. // TODO: wait until listener is active?
  970. }
  971. type testNetworkGetter struct {
  972. }
  973. func (testNetworkGetter) GetNetworkID() string {
  974. return "NETWORK1"
  975. }