controller_test.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "context"
  22. "encoding/json"
  23. "flag"
  24. "fmt"
  25. "io"
  26. "io/ioutil"
  27. "log"
  28. "net"
  29. "net/http"
  30. "net/url"
  31. "os"
  32. "path/filepath"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "testing"
  37. "time"
  38. socks "github.com/Psiphon-Labs/goptlib"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  41. "github.com/elazarl/goproxy"
  42. )
  43. var testDataDirName string
  44. func TestMain(m *testing.M) {
  45. flag.Parse()
  46. var err error
  47. testDataDirName, err = ioutil.TempDir("", "psiphon-controller-test")
  48. if err != nil {
  49. fmt.Printf("TempDir failed: %s\n", err)
  50. os.Exit(1)
  51. }
  52. defer os.RemoveAll(testDataDirName)
  53. SetEmitDiagnosticNotices(true)
  54. initDisruptor()
  55. initUpstreamProxy()
  56. os.Exit(m.Run())
  57. }
  58. // Test case notes/limitations/dependencies:
  59. //
  60. // * Untunneled upgrade tests must execute before
  61. // the other tests to ensure no tunnel is established.
  62. // We need a way to reset the datastore after it's been
  63. // initialized in order to to clear out its data entries
  64. // and be able to arbitrarily order the tests.
  65. //
  66. // * The resumable download tests using disruptNetwork
  67. // depend on the download object being larger than the
  68. // disruptorMax limits so that the disruptor will actually
  69. // interrupt the first download attempt. Specifically, the
  70. // upgrade and remote server list at the URLs specified in
  71. // controller_test.config.enc.
  72. //
  73. // * The protocol tests assume there is at least one server
  74. // supporting each protocol in the server list at the URL
  75. // specified in controller_test.config.enc, and that these
  76. // servers are not overloaded.
  77. //
  78. // * fetchAndVerifyWebsite depends on the target URL being
  79. // available and responding.
  80. //
  81. func TestUntunneledUpgradeDownload(t *testing.T) {
  82. controllerRun(t,
  83. &controllerRunConfig{
  84. expectNoServerEntries: true,
  85. protocol: "",
  86. clientIsLatestVersion: false,
  87. disableUntunneledUpgrade: false,
  88. disableEstablishing: true,
  89. disableApi: false,
  90. tunnelPoolSize: 1,
  91. useUpstreamProxy: false,
  92. disruptNetwork: false,
  93. transformHostNames: false,
  94. useFragmentor: false,
  95. })
  96. }
  97. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  98. controllerRun(t,
  99. &controllerRunConfig{
  100. expectNoServerEntries: true,
  101. protocol: "",
  102. clientIsLatestVersion: false,
  103. disableUntunneledUpgrade: false,
  104. disableEstablishing: true,
  105. disableApi: false,
  106. tunnelPoolSize: 1,
  107. useUpstreamProxy: false,
  108. disruptNetwork: true,
  109. transformHostNames: false,
  110. useFragmentor: false,
  111. })
  112. }
  113. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  114. controllerRun(t,
  115. &controllerRunConfig{
  116. expectNoServerEntries: true,
  117. protocol: "",
  118. clientIsLatestVersion: true,
  119. disableUntunneledUpgrade: false,
  120. disableEstablishing: true,
  121. disableApi: false,
  122. tunnelPoolSize: 1,
  123. useUpstreamProxy: false,
  124. disruptNetwork: false,
  125. transformHostNames: false,
  126. useFragmentor: false,
  127. })
  128. }
  129. func TestUntunneledResumableFetchRemoteServerList(t *testing.T) {
  130. controllerRun(t,
  131. &controllerRunConfig{
  132. expectNoServerEntries: true,
  133. protocol: "",
  134. clientIsLatestVersion: true,
  135. disableUntunneledUpgrade: false,
  136. disableEstablishing: false,
  137. disableApi: false,
  138. tunnelPoolSize: 1,
  139. useUpstreamProxy: false,
  140. disruptNetwork: true,
  141. transformHostNames: false,
  142. useFragmentor: false,
  143. })
  144. }
  145. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  146. controllerRun(t,
  147. &controllerRunConfig{
  148. expectNoServerEntries: false,
  149. protocol: "",
  150. clientIsLatestVersion: true,
  151. disableUntunneledUpgrade: true,
  152. disableEstablishing: false,
  153. disableApi: false,
  154. tunnelPoolSize: 1,
  155. useUpstreamProxy: false,
  156. disruptNetwork: false,
  157. transformHostNames: false,
  158. useFragmentor: false,
  159. })
  160. }
  161. func TestSSH(t *testing.T) {
  162. controllerRun(t,
  163. &controllerRunConfig{
  164. expectNoServerEntries: false,
  165. protocol: protocol.TUNNEL_PROTOCOL_SSH,
  166. clientIsLatestVersion: false,
  167. disableUntunneledUpgrade: true,
  168. disableEstablishing: false,
  169. disableApi: false,
  170. tunnelPoolSize: 1,
  171. useUpstreamProxy: false,
  172. disruptNetwork: false,
  173. transformHostNames: false,
  174. useFragmentor: false,
  175. })
  176. }
  177. func TestObfuscatedSSH(t *testing.T) {
  178. controllerRun(t,
  179. &controllerRunConfig{
  180. expectNoServerEntries: false,
  181. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  182. clientIsLatestVersion: false,
  183. disableUntunneledUpgrade: true,
  184. disableEstablishing: false,
  185. disableApi: false,
  186. tunnelPoolSize: 1,
  187. useUpstreamProxy: false,
  188. disruptNetwork: false,
  189. transformHostNames: false,
  190. useFragmentor: false,
  191. })
  192. }
  193. func TestUnfrontedMeek(t *testing.T) {
  194. controllerRun(t,
  195. &controllerRunConfig{
  196. expectNoServerEntries: false,
  197. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  198. clientIsLatestVersion: false,
  199. disableUntunneledUpgrade: true,
  200. disableEstablishing: false,
  201. disableApi: false,
  202. tunnelPoolSize: 1,
  203. useUpstreamProxy: false,
  204. disruptNetwork: false,
  205. transformHostNames: false,
  206. useFragmentor: false,
  207. })
  208. }
  209. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  210. controllerRun(t,
  211. &controllerRunConfig{
  212. expectNoServerEntries: false,
  213. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  214. clientIsLatestVersion: true,
  215. disableUntunneledUpgrade: true,
  216. disableEstablishing: false,
  217. disableApi: false,
  218. tunnelPoolSize: 1,
  219. useUpstreamProxy: false,
  220. disruptNetwork: false,
  221. transformHostNames: true,
  222. useFragmentor: false,
  223. })
  224. }
  225. func TestFrontedMeek(t *testing.T) {
  226. controllerRun(t,
  227. &controllerRunConfig{
  228. expectNoServerEntries: false,
  229. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  230. clientIsLatestVersion: false,
  231. disableUntunneledUpgrade: true,
  232. disableEstablishing: false,
  233. disableApi: false,
  234. tunnelPoolSize: 1,
  235. useUpstreamProxy: false,
  236. disruptNetwork: false,
  237. transformHostNames: false,
  238. useFragmentor: false,
  239. })
  240. }
  241. func TestFrontedMeekWithTransformer(t *testing.T) {
  242. controllerRun(t,
  243. &controllerRunConfig{
  244. expectNoServerEntries: false,
  245. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  246. clientIsLatestVersion: true,
  247. disableUntunneledUpgrade: true,
  248. disableEstablishing: false,
  249. disableApi: false,
  250. tunnelPoolSize: 1,
  251. useUpstreamProxy: false,
  252. disruptNetwork: false,
  253. transformHostNames: true,
  254. useFragmentor: false,
  255. })
  256. }
  257. func TestFrontedMeekHTTP(t *testing.T) {
  258. controllerRun(t,
  259. &controllerRunConfig{
  260. expectNoServerEntries: false,
  261. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  262. clientIsLatestVersion: true,
  263. disableUntunneledUpgrade: true,
  264. disableEstablishing: false,
  265. disableApi: false,
  266. tunnelPoolSize: 1,
  267. useUpstreamProxy: false,
  268. disruptNetwork: false,
  269. transformHostNames: false,
  270. useFragmentor: false,
  271. })
  272. }
  273. func TestUnfrontedMeekHTTPS(t *testing.T) {
  274. controllerRun(t,
  275. &controllerRunConfig{
  276. expectNoServerEntries: false,
  277. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  278. clientIsLatestVersion: false,
  279. disableUntunneledUpgrade: true,
  280. disableEstablishing: false,
  281. disableApi: false,
  282. tunnelPoolSize: 1,
  283. useUpstreamProxy: false,
  284. disruptNetwork: false,
  285. transformHostNames: false,
  286. useFragmentor: false,
  287. })
  288. }
  289. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  290. controllerRun(t,
  291. &controllerRunConfig{
  292. expectNoServerEntries: false,
  293. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  294. clientIsLatestVersion: true,
  295. disableUntunneledUpgrade: true,
  296. disableEstablishing: false,
  297. disableApi: false,
  298. tunnelPoolSize: 1,
  299. useUpstreamProxy: false,
  300. disruptNetwork: false,
  301. transformHostNames: true,
  302. useFragmentor: false,
  303. })
  304. }
  305. func TestDisabledApi(t *testing.T) {
  306. controllerRun(t,
  307. &controllerRunConfig{
  308. expectNoServerEntries: false,
  309. protocol: "",
  310. clientIsLatestVersion: true,
  311. disableUntunneledUpgrade: true,
  312. disableEstablishing: false,
  313. disableApi: true,
  314. tunnelPoolSize: 1,
  315. useUpstreamProxy: false,
  316. disruptNetwork: false,
  317. transformHostNames: false,
  318. useFragmentor: false,
  319. })
  320. }
  321. func TestObfuscatedSSHWithUpstreamProxy(t *testing.T) {
  322. controllerRun(t,
  323. &controllerRunConfig{
  324. expectNoServerEntries: false,
  325. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  326. clientIsLatestVersion: false,
  327. disableUntunneledUpgrade: true,
  328. disableEstablishing: false,
  329. disableApi: false,
  330. tunnelPoolSize: 1,
  331. useUpstreamProxy: true,
  332. disruptNetwork: false,
  333. transformHostNames: false,
  334. useFragmentor: false,
  335. })
  336. }
  337. func TestUnfrontedMeekWithUpstreamProxy(t *testing.T) {
  338. controllerRun(t,
  339. &controllerRunConfig{
  340. expectNoServerEntries: false,
  341. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  342. clientIsLatestVersion: false,
  343. disableUntunneledUpgrade: true,
  344. disableEstablishing: false,
  345. disableApi: false,
  346. tunnelPoolSize: 1,
  347. useUpstreamProxy: true,
  348. disruptNetwork: false,
  349. transformHostNames: false,
  350. useFragmentor: false,
  351. })
  352. }
  353. func TestUnfrontedMeekHTTPSWithUpstreamProxy(t *testing.T) {
  354. controllerRun(t,
  355. &controllerRunConfig{
  356. expectNoServerEntries: false,
  357. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  358. clientIsLatestVersion: false,
  359. disableUntunneledUpgrade: true,
  360. disableEstablishing: false,
  361. disableApi: false,
  362. tunnelPoolSize: 1,
  363. useUpstreamProxy: true,
  364. disruptNetwork: false,
  365. transformHostNames: false,
  366. useFragmentor: false,
  367. })
  368. }
  369. func TestObfuscatedSSHFragmentor(t *testing.T) {
  370. controllerRun(t,
  371. &controllerRunConfig{
  372. expectNoServerEntries: false,
  373. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  374. clientIsLatestVersion: false,
  375. disableUntunneledUpgrade: true,
  376. disableEstablishing: false,
  377. disableApi: false,
  378. tunnelPoolSize: 1,
  379. useUpstreamProxy: false,
  380. disruptNetwork: false,
  381. transformHostNames: false,
  382. useFragmentor: true,
  383. })
  384. }
  385. func TestFrontedMeekFragmentor(t *testing.T) {
  386. controllerRun(t,
  387. &controllerRunConfig{
  388. expectNoServerEntries: false,
  389. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  390. clientIsLatestVersion: false,
  391. disableUntunneledUpgrade: true,
  392. disableEstablishing: false,
  393. disableApi: false,
  394. tunnelPoolSize: 1,
  395. useUpstreamProxy: false,
  396. disruptNetwork: false,
  397. transformHostNames: false,
  398. useFragmentor: true,
  399. })
  400. }
  401. func TestQUIC(t *testing.T) {
  402. controllerRun(t,
  403. &controllerRunConfig{
  404. expectNoServerEntries: false,
  405. protocol: protocol.TUNNEL_PROTOCOL_QUIC_OBFUSCATED_SSH,
  406. clientIsLatestVersion: false,
  407. disableUntunneledUpgrade: true,
  408. disableEstablishing: false,
  409. disableApi: false,
  410. tunnelPoolSize: 1,
  411. useUpstreamProxy: false,
  412. disruptNetwork: false,
  413. transformHostNames: false,
  414. useFragmentor: false,
  415. })
  416. }
  417. type controllerRunConfig struct {
  418. expectNoServerEntries bool
  419. protocol string
  420. clientIsLatestVersion bool
  421. disableUntunneledUpgrade bool
  422. disableEstablishing bool
  423. disableApi bool
  424. tunnelPoolSize int
  425. useUpstreamProxy bool
  426. disruptNetwork bool
  427. transformHostNames bool
  428. useFragmentor bool
  429. }
  430. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  431. configJSON, err := ioutil.ReadFile("controller_test.config")
  432. if err != nil {
  433. // Skip, don't fail, if config file is not present
  434. t.Skipf("error loading configuration file: %s", err)
  435. }
  436. // Note: a successful tactics request may modify config parameters.
  437. var modifyConfig map[string]interface{}
  438. json.Unmarshal(configJSON, &modifyConfig)
  439. modifyConfig["DataStoreDirectory"] = testDataDirName
  440. modifyConfig["RemoteServerListDownloadFilename"] = filepath.Join(testDataDirName, "server_list_compressed")
  441. modifyConfig["UpgradeDownloadFilename"] = filepath.Join(testDataDirName, "upgrade")
  442. if runConfig.protocol != "" {
  443. modifyConfig["LimitTunnelProtocols"] = protocol.TunnelProtocols{runConfig.protocol}
  444. }
  445. // Override client retry throttle values to speed up automated
  446. // tests and ensure tests complete within fixed deadlines.
  447. modifyConfig["FetchRemoteServerListRetryPeriodMilliseconds"] = 250
  448. modifyConfig["FetchUpgradeRetryPeriodMilliseconds"] = 250
  449. modifyConfig["EstablishTunnelPausePeriodSeconds"] = 1
  450. if runConfig.disableUntunneledUpgrade {
  451. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  452. modifyConfig["UpgradeDownloadClientVersionHeader"] = "invalid-value"
  453. }
  454. if runConfig.transformHostNames {
  455. modifyConfig["TransformHostNames"] = "always"
  456. } else {
  457. modifyConfig["TransformHostNames"] = "never"
  458. }
  459. if runConfig.useFragmentor {
  460. modifyConfig["UseFragmentor"] = "always"
  461. modifyConfig["FragmentorLimitProtocols"] = protocol.TunnelProtocols{runConfig.protocol}
  462. modifyConfig["FragmentorMinTotalBytes"] = 1000
  463. modifyConfig["FragmentorMaxTotalBytes"] = 2000
  464. modifyConfig["FragmentorMinWriteBytes"] = 1
  465. modifyConfig["FragmentorMaxWriteBytes"] = 100
  466. modifyConfig["FragmentorMinDelayMicroseconds"] = 1000
  467. modifyConfig["FragmentorMaxDelayMicroseconds"] = 10000
  468. modifyConfig["ObfuscatedSSHMinPadding"] = 4096
  469. modifyConfig["ObfuscatedSSHMaxPadding"] = 8192
  470. }
  471. configJSON, _ = json.Marshal(modifyConfig)
  472. config, err := LoadConfig(configJSON)
  473. if err != nil {
  474. t.Fatalf("error processing configuration file: %s", err)
  475. }
  476. if runConfig.clientIsLatestVersion {
  477. config.ClientVersion = "999999999"
  478. }
  479. if runConfig.disableEstablishing {
  480. // Clear remote server list so tunnel cannot be established.
  481. // TODO: also delete all server entries in the datastore.
  482. config.DisableRemoteServerListFetcher = true
  483. }
  484. if runConfig.disableApi {
  485. config.DisableApi = true
  486. }
  487. config.TunnelPoolSize = runConfig.tunnelPoolSize
  488. if runConfig.useUpstreamProxy && runConfig.disruptNetwork {
  489. t.Fatalf("cannot use multiple upstream proxies")
  490. }
  491. if runConfig.disruptNetwork {
  492. config.UpstreamProxyURL = disruptorProxyURL
  493. } else if runConfig.useUpstreamProxy {
  494. config.UpstreamProxyURL = upstreamProxyURL
  495. config.CustomHeaders = upstreamProxyCustomHeaders
  496. }
  497. // All config fields should be set before calling Commit.
  498. err = config.Commit()
  499. if err != nil {
  500. t.Fatalf("error committing configuration file: %s", err)
  501. }
  502. // Enable tactics requests. This will passively exercise the code
  503. // paths. server_test runs a more comprehensive test that checks
  504. // that the tactics request succeeds.
  505. config.NetworkID = "NETWORK1"
  506. os.Remove(config.UpgradeDownloadFilename)
  507. os.Remove(config.RemoteServerListDownloadFilename)
  508. err = OpenDataStore(config)
  509. if err != nil {
  510. t.Fatalf("error initializing datastore: %s", err)
  511. }
  512. defer CloseDataStore()
  513. serverEntryCount := CountServerEntries()
  514. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  515. // TODO: replace expectNoServerEntries with resetServerEntries
  516. // so tests can run in arbitrary order
  517. t.Fatalf("unexpected server entries")
  518. }
  519. controller, err := NewController(config)
  520. if err != nil {
  521. t.Fatalf("error creating controller: %s", err)
  522. }
  523. // Monitor notices for "Tunnels" with count > 1, the
  524. // indication of tunnel establishment success.
  525. // Also record the selected HTTP proxy port to use
  526. // when fetching websites through the tunnel.
  527. httpProxyPort := 0
  528. tunnelEstablished := make(chan struct{}, 1)
  529. upgradeDownloaded := make(chan struct{}, 1)
  530. remoteServerListDownloaded := make(chan struct{}, 1)
  531. confirmedLatestVersion := make(chan struct{}, 1)
  532. var clientUpgradeDownloadedBytesCount int32
  533. var remoteServerListDownloadedBytesCount int32
  534. SetNoticeWriter(NewNoticeReceiver(
  535. func(notice []byte) {
  536. // TODO: log notices without logging server IPs:
  537. //fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  538. noticeType, payload, err := GetNotice(notice)
  539. if err != nil {
  540. return
  541. }
  542. switch noticeType {
  543. case "ListeningHttpProxyPort":
  544. httpProxyPort = int(payload["port"].(float64))
  545. case "ConnectingServer":
  546. serverProtocol := payload["protocol"].(string)
  547. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  548. // TODO: wrong goroutine for t.FatalNow()
  549. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  550. }
  551. case "Tunnels":
  552. count := int(payload["count"].(float64))
  553. if count > 0 {
  554. if runConfig.disableEstablishing {
  555. // TODO: wrong goroutine for t.FatalNow()
  556. t.Fatalf("tunnel established unexpectedly")
  557. } else {
  558. select {
  559. case tunnelEstablished <- *new(struct{}):
  560. default:
  561. }
  562. }
  563. }
  564. case "ClientUpgradeDownloadedBytes":
  565. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  566. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  567. case "ClientUpgradeDownloaded":
  568. select {
  569. case upgradeDownloaded <- *new(struct{}):
  570. default:
  571. }
  572. case "ClientIsLatestVersion":
  573. select {
  574. case confirmedLatestVersion <- *new(struct{}):
  575. default:
  576. }
  577. case "RemoteServerListResourceDownloadedBytes":
  578. url := payload["url"].(string)
  579. if url == config.RemoteServerListUrl {
  580. t.Logf("RemoteServerListResourceDownloadedBytes: %d", int(payload["bytes"].(float64)))
  581. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  582. }
  583. case "RemoteServerListResourceDownloaded":
  584. url := payload["url"].(string)
  585. if url == config.RemoteServerListUrl {
  586. t.Logf("RemoteServerListResourceDownloaded")
  587. select {
  588. case remoteServerListDownloaded <- *new(struct{}):
  589. default:
  590. }
  591. }
  592. }
  593. }))
  594. // Run controller, which establishes tunnels
  595. ctx, cancelFunc := context.WithCancel(context.Background())
  596. controllerWaitGroup := new(sync.WaitGroup)
  597. controllerWaitGroup.Add(1)
  598. go func() {
  599. defer controllerWaitGroup.Done()
  600. controller.Run(ctx)
  601. }()
  602. defer func() {
  603. // Test: shutdown must complete within 20 seconds
  604. cancelFunc()
  605. shutdownTimeout := time.NewTimer(20 * time.Second)
  606. shutdownOk := make(chan struct{}, 1)
  607. go func() {
  608. controllerWaitGroup.Wait()
  609. shutdownOk <- *new(struct{})
  610. }()
  611. select {
  612. case <-shutdownOk:
  613. case <-shutdownTimeout.C:
  614. t.Fatalf("controller shutdown timeout exceeded")
  615. }
  616. }()
  617. if !runConfig.disableEstablishing {
  618. // Test: tunnel must be established within 120 seconds
  619. establishTimeout := time.NewTimer(120 * time.Second)
  620. select {
  621. case <-tunnelEstablished:
  622. case <-establishTimeout.C:
  623. t.Fatalf("tunnel establish timeout exceeded")
  624. }
  625. // Test: if starting with no server entries, a fetch remote
  626. // server list must have succeeded. With disruptNetwork, the
  627. // fetch must have been resumed at least once.
  628. if serverEntryCount == 0 {
  629. select {
  630. case <-remoteServerListDownloaded:
  631. default:
  632. t.Fatalf("expected remote server list downloaded")
  633. }
  634. if runConfig.disruptNetwork {
  635. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  636. if count <= 1 {
  637. t.Fatalf("unexpected remote server list download progress: %d", count)
  638. }
  639. }
  640. }
  641. // Cannot establish port forwards in DisableApi mode
  642. if !runConfig.disableApi {
  643. // Test: fetch website through tunnel
  644. // Allow for known race condition described in NewHttpProxy():
  645. time.Sleep(1 * time.Second)
  646. if !runConfig.disruptNetwork {
  647. fetchAndVerifyWebsite(t, httpProxyPort)
  648. }
  649. }
  650. }
  651. // Test: upgrade check/download must be downloaded within 180 seconds
  652. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  653. if expectUpgrade {
  654. upgradeTimeout := time.NewTimer(120 * time.Second)
  655. select {
  656. case <-upgradeDownloaded:
  657. // TODO: verify downloaded file
  658. if runConfig.clientIsLatestVersion {
  659. t.Fatalf("upgrade downloaded unexpectedly")
  660. }
  661. // Test: with disruptNetwork, must be multiple download progress notices
  662. if runConfig.disruptNetwork {
  663. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  664. if count <= 1 {
  665. t.Fatalf("unexpected upgrade download progress: %d", count)
  666. }
  667. }
  668. case <-confirmedLatestVersion:
  669. if !runConfig.clientIsLatestVersion {
  670. t.Fatalf("confirmed latest version unexpectedly")
  671. }
  672. case <-upgradeTimeout.C:
  673. t.Fatalf("upgrade download timeout exceeded")
  674. }
  675. }
  676. }
  677. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) error {
  678. testUrl := "https://psiphon.ca"
  679. roundTripTimeout := 30 * time.Second
  680. expectedResponseContains := "Psiphon"
  681. checkResponse := func(responseBody string) bool {
  682. return strings.Contains(responseBody, expectedResponseContains)
  683. }
  684. // Retries are made to compensate for intermittent failures due
  685. // to external network conditions.
  686. fetchWithRetries := func(fetchName string, fetchFunc func() error) error {
  687. retryCount := 6
  688. retryDelay := 5 * time.Second
  689. var err error
  690. for i := 0; i < retryCount; i++ {
  691. err = fetchFunc()
  692. if err == nil || i == retryCount-1 {
  693. break
  694. }
  695. time.Sleep(retryDelay)
  696. t.Logf("retrying %s...", fetchName)
  697. }
  698. return err
  699. }
  700. // Test: use HTTP proxy
  701. fetchUsingHTTPProxy := func() error {
  702. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  703. if err != nil {
  704. return fmt.Errorf("error initializing proxied HTTP request: %s", err)
  705. }
  706. httpTransport := &http.Transport{
  707. Proxy: http.ProxyURL(proxyUrl),
  708. DisableKeepAlives: true,
  709. }
  710. httpClient := &http.Client{
  711. Transport: httpTransport,
  712. Timeout: roundTripTimeout,
  713. }
  714. request, err := http.NewRequest("GET", testUrl, nil)
  715. if err != nil {
  716. return fmt.Errorf("error preparing proxied HTTP request: %s", err)
  717. }
  718. response, err := httpClient.Do(request)
  719. if err != nil {
  720. return fmt.Errorf("error sending proxied HTTP request: %s", err)
  721. }
  722. defer response.Body.Close()
  723. body, err := ioutil.ReadAll(response.Body)
  724. if err != nil {
  725. return fmt.Errorf("error reading proxied HTTP response: %s", err)
  726. }
  727. if !checkResponse(string(body)) {
  728. return fmt.Errorf("unexpected proxied HTTP response")
  729. }
  730. return nil
  731. }
  732. err := fetchWithRetries("proxied HTTP request", fetchUsingHTTPProxy)
  733. if err != nil {
  734. return err
  735. }
  736. // Delay before requesting from external service again
  737. time.Sleep(1 * time.Second)
  738. // Test: use direct URL proxy
  739. fetchUsingURLProxyDirect := func() error {
  740. httpTransport := &http.Transport{
  741. DisableKeepAlives: true,
  742. }
  743. httpClient := &http.Client{
  744. Transport: httpTransport,
  745. Timeout: roundTripTimeout,
  746. }
  747. request, err := http.NewRequest(
  748. "GET",
  749. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  750. httpProxyPort, url.QueryEscape(testUrl)),
  751. nil)
  752. if err != nil {
  753. return fmt.Errorf("error preparing direct URL request: %s", err)
  754. }
  755. response, err := httpClient.Do(request)
  756. if err != nil {
  757. return fmt.Errorf("error sending direct URL request: %s", err)
  758. }
  759. defer response.Body.Close()
  760. body, err := ioutil.ReadAll(response.Body)
  761. if err != nil {
  762. return fmt.Errorf("error reading direct URL response: %s", err)
  763. }
  764. if !checkResponse(string(body)) {
  765. return fmt.Errorf("unexpected direct URL response")
  766. }
  767. return nil
  768. }
  769. err = fetchWithRetries("direct URL request", fetchUsingURLProxyDirect)
  770. if err != nil {
  771. return err
  772. }
  773. // Delay before requesting from external service again
  774. time.Sleep(1 * time.Second)
  775. // Test: use tunneled URL proxy
  776. fetchUsingURLProxyTunneled := func() error {
  777. httpTransport := &http.Transport{
  778. DisableKeepAlives: true,
  779. }
  780. httpClient := &http.Client{
  781. Transport: httpTransport,
  782. Timeout: roundTripTimeout,
  783. }
  784. request, err := http.NewRequest(
  785. "GET",
  786. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  787. httpProxyPort, url.QueryEscape(testUrl)),
  788. nil)
  789. if err != nil {
  790. return fmt.Errorf("error preparing tunneled URL request: %s", err)
  791. }
  792. response, err := httpClient.Do(request)
  793. if err != nil {
  794. return fmt.Errorf("error sending tunneled URL request: %s", err)
  795. }
  796. defer response.Body.Close()
  797. body, err := ioutil.ReadAll(response.Body)
  798. if err != nil {
  799. return fmt.Errorf("error reading tunneled URL response: %s", err)
  800. }
  801. if !checkResponse(string(body)) {
  802. return fmt.Errorf("unexpected tunneled URL response")
  803. }
  804. return nil
  805. }
  806. err = fetchWithRetries("tunneled URL request", fetchUsingURLProxyTunneled)
  807. if err != nil {
  808. return err
  809. }
  810. return nil
  811. }
  812. // Note: Valid values for disruptorMaxConnectionBytes depend on the production
  813. // network; for example, the size of the remote server list resource must exceed
  814. // disruptorMaxConnectionBytes or else TestUntunneledResumableFetchRemoteServerList
  815. // will fail since no retries are required. But if disruptorMaxConnectionBytes is
  816. // too small, the test will take longer to run since more retries are necessary.
  817. //
  818. // Tests such as TestUntunneledResumableFetchRemoteServerList could be rewritten to
  819. // use mock components (for example, see TestObfuscatedRemoteServerLists); however
  820. // these test in controller_test serve the dual purpose of ensuring that tunnel
  821. // core works with the production network.
  822. //
  823. // TODO: set disruptorMaxConnectionBytes (and disruptorMaxConnectionTime) dynamically,
  824. // based on current production network configuration?
  825. const disruptorProxyAddress = "127.0.0.1:2160"
  826. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  827. const disruptorMaxConnectionBytes = 250000
  828. const disruptorMaxConnectionTime = 10 * time.Second
  829. func initDisruptor() {
  830. go func() {
  831. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  832. if err != nil {
  833. fmt.Printf("disruptor proxy listen error: %s\n", err)
  834. return
  835. }
  836. for {
  837. localConn, err := listener.AcceptSocks()
  838. if err != nil {
  839. if e, ok := err.(net.Error); ok && e.Temporary() {
  840. fmt.Printf("disruptor proxy temporary accept error: %s\n", err)
  841. continue
  842. }
  843. fmt.Printf("disruptor proxy accept error: %s\n", err)
  844. return
  845. }
  846. go func() {
  847. defer localConn.Close()
  848. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  849. if err != nil {
  850. // TODO: log "err" without logging server IPs
  851. fmt.Printf("disruptor proxy dial error\n")
  852. return
  853. }
  854. defer remoteConn.Close()
  855. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  856. if err != nil {
  857. fmt.Printf("disruptor proxy grant error: %s\n", err)
  858. return
  859. }
  860. // Cut connection after disruptorMaxConnectionTime
  861. time.AfterFunc(disruptorMaxConnectionTime, func() {
  862. localConn.Close()
  863. remoteConn.Close()
  864. })
  865. // Relay connection, but only up to disruptorMaxConnectionBytes
  866. waitGroup := new(sync.WaitGroup)
  867. waitGroup.Add(1)
  868. go func() {
  869. defer waitGroup.Done()
  870. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  871. localConn.Close()
  872. remoteConn.Close()
  873. }()
  874. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  875. localConn.Close()
  876. remoteConn.Close()
  877. waitGroup.Wait()
  878. }()
  879. }
  880. }()
  881. }
  882. const upstreamProxyURL = "http://127.0.0.1:2161"
  883. var upstreamProxyCustomHeaders = map[string][]string{"X-Test-Header-Name": {"test-header-value1", "test-header-value2"}}
  884. func hasExpectedCustomHeaders(h http.Header) bool {
  885. for name, values := range upstreamProxyCustomHeaders {
  886. if h[name] == nil {
  887. return false
  888. }
  889. // Order may not be the same
  890. for _, value := range values {
  891. if !common.Contains(h[name], value) {
  892. return false
  893. }
  894. }
  895. }
  896. return true
  897. }
  898. func initUpstreamProxy() {
  899. go func() {
  900. proxy := goproxy.NewProxyHttpServer()
  901. proxy.Logger = log.New(ioutil.Discard, "", 0)
  902. proxy.OnRequest().DoFunc(
  903. func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
  904. if !hasExpectedCustomHeaders(r.Header) {
  905. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  906. return nil, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusUnauthorized, "")
  907. }
  908. return r, nil
  909. })
  910. proxy.OnRequest().HandleConnectFunc(
  911. func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) {
  912. if !hasExpectedCustomHeaders(ctx.Req.Header) {
  913. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  914. return goproxy.RejectConnect, host
  915. }
  916. return goproxy.OkConnect, host
  917. })
  918. err := http.ListenAndServe("127.0.0.1:2161", proxy)
  919. if err != nil {
  920. fmt.Printf("upstream proxy failed: %s\n", err)
  921. }
  922. }()
  923. // TODO: wait until listener is active?
  924. }