controller.go 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // Package psiphon implements the core tunnel functionality of a Psiphon client.
  20. // The main function is RunForever, which runs a Controller that obtains lists of
  21. // servers, establishes tunnel connections, and runs local proxies through which
  22. // tunneled traffic may be sent.
  23. package psiphon
  24. import (
  25. "errors"
  26. "math/rand"
  27. "net"
  28. "sync"
  29. "time"
  30. )
  31. // Controller is a tunnel lifecycle coordinator. It manages lists of servers to
  32. // connect to; establishes and monitors tunnels; and runs local proxies which
  33. // route traffic through the tunnels.
  34. type Controller struct {
  35. config *Config
  36. sessionId string
  37. componentFailureSignal chan struct{}
  38. shutdownBroadcast chan struct{}
  39. runWaitGroup *sync.WaitGroup
  40. establishedTunnels chan *Tunnel
  41. failedTunnels chan *Tunnel
  42. tunnelMutex sync.Mutex
  43. establishedOnce bool
  44. tunnels []*Tunnel
  45. nextTunnel int
  46. startedConnectedReporter bool
  47. isEstablishing bool
  48. establishWaitGroup *sync.WaitGroup
  49. stopEstablishingBroadcast chan struct{}
  50. candidateServerEntries chan *candidateServerEntry
  51. establishPendingConns *Conns
  52. untunneledPendingConns *Conns
  53. untunneledDialConfig *DialConfig
  54. splitTunnelClassifier *SplitTunnelClassifier
  55. signalFetchRemoteServerList chan struct{}
  56. signalDownloadUpgrade chan string
  57. impairedProtocolClassification map[string]int
  58. signalReportConnected chan struct{}
  59. serverAffinityDoneBroadcast chan struct{}
  60. }
  61. type candidateServerEntry struct {
  62. serverEntry *ServerEntry
  63. isServerAffinityCandidate bool
  64. }
  65. // NewController initializes a new controller.
  66. func NewController(config *Config) (controller *Controller, err error) {
  67. // Needed by regen, at least
  68. rand.Seed(int64(time.Now().Nanosecond()))
  69. // Supply a default HostNameTransformer
  70. if config.HostNameTransformer == nil {
  71. config.HostNameTransformer = &IdentityHostNameTransformer{}
  72. }
  73. // Generate a session ID for the Psiphon server API. This session ID is
  74. // used across all tunnels established by the controller.
  75. sessionId, err := MakeSessionId()
  76. if err != nil {
  77. return nil, ContextError(err)
  78. }
  79. // untunneledPendingConns may be used to interrupt the fetch remote server list
  80. // request and other untunneled connection establishments. BindToDevice may be
  81. // used to exclude these requests and connection from VPN routing.
  82. // TODO: fetch remote server list and untunneled upgrade download should remove
  83. // their completed conns from untunneledPendingConns.
  84. untunneledPendingConns := new(Conns)
  85. untunneledDialConfig := &DialConfig{
  86. UpstreamProxyUrl: config.UpstreamProxyUrl,
  87. PendingConns: untunneledPendingConns,
  88. DeviceBinder: config.DeviceBinder,
  89. DnsServerGetter: config.DnsServerGetter,
  90. UseIndistinguishableTLS: config.UseIndistinguishableTLS,
  91. TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
  92. DeviceRegion: config.DeviceRegion,
  93. }
  94. controller = &Controller{
  95. config: config,
  96. sessionId: sessionId,
  97. // componentFailureSignal receives a signal from a component (including socks and
  98. // http local proxies) if they unexpectedly fail. Senders should not block.
  99. // A buffer allows at least one stop signal to be sent before there is a receiver.
  100. componentFailureSignal: make(chan struct{}, 1),
  101. shutdownBroadcast: make(chan struct{}),
  102. runWaitGroup: new(sync.WaitGroup),
  103. // establishedTunnels and failedTunnels buffer sizes are large enough to
  104. // receive full pools of tunnels without blocking. Senders should not block.
  105. establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  106. failedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  107. tunnels: make([]*Tunnel, 0),
  108. establishedOnce: false,
  109. startedConnectedReporter: false,
  110. isEstablishing: false,
  111. establishPendingConns: new(Conns),
  112. untunneledPendingConns: untunneledPendingConns,
  113. untunneledDialConfig: untunneledDialConfig,
  114. impairedProtocolClassification: make(map[string]int),
  115. // TODO: Add a buffer of 1 so we don't miss a signal while receiver is
  116. // starting? Trade-off is potential back-to-back fetch remotes. As-is,
  117. // establish will eventually signal another fetch remote.
  118. signalFetchRemoteServerList: make(chan struct{}),
  119. signalDownloadUpgrade: make(chan string),
  120. signalReportConnected: make(chan struct{}),
  121. }
  122. controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
  123. return controller, nil
  124. }
  125. // Run executes the controller. It launches components and then monitors
  126. // for a shutdown signal; after receiving the signal it shuts down the
  127. // controller.
  128. // The components include:
  129. // - the periodic remote server list fetcher
  130. // - the connected reporter
  131. // - the tunnel manager
  132. // - a local SOCKS proxy that port forwards through the pool of tunnels
  133. // - a local HTTP proxy that port forwards through the pool of tunnels
  134. func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
  135. ReportAvailableRegions()
  136. // Start components
  137. listenIP, err := GetInterfaceIPAddress(controller.config.ListenInterface)
  138. if err != nil {
  139. NoticeError("error getting listener IP: %s", err)
  140. return
  141. }
  142. socksProxy, err := NewSocksProxy(controller.config, controller, listenIP)
  143. if err != nil {
  144. NoticeAlert("error initializing local SOCKS proxy: %s", err)
  145. return
  146. }
  147. defer socksProxy.Close()
  148. httpProxy, err := NewHttpProxy(
  149. controller.config, controller.untunneledDialConfig, controller, listenIP)
  150. if err != nil {
  151. NoticeAlert("error initializing local HTTP proxy: %s", err)
  152. return
  153. }
  154. defer httpProxy.Close()
  155. if !controller.config.DisableRemoteServerListFetcher {
  156. controller.runWaitGroup.Add(1)
  157. go controller.remoteServerListFetcher()
  158. }
  159. if controller.config.UpgradeDownloadUrl != "" &&
  160. controller.config.UpgradeDownloadFilename != "" {
  161. controller.runWaitGroup.Add(1)
  162. go controller.upgradeDownloader()
  163. }
  164. /// Note: the connected reporter isn't started until a tunnel is
  165. // established
  166. controller.runWaitGroup.Add(1)
  167. go controller.runTunnels()
  168. if *controller.config.EstablishTunnelTimeoutSeconds != 0 {
  169. controller.runWaitGroup.Add(1)
  170. go controller.establishTunnelWatcher()
  171. }
  172. // Wait while running
  173. select {
  174. case <-shutdownBroadcast:
  175. NoticeInfo("controller shutdown by request")
  176. case <-controller.componentFailureSignal:
  177. NoticeAlert("controller shutdown due to component failure")
  178. }
  179. close(controller.shutdownBroadcast)
  180. // Interrupts and stops establish workers blocking on
  181. // tunnel establishment network operations.
  182. controller.establishPendingConns.CloseAll()
  183. // Interrupts and stops workers blocking on untunneled
  184. // network operations. This includes fetch remote server
  185. // list and untunneled uprade download.
  186. // Note: this doesn't interrupt the final, untunneled status
  187. // requests started in operateTunnel after shutdownBroadcast.
  188. // This is by design -- we want to give these requests a short
  189. // timer period to succeed and deliver stats. These particular
  190. // requests opt out of untunneledPendingConns and use the
  191. // PSIPHON_API_SHUTDOWN_SERVER_TIMEOUT timeout (see
  192. // doUntunneledStatusRequest).
  193. controller.untunneledPendingConns.CloseAll()
  194. // Now with all workers signaled to stop and with all
  195. // blocking network operations interrupted, wait for
  196. // all workers to terminate.
  197. controller.runWaitGroup.Wait()
  198. controller.splitTunnelClassifier.Shutdown()
  199. NoticeInfo("exiting controller")
  200. }
  201. // SignalComponentFailure notifies the controller that an associated component has failed.
  202. // This will terminate the controller.
  203. func (controller *Controller) SignalComponentFailure() {
  204. select {
  205. case controller.componentFailureSignal <- *new(struct{}):
  206. default:
  207. }
  208. }
  209. // remoteServerListFetcher fetches an out-of-band list of server entries
  210. // for more tunnel candidates. It fetches when signalled, with retries
  211. // on failure.
  212. func (controller *Controller) remoteServerListFetcher() {
  213. defer controller.runWaitGroup.Done()
  214. var lastFetchTime time.Time
  215. fetcherLoop:
  216. for {
  217. // Wait for a signal before fetching
  218. select {
  219. case <-controller.signalFetchRemoteServerList:
  220. case <-controller.shutdownBroadcast:
  221. break fetcherLoop
  222. }
  223. // Skip fetch entirely (i.e., send no request at all, even when ETag would save
  224. // on response size) when a recent fetch was successful
  225. if time.Now().Before(lastFetchTime.Add(FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)) {
  226. continue
  227. }
  228. retryLoop:
  229. for {
  230. // Don't attempt to fetch while there is no network connectivity,
  231. // to avoid alert notice noise.
  232. if !WaitForNetworkConnectivity(
  233. controller.config.NetworkConnectivityChecker,
  234. controller.shutdownBroadcast) {
  235. break fetcherLoop
  236. }
  237. err := FetchRemoteServerList(
  238. controller.config, controller.untunneledDialConfig)
  239. if err == nil {
  240. lastFetchTime = time.Now()
  241. break retryLoop
  242. }
  243. NoticeAlert("failed to fetch remote server list: %s", err)
  244. timeout := time.After(FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD)
  245. select {
  246. case <-timeout:
  247. case <-controller.shutdownBroadcast:
  248. break fetcherLoop
  249. }
  250. }
  251. }
  252. NoticeInfo("exiting remote server list fetcher")
  253. }
  254. // establishTunnelWatcher terminates the controller if a tunnel
  255. // has not been established in the configured time period. This
  256. // is regardless of how many tunnels are presently active -- meaning
  257. // that if an active tunnel was established and lost the controller
  258. // is left running (to re-establish).
  259. func (controller *Controller) establishTunnelWatcher() {
  260. defer controller.runWaitGroup.Done()
  261. timeout := time.After(
  262. time.Duration(*controller.config.EstablishTunnelTimeoutSeconds) * time.Second)
  263. select {
  264. case <-timeout:
  265. if !controller.hasEstablishedOnce() {
  266. NoticeAlert("failed to establish tunnel before timeout")
  267. controller.SignalComponentFailure()
  268. }
  269. case <-controller.shutdownBroadcast:
  270. }
  271. NoticeInfo("exiting establish tunnel watcher")
  272. }
  273. // connectedReporter sends periodic "connected" requests to the Psiphon API.
  274. // These requests are for server-side unique user stats calculation. See the
  275. // comment in DoConnectedRequest for a description of the request mechanism.
  276. // To ensure we don't over- or under-count unique users, only one connected
  277. // request is made across all simultaneous multi-tunnels; and the connected
  278. // request is repeated periodically for very long-lived tunnels.
  279. // The signalReportConnected mechanism is used to trigger another connected
  280. // request immediately after a reconnect.
  281. func (controller *Controller) connectedReporter() {
  282. defer controller.runWaitGroup.Done()
  283. loop:
  284. for {
  285. // Pick any active tunnel and make the next connected request. No error
  286. // is logged if there's no active tunnel, as that's not an unexpected condition.
  287. reported := false
  288. tunnel := controller.getNextActiveTunnel()
  289. if tunnel != nil {
  290. err := tunnel.serverContext.DoConnectedRequest()
  291. if err == nil {
  292. reported = true
  293. } else {
  294. NoticeAlert("failed to make connected request: %s", err)
  295. }
  296. }
  297. // Schedule the next connected request and wait.
  298. var duration time.Duration
  299. if reported {
  300. duration = PSIPHON_API_CONNECTED_REQUEST_PERIOD
  301. } else {
  302. duration = PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD
  303. }
  304. timeout := time.After(duration)
  305. select {
  306. case <-controller.signalReportConnected:
  307. case <-timeout:
  308. // Make another connected request
  309. case <-controller.shutdownBroadcast:
  310. break loop
  311. }
  312. }
  313. NoticeInfo("exiting connected reporter")
  314. }
  315. func (controller *Controller) startOrSignalConnectedReporter() {
  316. // session is nil when DisableApi is set
  317. if controller.config.DisableApi {
  318. return
  319. }
  320. // Start the connected reporter after the first tunnel is established.
  321. // Concurrency note: only the runTunnels goroutine may access startedConnectedReporter.
  322. if !controller.startedConnectedReporter {
  323. controller.startedConnectedReporter = true
  324. controller.runWaitGroup.Add(1)
  325. go controller.connectedReporter()
  326. } else {
  327. select {
  328. case controller.signalReportConnected <- *new(struct{}):
  329. default:
  330. }
  331. }
  332. }
  333. // upgradeDownloader makes periodic attemps to complete a client upgrade
  334. // download. DownloadUpgrade() is resumable, so each attempt has potential for
  335. // getting closer to completion, even in conditions where the download or
  336. // tunnel is repeatedly interrupted.
  337. // An upgrade download is triggered by either a handshake response indicating
  338. // that a new version is available; or after failing to connect, in which case
  339. // it's useful to check, out-of-band, for an upgrade with new circumvention
  340. // capabilities.
  341. // Once the download operation completes successfully, the downloader exits
  342. // and is not run again: either there is not a newer version, or the upgrade
  343. // has been downloaded and is ready to be applied.
  344. // We're assuming that the upgrade will be applied and the entire system
  345. // restarted before another upgrade is to be downloaded.
  346. //
  347. // TODO: refactor upgrade downloader and remote server list fetcher to use
  348. // common code (including the resumable download routines).
  349. //
  350. func (controller *Controller) upgradeDownloader() {
  351. defer controller.runWaitGroup.Done()
  352. var lastDownloadTime time.Time
  353. downloadLoop:
  354. for {
  355. // Wait for a signal before downloading
  356. var handshakeVersion string
  357. select {
  358. case handshakeVersion = <-controller.signalDownloadUpgrade:
  359. case <-controller.shutdownBroadcast:
  360. break downloadLoop
  361. }
  362. // Unless handshake is explicitly advertizing a new version, skip
  363. // checking entirely when a recent download was successful.
  364. if handshakeVersion == "" &&
  365. time.Now().Before(lastDownloadTime.Add(DOWNLOAD_UPGRADE_STALE_PERIOD)) {
  366. continue
  367. }
  368. retryLoop:
  369. for {
  370. // Don't attempt to download while there is no network connectivity,
  371. // to avoid alert notice noise.
  372. if !WaitForNetworkConnectivity(
  373. controller.config.NetworkConnectivityChecker,
  374. controller.shutdownBroadcast) {
  375. break downloadLoop
  376. }
  377. // Pick any active tunnel and make the next download attempt. If there's
  378. // no active tunnel, the untunneledDialConfig will be used.
  379. tunnel := controller.getNextActiveTunnel()
  380. err := DownloadUpgrade(
  381. controller.config,
  382. handshakeVersion,
  383. tunnel,
  384. controller.untunneledDialConfig)
  385. if err == nil {
  386. lastDownloadTime = time.Now()
  387. break retryLoop
  388. }
  389. NoticeAlert("failed to download upgrade: %s", err)
  390. timeout := time.After(DOWNLOAD_UPGRADE_RETRY_PERIOD)
  391. select {
  392. case <-timeout:
  393. case <-controller.shutdownBroadcast:
  394. break downloadLoop
  395. }
  396. }
  397. }
  398. NoticeInfo("exiting upgrade downloader")
  399. }
  400. // runTunnels is the controller tunnel management main loop. It starts and stops
  401. // establishing tunnels based on the target tunnel pool size and the current size
  402. // of the pool. Tunnels are established asynchronously using worker goroutines.
  403. //
  404. // When there are no server entries for the target region/protocol, the
  405. // establishCandidateGenerator will yield no candidates and wait before
  406. // trying again. In the meantime, a remote server entry fetch may supply
  407. // valid candidates.
  408. //
  409. // When a tunnel is established, it's added to the active pool. The tunnel's
  410. // operateTunnel goroutine monitors the tunnel.
  411. //
  412. // When a tunnel fails, it's removed from the pool and the establish process is
  413. // restarted to fill the pool.
  414. func (controller *Controller) runTunnels() {
  415. defer controller.runWaitGroup.Done()
  416. // Start running
  417. controller.startEstablishing()
  418. loop:
  419. for {
  420. select {
  421. case failedTunnel := <-controller.failedTunnels:
  422. NoticeAlert("tunnel failed: %s", failedTunnel.serverEntry.IpAddress)
  423. controller.terminateTunnel(failedTunnel)
  424. // Note: we make this extra check to ensure the shutdown signal takes priority
  425. // and that we do not start establishing. Critically, startEstablishing() calls
  426. // establishPendingConns.Reset() which clears the closed flag in
  427. // establishPendingConns; this causes the pendingConns.Add() within
  428. // interruptibleTCPDial to succeed instead of aborting, and the result
  429. // is that it's possible for establish goroutines to run all the way through
  430. // NewServerContext before being discarded... delaying shutdown.
  431. select {
  432. case <-controller.shutdownBroadcast:
  433. break loop
  434. default:
  435. }
  436. controller.classifyImpairedProtocol(failedTunnel)
  437. // Concurrency note: only this goroutine may call startEstablishing/stopEstablishing
  438. // and access isEstablishing.
  439. if !controller.isEstablishing {
  440. controller.startEstablishing()
  441. }
  442. // !TODO! design issue: might not be enough server entries with region/caps to ever fill tunnel slots
  443. // solution(?) target MIN(CountServerEntries(region, protocol), TunnelPoolSize)
  444. case establishedTunnel := <-controller.establishedTunnels:
  445. tunnelCount, registered := controller.registerTunnel(establishedTunnel)
  446. if registered {
  447. NoticeActiveTunnel(establishedTunnel.serverEntry.IpAddress, establishedTunnel.protocol)
  448. if tunnelCount == 1 {
  449. // The split tunnel classifier is started once the first tunnel is
  450. // established. This first tunnel is passed in to be used to make
  451. // the routes data request.
  452. // A long-running controller may run while the host device is present
  453. // in different regions. In this case, we want the split tunnel logic
  454. // to switch to routes for new regions and not classify traffic based
  455. // on routes installed for older regions.
  456. // We assume that when regions change, the host network will also
  457. // change, and so all tunnels will fail and be re-established. Under
  458. // that assumption, the classifier will be re-Start()-ed here when
  459. // the region has changed.
  460. controller.splitTunnelClassifier.Start(establishedTunnel)
  461. // Signal a connected request on each 1st tunnel establishment. For
  462. // multi-tunnels, the session is connected as long as at least one
  463. // tunnel is established.
  464. controller.startOrSignalConnectedReporter()
  465. // If the handshake indicated that a new client version is available,
  466. // trigger an upgrade download.
  467. // Note: serverContext is nil when DisableApi is set
  468. if establishedTunnel.serverContext != nil &&
  469. establishedTunnel.serverContext.clientUpgradeVersion != "" {
  470. handshakeVersion := establishedTunnel.serverContext.clientUpgradeVersion
  471. select {
  472. case controller.signalDownloadUpgrade <- handshakeVersion:
  473. default:
  474. }
  475. }
  476. }
  477. } else {
  478. controller.discardTunnel(establishedTunnel)
  479. }
  480. if controller.isFullyEstablished() {
  481. controller.stopEstablishing()
  482. }
  483. case <-controller.shutdownBroadcast:
  484. break loop
  485. }
  486. }
  487. // Stop running
  488. controller.stopEstablishing()
  489. controller.terminateAllTunnels()
  490. // Drain tunnel channels
  491. close(controller.establishedTunnels)
  492. for tunnel := range controller.establishedTunnels {
  493. controller.discardTunnel(tunnel)
  494. }
  495. close(controller.failedTunnels)
  496. for tunnel := range controller.failedTunnels {
  497. controller.discardTunnel(tunnel)
  498. }
  499. NoticeInfo("exiting run tunnels")
  500. }
  501. // classifyImpairedProtocol tracks "impaired" protocol classifications for failed
  502. // tunnels. A protocol is classified as impaired if a tunnel using that protocol
  503. // fails, repeatedly, shortly after the start of the connection. During tunnel
  504. // establishment, impaired protocols are briefly skipped.
  505. //
  506. // One purpose of this measure is to defend against an attack where the adversary,
  507. // for example, tags an OSSH TCP connection as an "unidentified" protocol; allows
  508. // it to connect; but then kills the underlying TCP connection after a short time.
  509. // Since OSSH has less latency than other protocols that may bypass an "unidentified"
  510. // filter, these other protocols might never be selected for use.
  511. //
  512. // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol
  513. func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) {
  514. if failedTunnel.startTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) {
  515. controller.impairedProtocolClassification[failedTunnel.protocol] += 1
  516. } else {
  517. controller.impairedProtocolClassification[failedTunnel.protocol] = 0
  518. }
  519. if len(controller.getImpairedProtocols()) == len(SupportedTunnelProtocols) {
  520. // Reset classification if all protocols are classified as impaired as
  521. // the network situation (or attack) may not be protocol-specific.
  522. // TODO: compare against count of distinct supported protocols for
  523. // current known server entries.
  524. controller.impairedProtocolClassification = make(map[string]int)
  525. }
  526. }
  527. // getImpairedProtocols returns a list of protocols that have sufficient
  528. // classifications to be considered impaired protocols.
  529. //
  530. // Concurrency note: only the runTunnels() goroutine may call getImpairedProtocols
  531. func (controller *Controller) getImpairedProtocols() []string {
  532. if len(controller.impairedProtocolClassification) > 0 {
  533. NoticeInfo("impaired protocols: %+v", controller.impairedProtocolClassification)
  534. }
  535. impairedProtocols := make([]string, 0)
  536. for protocol, count := range controller.impairedProtocolClassification {
  537. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  538. impairedProtocols = append(impairedProtocols, protocol)
  539. }
  540. }
  541. return impairedProtocols
  542. }
  543. // SignalTunnelFailure implements the TunnelOwner interface. This function
  544. // is called by Tunnel.operateTunnel when the tunnel has detected that it
  545. // has failed. The Controller will signal runTunnels to create a new
  546. // tunnel and/or remove the tunnel from the list of active tunnels.
  547. func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) {
  548. // Don't block. Assumes the receiver has a buffer large enough for
  549. // the typical number of operated tunnels. In case there's no room,
  550. // terminate the tunnel (runTunnels won't get a signal in this case,
  551. // but the tunnel will be removed from the list of active tunnels).
  552. select {
  553. case controller.failedTunnels <- tunnel:
  554. default:
  555. controller.terminateTunnel(tunnel)
  556. }
  557. }
  558. // discardTunnel disposes of a successful connection that is no longer required.
  559. func (controller *Controller) discardTunnel(tunnel *Tunnel) {
  560. NoticeInfo("discard tunnel: %s", tunnel.serverEntry.IpAddress)
  561. // TODO: not calling PromoteServerEntry, since that would rank the
  562. // discarded tunnel before fully active tunnels. Can a discarded tunnel
  563. // be promoted (since it connects), but with lower rank than all active
  564. // tunnels?
  565. tunnel.Close(true)
  566. }
  567. // registerTunnel adds the connected tunnel to the pool of active tunnels
  568. // which are candidates for port forwarding. Returns true if the pool has an
  569. // empty slot and false if the pool is full (caller should discard the tunnel).
  570. func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
  571. controller.tunnelMutex.Lock()
  572. defer controller.tunnelMutex.Unlock()
  573. if len(controller.tunnels) >= controller.config.TunnelPoolSize {
  574. return len(controller.tunnels), false
  575. }
  576. // Perform a final check just in case we've established
  577. // a duplicate connection.
  578. for _, activeTunnel := range controller.tunnels {
  579. if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress {
  580. NoticeAlert("duplicate tunnel: %s", tunnel.serverEntry.IpAddress)
  581. return len(controller.tunnels), false
  582. }
  583. }
  584. controller.establishedOnce = true
  585. controller.tunnels = append(controller.tunnels, tunnel)
  586. NoticeTunnels(len(controller.tunnels))
  587. // Promote this successful tunnel to first rank so it's one
  588. // of the first candidates next time establish runs.
  589. // Connecting to a TargetServerEntry does not change the
  590. // ranking.
  591. if controller.config.TargetServerEntry == "" {
  592. PromoteServerEntry(tunnel.serverEntry.IpAddress)
  593. }
  594. return len(controller.tunnels), true
  595. }
  596. // hasEstablishedOnce indicates if at least one active tunnel has
  597. // been established up to this point. This is regardeless of how many
  598. // tunnels are presently active.
  599. func (controller *Controller) hasEstablishedOnce() bool {
  600. controller.tunnelMutex.Lock()
  601. defer controller.tunnelMutex.Unlock()
  602. return controller.establishedOnce
  603. }
  604. // isFullyEstablished indicates if the pool of active tunnels is full.
  605. func (controller *Controller) isFullyEstablished() bool {
  606. controller.tunnelMutex.Lock()
  607. defer controller.tunnelMutex.Unlock()
  608. return len(controller.tunnels) >= controller.config.TunnelPoolSize
  609. }
  610. // terminateTunnel removes a tunnel from the pool of active tunnels
  611. // and closes the tunnel. The next-tunnel state used by getNextActiveTunnel
  612. // is adjusted as required.
  613. func (controller *Controller) terminateTunnel(tunnel *Tunnel) {
  614. controller.tunnelMutex.Lock()
  615. defer controller.tunnelMutex.Unlock()
  616. for index, activeTunnel := range controller.tunnels {
  617. if tunnel == activeTunnel {
  618. controller.tunnels = append(
  619. controller.tunnels[:index], controller.tunnels[index+1:]...)
  620. if controller.nextTunnel > index {
  621. controller.nextTunnel--
  622. }
  623. if controller.nextTunnel >= len(controller.tunnels) {
  624. controller.nextTunnel = 0
  625. }
  626. activeTunnel.Close(false)
  627. NoticeTunnels(len(controller.tunnels))
  628. break
  629. }
  630. }
  631. }
  632. // terminateAllTunnels empties the tunnel pool, closing all active tunnels.
  633. // This is used when shutting down the controller.
  634. func (controller *Controller) terminateAllTunnels() {
  635. controller.tunnelMutex.Lock()
  636. defer controller.tunnelMutex.Unlock()
  637. // Closing all tunnels in parallel. In an orderly shutdown, each tunnel
  638. // may take a few seconds to send a final status request. We only want
  639. // to wait as long as the single slowest tunnel.
  640. closeWaitGroup := new(sync.WaitGroup)
  641. closeWaitGroup.Add(len(controller.tunnels))
  642. for _, activeTunnel := range controller.tunnels {
  643. tunnel := activeTunnel
  644. go func() {
  645. defer closeWaitGroup.Done()
  646. tunnel.Close(false)
  647. }()
  648. }
  649. closeWaitGroup.Wait()
  650. controller.tunnels = make([]*Tunnel, 0)
  651. controller.nextTunnel = 0
  652. NoticeTunnels(len(controller.tunnels))
  653. }
  654. // getNextActiveTunnel returns the next tunnel from the pool of active
  655. // tunnels. Currently, tunnel selection order is simple round-robin.
  656. func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
  657. controller.tunnelMutex.Lock()
  658. defer controller.tunnelMutex.Unlock()
  659. for i := len(controller.tunnels); i > 0; i-- {
  660. tunnel = controller.tunnels[controller.nextTunnel]
  661. controller.nextTunnel =
  662. (controller.nextTunnel + 1) % len(controller.tunnels)
  663. return tunnel
  664. }
  665. return nil
  666. }
  667. // isActiveTunnelServerEntry is used to check if there's already
  668. // an existing tunnel to a candidate server.
  669. func (controller *Controller) isActiveTunnelServerEntry(serverEntry *ServerEntry) bool {
  670. controller.tunnelMutex.Lock()
  671. defer controller.tunnelMutex.Unlock()
  672. for _, activeTunnel := range controller.tunnels {
  673. if activeTunnel.serverEntry.IpAddress == serverEntry.IpAddress {
  674. return true
  675. }
  676. }
  677. return false
  678. }
  679. // Dial selects an active tunnel and establishes a port forward
  680. // connection through the selected tunnel. Failure to connect is considered
  681. // a port foward failure, for the purpose of monitoring tunnel health.
  682. func (controller *Controller) Dial(
  683. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
  684. tunnel := controller.getNextActiveTunnel()
  685. if tunnel == nil {
  686. return nil, ContextError(errors.New("no active tunnels"))
  687. }
  688. // Perform split tunnel classification when feature is enabled, and if the remote
  689. // address is classified as untunneled, dial directly.
  690. if !alwaysTunnel && controller.config.SplitTunnelDnsServer != "" {
  691. host, _, err := net.SplitHostPort(remoteAddr)
  692. if err != nil {
  693. return nil, ContextError(err)
  694. }
  695. // Note: a possible optimization, when split tunnel is active and IsUntunneled performs
  696. // a DNS resolution in order to make its classification, is to reuse that IP address in
  697. // the following Dials so they do not need to make their own resolutions. However, the
  698. // way this is currently implemented ensures that, e.g., DNS geo load balancing occurs
  699. // relative to the outbound network.
  700. if controller.splitTunnelClassifier.IsUntunneled(host) {
  701. // !TODO! track downstreamConn and close it when the DialTCP conn closes, as with tunnel.Dial conns?
  702. return DialTCP(remoteAddr, controller.untunneledDialConfig)
  703. }
  704. }
  705. tunneledConn, err := tunnel.Dial(remoteAddr, alwaysTunnel, downstreamConn)
  706. if err != nil {
  707. return nil, ContextError(err)
  708. }
  709. return tunneledConn, nil
  710. }
  711. // startEstablishing creates a pool of worker goroutines which will
  712. // attempt to establish tunnels to candidate servers. The candidates
  713. // are generated by another goroutine.
  714. func (controller *Controller) startEstablishing() {
  715. if controller.isEstablishing {
  716. return
  717. }
  718. NoticeInfo("start establishing")
  719. controller.isEstablishing = true
  720. controller.establishWaitGroup = new(sync.WaitGroup)
  721. controller.stopEstablishingBroadcast = make(chan struct{})
  722. controller.candidateServerEntries = make(chan *candidateServerEntry)
  723. controller.establishPendingConns.Reset()
  724. // The server affinity mechanism attempts to favor the previously
  725. // used server when reconnecting. This is beneficial for user
  726. // applications which expect consistency in user IP address (for
  727. // example, a web site which prompts for additional user
  728. // authentication when the IP address changes).
  729. //
  730. // Only the very first server, as determined by
  731. // datastore.PromoteServerEntry(), is the server affinity candidate.
  732. // Concurrent connections attempts to many servers are launched
  733. // without delay, in case the affinity server connection fails.
  734. // While the affinity server connection is outstanding, when any
  735. // other connection is established, there is a short grace period
  736. // delay before delivering the established tunnel; this allows some
  737. // time for the affinity server connection to succeed first.
  738. // When the affinity server connection fails, any other established
  739. // tunnel is registered without delay.
  740. //
  741. // Note: the establishTunnelWorker that receives the affinity
  742. // candidate is solely resonsible for closing
  743. // controller.serverAffinityDoneBroadcast.
  744. //
  745. // Note: if config.EgressRegion or config.TunnelProtocol has changed
  746. // since the top server was promoted, the first server may not actually
  747. // be the last connected server.
  748. // TODO: should not favor the first server in this case
  749. controller.serverAffinityDoneBroadcast = make(chan struct{})
  750. for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
  751. controller.establishWaitGroup.Add(1)
  752. go controller.establishTunnelWorker()
  753. }
  754. controller.establishWaitGroup.Add(1)
  755. go controller.establishCandidateGenerator(
  756. controller.getImpairedProtocols())
  757. }
  758. // stopEstablishing signals the establish goroutines to stop and waits
  759. // for the group to halt. pendingConns is used to interrupt any worker
  760. // blocked on a socket connect.
  761. func (controller *Controller) stopEstablishing() {
  762. if !controller.isEstablishing {
  763. return
  764. }
  765. NoticeInfo("stop establishing")
  766. close(controller.stopEstablishingBroadcast)
  767. // Note: interruptibleTCPClose doesn't really interrupt socket connects
  768. // and may leave goroutines running for a time after the Wait call.
  769. controller.establishPendingConns.CloseAll()
  770. // Note: establishCandidateGenerator closes controller.candidateServerEntries
  771. // (as it may be sending to that channel).
  772. controller.establishWaitGroup.Wait()
  773. controller.isEstablishing = false
  774. controller.establishWaitGroup = nil
  775. controller.stopEstablishingBroadcast = nil
  776. controller.candidateServerEntries = nil
  777. controller.serverAffinityDoneBroadcast = nil
  778. }
  779. // establishCandidateGenerator populates the candidate queue with server entries
  780. // from the data store. Server entries are iterated in rank order, so that promoted
  781. // servers with higher rank are priority candidates.
  782. func (controller *Controller) establishCandidateGenerator(impairedProtocols []string) {
  783. defer controller.establishWaitGroup.Done()
  784. defer close(controller.candidateServerEntries)
  785. iterator, err := NewServerEntryIterator(controller.config)
  786. if err != nil {
  787. NoticeAlert("failed to iterate over candidates: %s", err)
  788. controller.SignalComponentFailure()
  789. return
  790. }
  791. defer iterator.Close()
  792. isServerAffinityCandidate := true
  793. // TODO: reconcile server affinity scheme with multi-tunnel mode
  794. if controller.config.TunnelPoolSize > 1 {
  795. isServerAffinityCandidate = false
  796. close(controller.serverAffinityDoneBroadcast)
  797. }
  798. loop:
  799. // Repeat until stopped
  800. for i := 0; ; i++ {
  801. if !WaitForNetworkConnectivity(
  802. controller.config.NetworkConnectivityChecker,
  803. controller.stopEstablishingBroadcast,
  804. controller.shutdownBroadcast) {
  805. break loop
  806. }
  807. // Send each iterator server entry to the establish workers
  808. startTime := time.Now()
  809. for {
  810. serverEntry, err := iterator.Next()
  811. if err != nil {
  812. NoticeAlert("failed to get next candidate: %s", err)
  813. controller.SignalComponentFailure()
  814. break loop
  815. }
  816. if serverEntry == nil {
  817. // Completed this iteration
  818. break
  819. }
  820. // Disable impaired protocols. This is only done for the
  821. // first iteration of the ESTABLISH_TUNNEL_WORK_TIME
  822. // loop since (a) one iteration should be sufficient to
  823. // evade the attack; (b) there's a good chance of false
  824. // positives (such as short tunnel durations due to network
  825. // hopping on a mobile device).
  826. // Impaired protocols logic is not applied when
  827. // config.TunnelProtocol is specified.
  828. // The edited serverEntry is temporary copy which is not
  829. // stored or reused.
  830. if i == 0 && controller.config.TunnelProtocol == "" {
  831. serverEntry.DisableImpairedProtocols(impairedProtocols)
  832. if len(serverEntry.GetSupportedProtocols()) == 0 {
  833. // Skip this server entry, as it has no supported
  834. // protocols after disabling the impaired ones
  835. // TODO: modify ServerEntryIterator to skip these?
  836. continue
  837. }
  838. }
  839. // Note: there must be only one server affinity candidate, as it
  840. // closes the serverAffinityDoneBroadcast channel.
  841. candidate := &candidateServerEntry{serverEntry, isServerAffinityCandidate}
  842. isServerAffinityCandidate = false
  843. // TODO: here we could generate multiple candidates from the
  844. // server entry when there are many MeekFrontingAddresses.
  845. select {
  846. case controller.candidateServerEntries <- candidate:
  847. case <-controller.stopEstablishingBroadcast:
  848. break loop
  849. case <-controller.shutdownBroadcast:
  850. break loop
  851. }
  852. if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME)) {
  853. // Start over, after a brief pause, with a new shuffle of the server
  854. // entries, and potentially some newly fetched server entries.
  855. break
  856. }
  857. }
  858. // Free up resources now, but don't reset until after the pause.
  859. iterator.Close()
  860. // Trigger a fetch remote server list, since we may have failed to
  861. // connect with all known servers. Don't block sending signal, since
  862. // this signal may have already been sent.
  863. // Don't wait for fetch remote to succeed, since it may fail and
  864. // enter a retry loop and we're better off trying more known servers.
  865. // TODO: synchronize the fetch response, so it can be incorporated
  866. // into the server entry iterator as soon as available.
  867. select {
  868. case controller.signalFetchRemoteServerList <- *new(struct{}):
  869. default:
  870. }
  871. // Trigger an out-of-band upgrade availability check and download.
  872. // Since we may have failed to connect, we may benefit from upgrading
  873. // to a new client version with new circumvention capabilities.
  874. select {
  875. case controller.signalDownloadUpgrade <- "":
  876. default:
  877. }
  878. // After a complete iteration of candidate servers, pause before iterating again.
  879. // This helps avoid some busy wait loop conditions, and also allows some time for
  880. // network conditions to change. Also allows for fetch remote to complete,
  881. // in typical conditions (it isn't strictly necessary to wait for this, there will
  882. // be more rounds if required).
  883. timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD)
  884. select {
  885. case <-timeout:
  886. // Retry iterating
  887. case <-controller.stopEstablishingBroadcast:
  888. break loop
  889. case <-controller.shutdownBroadcast:
  890. break loop
  891. }
  892. iterator.Reset()
  893. }
  894. NoticeInfo("stopped candidate generator")
  895. }
  896. // establishTunnelWorker pulls candidates from the candidate queue, establishes
  897. // a connection to the tunnel server, and delivers the established tunnel to a channel.
  898. func (controller *Controller) establishTunnelWorker() {
  899. defer controller.establishWaitGroup.Done()
  900. loop:
  901. for candidateServerEntry := range controller.candidateServerEntries {
  902. // Note: don't receive from candidateServerEntries and stopEstablishingBroadcast
  903. // in the same select, since we want to prioritize receiving the stop signal
  904. if controller.isStopEstablishingBroadcast() {
  905. break loop
  906. }
  907. // There may already be a tunnel to this candidate. If so, skip it.
  908. if controller.isActiveTunnelServerEntry(candidateServerEntry.serverEntry) {
  909. continue
  910. }
  911. tunnel, err := EstablishTunnel(
  912. controller.config,
  913. controller.untunneledDialConfig,
  914. controller.sessionId,
  915. controller.establishPendingConns,
  916. candidateServerEntry.serverEntry,
  917. controller) // TunnelOwner
  918. if err != nil {
  919. // Unblock other candidates immediately when
  920. // server affinity candidate fails.
  921. if candidateServerEntry.isServerAffinityCandidate {
  922. close(controller.serverAffinityDoneBroadcast)
  923. }
  924. // Before emitting error, check if establish interrupted, in which
  925. // case the error is noise.
  926. if controller.isStopEstablishingBroadcast() {
  927. break loop
  928. }
  929. NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
  930. continue
  931. }
  932. // Block for server affinity grace period before delivering.
  933. if !candidateServerEntry.isServerAffinityCandidate {
  934. timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD)
  935. select {
  936. case <-timer.C:
  937. case <-controller.serverAffinityDoneBroadcast:
  938. case <-controller.stopEstablishingBroadcast:
  939. }
  940. }
  941. // Deliver established tunnel.
  942. // Don't block. Assumes the receiver has a buffer large enough for
  943. // the number of desired tunnels. If there's no room, the tunnel must
  944. // not be required so it's discarded.
  945. select {
  946. case controller.establishedTunnels <- tunnel:
  947. default:
  948. controller.discardTunnel(tunnel)
  949. }
  950. // Unblock other candidates only after delivering when
  951. // server affinity candidate succeeds.
  952. if candidateServerEntry.isServerAffinityCandidate {
  953. close(controller.serverAffinityDoneBroadcast)
  954. }
  955. }
  956. NoticeInfo("stopped establish worker")
  957. }
  958. func (controller *Controller) isStopEstablishingBroadcast() bool {
  959. select {
  960. case <-controller.stopEstablishingBroadcast:
  961. return true
  962. default:
  963. }
  964. return false
  965. }