controller_test.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "context"
  22. "encoding/json"
  23. "flag"
  24. "fmt"
  25. "io"
  26. "io/ioutil"
  27. "log"
  28. "net"
  29. "net/http"
  30. "net/url"
  31. "os"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "testing"
  36. "time"
  37. socks "github.com/Psiphon-Labs/goptlib"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  42. "github.com/elazarl/goproxy"
  43. "github.com/elazarl/goproxy/ext/auth"
  44. )
  45. const testClientPlatform = "test_github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  46. func TestMain(m *testing.M) {
  47. flag.Parse()
  48. SetEmitDiagnosticNotices(true, true)
  49. initDisruptor()
  50. initUpstreamProxy()
  51. os.Exit(m.Run())
  52. }
  53. // Test case notes/limitations/dependencies:
  54. //
  55. // * Untunneled upgrade tests must execute before
  56. // the other tests to ensure no tunnel is established.
  57. // We need a way to reset the datastore after it's been
  58. // initialized in order to to clear out its data entries
  59. // and be able to arbitrarily order the tests.
  60. //
  61. // * The resumable download tests using disruptNetwork
  62. // depend on the download object being larger than the
  63. // disruptorMax limits so that the disruptor will actually
  64. // interrupt the first download attempt. Specifically, the
  65. // upgrade and remote server list at the URLs specified in
  66. // controller_test.config.enc.
  67. //
  68. // * The protocol tests assume there is at least one server
  69. // supporting each protocol in the server list at the URL
  70. // specified in controller_test.config.enc, and that these
  71. // servers are not overloaded.
  72. //
  73. // * fetchAndVerifyWebsite depends on the target URL being
  74. // available and responding.
  75. //
  76. func TestUntunneledUpgradeDownload(t *testing.T) {
  77. controllerRun(t,
  78. &controllerRunConfig{
  79. expectNoServerEntries: true,
  80. protocol: "",
  81. disableEstablishing: true,
  82. })
  83. }
  84. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  85. controllerRun(t,
  86. &controllerRunConfig{
  87. expectNoServerEntries: true,
  88. protocol: "",
  89. disableEstablishing: true,
  90. disruptNetwork: true,
  91. })
  92. }
  93. func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
  94. controllerRun(t,
  95. &controllerRunConfig{
  96. expectNoServerEntries: true,
  97. protocol: "",
  98. clientIsLatestVersion: true,
  99. disableEstablishing: true,
  100. })
  101. }
  102. func TestUntunneledResumableFetchRemoteServerList(t *testing.T) {
  103. controllerRun(t,
  104. &controllerRunConfig{
  105. expectNoServerEntries: true,
  106. protocol: "",
  107. clientIsLatestVersion: true,
  108. disruptNetwork: true,
  109. })
  110. }
  111. func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
  112. controllerRun(t,
  113. &controllerRunConfig{
  114. protocol: "",
  115. clientIsLatestVersion: true,
  116. disableUntunneledUpgrade: true,
  117. })
  118. }
  119. func TestSSH(t *testing.T) {
  120. controllerRun(t,
  121. &controllerRunConfig{
  122. protocol: protocol.TUNNEL_PROTOCOL_SSH,
  123. disableUntunneledUpgrade: true,
  124. })
  125. }
  126. func TestObfuscatedSSH(t *testing.T) {
  127. controllerRun(t,
  128. &controllerRunConfig{
  129. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  130. disableUntunneledUpgrade: true,
  131. })
  132. }
  133. func TestTLSOSSH(t *testing.T) {
  134. controllerRun(t,
  135. &controllerRunConfig{
  136. protocol: protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH,
  137. disableUntunneledUpgrade: true,
  138. })
  139. }
  140. func TestShadowsocks(t *testing.T) {
  141. t.Skipf("temporarily disabled")
  142. controllerRun(t,
  143. &controllerRunConfig{
  144. protocol: protocol.TUNNEL_PROTOCOL_SHADOWSOCKS_OSSH,
  145. disableUntunneledUpgrade: true,
  146. })
  147. }
  148. func TestUnfrontedMeek(t *testing.T) {
  149. controllerRun(t,
  150. &controllerRunConfig{
  151. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  152. disableUntunneledUpgrade: true,
  153. })
  154. }
  155. func TestUnfrontedMeekWithTransformer(t *testing.T) {
  156. controllerRun(t,
  157. &controllerRunConfig{
  158. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  159. disableUntunneledUpgrade: true,
  160. transformHostNames: true,
  161. })
  162. }
  163. func TestFrontedMeek(t *testing.T) {
  164. controllerRun(t,
  165. &controllerRunConfig{
  166. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  167. disableUntunneledUpgrade: true,
  168. })
  169. }
  170. func TestFrontedMeekWithTransformer(t *testing.T) {
  171. controllerRun(t,
  172. &controllerRunConfig{
  173. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  174. disableUntunneledUpgrade: true,
  175. transformHostNames: true,
  176. })
  177. }
  178. func TestFrontedMeekHTTP(t *testing.T) {
  179. controllerRun(t,
  180. &controllerRunConfig{
  181. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
  182. disableUntunneledUpgrade: true,
  183. })
  184. }
  185. func TestUnfrontedMeekHTTPS(t *testing.T) {
  186. controllerRun(t,
  187. &controllerRunConfig{
  188. expectNoServerEntries: false,
  189. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  190. disableUntunneledUpgrade: true,
  191. })
  192. }
  193. func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
  194. controllerRun(t,
  195. &controllerRunConfig{
  196. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  197. clientIsLatestVersion: true,
  198. transformHostNames: true,
  199. })
  200. }
  201. func TestDisabledApi(t *testing.T) {
  202. controllerRun(t,
  203. &controllerRunConfig{
  204. protocol: "",
  205. clientIsLatestVersion: true,
  206. disableUntunneledUpgrade: true,
  207. disableApi: true,
  208. tunnelPoolSize: 1,
  209. })
  210. }
  211. func TestObfuscatedSSHWithUpstreamProxy(t *testing.T) {
  212. controllerRun(t,
  213. &controllerRunConfig{
  214. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  215. disableUntunneledUpgrade: true,
  216. useUpstreamProxy: true,
  217. })
  218. }
  219. func TestUnfrontedMeekWithUpstreamProxy(t *testing.T) {
  220. controllerRun(t,
  221. &controllerRunConfig{
  222. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  223. disableUntunneledUpgrade: true,
  224. useUpstreamProxy: true,
  225. })
  226. }
  227. func TestUnfrontedMeekHTTPSWithUpstreamProxy(t *testing.T) {
  228. controllerRun(t,
  229. &controllerRunConfig{
  230. protocol: protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  231. disableUntunneledUpgrade: true,
  232. useUpstreamProxy: true,
  233. })
  234. }
  235. func TestObfuscatedSSHFragmentor(t *testing.T) {
  236. controllerRun(t,
  237. &controllerRunConfig{
  238. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  239. disableUntunneledUpgrade: true,
  240. useFragmentor: true,
  241. })
  242. }
  243. func TestFrontedMeekFragmentor(t *testing.T) {
  244. controllerRun(t,
  245. &controllerRunConfig{
  246. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK,
  247. disableUntunneledUpgrade: true,
  248. useFragmentor: true,
  249. })
  250. }
  251. func TestGQUIC(t *testing.T) {
  252. if !quic.Enabled() {
  253. t.Skip("QUIC is not enabled")
  254. }
  255. if !quic.GQUICEnabled() {
  256. t.Skip("gQUIC is not enabled")
  257. }
  258. controllerRun(t,
  259. &controllerRunConfig{
  260. protocol: protocol.TUNNEL_PROTOCOL_QUIC_OBFUSCATED_SSH,
  261. disableUntunneledUpgrade: true,
  262. quicVersions: protocol.SupportedGQUICVersions,
  263. })
  264. }
  265. func TestIETFQUIC(t *testing.T) {
  266. if !quic.Enabled() {
  267. t.Skip("QUIC is not enabled")
  268. }
  269. controllerRun(t,
  270. &controllerRunConfig{
  271. protocol: protocol.TUNNEL_PROTOCOL_QUIC_OBFUSCATED_SSH,
  272. disableUntunneledUpgrade: true,
  273. quicVersions: protocol.SupportedQUICv1Versions,
  274. })
  275. }
  276. func TestFrontedQUIC(t *testing.T) {
  277. if !quic.Enabled() {
  278. t.Skip("QUIC is not enabled")
  279. }
  280. controllerRun(t,
  281. &controllerRunConfig{
  282. protocol: protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_QUIC_OBFUSCATED_SSH,
  283. disableUntunneledUpgrade: true,
  284. })
  285. }
  286. func TestInproxyOSSH(t *testing.T) {
  287. if !inproxy.Enabled() {
  288. t.Skip("In-proxy is not enabled")
  289. }
  290. controllerRun(t,
  291. &controllerRunConfig{
  292. protocol: "INPROXY-WEBRTC-OSSH",
  293. disableUntunneledUpgrade: true,
  294. useInproxyDialRateLimit: true,
  295. })
  296. }
  297. func TestInproxyQUICOSSH(t *testing.T) {
  298. if !inproxy.Enabled() {
  299. t.Skip("In-proxy is not enabled")
  300. }
  301. controllerRun(t,
  302. &controllerRunConfig{
  303. protocol: "INPROXY-WEBRTC-QUIC-OSSH",
  304. disableUntunneledUpgrade: true,
  305. useInproxyDialRateLimit: true,
  306. })
  307. }
  308. func TestInproxyUnfrontedMeekHTTPS(t *testing.T) {
  309. if !inproxy.Enabled() {
  310. t.Skip("In-proxy is not enabled")
  311. }
  312. controllerRun(t,
  313. &controllerRunConfig{
  314. protocol: "INPROXY-WEBRTC-UNFRONTED-MEEK-HTTPS-OSSH",
  315. disableUntunneledUpgrade: true,
  316. })
  317. }
  318. func TestInproxyTLSOSSH(t *testing.T) {
  319. if !inproxy.Enabled() {
  320. t.Skip("In-proxy is not enabled")
  321. }
  322. controllerRun(t,
  323. &controllerRunConfig{
  324. protocol: "INPROXY-WEBRTC-TLS-OSSH",
  325. disableUntunneledUpgrade: true,
  326. })
  327. }
  328. func TestTunnelPool(t *testing.T) {
  329. controllerRun(t,
  330. &controllerRunConfig{
  331. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  332. disableUntunneledUpgrade: true,
  333. tunnelPoolSize: 2,
  334. })
  335. }
  336. func TestLegacyAPIEncoding(t *testing.T) {
  337. controllerRun(t,
  338. &controllerRunConfig{
  339. protocol: protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH,
  340. useLegacyAPIEncoding: true,
  341. })
  342. }
  343. type controllerRunConfig struct {
  344. expectNoServerEntries bool
  345. protocol string
  346. clientIsLatestVersion bool
  347. disableUntunneledUpgrade bool
  348. disableEstablishing bool
  349. disableApi bool
  350. tunnelPoolSize int
  351. useUpstreamProxy bool
  352. disruptNetwork bool
  353. transformHostNames bool
  354. useFragmentor bool
  355. useLegacyAPIEncoding bool
  356. useInproxyDialRateLimit bool
  357. quicVersions protocol.QUICVersions
  358. }
  359. func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
  360. testDataDirName, err := ioutil.TempDir("", "psiphon-controller-test")
  361. if err != nil {
  362. t.Fatalf("TempDir failed: %s\n", err)
  363. }
  364. defer os.RemoveAll(testDataDirName)
  365. configJSON, err := ioutil.ReadFile("controller_test.config")
  366. if err != nil {
  367. // Skip, don't fail, if config file is not present
  368. t.Skipf("error loading configuration file: %s", err)
  369. }
  370. // Note: a successful tactics request may modify config parameters.
  371. var modifyConfig map[string]interface{}
  372. err = json.Unmarshal(configJSON, &modifyConfig)
  373. if err != nil {
  374. t.Fatalf("json.Unmarshal failed: %v", err)
  375. }
  376. modifyConfig["DataRootDirectory"] = testDataDirName
  377. if runConfig.protocol != "" {
  378. modifyConfig["LimitTunnelProtocols"] = protocol.TunnelProtocols{runConfig.protocol}
  379. }
  380. modifyConfig["EnableUpgradeDownload"] = true
  381. modifyConfig["EnableFeedbackUpload"] = false
  382. // Override client retry throttle values to speed up automated
  383. // tests and ensure tests complete within fixed deadlines.
  384. modifyConfig["FetchRemoteServerListRetryPeriodMilliseconds"] = 250
  385. modifyConfig["FetchUpgradeRetryPeriodMilliseconds"] = 250
  386. modifyConfig["EstablishTunnelPausePeriodSeconds"] = 1
  387. if runConfig.disableUntunneledUpgrade {
  388. // Break untunneled upgrade downloader to ensure tunneled case is tested
  389. modifyConfig["UpgradeDownloadClientVersionHeader"] = "invalid-value"
  390. }
  391. if runConfig.transformHostNames {
  392. modifyConfig["TransformHostNames"] = "always"
  393. } else {
  394. modifyConfig["TransformHostNames"] = "never"
  395. }
  396. if runConfig.useFragmentor {
  397. modifyConfig["UseFragmentor"] = "always"
  398. modifyConfig["FragmentorLimitProtocols"] = protocol.TunnelProtocols{runConfig.protocol}
  399. modifyConfig["FragmentorMinTotalBytes"] = 1000
  400. modifyConfig["FragmentorMaxTotalBytes"] = 2000
  401. modifyConfig["FragmentorMinWriteBytes"] = 1
  402. modifyConfig["FragmentorMaxWriteBytes"] = 100
  403. modifyConfig["FragmentorMinDelayMicroseconds"] = 1000
  404. modifyConfig["FragmentorMaxDelayMicroseconds"] = 10000
  405. modifyConfig["ObfuscatedSSHMinPadding"] = 4096
  406. modifyConfig["ObfuscatedSSHMaxPadding"] = 8192
  407. }
  408. if runConfig.useLegacyAPIEncoding {
  409. modifyConfig["TargetAPIEncoding"] = protocol.PSIPHON_API_ENCODING_JSON
  410. }
  411. if runConfig.useInproxyDialRateLimit {
  412. modifyConfig["InproxyClientDialRateLimitQuantity"] = 2
  413. modifyConfig["InproxyClientDialRateLimitIntervalMilliseconds"] = 1000
  414. }
  415. modifyConfig["LimitQUICVersions"] = runConfig.quicVersions
  416. // TODO: vary this option
  417. modifyConfig["CompressTactics"] = false
  418. configJSON, _ = json.Marshal(modifyConfig)
  419. // Don't print initial config setup notices
  420. err = SetNoticeWriter(io.Discard)
  421. if err != nil {
  422. t.Fatalf("error setting notice writer: %s", err)
  423. }
  424. defer ResetNoticeWriter()
  425. config, err := LoadConfig(configJSON)
  426. if err != nil {
  427. t.Fatalf("error processing configuration file: %s", err)
  428. }
  429. if config.ClientPlatform == "" {
  430. config.ClientPlatform = testClientPlatform
  431. }
  432. if runConfig.clientIsLatestVersion {
  433. config.ClientVersion = "999999999"
  434. }
  435. if runConfig.disableEstablishing {
  436. // Clear remote server list so tunnel cannot be established.
  437. // TODO: also delete all server entries in the datastore.
  438. config.DisableRemoteServerListFetcher = true
  439. }
  440. if runConfig.disableApi {
  441. config.DisableApi = true
  442. }
  443. config.TunnelPoolSize = runConfig.tunnelPoolSize
  444. if runConfig.useUpstreamProxy && runConfig.disruptNetwork {
  445. t.Fatalf("cannot use multiple upstream proxies")
  446. }
  447. if runConfig.disruptNetwork {
  448. config.UpstreamProxyURL = disruptorProxyURL
  449. } else if runConfig.useUpstreamProxy {
  450. config.UpstreamProxyURL = upstreamProxyURL
  451. config.CustomHeaders = upstreamProxyCustomHeaders
  452. }
  453. // All config fields should be set before calling Commit.
  454. err = config.Commit(false)
  455. if err != nil {
  456. t.Fatalf("error committing configuration file: %s", err)
  457. }
  458. err = OpenDataStore(config)
  459. if err != nil {
  460. t.Fatalf("error initializing datastore: %s", err)
  461. }
  462. defer CloseDataStore()
  463. serverEntryCount := CountServerEntries()
  464. if runConfig.expectNoServerEntries && serverEntryCount > 0 {
  465. // TODO: replace expectNoServerEntries with resetServerEntries
  466. // so tests can run in arbitrary order
  467. t.Fatalf("unexpected server entries")
  468. }
  469. controller, err := NewController(config)
  470. if err != nil {
  471. t.Fatalf("error creating controller: %s", err)
  472. }
  473. // Monitor notices for "Tunnels" with count > 1, the
  474. // indication of tunnel establishment success.
  475. // Also record the selected HTTP proxy port to use
  476. // when fetching websites through the tunnel.
  477. httpProxyPort := 0
  478. tunnelEstablished := make(chan struct{}, 1)
  479. upgradeDownloaded := make(chan struct{}, 1)
  480. remoteServerListDownloaded := make(chan struct{}, 1)
  481. confirmedLatestVersion := make(chan struct{}, 1)
  482. candidateServers := make(chan struct{}, 1)
  483. availableEgressRegions := make(chan struct{}, 1)
  484. var clientUpgradeDownloadedBytesCount int32
  485. var remoteServerListDownloadedBytesCount int32
  486. ResetNoticeWriter()
  487. err = SetNoticeWriter(NewNoticeReceiver(
  488. func(notice []byte) {
  489. // TODO: log notices without logging server IPs:
  490. //fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  491. noticeType, payload, err := GetNotice(notice)
  492. if err != nil {
  493. return
  494. }
  495. switch noticeType {
  496. case "ListeningHttpProxyPort":
  497. httpProxyPort = int(payload["port"].(float64))
  498. case "ConnectingServer":
  499. serverProtocol := payload["protocol"].(string)
  500. if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
  501. // TODO: wrong goroutine for t.FatalNow()
  502. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  503. }
  504. case "Tunnels":
  505. count := int(payload["count"].(float64))
  506. if count > 0 {
  507. if runConfig.disableEstablishing {
  508. // TODO: wrong goroutine for t.FatalNow()
  509. t.Fatalf("tunnel established unexpectedly")
  510. } else {
  511. select {
  512. case tunnelEstablished <- struct{}{}:
  513. default:
  514. }
  515. }
  516. }
  517. case "ClientUpgradeDownloadedBytes":
  518. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  519. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  520. case "ClientUpgradeDownloaded":
  521. select {
  522. case upgradeDownloaded <- struct{}{}:
  523. default:
  524. }
  525. case "ClientIsLatestVersion":
  526. select {
  527. case confirmedLatestVersion <- struct{}{}:
  528. default:
  529. }
  530. case "RemoteServerListResourceDownloadedBytes":
  531. url := payload["url"].(string)
  532. if url == config.RemoteServerListUrl {
  533. t.Logf("RemoteServerListResourceDownloadedBytes: %d", int(payload["bytes"].(float64)))
  534. atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
  535. }
  536. case "RemoteServerListResourceDownloaded":
  537. url := payload["url"].(string)
  538. if url == config.RemoteServerListUrl {
  539. t.Logf("RemoteServerListResourceDownloaded")
  540. select {
  541. case remoteServerListDownloaded <- struct{}{}:
  542. default:
  543. }
  544. }
  545. case "CandidateServers":
  546. select {
  547. case candidateServers <- struct{}{}:
  548. default:
  549. }
  550. case "AvailableEgressRegions":
  551. select {
  552. case availableEgressRegions <- struct{}{}:
  553. default:
  554. }
  555. }
  556. }))
  557. if err != nil {
  558. t.Fatalf("error setting notice writer: %s", err)
  559. }
  560. defer ResetNoticeWriter()
  561. // Run controller, which establishes tunnels
  562. ctx, cancelFunc := context.WithCancel(context.Background())
  563. controllerWaitGroup := new(sync.WaitGroup)
  564. controllerWaitGroup.Add(1)
  565. go func() {
  566. defer controllerWaitGroup.Done()
  567. controller.Run(ctx)
  568. }()
  569. defer func() {
  570. // Test: shutdown must complete within 20 seconds
  571. cancelFunc()
  572. shutdownTimeout := time.NewTimer(20 * time.Second)
  573. shutdownOk := make(chan struct{}, 1)
  574. go func() {
  575. controllerWaitGroup.Wait()
  576. shutdownOk <- struct{}{}
  577. }()
  578. select {
  579. case <-shutdownOk:
  580. case <-shutdownTimeout.C:
  581. t.Fatalf("controller shutdown timeout exceeded")
  582. }
  583. }()
  584. if !runConfig.disableEstablishing {
  585. // Test: tunnel must be established within 120 seconds
  586. establishTimeout := time.NewTimer(120 * time.Second)
  587. select {
  588. case <-tunnelEstablished:
  589. case <-establishTimeout.C:
  590. t.Fatalf("tunnel establish timeout exceeded")
  591. }
  592. // Test: asynchronous server entry scans must complete
  593. select {
  594. case <-candidateServers:
  595. case <-establishTimeout.C:
  596. t.Fatalf("missing candidate servers notice")
  597. }
  598. select {
  599. case <-availableEgressRegions:
  600. case <-establishTimeout.C:
  601. t.Fatalf("missing available egress regions notice")
  602. }
  603. // Test: if starting with no server entries, a fetch remote
  604. // server list must have succeeded. With disruptNetwork, the
  605. // fetch must have been resumed at least once.
  606. if serverEntryCount == 0 {
  607. select {
  608. case <-remoteServerListDownloaded:
  609. default:
  610. t.Fatalf("expected remote server list downloaded")
  611. }
  612. if runConfig.disruptNetwork {
  613. count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
  614. if count <= 1 {
  615. t.Fatalf("unexpected remote server list download progress: %d", count)
  616. }
  617. }
  618. }
  619. // Cannot establish port forwards in DisableApi mode
  620. if !runConfig.disableApi {
  621. // Test: fetch website through tunnel
  622. // Allow for known race condition described in NewHttpProxy():
  623. time.Sleep(1 * time.Second)
  624. if !runConfig.disruptNetwork {
  625. fetchAndVerifyWebsite(t, httpProxyPort)
  626. }
  627. }
  628. }
  629. // Test: upgrade check/download must be downloaded within 240 seconds
  630. expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
  631. if expectUpgrade {
  632. upgradeTimeout := time.NewTimer(240 * time.Second)
  633. select {
  634. case <-upgradeDownloaded:
  635. // TODO: verify downloaded file
  636. if runConfig.clientIsLatestVersion {
  637. t.Fatalf("upgrade downloaded unexpectedly")
  638. }
  639. // Test: with disruptNetwork, must be multiple download progress notices
  640. if runConfig.disruptNetwork {
  641. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  642. if count <= 1 {
  643. t.Fatalf("unexpected upgrade download progress: %d", count)
  644. }
  645. }
  646. case <-confirmedLatestVersion:
  647. if !runConfig.clientIsLatestVersion {
  648. t.Fatalf("confirmed latest version unexpectedly")
  649. }
  650. case <-upgradeTimeout.C:
  651. t.Fatalf("upgrade download timeout exceeded")
  652. }
  653. }
  654. }
  655. func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) error {
  656. testUrl := "https://psiphon.ca"
  657. roundTripTimeout := 30 * time.Second
  658. expectedResponseContains := "Psiphon"
  659. checkResponse := func(responseBody string) bool {
  660. return strings.Contains(responseBody, expectedResponseContains)
  661. }
  662. // Retries are made to compensate for intermittent failures due
  663. // to external network conditions.
  664. fetchWithRetries := func(fetchName string, fetchFunc func() error) error {
  665. retryCount := 6
  666. retryDelay := 5 * time.Second
  667. var err error
  668. for i := 0; i < retryCount; i++ {
  669. err = fetchFunc()
  670. if err == nil || i == retryCount-1 {
  671. break
  672. }
  673. time.Sleep(retryDelay)
  674. t.Logf("retrying %s...", fetchName)
  675. }
  676. return err
  677. }
  678. // Test: use HTTP proxy
  679. fetchUsingHTTPProxy := func() error {
  680. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  681. if err != nil {
  682. return fmt.Errorf("error initializing proxied HTTP request: %s", err)
  683. }
  684. httpTransport := &http.Transport{
  685. Proxy: http.ProxyURL(proxyUrl),
  686. DisableKeepAlives: true,
  687. }
  688. httpClient := &http.Client{
  689. Transport: httpTransport,
  690. Timeout: roundTripTimeout,
  691. }
  692. request, err := http.NewRequest("GET", testUrl, nil)
  693. if err != nil {
  694. return fmt.Errorf("error preparing proxied HTTP request: %s", err)
  695. }
  696. response, err := httpClient.Do(request)
  697. if err != nil {
  698. return fmt.Errorf("error sending proxied HTTP request: %s", err)
  699. }
  700. defer response.Body.Close()
  701. body, err := ioutil.ReadAll(response.Body)
  702. if err != nil {
  703. return fmt.Errorf("error reading proxied HTTP response: %s", err)
  704. }
  705. if !checkResponse(string(body)) {
  706. return fmt.Errorf("unexpected proxied HTTP response")
  707. }
  708. return nil
  709. }
  710. err := fetchWithRetries("proxied HTTP request", fetchUsingHTTPProxy)
  711. if err != nil {
  712. return err
  713. }
  714. // Delay before requesting from external service again
  715. time.Sleep(1 * time.Second)
  716. // Test: use direct URL proxy
  717. fetchUsingURLProxyDirect := func() error {
  718. httpTransport := &http.Transport{
  719. DisableKeepAlives: true,
  720. }
  721. httpClient := &http.Client{
  722. Transport: httpTransport,
  723. Timeout: roundTripTimeout,
  724. }
  725. request, err := http.NewRequest(
  726. "GET",
  727. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  728. httpProxyPort, url.QueryEscape(testUrl)),
  729. nil)
  730. if err != nil {
  731. return fmt.Errorf("error preparing direct URL request: %s", err)
  732. }
  733. response, err := httpClient.Do(request)
  734. if err != nil {
  735. return fmt.Errorf("error sending direct URL request: %s", err)
  736. }
  737. defer response.Body.Close()
  738. body, err := ioutil.ReadAll(response.Body)
  739. if err != nil {
  740. return fmt.Errorf("error reading direct URL response: %s", err)
  741. }
  742. if !checkResponse(string(body)) {
  743. return fmt.Errorf("unexpected direct URL response")
  744. }
  745. return nil
  746. }
  747. err = fetchWithRetries("direct URL request", fetchUsingURLProxyDirect)
  748. if err != nil {
  749. return err
  750. }
  751. // Delay before requesting from external service again
  752. time.Sleep(1 * time.Second)
  753. // Test: use tunneled URL proxy
  754. fetchUsingURLProxyTunneled := func() error {
  755. httpTransport := &http.Transport{
  756. DisableKeepAlives: true,
  757. }
  758. httpClient := &http.Client{
  759. Transport: httpTransport,
  760. Timeout: roundTripTimeout,
  761. }
  762. request, err := http.NewRequest(
  763. "GET",
  764. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  765. httpProxyPort, url.QueryEscape(testUrl)),
  766. nil)
  767. if err != nil {
  768. return fmt.Errorf("error preparing tunneled URL request: %s", err)
  769. }
  770. response, err := httpClient.Do(request)
  771. if err != nil {
  772. return fmt.Errorf("error sending tunneled URL request: %s", err)
  773. }
  774. defer response.Body.Close()
  775. body, err := ioutil.ReadAll(response.Body)
  776. if err != nil {
  777. return fmt.Errorf("error reading tunneled URL response: %s", err)
  778. }
  779. if !checkResponse(string(body)) {
  780. return fmt.Errorf("unexpected tunneled URL response")
  781. }
  782. return nil
  783. }
  784. err = fetchWithRetries("tunneled URL request", fetchUsingURLProxyTunneled)
  785. if err != nil {
  786. return err
  787. }
  788. return nil
  789. }
  790. // Note: Valid values for disruptorMaxConnectionBytes depend on the production
  791. // network; for example, the size of the remote server list resource must exceed
  792. // disruptorMaxConnectionBytes or else TestUntunneledResumableFetchRemoteServerList
  793. // will fail since no retries are required. But if disruptorMaxConnectionBytes is
  794. // too small, the test will take longer to run since more retries are necessary.
  795. //
  796. // Tests such as TestUntunneledResumableFetchRemoteServerList could be rewritten to
  797. // use mock components (for example, see TestObfuscatedRemoteServerLists); however
  798. // these test in controller_test serve the dual purpose of ensuring that tunnel
  799. // core works with the production network.
  800. //
  801. // TODO: set disruptorMaxConnectionBytes (and disruptorMaxConnectionTime) dynamically,
  802. // based on current production network configuration?
  803. const disruptorProxyAddress = "127.0.0.1:2160"
  804. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  805. const disruptorMaxConnectionBytes = 150000
  806. const disruptorMaxConnectionTime = 10 * time.Second
  807. func initDisruptor() {
  808. go func() {
  809. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  810. if err != nil {
  811. fmt.Printf("disruptor proxy listen error: %s\n", err)
  812. return
  813. }
  814. for {
  815. localConn, err := listener.AcceptSocks()
  816. if err != nil {
  817. if e, ok := err.(net.Error); ok && e.Temporary() {
  818. fmt.Printf("disruptor proxy temporary accept error: %s\n", err)
  819. continue
  820. }
  821. fmt.Printf("disruptor proxy accept error: %s\n", err)
  822. return
  823. }
  824. go func() {
  825. defer localConn.Close()
  826. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  827. if err != nil {
  828. // TODO: log "err" without logging server IPs
  829. fmt.Printf("disruptor proxy dial error\n")
  830. return
  831. }
  832. defer remoteConn.Close()
  833. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  834. if err != nil {
  835. fmt.Printf("disruptor proxy grant error: %s\n", err)
  836. return
  837. }
  838. // Cut connection after disruptorMaxConnectionTime
  839. time.AfterFunc(disruptorMaxConnectionTime, func() {
  840. localConn.Close()
  841. remoteConn.Close()
  842. })
  843. // Relay connection, but only up to disruptorMaxConnectionBytes
  844. waitGroup := new(sync.WaitGroup)
  845. waitGroup.Add(1)
  846. go func() {
  847. defer waitGroup.Done()
  848. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  849. localConn.Close()
  850. remoteConn.Close()
  851. }()
  852. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  853. localConn.Close()
  854. remoteConn.Close()
  855. waitGroup.Wait()
  856. }()
  857. }
  858. }()
  859. }
  860. const upstreamProxyURL = "http://testUser:[email protected]:2161"
  861. var upstreamProxyCustomHeaders = map[string][]string{"X-Test-Header-Name": {"test-header-value1", "test-header-value2"}}
  862. func hasExpectedCustomHeaders(h http.Header) bool {
  863. for name, values := range upstreamProxyCustomHeaders {
  864. if h[name] == nil {
  865. return false
  866. }
  867. // Order may not be the same
  868. for _, value := range values {
  869. if !common.Contains(h[name], value) {
  870. return false
  871. }
  872. }
  873. }
  874. return true
  875. }
  876. func initUpstreamProxy() {
  877. go func() {
  878. proxy := goproxy.NewProxyHttpServer()
  879. proxy.Logger = log.New(ioutil.Discard, "", 0)
  880. auth.ProxyBasic(
  881. proxy,
  882. "testRealm",
  883. func(user, passwd string) bool { return user == "testUser" && passwd == "testPassword" })
  884. proxy.OnRequest().DoFunc(
  885. func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
  886. if !hasExpectedCustomHeaders(r.Header) {
  887. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  888. return nil, goproxy.NewResponse(r, goproxy.ContentTypeText, http.StatusUnauthorized, "")
  889. }
  890. return r, nil
  891. })
  892. proxy.OnRequest().HandleConnectFunc(
  893. func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) {
  894. if !hasExpectedCustomHeaders(ctx.Req.Header) {
  895. fmt.Printf("missing expected headers: %+v\n", ctx.Req.Header)
  896. return goproxy.RejectConnect, host
  897. }
  898. return goproxy.OkConnect, host
  899. })
  900. err := http.ListenAndServe("127.0.0.1:2161", proxy)
  901. if err != nil {
  902. fmt.Printf("upstream proxy failed: %s\n", err)
  903. }
  904. }()
  905. // TODO: wait until listener is active?
  906. }