controller.go 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // Package psiphon implements the core tunnel functionality of a Psiphon client.
  20. // The main function is RunForever, which runs a Controller that obtains lists of
  21. // servers, establishes tunnel connections, and runs local proxies through which
  22. // tunneled traffic may be sent.
  23. package psiphon
  24. import (
  25. "context"
  26. "errors"
  27. "fmt"
  28. "math/rand"
  29. "net"
  30. "sync"
  31. "time"
  32. "github.com/Psiphon-Inc/goarista/monotime"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  36. )
  37. // Controller is a tunnel lifecycle coordinator. It manages lists of servers to
  38. // connect to; establishes and monitors tunnels; and runs local proxies which
  39. // route traffic through the tunnels.
  40. type Controller struct {
  41. config *Config
  42. sessionId string
  43. runCtx context.Context
  44. stopRunning context.CancelFunc
  45. runWaitGroup *sync.WaitGroup
  46. connectedTunnels chan *Tunnel
  47. failedTunnels chan *Tunnel
  48. tunnelMutex sync.Mutex
  49. establishedOnce bool
  50. tunnels []*Tunnel
  51. nextTunnel int
  52. startedConnectedReporter bool
  53. isEstablishing bool
  54. concurrentEstablishTunnelsMutex sync.Mutex
  55. concurrentEstablishTunnels int
  56. concurrentMeekEstablishTunnels int
  57. peakConcurrentEstablishTunnels int
  58. peakConcurrentMeekEstablishTunnels int
  59. establishCtx context.Context
  60. stopEstablish context.CancelFunc
  61. establishWaitGroup *sync.WaitGroup
  62. candidateServerEntries chan *candidateServerEntry
  63. untunneledDialConfig *DialConfig
  64. splitTunnelClassifier *SplitTunnelClassifier
  65. signalFetchCommonRemoteServerList chan struct{}
  66. signalFetchObfuscatedServerLists chan struct{}
  67. signalDownloadUpgrade chan string
  68. impairedProtocolClassification map[string]int
  69. signalReportConnected chan struct{}
  70. serverAffinityDoneBroadcast chan struct{}
  71. newClientVerificationPayload chan string
  72. packetTunnelClient *tun.Client
  73. packetTunnelTransport *PacketTunnelTransport
  74. }
  75. type candidateServerEntry struct {
  76. serverEntry *protocol.ServerEntry
  77. isServerAffinityCandidate bool
  78. adjustedEstablishStartTime monotime.Time
  79. }
  80. // NewController initializes a new controller.
  81. func NewController(config *Config) (controller *Controller, err error) {
  82. // Needed by regen, at least
  83. rand.Seed(int64(time.Now().Nanosecond()))
  84. // The session ID for the Psiphon server API is used across all
  85. // tunnels established by the controller.
  86. NoticeSessionId(config.SessionID)
  87. untunneledDialConfig := &DialConfig{
  88. UpstreamProxyUrl: config.UpstreamProxyUrl,
  89. CustomHeaders: config.CustomHeaders,
  90. DeviceBinder: config.DeviceBinder,
  91. DnsServerGetter: config.DnsServerGetter,
  92. IPv6Synthesizer: config.IPv6Synthesizer,
  93. UseIndistinguishableTLS: config.UseIndistinguishableTLS,
  94. TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
  95. DeviceRegion: config.DeviceRegion,
  96. }
  97. controller = &Controller{
  98. config: config,
  99. sessionId: config.SessionID,
  100. runWaitGroup: new(sync.WaitGroup),
  101. // connectedTunnels and failedTunnels buffer sizes are large enough to
  102. // receive full pools of tunnels without blocking. Senders should not block.
  103. connectedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  104. failedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  105. tunnels: make([]*Tunnel, 0),
  106. establishedOnce: false,
  107. startedConnectedReporter: false,
  108. isEstablishing: false,
  109. untunneledDialConfig: untunneledDialConfig,
  110. impairedProtocolClassification: make(map[string]int),
  111. // TODO: Add a buffer of 1 so we don't miss a signal while receiver is
  112. // starting? Trade-off is potential back-to-back fetch remotes. As-is,
  113. // establish will eventually signal another fetch remote.
  114. signalFetchCommonRemoteServerList: make(chan struct{}),
  115. signalFetchObfuscatedServerLists: make(chan struct{}),
  116. signalDownloadUpgrade: make(chan string),
  117. signalReportConnected: make(chan struct{}),
  118. // Buffer allows SetClientVerificationPayloadForActiveTunnels to submit one
  119. // new payload without blocking or dropping it.
  120. newClientVerificationPayload: make(chan string, 1),
  121. }
  122. controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
  123. if config.PacketTunnelTunFileDescriptor > 0 {
  124. // Run a packet tunnel client. The lifetime of the tun.Client is the
  125. // lifetime of the Controller, so it exists across tunnel establishments
  126. // and reestablishments. The PacketTunnelTransport provides a layer
  127. // that presents a continuosuly existing transport to the tun.Client;
  128. // it's set to use new SSH channels after new SSH tunnel establishes.
  129. packetTunnelTransport := NewPacketTunnelTransport()
  130. packetTunnelClient, err := tun.NewClient(&tun.ClientConfig{
  131. Logger: NoticeCommonLogger(),
  132. TunFileDescriptor: config.PacketTunnelTunFileDescriptor,
  133. Transport: packetTunnelTransport,
  134. })
  135. if err != nil {
  136. return nil, common.ContextError(err)
  137. }
  138. controller.packetTunnelClient = packetTunnelClient
  139. controller.packetTunnelTransport = packetTunnelTransport
  140. }
  141. return controller, nil
  142. }
  143. // Run executes the controller. Run exits if a controller
  144. // component fails or the parent context is canceled.
  145. func (controller *Controller) Run(ctx context.Context) {
  146. ReportAvailableRegions()
  147. runCtx, stopRunning := context.WithCancel(ctx)
  148. defer stopRunning()
  149. controller.runCtx = runCtx
  150. controller.stopRunning = stopRunning
  151. // Start components
  152. // TODO: IPv6 support
  153. var listenIP string
  154. if controller.config.ListenInterface == "" {
  155. listenIP = "127.0.0.1"
  156. } else if controller.config.ListenInterface == "any" {
  157. listenIP = "0.0.0.0"
  158. } else {
  159. IPv4Address, _, err := common.GetInterfaceIPAddresses(controller.config.ListenInterface)
  160. if err == nil && IPv4Address == nil {
  161. err = fmt.Errorf("no IPv4 address for interface %s", controller.config.ListenInterface)
  162. }
  163. if err != nil {
  164. NoticeError("error getting listener IP: %s", err)
  165. return
  166. }
  167. listenIP = IPv4Address.String()
  168. }
  169. if !controller.config.DisableLocalSocksProxy {
  170. socksProxy, err := NewSocksProxy(controller.config, controller, listenIP)
  171. if err != nil {
  172. NoticeAlert("error initializing local SOCKS proxy: %s", err)
  173. return
  174. }
  175. defer socksProxy.Close()
  176. }
  177. if !controller.config.DisableLocalHTTPProxy {
  178. httpProxy, err := NewHttpProxy(controller.config, controller, listenIP)
  179. if err != nil {
  180. NoticeAlert("error initializing local HTTP proxy: %s", err)
  181. return
  182. }
  183. defer httpProxy.Close()
  184. }
  185. if !controller.config.DisableRemoteServerListFetcher {
  186. retryPeriod := time.Duration(
  187. *controller.config.FetchRemoteServerListRetryPeriodSeconds) * time.Second
  188. if controller.config.RemoteServerListURLs != nil {
  189. controller.runWaitGroup.Add(1)
  190. go controller.remoteServerListFetcher(
  191. "common",
  192. FetchCommonRemoteServerList,
  193. controller.signalFetchCommonRemoteServerList,
  194. retryPeriod,
  195. FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)
  196. }
  197. if controller.config.ObfuscatedServerListRootURLs != nil {
  198. controller.runWaitGroup.Add(1)
  199. go controller.remoteServerListFetcher(
  200. "obfuscated",
  201. FetchObfuscatedServerLists,
  202. controller.signalFetchObfuscatedServerLists,
  203. retryPeriod,
  204. FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)
  205. }
  206. }
  207. if controller.config.UpgradeDownloadURLs != nil {
  208. controller.runWaitGroup.Add(1)
  209. go controller.upgradeDownloader()
  210. }
  211. /// Note: the connected reporter isn't started until a tunnel is
  212. // established
  213. controller.runWaitGroup.Add(1)
  214. go controller.runTunnels()
  215. if *controller.config.EstablishTunnelTimeoutSeconds != 0 {
  216. controller.runWaitGroup.Add(1)
  217. go controller.establishTunnelWatcher()
  218. }
  219. if controller.packetTunnelClient != nil {
  220. controller.packetTunnelClient.Start()
  221. }
  222. // Wait while running
  223. <-controller.runCtx.Done()
  224. NoticeInfo("controller stopped")
  225. if controller.packetTunnelClient != nil {
  226. controller.packetTunnelClient.Stop()
  227. }
  228. // All workers -- runTunnels, establishment workers, and auxilliary
  229. // workers such as fetch remote server list and untunneled uprade
  230. // download -- operate with the controller run context and will all
  231. // be interrupted when the run context is done.
  232. controller.runWaitGroup.Wait()
  233. controller.splitTunnelClassifier.Shutdown()
  234. NoticeInfo("exiting controller")
  235. NoticeExiting()
  236. }
  237. // SignalComponentFailure notifies the controller that an associated component has failed.
  238. // This will terminate the controller.
  239. func (controller *Controller) SignalComponentFailure() {
  240. NoticeAlert("controller shutdown due to component failure")
  241. controller.stopRunning()
  242. }
  243. // SetClientVerificationPayloadForActiveTunnels sets the client verification
  244. // payload that is to be sent in client verification requests to all established
  245. // tunnels.
  246. //
  247. // Client verification is used to verify that the client is a
  248. // valid Psiphon client, which will determine how the server treats
  249. // the client traffic. The proof-of-validity is platform-specific
  250. // and the payload is opaque to this function but assumed to be JSON.
  251. //
  252. // Since, in some cases, verification payload cannot be determined until
  253. // after tunnel-core starts, the payload cannot be simply specified in
  254. // the Config.
  255. //
  256. // SetClientVerificationPayloadForActiveTunnels will not block enqueuing a new verification
  257. // payload. One new payload can be enqueued, after which additional payloads
  258. // will be dropped if a payload is still enqueued.
  259. func (controller *Controller) SetClientVerificationPayloadForActiveTunnels(clientVerificationPayload string) {
  260. select {
  261. case controller.newClientVerificationPayload <- clientVerificationPayload:
  262. default:
  263. }
  264. }
  265. // remoteServerListFetcher fetches an out-of-band list of server entries
  266. // for more tunnel candidates. It fetches when signalled, with retries
  267. // on failure.
  268. func (controller *Controller) remoteServerListFetcher(
  269. name string,
  270. fetcher RemoteServerListFetcher,
  271. signal <-chan struct{},
  272. retryPeriod, stalePeriod time.Duration) {
  273. defer controller.runWaitGroup.Done()
  274. var lastFetchTime monotime.Time
  275. fetcherLoop:
  276. for {
  277. // Wait for a signal before fetching
  278. select {
  279. case <-signal:
  280. case <-controller.runCtx.Done():
  281. break fetcherLoop
  282. }
  283. // Skip fetch entirely (i.e., send no request at all, even when ETag would save
  284. // on response size) when a recent fetch was successful
  285. if lastFetchTime != 0 &&
  286. lastFetchTime.Add(stalePeriod).After(monotime.Now()) {
  287. continue
  288. }
  289. retryLoop:
  290. for attempt := 0; ; attempt++ {
  291. // Don't attempt to fetch while there is no network connectivity,
  292. // to avoid alert notice noise.
  293. if !WaitForNetworkConnectivity(
  294. controller.runCtx,
  295. controller.config.NetworkConnectivityChecker) {
  296. break fetcherLoop
  297. }
  298. // Pick any active tunnel and make the next fetch attempt. If there's
  299. // no active tunnel, the untunneledDialConfig will be used.
  300. tunnel := controller.getNextActiveTunnel()
  301. err := fetcher(
  302. controller.runCtx,
  303. controller.config,
  304. attempt,
  305. tunnel,
  306. controller.untunneledDialConfig)
  307. if err == nil {
  308. lastFetchTime = monotime.Now()
  309. break retryLoop
  310. }
  311. NoticeAlert("failed to fetch %s remote server list: %s", name, err)
  312. timer := time.NewTimer(retryPeriod)
  313. select {
  314. case <-timer.C:
  315. case <-controller.runCtx.Done():
  316. timer.Stop()
  317. break fetcherLoop
  318. }
  319. }
  320. }
  321. NoticeInfo("exiting %s remote server list fetcher", name)
  322. }
  323. // establishTunnelWatcher terminates the controller if a tunnel
  324. // has not been established in the configured time period. This
  325. // is regardless of how many tunnels are presently active -- meaning
  326. // that if an active tunnel was established and lost the controller
  327. // is left running (to re-establish).
  328. func (controller *Controller) establishTunnelWatcher() {
  329. defer controller.runWaitGroup.Done()
  330. timer := time.NewTimer(
  331. time.Duration(*controller.config.EstablishTunnelTimeoutSeconds) * time.Second)
  332. defer timer.Stop()
  333. select {
  334. case <-timer.C:
  335. if !controller.hasEstablishedOnce() {
  336. NoticeAlert("failed to establish tunnel before timeout")
  337. controller.SignalComponentFailure()
  338. }
  339. case <-controller.runCtx.Done():
  340. }
  341. NoticeInfo("exiting establish tunnel watcher")
  342. }
  343. // connectedReporter sends periodic "connected" requests to the Psiphon API.
  344. // These requests are for server-side unique user stats calculation. See the
  345. // comment in DoConnectedRequest for a description of the request mechanism.
  346. // To ensure we don't over- or under-count unique users, only one connected
  347. // request is made across all simultaneous multi-tunnels; and the connected
  348. // request is repeated periodically for very long-lived tunnels.
  349. // The signalReportConnected mechanism is used to trigger another connected
  350. // request immediately after a reconnect.
  351. func (controller *Controller) connectedReporter() {
  352. defer controller.runWaitGroup.Done()
  353. loop:
  354. for {
  355. // Pick any active tunnel and make the next connected request. No error
  356. // is logged if there's no active tunnel, as that's not an unexpected condition.
  357. reported := false
  358. tunnel := controller.getNextActiveTunnel()
  359. if tunnel != nil {
  360. err := tunnel.serverContext.DoConnectedRequest()
  361. if err == nil {
  362. reported = true
  363. } else {
  364. NoticeAlert("failed to make connected request: %s", err)
  365. }
  366. }
  367. // Schedule the next connected request and wait.
  368. var duration time.Duration
  369. if reported {
  370. duration = PSIPHON_API_CONNECTED_REQUEST_PERIOD
  371. } else {
  372. duration = PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD
  373. }
  374. timer := time.NewTimer(duration)
  375. doBreak := false
  376. select {
  377. case <-controller.signalReportConnected:
  378. case <-timer.C:
  379. // Make another connected request
  380. case <-controller.runCtx.Done():
  381. doBreak = true
  382. }
  383. timer.Stop()
  384. if doBreak {
  385. break loop
  386. }
  387. }
  388. NoticeInfo("exiting connected reporter")
  389. }
  390. func (controller *Controller) startOrSignalConnectedReporter() {
  391. // session is nil when DisableApi is set
  392. if controller.config.DisableApi {
  393. return
  394. }
  395. // Start the connected reporter after the first tunnel is established.
  396. // Concurrency note: only the runTunnels goroutine may access startedConnectedReporter.
  397. if !controller.startedConnectedReporter {
  398. controller.startedConnectedReporter = true
  399. controller.runWaitGroup.Add(1)
  400. go controller.connectedReporter()
  401. } else {
  402. select {
  403. case controller.signalReportConnected <- *new(struct{}):
  404. default:
  405. }
  406. }
  407. }
  408. // upgradeDownloader makes periodic attempts to complete a client upgrade
  409. // download. DownloadUpgrade() is resumable, so each attempt has potential for
  410. // getting closer to completion, even in conditions where the download or
  411. // tunnel is repeatedly interrupted.
  412. // An upgrade download is triggered by either a handshake response indicating
  413. // that a new version is available; or after failing to connect, in which case
  414. // it's useful to check, out-of-band, for an upgrade with new circumvention
  415. // capabilities.
  416. // Once the download operation completes successfully, the downloader exits
  417. // and is not run again: either there is not a newer version, or the upgrade
  418. // has been downloaded and is ready to be applied.
  419. // We're assuming that the upgrade will be applied and the entire system
  420. // restarted before another upgrade is to be downloaded.
  421. //
  422. // TODO: refactor upgrade downloader and remote server list fetcher to use
  423. // common code (including the resumable download routines).
  424. //
  425. func (controller *Controller) upgradeDownloader() {
  426. defer controller.runWaitGroup.Done()
  427. var lastDownloadTime monotime.Time
  428. downloadLoop:
  429. for {
  430. // Wait for a signal before downloading
  431. var handshakeVersion string
  432. select {
  433. case handshakeVersion = <-controller.signalDownloadUpgrade:
  434. case <-controller.runCtx.Done():
  435. break downloadLoop
  436. }
  437. // Unless handshake is explicitly advertizing a new version, skip
  438. // checking entirely when a recent download was successful.
  439. if handshakeVersion == "" &&
  440. lastDownloadTime != 0 &&
  441. lastDownloadTime.Add(DOWNLOAD_UPGRADE_STALE_PERIOD).After(monotime.Now()) {
  442. continue
  443. }
  444. retryLoop:
  445. for attempt := 0; ; attempt++ {
  446. // Don't attempt to download while there is no network connectivity,
  447. // to avoid alert notice noise.
  448. if !WaitForNetworkConnectivity(
  449. controller.runCtx,
  450. controller.config.NetworkConnectivityChecker) {
  451. break downloadLoop
  452. }
  453. // Pick any active tunnel and make the next download attempt. If there's
  454. // no active tunnel, the untunneledDialConfig will be used.
  455. tunnel := controller.getNextActiveTunnel()
  456. err := DownloadUpgrade(
  457. controller.runCtx,
  458. controller.config,
  459. attempt,
  460. handshakeVersion,
  461. tunnel,
  462. controller.untunneledDialConfig)
  463. if err == nil {
  464. lastDownloadTime = monotime.Now()
  465. break retryLoop
  466. }
  467. NoticeAlert("failed to download upgrade: %s", err)
  468. timer := time.NewTimer(
  469. time.Duration(*controller.config.DownloadUpgradeRetryPeriodSeconds) * time.Second)
  470. select {
  471. case <-timer.C:
  472. case <-controller.runCtx.Done():
  473. timer.Stop()
  474. break downloadLoop
  475. }
  476. }
  477. }
  478. NoticeInfo("exiting upgrade downloader")
  479. }
  480. // runTunnels is the controller tunnel management main loop. It starts and stops
  481. // establishing tunnels based on the target tunnel pool size and the current size
  482. // of the pool. Tunnels are established asynchronously using worker goroutines.
  483. //
  484. // When there are no server entries for the target region/protocol, the
  485. // establishCandidateGenerator will yield no candidates and wait before
  486. // trying again. In the meantime, a remote server entry fetch may supply
  487. // valid candidates.
  488. //
  489. // When a tunnel is established, it's added to the active pool. The tunnel's
  490. // operateTunnel goroutine monitors the tunnel.
  491. //
  492. // When a tunnel fails, it's removed from the pool and the establish process is
  493. // restarted to fill the pool.
  494. func (controller *Controller) runTunnels() {
  495. defer controller.runWaitGroup.Done()
  496. var clientVerificationPayload string
  497. // Start running
  498. controller.startEstablishing()
  499. loop:
  500. for {
  501. select {
  502. case failedTunnel := <-controller.failedTunnels:
  503. NoticeAlert("tunnel failed: %s", failedTunnel.serverEntry.IpAddress)
  504. controller.terminateTunnel(failedTunnel)
  505. controller.classifyImpairedProtocol(failedTunnel)
  506. // Clear the reference to this tunnel before calling startEstablishing,
  507. // which will invoke a garbage collection.
  508. failedTunnel = nil
  509. // Concurrency note: only this goroutine may call startEstablishing/stopEstablishing,
  510. // which reference controller.isEstablishing.
  511. controller.startEstablishing()
  512. case connectedTunnel := <-controller.connectedTunnels:
  513. if controller.isImpairedProtocol(connectedTunnel.protocol) {
  514. // Protocol was classified as impaired while this tunnel established.
  515. // This is most likely to occur with TunnelPoolSize > 0. We log the
  516. // event but take no action. Discarding the tunnel would break the
  517. // impaired logic unless we did that (a) only if there are other
  518. // unimpaired protocols; (b) only during the first iteration of the
  519. // ESTABLISH_TUNNEL_WORK_TIME loop. By not discarding here, a true
  520. // impaired protocol may require an extra reconnect.
  521. NoticeAlert("connected tunnel with impaired protocol: %s", connectedTunnel.protocol)
  522. }
  523. // Tunnel establishment has two phases: connection and activation.
  524. //
  525. // Connection is run concurrently by the establishTunnelWorkers, to minimize
  526. // delay when it's not yet known which server and protocol will be available
  527. // and unblocked.
  528. //
  529. // Activation is run serially, here, to minimize the overhead of making a
  530. // handshake request and starting the operateTunnel management worker for a
  531. // tunnel which may be discarded.
  532. //
  533. // When the active tunnel will complete establishment, establishment is
  534. // stopped before activation. This interrupts all connecting tunnels and
  535. // garbage collects their memory. The purpose is to minimize memory
  536. // pressure when the handshake request is made. In the unlikely case that the
  537. // handshake fails, establishment is restarted.
  538. //
  539. // Any delays in stopEstablishing will delay the handshake for the last
  540. // active tunnel.
  541. //
  542. // In the typical case of TunnelPoolSize of 1, only a single handshake is
  543. // performed and the homepages notices file, when used, will not be modifed
  544. // after the NoticeTunnels(1) [i.e., connected] until NoticeTunnels(0) [i.e.,
  545. // disconnected]. For TunnelPoolSize > 1, serial handshakes only ensures that
  546. // each set of emitted NoticeHomepages is contiguous.
  547. active, outstanding := controller.numTunnels()
  548. // discardTunnel will be true here when already fully established.
  549. discardTunnel := (outstanding <= 0)
  550. isFirstTunnel := (active == 0)
  551. isLastTunnel := (outstanding == 1)
  552. if !discardTunnel {
  553. if isLastTunnel {
  554. controller.stopEstablishing()
  555. }
  556. err := connectedTunnel.Activate(controller.runCtx, controller)
  557. if err != nil {
  558. // Assume the Activate failed due to a broken tunnel connection,
  559. // currently the most likely case, and classify as impaired, as in
  560. // the failed tunnel case above.
  561. // TODO: distinguish between network and other errors
  562. controller.classifyImpairedProtocol(connectedTunnel)
  563. NoticeAlert("failed to activate %s: %s", connectedTunnel.serverEntry.IpAddress, err)
  564. discardTunnel = true
  565. } else {
  566. // It's unlikely that registerTunnel will fail, since only this goroutine
  567. // calls registerTunnel -- and after checking numTunnels; so failure is not
  568. // expected.
  569. if !controller.registerTunnel(connectedTunnel) {
  570. NoticeAlert("failed to register %s: %s", connectedTunnel.serverEntry.IpAddress)
  571. discardTunnel = true
  572. }
  573. }
  574. // May need to replace this tunnel
  575. if isLastTunnel && discardTunnel {
  576. controller.startEstablishing()
  577. }
  578. }
  579. if discardTunnel {
  580. controller.discardTunnel(connectedTunnel)
  581. // Clear the reference to this discarded tunnel and immediately run
  582. // a garbage collection to reclaim its memory.
  583. connectedTunnel = nil
  584. aggressiveGarbageCollection()
  585. // Skip the rest of this case
  586. break
  587. }
  588. NoticeActiveTunnel(
  589. connectedTunnel.serverEntry.IpAddress,
  590. connectedTunnel.protocol,
  591. connectedTunnel.serverEntry.SupportsSSHAPIRequests())
  592. if isFirstTunnel {
  593. // The split tunnel classifier is started once the first tunnel is
  594. // established. This first tunnel is passed in to be used to make
  595. // the routes data request.
  596. // A long-running controller may run while the host device is present
  597. // in different regions. In this case, we want the split tunnel logic
  598. // to switch to routes for new regions and not classify traffic based
  599. // on routes installed for older regions.
  600. // We assume that when regions change, the host network will also
  601. // change, and so all tunnels will fail and be re-established. Under
  602. // that assumption, the classifier will be re-Start()-ed here when
  603. // the region has changed.
  604. controller.splitTunnelClassifier.Start(connectedTunnel)
  605. // Signal a connected request on each 1st tunnel establishment. For
  606. // multi-tunnels, the session is connected as long as at least one
  607. // tunnel is established.
  608. controller.startOrSignalConnectedReporter()
  609. // If the handshake indicated that a new client version is available,
  610. // trigger an upgrade download.
  611. // Note: serverContext is nil when DisableApi is set
  612. if connectedTunnel.serverContext != nil &&
  613. connectedTunnel.serverContext.clientUpgradeVersion != "" {
  614. handshakeVersion := connectedTunnel.serverContext.clientUpgradeVersion
  615. select {
  616. case controller.signalDownloadUpgrade <- handshakeVersion:
  617. default:
  618. }
  619. }
  620. }
  621. // Set the new tunnel as the transport for the packet tunnel. The packet tunnel
  622. // client remains up when reestablishing, but no packets are relayed while there
  623. // is no connected tunnel. UseTunnel will establish a new packet tunnel SSH
  624. // channel over the new SSH tunnel and configure the packet tunnel client to use
  625. // the new SSH channel as its transport.
  626. //
  627. // Note: as is, this logic is suboptimal for TunnelPoolSize > 1, as this would
  628. // continuously initialize new packet tunnel sessions for each established
  629. // server. For now, config validation requires TunnelPoolSize == 1 when
  630. // the packet tunnel is used.
  631. if controller.packetTunnelTransport != nil {
  632. controller.packetTunnelTransport.UseTunnel(connectedTunnel)
  633. }
  634. // TODO: design issue -- might not be enough server entries with region/caps to ever fill tunnel slots;
  635. // possible solution is establish target MIN(CountServerEntries(region, protocol), TunnelPoolSize)
  636. if controller.isFullyEstablished() {
  637. controller.stopEstablishing()
  638. }
  639. case clientVerificationPayload = <-controller.newClientVerificationPayload:
  640. controller.setClientVerificationPayloadForActiveTunnels(clientVerificationPayload)
  641. case <-controller.runCtx.Done():
  642. break loop
  643. }
  644. }
  645. // Stop running
  646. controller.stopEstablishing()
  647. controller.terminateAllTunnels()
  648. // Drain tunnel channels
  649. close(controller.connectedTunnels)
  650. for tunnel := range controller.connectedTunnels {
  651. controller.discardTunnel(tunnel)
  652. }
  653. close(controller.failedTunnels)
  654. for tunnel := range controller.failedTunnels {
  655. controller.discardTunnel(tunnel)
  656. }
  657. NoticeInfo("exiting run tunnels")
  658. }
  659. // TerminateNextActiveTunnel is a support routine for
  660. // test code that must terminate the active tunnel and
  661. // restart establishing. This function is not guaranteed
  662. // to be safe for use in other cases.
  663. func (controller *Controller) TerminateNextActiveTunnel() {
  664. tunnel := controller.getNextActiveTunnel()
  665. if tunnel != nil {
  666. controller.SignalTunnelFailure(tunnel)
  667. NoticeInfo("terminated tunnel: %s", tunnel.serverEntry.IpAddress)
  668. }
  669. }
  670. // classifyImpairedProtocol tracks "impaired" protocol classifications for failed
  671. // tunnels. A protocol is classified as impaired if a tunnel using that protocol
  672. // fails, repeatedly, shortly after the start of the connection. During tunnel
  673. // establishment, impaired protocols are briefly skipped.
  674. //
  675. // One purpose of this measure is to defend against an attack where the adversary,
  676. // for example, tags an OSSH TCP connection as an "unidentified" protocol; allows
  677. // it to connect; but then kills the underlying TCP connection after a short time.
  678. // Since OSSH has less latency than other protocols that may bypass an "unidentified"
  679. // filter, these other protocols might never be selected for use.
  680. //
  681. // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol
  682. func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) {
  683. // If the tunnel failed while activating, its establishedTime will be 0.
  684. if failedTunnel.establishedTime == 0 ||
  685. failedTunnel.establishedTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(monotime.Now()) {
  686. controller.impairedProtocolClassification[failedTunnel.protocol] += 1
  687. } else {
  688. controller.impairedProtocolClassification[failedTunnel.protocol] = 0
  689. }
  690. // Reset classification once all known protocols are classified as impaired, as
  691. // there is now no way to proceed with only unimpaired protocols. The network
  692. // situation (or attack) resulting in classification may not be protocol-specific.
  693. //
  694. // Note: with controller.config.TunnelProtocol set, this will always reset once
  695. // that protocol has reached IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD.
  696. if CountNonImpairedProtocols(
  697. controller.config.EgressRegion,
  698. controller.config.TunnelProtocol,
  699. controller.getImpairedProtocols()) == 0 {
  700. controller.impairedProtocolClassification = make(map[string]int)
  701. }
  702. }
  703. // getImpairedProtocols returns a list of protocols that have sufficient
  704. // classifications to be considered impaired protocols.
  705. //
  706. // Concurrency note: only the runTunnels() goroutine may call getImpairedProtocols
  707. func (controller *Controller) getImpairedProtocols() []string {
  708. NoticeImpairedProtocolClassification(controller.impairedProtocolClassification)
  709. impairedProtocols := make([]string, 0)
  710. for protocol, count := range controller.impairedProtocolClassification {
  711. if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
  712. impairedProtocols = append(impairedProtocols, protocol)
  713. }
  714. }
  715. return impairedProtocols
  716. }
  717. // isImpairedProtocol checks if the specified protocol is classified as impaired.
  718. //
  719. // Concurrency note: only the runTunnels() goroutine may call isImpairedProtocol
  720. func (controller *Controller) isImpairedProtocol(protocol string) bool {
  721. count, ok := controller.impairedProtocolClassification[protocol]
  722. return ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD
  723. }
  724. // SignalSeededNewSLOK implements the TunnelOwner interface. This function
  725. // is called by Tunnel.operateTunnel when the tunnel has received a new,
  726. // previously unknown SLOK from the server. The Controller triggers an OSL
  727. // fetch, as the new SLOK may be sufficient to access new OSLs.
  728. func (controller *Controller) SignalSeededNewSLOK() {
  729. select {
  730. case controller.signalFetchObfuscatedServerLists <- *new(struct{}):
  731. default:
  732. }
  733. }
  734. // SignalTunnelFailure implements the TunnelOwner interface. This function
  735. // is called by Tunnel.operateTunnel when the tunnel has detected that it
  736. // has failed. The Controller will signal runTunnels to create a new
  737. // tunnel and/or remove the tunnel from the list of active tunnels.
  738. func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) {
  739. // Don't block. Assumes the receiver has a buffer large enough for
  740. // the typical number of operated tunnels. In case there's no room,
  741. // terminate the tunnel (runTunnels won't get a signal in this case,
  742. // but the tunnel will be removed from the list of active tunnels).
  743. select {
  744. case controller.failedTunnels <- tunnel:
  745. default:
  746. controller.terminateTunnel(tunnel)
  747. }
  748. }
  749. // discardTunnel disposes of a successful connection that is no longer required.
  750. func (controller *Controller) discardTunnel(tunnel *Tunnel) {
  751. NoticeInfo("discard tunnel: %s", tunnel.serverEntry.IpAddress)
  752. // TODO: not calling PromoteServerEntry, since that would rank the
  753. // discarded tunnel before fully active tunnels. Can a discarded tunnel
  754. // be promoted (since it connects), but with lower rank than all active
  755. // tunnels?
  756. tunnel.Close(true)
  757. }
  758. // registerTunnel adds the connected tunnel to the pool of active tunnels
  759. // which are candidates for port forwarding. Returns true if the pool has an
  760. // empty slot and false if the pool is full (caller should discard the tunnel).
  761. func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
  762. controller.tunnelMutex.Lock()
  763. defer controller.tunnelMutex.Unlock()
  764. if len(controller.tunnels) >= controller.config.TunnelPoolSize {
  765. return false
  766. }
  767. // Perform a final check just in case we've established
  768. // a duplicate connection.
  769. for _, activeTunnel := range controller.tunnels {
  770. if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress {
  771. NoticeAlert("duplicate tunnel: %s", tunnel.serverEntry.IpAddress)
  772. return false
  773. }
  774. }
  775. controller.establishedOnce = true
  776. controller.tunnels = append(controller.tunnels, tunnel)
  777. NoticeTunnels(len(controller.tunnels))
  778. // Promote this successful tunnel to first rank so it's one
  779. // of the first candidates next time establish runs.
  780. // Connecting to a TargetServerEntry does not change the
  781. // ranking.
  782. if controller.config.TargetServerEntry == "" {
  783. PromoteServerEntry(tunnel.serverEntry.IpAddress)
  784. }
  785. return true
  786. }
  787. // hasEstablishedOnce indicates if at least one active tunnel has
  788. // been established up to this point. This is regardeless of how many
  789. // tunnels are presently active.
  790. func (controller *Controller) hasEstablishedOnce() bool {
  791. controller.tunnelMutex.Lock()
  792. defer controller.tunnelMutex.Unlock()
  793. return controller.establishedOnce
  794. }
  795. // isFullyEstablished indicates if the pool of active tunnels is full.
  796. func (controller *Controller) isFullyEstablished() bool {
  797. controller.tunnelMutex.Lock()
  798. defer controller.tunnelMutex.Unlock()
  799. return len(controller.tunnels) >= controller.config.TunnelPoolSize
  800. }
  801. // numTunnels returns the number of active and outstanding tunnels.
  802. // Oustanding is the number of tunnels required to fill the pool of
  803. // active tunnels.
  804. func (controller *Controller) numTunnels() (int, int) {
  805. controller.tunnelMutex.Lock()
  806. defer controller.tunnelMutex.Unlock()
  807. active := len(controller.tunnels)
  808. outstanding := controller.config.TunnelPoolSize - len(controller.tunnels)
  809. return active, outstanding
  810. }
  811. // terminateTunnel removes a tunnel from the pool of active tunnels
  812. // and closes the tunnel. The next-tunnel state used by getNextActiveTunnel
  813. // is adjusted as required.
  814. func (controller *Controller) terminateTunnel(tunnel *Tunnel) {
  815. controller.tunnelMutex.Lock()
  816. defer controller.tunnelMutex.Unlock()
  817. for index, activeTunnel := range controller.tunnels {
  818. if tunnel == activeTunnel {
  819. controller.tunnels = append(
  820. controller.tunnels[:index], controller.tunnels[index+1:]...)
  821. if controller.nextTunnel > index {
  822. controller.nextTunnel--
  823. }
  824. if controller.nextTunnel >= len(controller.tunnels) {
  825. controller.nextTunnel = 0
  826. }
  827. activeTunnel.Close(false)
  828. NoticeTunnels(len(controller.tunnels))
  829. break
  830. }
  831. }
  832. }
  833. // terminateAllTunnels empties the tunnel pool, closing all active tunnels.
  834. // This is used when shutting down the controller.
  835. func (controller *Controller) terminateAllTunnels() {
  836. controller.tunnelMutex.Lock()
  837. defer controller.tunnelMutex.Unlock()
  838. // Closing all tunnels in parallel. In an orderly shutdown, each tunnel
  839. // may take a few seconds to send a final status request. We only want
  840. // to wait as long as the single slowest tunnel.
  841. closeWaitGroup := new(sync.WaitGroup)
  842. closeWaitGroup.Add(len(controller.tunnels))
  843. for _, activeTunnel := range controller.tunnels {
  844. tunnel := activeTunnel
  845. go func() {
  846. defer closeWaitGroup.Done()
  847. tunnel.Close(false)
  848. }()
  849. }
  850. closeWaitGroup.Wait()
  851. controller.tunnels = make([]*Tunnel, 0)
  852. controller.nextTunnel = 0
  853. NoticeTunnels(len(controller.tunnels))
  854. }
  855. // getNextActiveTunnel returns the next tunnel from the pool of active
  856. // tunnels. Currently, tunnel selection order is simple round-robin.
  857. func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
  858. controller.tunnelMutex.Lock()
  859. defer controller.tunnelMutex.Unlock()
  860. for i := len(controller.tunnels); i > 0; i-- {
  861. tunnel = controller.tunnels[controller.nextTunnel]
  862. controller.nextTunnel =
  863. (controller.nextTunnel + 1) % len(controller.tunnels)
  864. return tunnel
  865. }
  866. return nil
  867. }
  868. // isActiveTunnelServerEntry is used to check if there's already
  869. // an existing tunnel to a candidate server.
  870. func (controller *Controller) isActiveTunnelServerEntry(
  871. serverEntry *protocol.ServerEntry) bool {
  872. controller.tunnelMutex.Lock()
  873. defer controller.tunnelMutex.Unlock()
  874. for _, activeTunnel := range controller.tunnels {
  875. if activeTunnel.serverEntry.IpAddress == serverEntry.IpAddress {
  876. return true
  877. }
  878. }
  879. return false
  880. }
  881. // setClientVerificationPayloadForActiveTunnels triggers the client verification
  882. // request for all active tunnels.
  883. func (controller *Controller) setClientVerificationPayloadForActiveTunnels(
  884. clientVerificationPayload string) {
  885. controller.tunnelMutex.Lock()
  886. defer controller.tunnelMutex.Unlock()
  887. for _, activeTunnel := range controller.tunnels {
  888. activeTunnel.SetClientVerificationPayload(clientVerificationPayload)
  889. }
  890. }
  891. // Dial selects an active tunnel and establishes a port forward
  892. // connection through the selected tunnel. Failure to connect is considered
  893. // a port forward failure, for the purpose of monitoring tunnel health.
  894. func (controller *Controller) Dial(
  895. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
  896. tunnel := controller.getNextActiveTunnel()
  897. if tunnel == nil {
  898. return nil, common.ContextError(errors.New("no active tunnels"))
  899. }
  900. // Perform split tunnel classification when feature is enabled, and if the remote
  901. // address is classified as untunneled, dial directly.
  902. if !alwaysTunnel && controller.config.SplitTunnelDnsServer != "" {
  903. host, _, err := net.SplitHostPort(remoteAddr)
  904. if err != nil {
  905. return nil, common.ContextError(err)
  906. }
  907. // Note: a possible optimization, when split tunnel is active and IsUntunneled performs
  908. // a DNS resolution in order to make its classification, is to reuse that IP address in
  909. // the following Dials so they do not need to make their own resolutions. However, the
  910. // way this is currently implemented ensures that, e.g., DNS geo load balancing occurs
  911. // relative to the outbound network.
  912. if controller.splitTunnelClassifier.IsUntunneled(host) {
  913. return controller.DirectDial(remoteAddr)
  914. }
  915. }
  916. tunneledConn, err := tunnel.Dial(remoteAddr, alwaysTunnel, downstreamConn)
  917. if err != nil {
  918. return nil, common.ContextError(err)
  919. }
  920. return tunneledConn, nil
  921. }
  922. // DirectDial dials an untunneled TCP connection within the controller run context.
  923. func (controller *Controller) DirectDial(remoteAddr string) (conn net.Conn, err error) {
  924. return DialTCP(controller.runCtx, remoteAddr, controller.untunneledDialConfig)
  925. }
  926. // startEstablishing creates a pool of worker goroutines which will
  927. // attempt to establish tunnels to candidate servers. The candidates
  928. // are generated by another goroutine.
  929. func (controller *Controller) startEstablishing() {
  930. if controller.isEstablishing {
  931. return
  932. }
  933. NoticeInfo("start establishing")
  934. controller.concurrentEstablishTunnelsMutex.Lock()
  935. controller.concurrentEstablishTunnels = 0
  936. controller.concurrentMeekEstablishTunnels = 0
  937. controller.peakConcurrentEstablishTunnels = 0
  938. controller.peakConcurrentMeekEstablishTunnels = 0
  939. controller.concurrentEstablishTunnelsMutex.Unlock()
  940. aggressiveGarbageCollection()
  941. emitMemoryMetrics()
  942. // Note: the establish context cancelFunc, controller.stopEstablish,
  943. // is called in controller.stopEstablishing.
  944. controller.isEstablishing = true
  945. controller.establishCtx, controller.stopEstablish = context.WithCancel(controller.runCtx)
  946. controller.establishWaitGroup = new(sync.WaitGroup)
  947. controller.candidateServerEntries = make(chan *candidateServerEntry)
  948. // The server affinity mechanism attempts to favor the previously
  949. // used server when reconnecting. This is beneficial for user
  950. // applications which expect consistency in user IP address (for
  951. // example, a web site which prompts for additional user
  952. // authentication when the IP address changes).
  953. //
  954. // Only the very first server, as determined by
  955. // datastore.PromoteServerEntry(), is the server affinity candidate.
  956. // Concurrent connections attempts to many servers are launched
  957. // without delay, in case the affinity server connection fails.
  958. // While the affinity server connection is outstanding, when any
  959. // other connection is established, there is a short grace period
  960. // delay before delivering the established tunnel; this allows some
  961. // time for the affinity server connection to succeed first.
  962. // When the affinity server connection fails, any other established
  963. // tunnel is registered without delay.
  964. //
  965. // Note: the establishTunnelWorker that receives the affinity
  966. // candidate is solely resonsible for closing
  967. // controller.serverAffinityDoneBroadcast.
  968. //
  969. // Note: if config.EgressRegion or config.TunnelProtocol has changed
  970. // since the top server was promoted, the first server may not actually
  971. // be the last connected server.
  972. // TODO: should not favor the first server in this case
  973. controller.serverAffinityDoneBroadcast = make(chan struct{})
  974. for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
  975. controller.establishWaitGroup.Add(1)
  976. go controller.establishTunnelWorker()
  977. }
  978. controller.establishWaitGroup.Add(1)
  979. go controller.establishCandidateGenerator(
  980. controller.getImpairedProtocols())
  981. }
  982. // stopEstablishing signals the establish goroutines to stop and waits
  983. // for the group to halt.
  984. func (controller *Controller) stopEstablishing() {
  985. if !controller.isEstablishing {
  986. return
  987. }
  988. NoticeInfo("stop establishing")
  989. controller.stopEstablish()
  990. // Note: establishCandidateGenerator closes controller.candidateServerEntries
  991. // (as it may be sending to that channel).
  992. controller.establishWaitGroup.Wait()
  993. NoticeInfo("stopped establishing")
  994. controller.isEstablishing = false
  995. controller.establishCtx = nil
  996. controller.stopEstablish = nil
  997. controller.establishWaitGroup = nil
  998. controller.candidateServerEntries = nil
  999. controller.serverAffinityDoneBroadcast = nil
  1000. controller.concurrentEstablishTunnelsMutex.Lock()
  1001. peakConcurrent := controller.peakConcurrentEstablishTunnels
  1002. peakConcurrentMeek := controller.peakConcurrentMeekEstablishTunnels
  1003. controller.concurrentEstablishTunnels = 0
  1004. controller.concurrentMeekEstablishTunnels = 0
  1005. controller.peakConcurrentEstablishTunnels = 0
  1006. controller.peakConcurrentMeekEstablishTunnels = 0
  1007. controller.concurrentEstablishTunnelsMutex.Unlock()
  1008. NoticeInfo("peak concurrent establish tunnels: %d", peakConcurrent)
  1009. NoticeInfo("peak concurrent meek establish tunnels: %d", peakConcurrentMeek)
  1010. emitMemoryMetrics()
  1011. standardGarbageCollection()
  1012. }
  1013. // establishCandidateGenerator populates the candidate queue with server entries
  1014. // from the data store. Server entries are iterated in rank order, so that promoted
  1015. // servers with higher rank are priority candidates.
  1016. func (controller *Controller) establishCandidateGenerator(impairedProtocols []string) {
  1017. defer controller.establishWaitGroup.Done()
  1018. defer close(controller.candidateServerEntries)
  1019. // establishStartTime is used to calculate and report the
  1020. // client's tunnel establishment duration.
  1021. //
  1022. // networkWaitDuration is the elapsed time spent waiting
  1023. // for network connectivity. This duration will be excluded
  1024. // from reported tunnel establishment duration.
  1025. establishStartTime := monotime.Now()
  1026. var networkWaitDuration time.Duration
  1027. iterator, err := NewServerEntryIterator(controller.config)
  1028. if err != nil {
  1029. NoticeAlert("failed to iterate over candidates: %s", err)
  1030. controller.SignalComponentFailure()
  1031. return
  1032. }
  1033. defer iterator.Close()
  1034. isServerAffinityCandidate := true
  1035. // TODO: reconcile server affinity scheme with multi-tunnel mode
  1036. if controller.config.TunnelPoolSize > 1 {
  1037. isServerAffinityCandidate = false
  1038. close(controller.serverAffinityDoneBroadcast)
  1039. }
  1040. loop:
  1041. // Repeat until stopped
  1042. for i := 0; ; i++ {
  1043. networkWaitStartTime := monotime.Now()
  1044. if !WaitForNetworkConnectivity(
  1045. controller.establishCtx,
  1046. controller.config.NetworkConnectivityChecker) {
  1047. break loop
  1048. }
  1049. networkWaitDuration += monotime.Since(networkWaitStartTime)
  1050. // Send each iterator server entry to the establish workers
  1051. startTime := monotime.Now()
  1052. for {
  1053. serverEntry, err := iterator.Next()
  1054. if err != nil {
  1055. NoticeAlert("failed to get next candidate: %s", err)
  1056. controller.SignalComponentFailure()
  1057. break loop
  1058. }
  1059. if serverEntry == nil {
  1060. // Completed this iteration
  1061. break
  1062. }
  1063. if controller.config.TargetApiProtocol == protocol.PSIPHON_SSH_API_PROTOCOL &&
  1064. !serverEntry.SupportsSSHAPIRequests() {
  1065. continue
  1066. }
  1067. // Disable impaired protocols. This is only done for the
  1068. // first iteration of the ESTABLISH_TUNNEL_WORK_TIME
  1069. // loop since (a) one iteration should be sufficient to
  1070. // evade the attack; (b) there's a good chance of false
  1071. // positives (such as short tunnel durations due to network
  1072. // hopping on a mobile device).
  1073. // The edited serverEntry is temporary copy which is not
  1074. // stored or reused.
  1075. if i == 0 {
  1076. serverEntry.DisableImpairedProtocols(impairedProtocols)
  1077. if len(serverEntry.GetSupportedProtocols(false)) == 0 {
  1078. // Skip this server entry, as it has no supported
  1079. // protocols after disabling the impaired ones
  1080. // TODO: modify ServerEntryIterator to skip these?
  1081. continue
  1082. }
  1083. }
  1084. // adjustedEstablishStartTime is establishStartTime shifted
  1085. // to exclude time spent waiting for network connectivity.
  1086. candidate := &candidateServerEntry{
  1087. serverEntry: serverEntry,
  1088. isServerAffinityCandidate: isServerAffinityCandidate,
  1089. adjustedEstablishStartTime: establishStartTime.Add(networkWaitDuration),
  1090. }
  1091. wasServerAffinityCandidate := isServerAffinityCandidate
  1092. // Note: there must be only one server affinity candidate, as it
  1093. // closes the serverAffinityDoneBroadcast channel.
  1094. isServerAffinityCandidate = false
  1095. // TODO: here we could generate multiple candidates from the
  1096. // server entry when there are many MeekFrontingAddresses.
  1097. select {
  1098. case controller.candidateServerEntries <- candidate:
  1099. case <-controller.establishCtx.Done():
  1100. break loop
  1101. }
  1102. if startTime.Add(ESTABLISH_TUNNEL_WORK_TIME).Before(monotime.Now()) {
  1103. // Start over, after a brief pause, with a new shuffle of the server
  1104. // entries, and potentially some newly fetched server entries.
  1105. break
  1106. }
  1107. if wasServerAffinityCandidate {
  1108. // Don't start the next candidate until either the server affinity
  1109. // candidate has completed (success or failure) or is still working
  1110. // and the grace period has elapsed.
  1111. timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD)
  1112. select {
  1113. case <-timer.C:
  1114. case <-controller.serverAffinityDoneBroadcast:
  1115. case <-controller.establishCtx.Done():
  1116. timer.Stop()
  1117. break loop
  1118. }
  1119. timer.Stop()
  1120. } else if controller.config.StaggerConnectionWorkersMilliseconds != 0 {
  1121. // Stagger concurrent connection workers.
  1122. timer := time.NewTimer(time.Millisecond * time.Duration(
  1123. controller.config.StaggerConnectionWorkersMilliseconds))
  1124. select {
  1125. case <-timer.C:
  1126. case <-controller.establishCtx.Done():
  1127. timer.Stop()
  1128. break loop
  1129. }
  1130. timer.Stop()
  1131. }
  1132. }
  1133. // Free up resources now, but don't reset until after the pause.
  1134. iterator.Close()
  1135. // Trigger a common remote server list fetch, since we may have failed
  1136. // to connect with all known servers. Don't block sending signal, since
  1137. // this signal may have already been sent.
  1138. // Don't wait for fetch remote to succeed, since it may fail and
  1139. // enter a retry loop and we're better off trying more known servers.
  1140. // TODO: synchronize the fetch response, so it can be incorporated
  1141. // into the server entry iterator as soon as available.
  1142. select {
  1143. case controller.signalFetchCommonRemoteServerList <- *new(struct{}):
  1144. default:
  1145. }
  1146. // Trigger an OSL fetch in parallel. Both fetches are run in parallel
  1147. // so that if one out of the common RLS and OSL set is large, it doesn't
  1148. // doesn't entirely block fetching the other.
  1149. select {
  1150. case controller.signalFetchObfuscatedServerLists <- *new(struct{}):
  1151. default:
  1152. }
  1153. // Trigger an out-of-band upgrade availability check and download.
  1154. // Since we may have failed to connect, we may benefit from upgrading
  1155. // to a new client version with new circumvention capabilities.
  1156. select {
  1157. case controller.signalDownloadUpgrade <- "":
  1158. default:
  1159. }
  1160. // After a complete iteration of candidate servers, pause before iterating again.
  1161. // This helps avoid some busy wait loop conditions, and also allows some time for
  1162. // network conditions to change. Also allows for fetch remote to complete,
  1163. // in typical conditions (it isn't strictly necessary to wait for this, there will
  1164. // be more rounds if required).
  1165. timer := time.NewTimer(
  1166. time.Duration(*controller.config.EstablishTunnelPausePeriodSeconds) * time.Second)
  1167. select {
  1168. case <-timer.C:
  1169. // Retry iterating
  1170. case <-controller.establishCtx.Done():
  1171. timer.Stop()
  1172. break loop
  1173. }
  1174. timer.Stop()
  1175. iterator.Reset()
  1176. }
  1177. }
  1178. // establishTunnelWorker pulls candidates from the candidate queue, establishes
  1179. // a connection to the tunnel server, and delivers the connected tunnel to a channel.
  1180. func (controller *Controller) establishTunnelWorker() {
  1181. defer controller.establishWaitGroup.Done()
  1182. loop:
  1183. for candidateServerEntry := range controller.candidateServerEntries {
  1184. // Note: don't receive from candidateServerEntries and isStopEstablishing
  1185. // in the same select, since we want to prioritize receiving the stop signal
  1186. if controller.isStopEstablishing() {
  1187. break loop
  1188. }
  1189. // There may already be a tunnel to this candidate. If so, skip it.
  1190. if controller.isActiveTunnelServerEntry(candidateServerEntry.serverEntry) {
  1191. continue
  1192. }
  1193. // ConnectTunnel will allocate significant memory, so first attempt to
  1194. // reclaim as much as possible.
  1195. aggressiveGarbageCollection()
  1196. // Select the tunnel protocol. Unless config.TunnelProtocol is set, the
  1197. // selection will be made at random from protocols supported by the
  1198. // server entry.
  1199. //
  1200. // When limiting concurrent meek connection workers, and at the limit,
  1201. // do not select meek since otherwise the candidate must be skipped.
  1202. //
  1203. // If at the limit and unabled to select a non-meek protocol, skip the
  1204. // candidate entirely and move on to the next. Since candidates are shuffled
  1205. // it's probable that the next candidate is not meek. In this case, a
  1206. // StaggerConnectionWorkersMilliseconds delay may still be incurred.
  1207. excludeMeek := false
  1208. controller.concurrentEstablishTunnelsMutex.Lock()
  1209. if controller.config.LimitMeekConnectionWorkers > 0 &&
  1210. controller.concurrentMeekEstablishTunnels >=
  1211. controller.config.LimitMeekConnectionWorkers {
  1212. excludeMeek = true
  1213. }
  1214. controller.concurrentEstablishTunnelsMutex.Unlock()
  1215. selectedProtocol, err := selectProtocol(
  1216. controller.config, candidateServerEntry.serverEntry, excludeMeek)
  1217. if err == errProtocolNotSupported {
  1218. // selectProtocol returns errProtocolNotSupported when excludeMeek
  1219. // is set and the server entry only supports meek protocols.
  1220. // Skip this candidate.
  1221. continue
  1222. }
  1223. var tunnel *Tunnel
  1224. if err == nil {
  1225. isMeek := protocol.TunnelProtocolUsesMeek(selectedProtocol) ||
  1226. protocol.TunnelProtocolUsesMeek(selectedProtocol)
  1227. controller.concurrentEstablishTunnelsMutex.Lock()
  1228. if isMeek {
  1229. // Recheck the limit now that we know we're selecting meek and
  1230. // adjusting concurrentMeekEstablishTunnels.
  1231. if controller.config.LimitMeekConnectionWorkers > 0 &&
  1232. controller.concurrentMeekEstablishTunnels >=
  1233. controller.config.LimitMeekConnectionWorkers {
  1234. // Skip this candidate.
  1235. controller.concurrentEstablishTunnelsMutex.Unlock()
  1236. continue
  1237. }
  1238. controller.concurrentMeekEstablishTunnels += 1
  1239. if controller.concurrentMeekEstablishTunnels > controller.peakConcurrentMeekEstablishTunnels {
  1240. controller.peakConcurrentMeekEstablishTunnels = controller.concurrentMeekEstablishTunnels
  1241. }
  1242. }
  1243. controller.concurrentEstablishTunnels += 1
  1244. if controller.concurrentEstablishTunnels > controller.peakConcurrentEstablishTunnels {
  1245. controller.peakConcurrentEstablishTunnels = controller.concurrentEstablishTunnels
  1246. }
  1247. controller.concurrentEstablishTunnelsMutex.Unlock()
  1248. tunnel, err = ConnectTunnel(
  1249. controller.establishCtx,
  1250. controller.config,
  1251. controller.sessionId,
  1252. candidateServerEntry.serverEntry,
  1253. selectedProtocol,
  1254. candidateServerEntry.adjustedEstablishStartTime)
  1255. controller.concurrentEstablishTunnelsMutex.Lock()
  1256. if isMeek {
  1257. controller.concurrentMeekEstablishTunnels -= 1
  1258. }
  1259. controller.concurrentEstablishTunnels -= 1
  1260. controller.concurrentEstablishTunnelsMutex.Unlock()
  1261. }
  1262. // Periodically emit memory metrics during the establishment cycle.
  1263. if !controller.isStopEstablishing() {
  1264. emitMemoryMetrics()
  1265. }
  1266. // Immediately reclaim memory allocated by the establishment. In the case
  1267. // of failure, first clear the reference to the tunnel. In the case of
  1268. // success, the garbage collection may still be effective as the initial
  1269. // phases of some protocols involve significant memory allocation that
  1270. // could now be reclaimed.
  1271. if err != nil {
  1272. tunnel = nil
  1273. }
  1274. aggressiveGarbageCollection()
  1275. if err != nil {
  1276. // Unblock other candidates immediately when
  1277. // server affinity candidate fails.
  1278. if candidateServerEntry.isServerAffinityCandidate {
  1279. close(controller.serverAffinityDoneBroadcast)
  1280. }
  1281. // Before emitting error, check if establish interrupted, in which
  1282. // case the error is noise.
  1283. if controller.isStopEstablishing() {
  1284. break loop
  1285. }
  1286. NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
  1287. continue
  1288. }
  1289. // Deliver connected tunnel.
  1290. // Don't block. Assumes the receiver has a buffer large enough for
  1291. // the number of desired tunnels. If there's no room, the tunnel must
  1292. // not be required so it's discarded.
  1293. select {
  1294. case controller.connectedTunnels <- tunnel:
  1295. default:
  1296. controller.discardTunnel(tunnel)
  1297. // Clear the reference to this discarded tunnel and immediately run
  1298. // a garbage collection to reclaim its memory.
  1299. tunnel = nil
  1300. aggressiveGarbageCollection()
  1301. }
  1302. // Unblock other candidates only after delivering when
  1303. // server affinity candidate succeeds.
  1304. if candidateServerEntry.isServerAffinityCandidate {
  1305. close(controller.serverAffinityDoneBroadcast)
  1306. }
  1307. }
  1308. }
  1309. func (controller *Controller) isStopEstablishing() bool {
  1310. select {
  1311. case <-controller.establishCtx.Done():
  1312. return true
  1313. default:
  1314. }
  1315. return false
  1316. }