controller_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "flag"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "net"
  26. "net/http"
  27. "net/url"
  28. "os"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "testing"
  33. "time"
  34. socks "github.com/Psiphon-Inc/goptlib"
  35. )
  36. func TestMain(m *testing.M) {
  37. flag.Parse()
  38. os.Remove(DATA_STORE_FILENAME)
  39. initDisruptor()
  40. setEmitDiagnosticNotices(true)
  41. os.Exit(m.Run())
  42. }
  43. // Note: untunneled upgrade tests must execute before
  44. // the "Run" tests to ensure no tunnel is established.
  45. // We need a way to reset the datastore after it's been
  46. // initialized in order to to clear out its data entries
  47. // and be able to arbitrarily order the tests.
  48. /*
  49. TODO: temporarily disabled pending automation
  50. support for untunneled upgrade downloads.
  51. func TestUntunneledUpgradeDownload(t *testing.T) {
  52. doUntunnledUpgradeDownload(t, false)
  53. }
  54. func TestUntunneledResumableUpgradeDownload(t *testing.T) {
  55. doUntunnledUpgradeDownload(t, true)
  56. }
  57. */
  58. func TestControllerRunSSH(t *testing.T) {
  59. controllerRun(t, TUNNEL_PROTOCOL_SSH, false)
  60. }
  61. func TestControllerRunObfuscatedSSH(t *testing.T) {
  62. controllerRun(t, TUNNEL_PROTOCOL_OBFUSCATED_SSH, false)
  63. }
  64. func TestControllerRunUnfrontedMeek(t *testing.T) {
  65. controllerRun(t, TUNNEL_PROTOCOL_UNFRONTED_MEEK, true)
  66. }
  67. func TestControllerRunFrontedMeek(t *testing.T) {
  68. controllerRun(t, TUNNEL_PROTOCOL_FRONTED_MEEK, true)
  69. }
  70. func TestControllerRunFrontedMeekHTTP(t *testing.T) {
  71. controllerRun(t, TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP, false)
  72. }
  73. func TestControllerRunUnfrontedMeekHTTPS(t *testing.T) {
  74. controllerRun(t, TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS, true)
  75. }
  76. func doUntunnledUpgradeDownload(t *testing.T, disrupt bool) {
  77. configFileContents, err := ioutil.ReadFile("controller_test.config")
  78. if err != nil {
  79. // Skip, don't fail, if config file is not present
  80. t.Skipf("error loading configuration file: %s", err)
  81. }
  82. config, err := LoadConfig(configFileContents)
  83. if err != nil {
  84. t.Fatalf("error processing configuration file: %s", err)
  85. }
  86. if disrupt {
  87. config.UpstreamProxyUrl = disruptorProxyURL
  88. }
  89. // Clear remote server list so tunnel cannot be established and
  90. // untunneled upgrade download case is tested.
  91. config.RemoteServerListUrl = ""
  92. os.Remove(config.UpgradeDownloadFilename)
  93. err = InitDataStore(config)
  94. if err != nil {
  95. t.Fatalf("error initializing datastore: %s", err)
  96. }
  97. controller, err := NewController(config)
  98. if err != nil {
  99. t.Fatalf("error creating controller: %s", err)
  100. }
  101. upgradeDownloaded := make(chan struct{}, 1)
  102. var clientUpgradeDownloadedBytesCount int32
  103. SetNoticeOutput(NewNoticeReceiver(
  104. func(notice []byte) {
  105. // TODO: log notices without logging server IPs:
  106. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  107. noticeType, payload, err := GetNotice(notice)
  108. if err != nil {
  109. return
  110. }
  111. switch noticeType {
  112. case "Tunnels":
  113. count := int(payload["count"].(float64))
  114. if count > 0 {
  115. // TODO: wrong goroutine for t.FatalNow()
  116. t.Fatalf("tunnel established unexpectedly")
  117. }
  118. case "ClientUpgradeDownloadedBytes":
  119. atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
  120. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  121. case "ClientUpgradeDownloaded":
  122. select {
  123. case upgradeDownloaded <- *new(struct{}):
  124. default:
  125. }
  126. }
  127. }))
  128. // Run controller
  129. shutdownBroadcast := make(chan struct{})
  130. controllerWaitGroup := new(sync.WaitGroup)
  131. controllerWaitGroup.Add(1)
  132. go func() {
  133. defer controllerWaitGroup.Done()
  134. controller.Run(shutdownBroadcast)
  135. }()
  136. defer func() {
  137. // Test: shutdown must complete within 10 seconds
  138. close(shutdownBroadcast)
  139. shutdownTimeout := time.NewTimer(10 * time.Second)
  140. shutdownOk := make(chan struct{}, 1)
  141. go func() {
  142. controllerWaitGroup.Wait()
  143. shutdownOk <- *new(struct{})
  144. }()
  145. select {
  146. case <-shutdownOk:
  147. case <-shutdownTimeout.C:
  148. t.Fatalf("controller shutdown timeout exceeded")
  149. }
  150. }()
  151. // Test: upgrade must be downloaded within 120 seconds
  152. downloadTimeout := time.NewTimer(120 * time.Second)
  153. select {
  154. case <-upgradeDownloaded:
  155. // TODO: verify downloaded file
  156. case <-downloadTimeout.C:
  157. t.Fatalf("upgrade download timeout exceeded")
  158. }
  159. // Test: with disrupt, must be multiple download progress notices
  160. if disrupt {
  161. count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
  162. if count <= 1 {
  163. t.Fatalf("unexpected upgrade download progress: %d", count)
  164. }
  165. }
  166. }
  167. type TestHostNameTransformer struct {
  168. }
  169. func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
  170. return "example.com", true
  171. }
  172. func controllerRun(t *testing.T, protocol string, protocolUsesHostNameTransformer bool) {
  173. doControllerRun(t, protocol, nil)
  174. if protocolUsesHostNameTransformer {
  175. t.Log("running with testHostNameTransformer")
  176. doControllerRun(t, protocol, &TestHostNameTransformer{})
  177. }
  178. }
  179. func doControllerRun(t *testing.T, protocol string, hostNameTransformer HostNameTransformer) {
  180. configFileContents, err := ioutil.ReadFile("controller_test.config")
  181. if err != nil {
  182. // Skip, don't fail, if config file is not present
  183. t.Skipf("error loading configuration file: %s", err)
  184. }
  185. config, err := LoadConfig(configFileContents)
  186. if err != nil {
  187. t.Fatalf("error processing configuration file: %s", err)
  188. }
  189. // Disable untunneled upgrade downloader to ensure tunneled case is tested
  190. config.UpgradeDownloadClientVersionHeader = ""
  191. os.Remove(config.UpgradeDownloadFilename)
  192. config.TunnelProtocol = protocol
  193. config.HostNameTransformer = hostNameTransformer
  194. err = InitDataStore(config)
  195. if err != nil {
  196. t.Fatalf("error initializing datastore: %s", err)
  197. }
  198. controller, err := NewController(config)
  199. if err != nil {
  200. t.Fatalf("error creating controller: %s", err)
  201. }
  202. // Monitor notices for "Tunnels" with count > 1, the
  203. // indication of tunnel establishment success.
  204. // Also record the selected HTTP proxy port to use
  205. // when fetching websites through the tunnel.
  206. httpProxyPort := 0
  207. tunnelEstablished := make(chan struct{}, 1)
  208. upgradeDownloaded := make(chan struct{}, 1)
  209. SetNoticeOutput(NewNoticeReceiver(
  210. func(notice []byte) {
  211. // TODO: log notices without logging server IPs:
  212. // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
  213. noticeType, payload, err := GetNotice(notice)
  214. if err != nil {
  215. return
  216. }
  217. switch noticeType {
  218. case "Tunnels":
  219. count := int(payload["count"].(float64))
  220. if count > 0 {
  221. select {
  222. case tunnelEstablished <- *new(struct{}):
  223. default:
  224. }
  225. }
  226. case "ClientUpgradeDownloadedBytes":
  227. t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
  228. case "ClientUpgradeDownloaded":
  229. select {
  230. case upgradeDownloaded <- *new(struct{}):
  231. default:
  232. }
  233. case "ListeningHttpProxyPort":
  234. httpProxyPort = int(payload["port"].(float64))
  235. case "ConnectingServer":
  236. serverProtocol := payload["protocol"]
  237. if serverProtocol != protocol {
  238. // TODO: wrong goroutine for t.FatalNow()
  239. t.Fatalf("wrong protocol selected: %s", serverProtocol)
  240. }
  241. }
  242. }))
  243. // Run controller, which establishes tunnels
  244. shutdownBroadcast := make(chan struct{})
  245. controllerWaitGroup := new(sync.WaitGroup)
  246. controllerWaitGroup.Add(1)
  247. go func() {
  248. defer controllerWaitGroup.Done()
  249. controller.Run(shutdownBroadcast)
  250. }()
  251. defer func() {
  252. // Test: shutdown must complete within 10 seconds
  253. close(shutdownBroadcast)
  254. shutdownTimeout := time.NewTimer(10 * time.Second)
  255. shutdownOk := make(chan struct{}, 1)
  256. go func() {
  257. controllerWaitGroup.Wait()
  258. shutdownOk <- *new(struct{})
  259. }()
  260. select {
  261. case <-shutdownOk:
  262. case <-shutdownTimeout.C:
  263. t.Fatalf("controller shutdown timeout exceeded")
  264. }
  265. }()
  266. // Test: tunnel must be established within 60 seconds
  267. establishTimeout := time.NewTimer(60 * time.Second)
  268. select {
  269. case <-tunnelEstablished:
  270. case <-establishTimeout.C:
  271. t.Fatalf("tunnel establish timeout exceeded")
  272. }
  273. // Allow for known race condition described in NewHttpProxy():
  274. time.Sleep(1 * time.Second)
  275. // Test: fetch website through tunnel
  276. fetchWebsite(t, httpProxyPort)
  277. // Test: upgrade must be downloaded within 60 seconds
  278. downloadTimeout := time.NewTimer(60 * time.Second)
  279. select {
  280. case <-upgradeDownloaded:
  281. // TODO: verify downloaded file
  282. case <-downloadTimeout.C:
  283. t.Fatalf("upgrade download timeout exceeded")
  284. }
  285. }
  286. func fetchWebsite(t *testing.T, httpProxyPort int) {
  287. testUrl := "https://raw.githubusercontent.com/Psiphon-Labs/psiphon-tunnel-core/master/LICENSE"
  288. roundTripTimeout := 10 * time.Second
  289. expectedResponsePrefix := " GNU GENERAL PUBLIC LICENSE"
  290. expectedResponseSize := 35148
  291. checkResponse := func(responseBody string) bool {
  292. return strings.HasPrefix(responseBody, expectedResponsePrefix) && len(responseBody) == expectedResponseSize
  293. }
  294. // Test: use HTTP proxy
  295. proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
  296. if err != nil {
  297. t.Fatalf("error initializing proxied HTTP request: %s", err)
  298. }
  299. httpClient := &http.Client{
  300. Transport: &http.Transport{
  301. Proxy: http.ProxyURL(proxyUrl),
  302. },
  303. Timeout: roundTripTimeout,
  304. }
  305. response, err := httpClient.Get(testUrl)
  306. if err != nil {
  307. t.Fatalf("error sending proxied HTTP request: %s", err)
  308. }
  309. body, err := ioutil.ReadAll(response.Body)
  310. if err != nil {
  311. t.Fatalf("error reading proxied HTTP response: %s", err)
  312. }
  313. response.Body.Close()
  314. if !checkResponse(string(body)) {
  315. t.Fatalf("unexpected proxied HTTP response")
  316. }
  317. // Test: use direct URL proxy
  318. httpClient = &http.Client{
  319. Transport: http.DefaultTransport,
  320. Timeout: roundTripTimeout,
  321. }
  322. response, err = httpClient.Get(
  323. fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
  324. httpProxyPort, url.QueryEscape(testUrl)))
  325. if err != nil {
  326. t.Fatalf("error sending direct URL request: %s", err)
  327. }
  328. body, err = ioutil.ReadAll(response.Body)
  329. if err != nil {
  330. t.Fatalf("error reading direct URL response: %s", err)
  331. }
  332. response.Body.Close()
  333. if !checkResponse(string(body)) {
  334. t.Fatalf("unexpected direct URL response")
  335. }
  336. // Test: use tunneled URL proxy
  337. response, err = httpClient.Get(
  338. fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
  339. httpProxyPort, url.QueryEscape(testUrl)))
  340. if err != nil {
  341. t.Fatalf("error sending tunneled URL request: %s", err)
  342. }
  343. body, err = ioutil.ReadAll(response.Body)
  344. if err != nil {
  345. t.Fatalf("error reading tunneled URL response: %s", err)
  346. }
  347. response.Body.Close()
  348. if !checkResponse(string(body)) {
  349. t.Fatalf("unexpected tunneled URL response")
  350. }
  351. }
  352. const disruptorProxyAddress = "127.0.0.1:2160"
  353. const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
  354. const disruptorMaxConnectionBytes = 2000000
  355. const disruptorMaxConnectionTime = 15 * time.Second
  356. func initDisruptor() {
  357. go func() {
  358. listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
  359. if err != nil {
  360. fmt.Errorf("disruptor proxy listen error: %s", err)
  361. return
  362. }
  363. for {
  364. localConn, err := listener.AcceptSocks()
  365. if err != nil {
  366. fmt.Errorf("disruptor proxy accept error: %s", err)
  367. return
  368. }
  369. go func() {
  370. defer localConn.Close()
  371. remoteConn, err := net.Dial("tcp", localConn.Req.Target)
  372. if err != nil {
  373. fmt.Errorf("disruptor proxy dial error: %s", err)
  374. return
  375. }
  376. defer remoteConn.Close()
  377. err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
  378. if err != nil {
  379. fmt.Errorf("disruptor proxy grant error: %s", err)
  380. return
  381. }
  382. // Cut connection after disruptorMaxConnectionTime
  383. time.AfterFunc(disruptorMaxConnectionTime, func() {
  384. localConn.Close()
  385. remoteConn.Close()
  386. })
  387. // Relay connection, but only up to disruptorMaxConnectionBytes
  388. waitGroup := new(sync.WaitGroup)
  389. waitGroup.Add(1)
  390. go func() {
  391. defer waitGroup.Done()
  392. io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
  393. }()
  394. io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
  395. waitGroup.Wait()
  396. }()
  397. }
  398. }()
  399. }