controller.go 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // Package psiphon implements the core tunnel functionality of a Psiphon client.
  20. // The main function is RunForever, which runs a Controller that obtains lists of
  21. // servers, establishes tunnel connections, and runs local proxies through which
  22. // tunneled traffic may be sent.
  23. package psiphon
  24. import (
  25. "context"
  26. "errors"
  27. "fmt"
  28. "math/rand"
  29. "net"
  30. "sync"
  31. "time"
  32. "github.com/Psiphon-Labs/goarista/monotime"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  38. )
  39. // Controller is a tunnel lifecycle coordinator. It manages lists of servers to
  40. // connect to; establishes and monitors tunnels; and runs local proxies which
  41. // route traffic through the tunnels.
  42. type Controller struct {
  43. config *Config
  44. sessionId string
  45. runCtx context.Context
  46. stopRunning context.CancelFunc
  47. runWaitGroup *sync.WaitGroup
  48. connectedTunnels chan *Tunnel
  49. failedTunnels chan *Tunnel
  50. tunnelMutex sync.Mutex
  51. establishedOnce bool
  52. tunnels []*Tunnel
  53. nextTunnel int
  54. startedConnectedReporter bool
  55. isEstablishing bool
  56. concurrentEstablishTunnelsMutex sync.Mutex
  57. concurrentEstablishTunnels int
  58. concurrentMeekEstablishTunnels int
  59. peakConcurrentEstablishTunnels int
  60. peakConcurrentMeekEstablishTunnels int
  61. establishCtx context.Context
  62. stopEstablish context.CancelFunc
  63. establishWaitGroup *sync.WaitGroup
  64. candidateServerEntries chan *candidateServerEntry
  65. untunneledDialConfig *DialConfig
  66. splitTunnelClassifier *SplitTunnelClassifier
  67. signalFetchCommonRemoteServerList chan struct{}
  68. signalFetchObfuscatedServerLists chan struct{}
  69. signalDownloadUpgrade chan string
  70. impairedProtocolClassification map[string]int
  71. signalReportConnected chan struct{}
  72. serverAffinityDoneBroadcast chan struct{}
  73. packetTunnelClient *tun.Client
  74. packetTunnelTransport *PacketTunnelTransport
  75. }
  76. type candidateServerEntry struct {
  77. serverEntry *protocol.ServerEntry
  78. isServerAffinityCandidate bool
  79. usePriorityProtocol bool
  80. impairedProtocols []string
  81. adjustedEstablishStartTime monotime.Time
  82. }
  83. // NewController initializes a new controller.
  84. func NewController(config *Config) (controller *Controller, err error) {
  85. if !config.IsCommitted() {
  86. return nil, common.ContextError(errors.New("uncommitted config"))
  87. }
  88. // Needed by regen, at least
  89. rand.Seed(int64(time.Now().Nanosecond()))
  90. // The session ID for the Psiphon server API is used across all
  91. // tunnels established by the controller.
  92. NoticeSessionId(config.SessionID)
  93. untunneledDialConfig := &DialConfig{
  94. UpstreamProxyURL: config.UpstreamProxyURL,
  95. CustomHeaders: config.CustomHeaders,
  96. DeviceBinder: config.deviceBinder,
  97. DnsServerGetter: config.DnsServerGetter,
  98. IPv6Synthesizer: config.IPv6Synthesizer,
  99. UseIndistinguishableTLS: config.UseIndistinguishableTLS,
  100. TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
  101. DeviceRegion: config.DeviceRegion,
  102. }
  103. controller = &Controller{
  104. config: config,
  105. sessionId: config.SessionID,
  106. runWaitGroup: new(sync.WaitGroup),
  107. // connectedTunnels and failedTunnels buffer sizes are large enough to
  108. // receive full pools of tunnels without blocking. Senders should not block.
  109. connectedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  110. failedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  111. tunnels: make([]*Tunnel, 0),
  112. establishedOnce: false,
  113. startedConnectedReporter: false,
  114. isEstablishing: false,
  115. untunneledDialConfig: untunneledDialConfig,
  116. impairedProtocolClassification: make(map[string]int),
  117. // TODO: Add a buffer of 1 so we don't miss a signal while receiver is
  118. // starting? Trade-off is potential back-to-back fetch remotes. As-is,
  119. // establish will eventually signal another fetch remote.
  120. signalFetchCommonRemoteServerList: make(chan struct{}),
  121. signalFetchObfuscatedServerLists: make(chan struct{}),
  122. signalDownloadUpgrade: make(chan string),
  123. signalReportConnected: make(chan struct{}),
  124. }
  125. controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
  126. if config.PacketTunnelTunFileDescriptor > 0 {
  127. // Run a packet tunnel client. The lifetime of the tun.Client is the
  128. // lifetime of the Controller, so it exists across tunnel establishments
  129. // and reestablishments. The PacketTunnelTransport provides a layer
  130. // that presents a continuosuly existing transport to the tun.Client;
  131. // it's set to use new SSH channels after new SSH tunnel establishes.
  132. packetTunnelTransport := NewPacketTunnelTransport()
  133. packetTunnelClient, err := tun.NewClient(&tun.ClientConfig{
  134. Logger: NoticeCommonLogger(),
  135. TunFileDescriptor: config.PacketTunnelTunFileDescriptor,
  136. Transport: packetTunnelTransport,
  137. })
  138. if err != nil {
  139. return nil, common.ContextError(err)
  140. }
  141. controller.packetTunnelClient = packetTunnelClient
  142. controller.packetTunnelTransport = packetTunnelTransport
  143. }
  144. return controller, nil
  145. }
  146. // Run executes the controller. Run exits if a controller
  147. // component fails or the parent context is canceled.
  148. func (controller *Controller) Run(ctx context.Context) {
  149. // Ensure fresh repetitive notice state for each run, so the
  150. // client will always get an AvailableEgressRegions notice,
  151. // an initial instance of any repetitive error notice, etc.
  152. ResetRepetitiveNotices()
  153. runCtx, stopRunning := context.WithCancel(ctx)
  154. defer stopRunning()
  155. controller.runCtx = runCtx
  156. controller.stopRunning = stopRunning
  157. // Start components
  158. // TODO: IPv6 support
  159. var listenIP string
  160. if controller.config.ListenInterface == "" {
  161. listenIP = "127.0.0.1"
  162. } else if controller.config.ListenInterface == "any" {
  163. listenIP = "0.0.0.0"
  164. } else {
  165. IPv4Address, _, err := common.GetInterfaceIPAddresses(controller.config.ListenInterface)
  166. if err == nil && IPv4Address == nil {
  167. err = fmt.Errorf("no IPv4 address for interface %s", controller.config.ListenInterface)
  168. }
  169. if err != nil {
  170. NoticeError("error getting listener IP: %s", err)
  171. return
  172. }
  173. listenIP = IPv4Address.String()
  174. }
  175. if !controller.config.DisableLocalSocksProxy {
  176. socksProxy, err := NewSocksProxy(controller.config, controller, listenIP)
  177. if err != nil {
  178. NoticeAlert("error initializing local SOCKS proxy: %s", err)
  179. return
  180. }
  181. defer socksProxy.Close()
  182. }
  183. if !controller.config.DisableLocalHTTPProxy {
  184. httpProxy, err := NewHttpProxy(controller.config, controller, listenIP)
  185. if err != nil {
  186. NoticeAlert("error initializing local HTTP proxy: %s", err)
  187. return
  188. }
  189. defer httpProxy.Close()
  190. }
  191. if !controller.config.DisableRemoteServerListFetcher {
  192. if controller.config.RemoteServerListURLs != nil {
  193. controller.runWaitGroup.Add(1)
  194. go controller.remoteServerListFetcher(
  195. "common",
  196. FetchCommonRemoteServerList,
  197. controller.signalFetchCommonRemoteServerList)
  198. }
  199. if controller.config.ObfuscatedServerListRootURLs != nil {
  200. controller.runWaitGroup.Add(1)
  201. go controller.remoteServerListFetcher(
  202. "obfuscated",
  203. FetchObfuscatedServerLists,
  204. controller.signalFetchObfuscatedServerLists)
  205. }
  206. }
  207. if controller.config.UpgradeDownloadURLs != nil {
  208. controller.runWaitGroup.Add(1)
  209. go controller.upgradeDownloader()
  210. }
  211. /// Note: the connected reporter isn't started until a tunnel is
  212. // established
  213. controller.runWaitGroup.Add(1)
  214. go controller.runTunnels()
  215. controller.runWaitGroup.Add(1)
  216. go controller.establishTunnelWatcher()
  217. if controller.packetTunnelClient != nil {
  218. controller.packetTunnelClient.Start()
  219. }
  220. // Wait while running
  221. <-controller.runCtx.Done()
  222. NoticeInfo("controller stopped")
  223. if controller.packetTunnelClient != nil {
  224. controller.packetTunnelClient.Stop()
  225. }
  226. // All workers -- runTunnels, establishment workers, and auxilliary
  227. // workers such as fetch remote server list and untunneled uprade
  228. // download -- operate with the controller run context and will all
  229. // be interrupted when the run context is done.
  230. controller.runWaitGroup.Wait()
  231. controller.splitTunnelClassifier.Shutdown()
  232. NoticeInfo("exiting controller")
  233. NoticeExiting()
  234. }
  235. // SignalComponentFailure notifies the controller that an associated component has failed.
  236. // This will terminate the controller.
  237. func (controller *Controller) SignalComponentFailure() {
  238. NoticeAlert("controller shutdown due to component failure")
  239. controller.stopRunning()
  240. }
  241. // SetDynamicConfig overrides the sponsor ID and authorizations fields of the
  242. // Controller config with the input values. The new values will be used in the
  243. // next tunnel connection.
  244. func (controller *Controller) SetDynamicConfig(sponsorID string, authorizations []string) {
  245. controller.config.SetDynamicConfig(sponsorID, authorizations)
  246. }
  247. // TerminateNextActiveTunnel terminates the active tunnel, which will initiate
  248. // establishment of a new tunnel.
  249. func (controller *Controller) TerminateNextActiveTunnel() {
  250. tunnel := controller.getNextActiveTunnel()
  251. if tunnel != nil {
  252. controller.SignalTunnelFailure(tunnel)
  253. NoticeInfo("terminated tunnel: %s", tunnel.serverEntry.IpAddress)
  254. }
  255. }
  256. // remoteServerListFetcher fetches an out-of-band list of server entries
  257. // for more tunnel candidates. It fetches when signalled, with retries
  258. // on failure.
  259. func (controller *Controller) remoteServerListFetcher(
  260. name string,
  261. fetcher RemoteServerListFetcher,
  262. signal <-chan struct{}) {
  263. defer controller.runWaitGroup.Done()
  264. var lastFetchTime monotime.Time
  265. fetcherLoop:
  266. for {
  267. // Wait for a signal before fetching
  268. select {
  269. case <-signal:
  270. case <-controller.runCtx.Done():
  271. break fetcherLoop
  272. }
  273. // Skip fetch entirely (i.e., send no request at all, even when ETag would save
  274. // on response size) when a recent fetch was successful
  275. stalePeriod := controller.config.clientParameters.Get().Duration(
  276. parameters.FetchRemoteServerListStalePeriod)
  277. if lastFetchTime != 0 &&
  278. lastFetchTime.Add(stalePeriod).After(monotime.Now()) {
  279. continue
  280. }
  281. retryLoop:
  282. for attempt := 0; ; attempt++ {
  283. // Don't attempt to fetch while there is no network connectivity,
  284. // to avoid alert notice noise.
  285. if !WaitForNetworkConnectivity(
  286. controller.runCtx,
  287. controller.config.NetworkConnectivityChecker) {
  288. break fetcherLoop
  289. }
  290. // Pick any active tunnel and make the next fetch attempt. If there's
  291. // no active tunnel, the untunneledDialConfig will be used.
  292. tunnel := controller.getNextActiveTunnel()
  293. err := fetcher(
  294. controller.runCtx,
  295. controller.config,
  296. attempt,
  297. tunnel,
  298. controller.untunneledDialConfig)
  299. if err == nil {
  300. lastFetchTime = monotime.Now()
  301. break retryLoop
  302. }
  303. NoticeAlert("failed to fetch %s remote server list: %s", name, err)
  304. retryPeriod := controller.config.clientParameters.Get().Duration(
  305. parameters.FetchRemoteServerListRetryPeriod)
  306. timer := time.NewTimer(retryPeriod)
  307. select {
  308. case <-timer.C:
  309. case <-controller.runCtx.Done():
  310. timer.Stop()
  311. break fetcherLoop
  312. }
  313. }
  314. }
  315. NoticeInfo("exiting %s remote server list fetcher", name)
  316. }
  317. // establishTunnelWatcher terminates the controller if a tunnel
  318. // has not been established in the configured time period. This
  319. // is regardless of how many tunnels are presently active -- meaning
  320. // that if an active tunnel was established and lost the controller
  321. // is left running (to re-establish).
  322. func (controller *Controller) establishTunnelWatcher() {
  323. defer controller.runWaitGroup.Done()
  324. timeout := controller.config.clientParameters.Get().Duration(
  325. parameters.EstablishTunnelTimeout)
  326. if timeout > 0 {
  327. timer := time.NewTimer(timeout)
  328. defer timer.Stop()
  329. select {
  330. case <-timer.C:
  331. if !controller.hasEstablishedOnce() {
  332. NoticeAlert("failed to establish tunnel before timeout")
  333. controller.SignalComponentFailure()
  334. }
  335. case <-controller.runCtx.Done():
  336. }
  337. }
  338. NoticeInfo("exiting establish tunnel watcher")
  339. }
  340. // connectedReporter sends periodic "connected" requests to the Psiphon API.
  341. // These requests are for server-side unique user stats calculation. See the
  342. // comment in DoConnectedRequest for a description of the request mechanism.
  343. // To ensure we don't over- or under-count unique users, only one connected
  344. // request is made across all simultaneous multi-tunnels; and the connected
  345. // request is repeated periodically for very long-lived tunnels.
  346. // The signalReportConnected mechanism is used to trigger another connected
  347. // request immediately after a reconnect.
  348. func (controller *Controller) connectedReporter() {
  349. defer controller.runWaitGroup.Done()
  350. loop:
  351. for {
  352. // Pick any active tunnel and make the next connected request. No error
  353. // is logged if there's no active tunnel, as that's not an unexpected condition.
  354. reported := false
  355. tunnel := controller.getNextActiveTunnel()
  356. if tunnel != nil {
  357. err := tunnel.serverContext.DoConnectedRequest()
  358. if err == nil {
  359. reported = true
  360. } else {
  361. NoticeAlert("failed to make connected request: %s", err)
  362. }
  363. }
  364. // Schedule the next connected request and wait.
  365. // Note: this duration is not a dynamic ClientParameter as
  366. // the daily unique user stats logic specifically requires
  367. // a "connected" request no more or less often than every
  368. // 24 hours.
  369. var duration time.Duration
  370. if reported {
  371. duration = 24 * time.Hour
  372. } else {
  373. duration = controller.config.clientParameters.Get().Duration(
  374. parameters.PsiphonAPIConnectedRequestRetryPeriod)
  375. }
  376. timer := time.NewTimer(duration)
  377. doBreak := false
  378. select {
  379. case <-controller.signalReportConnected:
  380. case <-timer.C:
  381. // Make another connected request
  382. case <-controller.runCtx.Done():
  383. doBreak = true
  384. }
  385. timer.Stop()
  386. if doBreak {
  387. break loop
  388. }
  389. }
  390. NoticeInfo("exiting connected reporter")
  391. }
  392. func (controller *Controller) startOrSignalConnectedReporter() {
  393. // session is nil when DisableApi is set
  394. if controller.config.DisableApi {
  395. return
  396. }
  397. // Start the connected reporter after the first tunnel is established.
  398. // Concurrency note: only the runTunnels goroutine may access startedConnectedReporter.
  399. if !controller.startedConnectedReporter {
  400. controller.startedConnectedReporter = true
  401. controller.runWaitGroup.Add(1)
  402. go controller.connectedReporter()
  403. } else {
  404. select {
  405. case controller.signalReportConnected <- *new(struct{}):
  406. default:
  407. }
  408. }
  409. }
  410. // upgradeDownloader makes periodic attempts to complete a client upgrade
  411. // download. DownloadUpgrade() is resumable, so each attempt has potential for
  412. // getting closer to completion, even in conditions where the download or
  413. // tunnel is repeatedly interrupted.
  414. // An upgrade download is triggered by either a handshake response indicating
  415. // that a new version is available; or after failing to connect, in which case
  416. // it's useful to check, out-of-band, for an upgrade with new circumvention
  417. // capabilities.
  418. // Once the download operation completes successfully, the downloader exits
  419. // and is not run again: either there is not a newer version, or the upgrade
  420. // has been downloaded and is ready to be applied.
  421. // We're assuming that the upgrade will be applied and the entire system
  422. // restarted before another upgrade is to be downloaded.
  423. //
  424. // TODO: refactor upgrade downloader and remote server list fetcher to use
  425. // common code (including the resumable download routines).
  426. //
  427. func (controller *Controller) upgradeDownloader() {
  428. defer controller.runWaitGroup.Done()
  429. var lastDownloadTime monotime.Time
  430. downloadLoop:
  431. for {
  432. // Wait for a signal before downloading
  433. var handshakeVersion string
  434. select {
  435. case handshakeVersion = <-controller.signalDownloadUpgrade:
  436. case <-controller.runCtx.Done():
  437. break downloadLoop
  438. }
  439. stalePeriod := controller.config.clientParameters.Get().Duration(
  440. parameters.FetchUpgradeStalePeriod)
  441. // Unless handshake is explicitly advertizing a new version, skip
  442. // checking entirely when a recent download was successful.
  443. if handshakeVersion == "" &&
  444. lastDownloadTime != 0 &&
  445. lastDownloadTime.Add(stalePeriod).After(monotime.Now()) {
  446. continue
  447. }
  448. retryLoop:
  449. for attempt := 0; ; attempt++ {
  450. // Don't attempt to download while there is no network connectivity,
  451. // to avoid alert notice noise.
  452. if !WaitForNetworkConnectivity(
  453. controller.runCtx,
  454. controller.config.NetworkConnectivityChecker) {
  455. break downloadLoop
  456. }
  457. // Pick any active tunnel and make the next download attempt. If there's
  458. // no active tunnel, the untunneledDialConfig will be used.
  459. tunnel := controller.getNextActiveTunnel()
  460. err := DownloadUpgrade(
  461. controller.runCtx,
  462. controller.config,
  463. attempt,
  464. handshakeVersion,
  465. tunnel,
  466. controller.untunneledDialConfig)
  467. if err == nil {
  468. lastDownloadTime = monotime.Now()
  469. break retryLoop
  470. }
  471. NoticeAlert("failed to download upgrade: %s", err)
  472. timeout := controller.config.clientParameters.Get().Duration(
  473. parameters.FetchUpgradeRetryPeriod)
  474. timer := time.NewTimer(timeout)
  475. select {
  476. case <-timer.C:
  477. case <-controller.runCtx.Done():
  478. timer.Stop()
  479. break downloadLoop
  480. }
  481. }
  482. }
  483. NoticeInfo("exiting upgrade downloader")
  484. }
  485. // runTunnels is the controller tunnel management main loop. It starts and stops
  486. // establishing tunnels based on the target tunnel pool size and the current size
  487. // of the pool. Tunnels are established asynchronously using worker goroutines.
  488. //
  489. // When there are no server entries for the target region/protocol, the
  490. // establishCandidateGenerator will yield no candidates and wait before
  491. // trying again. In the meantime, a remote server entry fetch may supply
  492. // valid candidates.
  493. //
  494. // When a tunnel is established, it's added to the active pool. The tunnel's
  495. // operateTunnel goroutine monitors the tunnel.
  496. //
  497. // When a tunnel fails, it's removed from the pool and the establish process is
  498. // restarted to fill the pool.
  499. func (controller *Controller) runTunnels() {
  500. defer controller.runWaitGroup.Done()
  501. // Start running
  502. controller.startEstablishing()
  503. loop:
  504. for {
  505. select {
  506. case failedTunnel := <-controller.failedTunnels:
  507. NoticeAlert("tunnel failed: %s", failedTunnel.serverEntry.IpAddress)
  508. controller.terminateTunnel(failedTunnel)
  509. controller.classifyImpairedProtocol(failedTunnel)
  510. // Clear the reference to this tunnel before calling startEstablishing,
  511. // which will invoke a garbage collection.
  512. failedTunnel = nil
  513. // Concurrency note: only this goroutine may call startEstablishing/stopEstablishing,
  514. // which reference controller.isEstablishing.
  515. controller.startEstablishing()
  516. case connectedTunnel := <-controller.connectedTunnels:
  517. if controller.isImpairedProtocol(connectedTunnel.protocol) {
  518. // Protocol was classified as impaired while this tunnel established.
  519. // This is most likely to occur with TunnelPoolSize > 0. We log the
  520. // event but take no action. Discarding the tunnel would break the
  521. // impaired logic unless we did that (a) only if there are other
  522. // unimpaired protocols; (b) only during the first iteration of the
  523. // ESTABLISH_TUNNEL_WORK_TIME loop. By not discarding here, a true
  524. // impaired protocol may require an extra reconnect.
  525. NoticeAlert("connected tunnel with impaired protocol: %s", connectedTunnel.protocol)
  526. }
  527. // Tunnel establishment has two phases: connection and activation.
  528. //
  529. // Connection is run concurrently by the establishTunnelWorkers, to minimize
  530. // delay when it's not yet known which server and protocol will be available
  531. // and unblocked.
  532. //
  533. // Activation is run serially, here, to minimize the overhead of making a
  534. // handshake request and starting the operateTunnel management worker for a
  535. // tunnel which may be discarded.
  536. //
  537. // When the active tunnel will complete establishment, establishment is
  538. // stopped before activation. This interrupts all connecting tunnels and
  539. // garbage collects their memory. The purpose is to minimize memory
  540. // pressure when the handshake request is made. In the unlikely case that the
  541. // handshake fails, establishment is restarted.
  542. //
  543. // Any delays in stopEstablishing will delay the handshake for the last
  544. // active tunnel.
  545. //
  546. // In the typical case of TunnelPoolSize of 1, only a single handshake is
  547. // performed and the homepages notices file, when used, will not be modifed
  548. // after the NoticeTunnels(1) [i.e., connected] until NoticeTunnels(0) [i.e.,
  549. // disconnected]. For TunnelPoolSize > 1, serial handshakes only ensures that
  550. // each set of emitted NoticeHomepages is contiguous.
  551. active, outstanding := controller.numTunnels()
  552. // discardTunnel will be true here when already fully established.
  553. discardTunnel := (outstanding <= 0)
  554. isFirstTunnel := (active == 0)
  555. isLastTunnel := (outstanding == 1)
  556. if !discardTunnel {
  557. if isLastTunnel {
  558. controller.stopEstablishing()
  559. }
  560. err := connectedTunnel.Activate(controller.runCtx, controller)
  561. if err != nil {
  562. // Assume the Activate failed due to a broken tunnel connection,
  563. // currently the most likely case, and classify as impaired, as in
  564. // the failed tunnel case above.
  565. // TODO: distinguish between network and other errors
  566. controller.classifyImpairedProtocol(connectedTunnel)
  567. NoticeAlert("failed to activate %s: %s", connectedTunnel.serverEntry.IpAddress, err)
  568. discardTunnel = true
  569. } else {
  570. // It's unlikely that registerTunnel will fail, since only this goroutine
  571. // calls registerTunnel -- and after checking numTunnels; so failure is not
  572. // expected.
  573. if !controller.registerTunnel(connectedTunnel) {
  574. NoticeAlert("failed to register %s: %s", connectedTunnel.serverEntry.IpAddress)
  575. discardTunnel = true
  576. }
  577. }
  578. // May need to replace this tunnel
  579. if isLastTunnel && discardTunnel {
  580. controller.startEstablishing()
  581. }
  582. }
  583. if discardTunnel {
  584. controller.discardTunnel(connectedTunnel)
  585. // Clear the reference to this discarded tunnel and immediately run
  586. // a garbage collection to reclaim its memory.
  587. connectedTunnel = nil
  588. defaultGarbageCollection()
  589. // Skip the rest of this case
  590. break
  591. }
  592. NoticeActiveTunnel(
  593. connectedTunnel.serverEntry.IpAddress,
  594. connectedTunnel.protocol,
  595. connectedTunnel.serverEntry.SupportsSSHAPIRequests())
  596. if isFirstTunnel {
  597. // The split tunnel classifier is started once the first tunnel is
  598. // established. This first tunnel is passed in to be used to make
  599. // the routes data request.
  600. // A long-running controller may run while the host device is present
  601. // in different regions. In this case, we want the split tunnel logic
  602. // to switch to routes for new regions and not classify traffic based
  603. // on routes installed for older regions.
  604. // We assume that when regions change, the host network will also
  605. // change, and so all tunnels will fail and be re-established. Under
  606. // that assumption, the classifier will be re-Start()-ed here when
  607. // the region has changed.
  608. controller.splitTunnelClassifier.Start(connectedTunnel)
  609. // Signal a connected request on each 1st tunnel establishment. For
  610. // multi-tunnels, the session is connected as long as at least one
  611. // tunnel is established.
  612. controller.startOrSignalConnectedReporter()
  613. // If the handshake indicated that a new client version is available,
  614. // trigger an upgrade download.
  615. // Note: serverContext is nil when DisableApi is set
  616. if connectedTunnel.serverContext != nil &&
  617. connectedTunnel.serverContext.clientUpgradeVersion != "" {
  618. handshakeVersion := connectedTunnel.serverContext.clientUpgradeVersion
  619. select {
  620. case controller.signalDownloadUpgrade <- handshakeVersion:
  621. default:
  622. }
  623. }
  624. }
  625. // Set the new tunnel as the transport for the packet tunnel. The packet tunnel
  626. // client remains up when reestablishing, but no packets are relayed while there
  627. // is no connected tunnel. UseTunnel will establish a new packet tunnel SSH
  628. // channel over the new SSH tunnel and configure the packet tunnel client to use
  629. // the new SSH channel as its transport.
  630. //
  631. // Note: as is, this logic is suboptimal for TunnelPoolSize > 1, as this would
  632. // continuously initialize new packet tunnel sessions for each established
  633. // server. For now, config validation requires TunnelPoolSize == 1 when
  634. // the packet tunnel is used.
  635. if controller.packetTunnelTransport != nil {
  636. controller.packetTunnelTransport.UseTunnel(connectedTunnel)
  637. }
  638. // TODO: design issue -- might not be enough server entries with region/caps to ever fill tunnel slots;
  639. // possible solution is establish target MIN(CountServerEntries(region, protocol), TunnelPoolSize)
  640. if controller.isFullyEstablished() {
  641. controller.stopEstablishing()
  642. }
  643. case <-controller.runCtx.Done():
  644. break loop
  645. }
  646. }
  647. // Stop running
  648. controller.stopEstablishing()
  649. controller.terminateAllTunnels()
  650. // Drain tunnel channels
  651. close(controller.connectedTunnels)
  652. for tunnel := range controller.connectedTunnels {
  653. controller.discardTunnel(tunnel)
  654. }
  655. close(controller.failedTunnels)
  656. for tunnel := range controller.failedTunnels {
  657. controller.discardTunnel(tunnel)
  658. }
  659. NoticeInfo("exiting run tunnels")
  660. }
  661. // classifyImpairedProtocol tracks "impaired" protocol classifications for failed
  662. // tunnels. A protocol is classified as impaired if a tunnel using that protocol
  663. // fails, repeatedly, shortly after the start of the connection. During tunnel
  664. // establishment, impaired protocols are briefly skipped.
  665. //
  666. // One purpose of this measure is to defend against an attack where the adversary,
  667. // for example, tags an OSSH TCP connection as an "unidentified" protocol; allows
  668. // it to connect; but then kills the underlying TCP connection after a short time.
  669. // Since OSSH has less latency than other protocols that may bypass an "unidentified"
  670. // filter, these other protocols might never be selected for use.
  671. //
  672. // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol
  673. func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) {
  674. // If the tunnel failed while activating, its establishedTime will be 0.
  675. duration := controller.config.clientParameters.Get().Duration(
  676. parameters.ImpairedProtocolClassificationDuration)
  677. if failedTunnel.establishedTime == 0 ||
  678. failedTunnel.establishedTime.Add(duration).After(monotime.Now()) {
  679. controller.impairedProtocolClassification[failedTunnel.protocol] += 1
  680. } else {
  681. controller.impairedProtocolClassification[failedTunnel.protocol] = 0
  682. }
  683. // Reset classification once all known protocols are classified as impaired, as
  684. // there is now no way to proceed with only unimpaired protocols. The network
  685. // situation (or attack) resulting in classification may not be protocol-specific.
  686. if CountNonImpairedProtocols(
  687. controller.config.EgressRegion,
  688. controller.config.clientParameters.Get().TunnelProtocols(
  689. parameters.LimitTunnelProtocols),
  690. controller.getImpairedProtocols()) == 0 {
  691. controller.impairedProtocolClassification = make(map[string]int)
  692. }
  693. }
  694. // getImpairedProtocols returns a list of protocols that have sufficient
  695. // classifications to be considered impaired protocols.
  696. //
  697. // Concurrency note: only the runTunnels() goroutine may call getImpairedProtocols
  698. func (controller *Controller) getImpairedProtocols() []string {
  699. NoticeImpairedProtocolClassification(controller.impairedProtocolClassification)
  700. threshold := controller.config.clientParameters.Get().Int(
  701. parameters.ImpairedProtocolClassificationThreshold)
  702. impairedProtocols := make([]string, 0)
  703. for protocol, count := range controller.impairedProtocolClassification {
  704. if count >= threshold {
  705. impairedProtocols = append(impairedProtocols, protocol)
  706. }
  707. }
  708. return impairedProtocols
  709. }
  710. // isImpairedProtocol checks if the specified protocol is classified as impaired.
  711. //
  712. // Concurrency note: only the runTunnels() goroutine may call isImpairedProtocol
  713. func (controller *Controller) isImpairedProtocol(protocol string) bool {
  714. threshold := controller.config.clientParameters.Get().Int(
  715. parameters.ImpairedProtocolClassificationThreshold)
  716. count, ok := controller.impairedProtocolClassification[protocol]
  717. return ok && count >= threshold
  718. }
  719. // SignalSeededNewSLOK implements the TunnelOwner interface. This function
  720. // is called by Tunnel.operateTunnel when the tunnel has received a new,
  721. // previously unknown SLOK from the server. The Controller triggers an OSL
  722. // fetch, as the new SLOK may be sufficient to access new OSLs.
  723. func (controller *Controller) SignalSeededNewSLOK() {
  724. select {
  725. case controller.signalFetchObfuscatedServerLists <- *new(struct{}):
  726. default:
  727. }
  728. }
  729. // SignalTunnelFailure implements the TunnelOwner interface. This function
  730. // is called by Tunnel.operateTunnel when the tunnel has detected that it
  731. // has failed. The Controller will signal runTunnels to create a new
  732. // tunnel and/or remove the tunnel from the list of active tunnels.
  733. func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) {
  734. // Don't block. Assumes the receiver has a buffer large enough for
  735. // the typical number of operated tunnels. In case there's no room,
  736. // terminate the tunnel (runTunnels won't get a signal in this case,
  737. // but the tunnel will be removed from the list of active tunnels).
  738. select {
  739. case controller.failedTunnels <- tunnel:
  740. default:
  741. controller.terminateTunnel(tunnel)
  742. }
  743. }
  744. // discardTunnel disposes of a successful connection that is no longer required.
  745. func (controller *Controller) discardTunnel(tunnel *Tunnel) {
  746. NoticeInfo("discard tunnel: %s", tunnel.serverEntry.IpAddress)
  747. // TODO: not calling PromoteServerEntry, since that would rank the
  748. // discarded tunnel before fully active tunnels. Can a discarded tunnel
  749. // be promoted (since it connects), but with lower rank than all active
  750. // tunnels?
  751. tunnel.Close(true)
  752. }
  753. // registerTunnel adds the connected tunnel to the pool of active tunnels
  754. // which are candidates for port forwarding. Returns true if the pool has an
  755. // empty slot and false if the pool is full (caller should discard the tunnel).
  756. func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
  757. controller.tunnelMutex.Lock()
  758. defer controller.tunnelMutex.Unlock()
  759. if len(controller.tunnels) >= controller.config.TunnelPoolSize {
  760. return false
  761. }
  762. // Perform a final check just in case we've established
  763. // a duplicate connection.
  764. for _, activeTunnel := range controller.tunnels {
  765. if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress {
  766. NoticeAlert("duplicate tunnel: %s", tunnel.serverEntry.IpAddress)
  767. return false
  768. }
  769. }
  770. controller.establishedOnce = true
  771. controller.tunnels = append(controller.tunnels, tunnel)
  772. NoticeTunnels(len(controller.tunnels))
  773. // Promote this successful tunnel to first rank so it's one
  774. // of the first candidates next time establish runs.
  775. // Connecting to a TargetServerEntry does not change the
  776. // ranking.
  777. if controller.config.TargetServerEntry == "" {
  778. PromoteServerEntry(controller.config, tunnel.serverEntry.IpAddress)
  779. }
  780. return true
  781. }
  782. // hasEstablishedOnce indicates if at least one active tunnel has
  783. // been established up to this point. This is regardeless of how many
  784. // tunnels are presently active.
  785. func (controller *Controller) hasEstablishedOnce() bool {
  786. controller.tunnelMutex.Lock()
  787. defer controller.tunnelMutex.Unlock()
  788. return controller.establishedOnce
  789. }
  790. // isFullyEstablished indicates if the pool of active tunnels is full.
  791. func (controller *Controller) isFullyEstablished() bool {
  792. controller.tunnelMutex.Lock()
  793. defer controller.tunnelMutex.Unlock()
  794. return len(controller.tunnels) >= controller.config.TunnelPoolSize
  795. }
  796. // numTunnels returns the number of active and outstanding tunnels.
  797. // Oustanding is the number of tunnels required to fill the pool of
  798. // active tunnels.
  799. func (controller *Controller) numTunnels() (int, int) {
  800. controller.tunnelMutex.Lock()
  801. defer controller.tunnelMutex.Unlock()
  802. active := len(controller.tunnels)
  803. outstanding := controller.config.TunnelPoolSize - len(controller.tunnels)
  804. return active, outstanding
  805. }
  806. // terminateTunnel removes a tunnel from the pool of active tunnels
  807. // and closes the tunnel. The next-tunnel state used by getNextActiveTunnel
  808. // is adjusted as required.
  809. func (controller *Controller) terminateTunnel(tunnel *Tunnel) {
  810. controller.tunnelMutex.Lock()
  811. defer controller.tunnelMutex.Unlock()
  812. for index, activeTunnel := range controller.tunnels {
  813. if tunnel == activeTunnel {
  814. controller.tunnels = append(
  815. controller.tunnels[:index], controller.tunnels[index+1:]...)
  816. if controller.nextTunnel > index {
  817. controller.nextTunnel--
  818. }
  819. if controller.nextTunnel >= len(controller.tunnels) {
  820. controller.nextTunnel = 0
  821. }
  822. activeTunnel.Close(false)
  823. NoticeTunnels(len(controller.tunnels))
  824. break
  825. }
  826. }
  827. }
  828. // terminateAllTunnels empties the tunnel pool, closing all active tunnels.
  829. // This is used when shutting down the controller.
  830. func (controller *Controller) terminateAllTunnels() {
  831. controller.tunnelMutex.Lock()
  832. defer controller.tunnelMutex.Unlock()
  833. // Closing all tunnels in parallel. In an orderly shutdown, each tunnel
  834. // may take a few seconds to send a final status request. We only want
  835. // to wait as long as the single slowest tunnel.
  836. closeWaitGroup := new(sync.WaitGroup)
  837. closeWaitGroup.Add(len(controller.tunnels))
  838. for _, activeTunnel := range controller.tunnels {
  839. tunnel := activeTunnel
  840. go func() {
  841. defer closeWaitGroup.Done()
  842. tunnel.Close(false)
  843. }()
  844. }
  845. closeWaitGroup.Wait()
  846. controller.tunnels = make([]*Tunnel, 0)
  847. controller.nextTunnel = 0
  848. NoticeTunnels(len(controller.tunnels))
  849. }
  850. // getNextActiveTunnel returns the next tunnel from the pool of active
  851. // tunnels. Currently, tunnel selection order is simple round-robin.
  852. func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
  853. controller.tunnelMutex.Lock()
  854. defer controller.tunnelMutex.Unlock()
  855. for i := len(controller.tunnels); i > 0; i-- {
  856. tunnel = controller.tunnels[controller.nextTunnel]
  857. controller.nextTunnel =
  858. (controller.nextTunnel + 1) % len(controller.tunnels)
  859. return tunnel
  860. }
  861. return nil
  862. }
  863. // isActiveTunnelServerEntry is used to check if there's already
  864. // an existing tunnel to a candidate server.
  865. func (controller *Controller) isActiveTunnelServerEntry(
  866. serverEntry *protocol.ServerEntry) bool {
  867. controller.tunnelMutex.Lock()
  868. defer controller.tunnelMutex.Unlock()
  869. for _, activeTunnel := range controller.tunnels {
  870. if activeTunnel.serverEntry.IpAddress == serverEntry.IpAddress {
  871. return true
  872. }
  873. }
  874. return false
  875. }
  876. // Dial selects an active tunnel and establishes a port forward
  877. // connection through the selected tunnel. Failure to connect is considered
  878. // a port forward failure, for the purpose of monitoring tunnel health.
  879. func (controller *Controller) Dial(
  880. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
  881. tunnel := controller.getNextActiveTunnel()
  882. if tunnel == nil {
  883. return nil, common.ContextError(errors.New("no active tunnels"))
  884. }
  885. // Perform split tunnel classification when feature is enabled, and if the remote
  886. // address is classified as untunneled, dial directly.
  887. if !alwaysTunnel && controller.config.SplitTunnelDNSServer != "" {
  888. host, _, err := net.SplitHostPort(remoteAddr)
  889. if err != nil {
  890. return nil, common.ContextError(err)
  891. }
  892. // Note: a possible optimization, when split tunnel is active and IsUntunneled performs
  893. // a DNS resolution in order to make its classification, is to reuse that IP address in
  894. // the following Dials so they do not need to make their own resolutions. However, the
  895. // way this is currently implemented ensures that, e.g., DNS geo load balancing occurs
  896. // relative to the outbound network.
  897. if controller.splitTunnelClassifier.IsUntunneled(host) {
  898. return controller.DirectDial(remoteAddr)
  899. }
  900. }
  901. tunneledConn, err := tunnel.Dial(remoteAddr, alwaysTunnel, downstreamConn)
  902. if err != nil {
  903. return nil, common.ContextError(err)
  904. }
  905. return tunneledConn, nil
  906. }
  907. // DirectDial dials an untunneled TCP connection within the controller run context.
  908. func (controller *Controller) DirectDial(remoteAddr string) (conn net.Conn, err error) {
  909. return DialTCP(controller.runCtx, remoteAddr, controller.untunneledDialConfig)
  910. }
  911. // startEstablishing creates a pool of worker goroutines which will
  912. // attempt to establish tunnels to candidate servers. The candidates
  913. // are generated by another goroutine.
  914. func (controller *Controller) startEstablishing() {
  915. if controller.isEstablishing {
  916. return
  917. }
  918. NoticeInfo("start establishing")
  919. controller.concurrentEstablishTunnelsMutex.Lock()
  920. controller.concurrentEstablishTunnels = 0
  921. controller.concurrentMeekEstablishTunnels = 0
  922. controller.peakConcurrentEstablishTunnels = 0
  923. controller.peakConcurrentMeekEstablishTunnels = 0
  924. controller.concurrentEstablishTunnelsMutex.Unlock()
  925. aggressiveGarbageCollection()
  926. emitMemoryMetrics()
  927. // Note: the establish context cancelFunc, controller.stopEstablish,
  928. // is called in controller.stopEstablishing.
  929. controller.isEstablishing = true
  930. controller.establishCtx, controller.stopEstablish = context.WithCancel(controller.runCtx)
  931. controller.establishWaitGroup = new(sync.WaitGroup)
  932. controller.candidateServerEntries = make(chan *candidateServerEntry)
  933. // The server affinity mechanism attempts to favor the previously
  934. // used server when reconnecting. This is beneficial for user
  935. // applications which expect consistency in user IP address (for
  936. // example, a web site which prompts for additional user
  937. // authentication when the IP address changes).
  938. //
  939. // Only the very first server, as determined by
  940. // datastore.PromoteServerEntry(), is the server affinity candidate.
  941. // Concurrent connections attempts to many servers are launched
  942. // without delay, in case the affinity server connection fails.
  943. // While the affinity server connection is outstanding, when any
  944. // other connection is established, there is a short grace period
  945. // delay before delivering the established tunnel; this allows some
  946. // time for the affinity server connection to succeed first.
  947. // When the affinity server connection fails, any other established
  948. // tunnel is registered without delay.
  949. //
  950. // Note: the establishTunnelWorker that receives the affinity
  951. // candidate is solely resonsible for closing
  952. // controller.serverAffinityDoneBroadcast.
  953. controller.serverAffinityDoneBroadcast = make(chan struct{})
  954. controller.establishWaitGroup.Add(1)
  955. go controller.launchEstablishing(controller.getImpairedProtocols())
  956. }
  957. func (controller *Controller) launchEstablishing(impairedProtocols []string) {
  958. defer controller.establishWaitGroup.Done()
  959. // Before starting the establish tunnel workers, get and apply
  960. // tactics, launching a tactics request if required.
  961. //
  962. // Wait only TacticsWaitPeriod for the tactics request to complete (or
  963. // fail) before proceeding with tunnel establishment, in case the tactics
  964. // request is blocked or takes very long to complete.
  965. //
  966. // An in-flight tactics request uses meek in round tripper mode, which
  967. // uses less resources than meek tunnel relay mode. For this reason, the
  968. // tactics request is not counted in concurrentMeekEstablishTunnels.
  969. //
  970. // TODO: HTTP/2 uses significantly more memory, so perhaps
  971. // concurrentMeekEstablishTunnels should be counted in that case.
  972. //
  973. // Any in-flight tactics request or pending retry will be
  974. // canceled when establishment is stopped.
  975. doTactics := !controller.config.DisableTactics &&
  976. controller.config.networkIDGetter != nil
  977. if doTactics {
  978. timeout := controller.config.clientParameters.Get().Duration(
  979. parameters.TacticsWaitPeriod)
  980. tacticsDone := make(chan struct{})
  981. tacticsWaitPeriod := time.NewTimer(timeout)
  982. defer tacticsWaitPeriod.Stop()
  983. controller.establishWaitGroup.Add(1)
  984. go controller.getTactics(tacticsDone)
  985. select {
  986. case <-tacticsDone:
  987. case <-tacticsWaitPeriod.C:
  988. }
  989. tacticsWaitPeriod.Stop()
  990. if controller.isStopEstablishing() {
  991. // This check isn't strictly required by avoids the
  992. // overhead of launching workers if establishment
  993. // stopped while awaiting a tactics request.
  994. return
  995. }
  996. }
  997. // Unconditionally report available egress regions. After a fresh install,
  998. // the outer client may not have a list of regions to display, so we
  999. // always report here. Other events that trigger ReportAvailableRegions,
  1000. // are not guaranteed to occur.
  1001. //
  1002. // This report is delayed until after tactics are likely to be applied, as
  1003. // tactics can impact the list of available regions; this avoids a
  1004. // ReportAvailableRegions reporting too many regions, followed shortly by
  1005. // a ReportAvailableRegions reporting fewer regions. That sequence could
  1006. // cause issues in the outer client UI.
  1007. ReportAvailableRegions(controller.config)
  1008. // The ConnectionWorkerPoolSize may be set by tactics.
  1009. size := controller.config.clientParameters.Get().Int(
  1010. parameters.ConnectionWorkerPoolSize)
  1011. for i := 0; i < size; i++ {
  1012. controller.establishWaitGroup.Add(1)
  1013. go controller.establishTunnelWorker()
  1014. }
  1015. controller.establishWaitGroup.Add(1)
  1016. go controller.establishCandidateGenerator(impairedProtocols)
  1017. }
  1018. // stopEstablishing signals the establish goroutines to stop and waits
  1019. // for the group to halt.
  1020. func (controller *Controller) stopEstablishing() {
  1021. if !controller.isEstablishing {
  1022. return
  1023. }
  1024. NoticeInfo("stop establishing")
  1025. controller.stopEstablish()
  1026. // Note: establishCandidateGenerator closes controller.candidateServerEntries
  1027. // (as it may be sending to that channel).
  1028. controller.establishWaitGroup.Wait()
  1029. NoticeInfo("stopped establishing")
  1030. controller.isEstablishing = false
  1031. controller.establishCtx = nil
  1032. controller.stopEstablish = nil
  1033. controller.establishWaitGroup = nil
  1034. controller.candidateServerEntries = nil
  1035. controller.serverAffinityDoneBroadcast = nil
  1036. controller.concurrentEstablishTunnelsMutex.Lock()
  1037. peakConcurrent := controller.peakConcurrentEstablishTunnels
  1038. peakConcurrentMeek := controller.peakConcurrentMeekEstablishTunnels
  1039. controller.concurrentEstablishTunnels = 0
  1040. controller.concurrentMeekEstablishTunnels = 0
  1041. controller.peakConcurrentEstablishTunnels = 0
  1042. controller.peakConcurrentMeekEstablishTunnels = 0
  1043. controller.concurrentEstablishTunnelsMutex.Unlock()
  1044. NoticeInfo("peak concurrent establish tunnels: %d", peakConcurrent)
  1045. NoticeInfo("peak concurrent meek establish tunnels: %d", peakConcurrentMeek)
  1046. emitMemoryMetrics()
  1047. standardGarbageCollection()
  1048. }
  1049. func (controller *Controller) getTactics(done chan struct{}) {
  1050. defer controller.establishWaitGroup.Done()
  1051. defer close(done)
  1052. tacticsRecord, err := tactics.UseStoredTactics(
  1053. GetTacticsStorer(),
  1054. controller.config.networkIDGetter.GetNetworkID())
  1055. if err != nil {
  1056. NoticeAlert("get stored tactics failed: %s", err)
  1057. // The error will be due to a local datastore problem.
  1058. // While we could proceed with the tactics request, this
  1059. // could result in constant tactics requests. So, abort.
  1060. return
  1061. }
  1062. if tacticsRecord == nil {
  1063. iterator, err := NewTacticsServerEntryIterator(
  1064. controller.config)
  1065. if err != nil {
  1066. NoticeAlert("tactics iterator failed: %s", err)
  1067. return
  1068. }
  1069. defer iterator.Close()
  1070. for iteration := 0; ; iteration++ {
  1071. if !WaitForNetworkConnectivity(
  1072. controller.runCtx,
  1073. controller.config.NetworkConnectivityChecker) {
  1074. return
  1075. }
  1076. serverEntry, err := iterator.Next()
  1077. if err != nil {
  1078. NoticeAlert("tactics iterator failed: %s", err)
  1079. return
  1080. }
  1081. if serverEntry == nil {
  1082. if iteration == 0 {
  1083. NoticeAlert("tactics request skipped: no capable servers")
  1084. return
  1085. }
  1086. iterator.Reset()
  1087. continue
  1088. }
  1089. tacticsRecord, err = controller.doFetchTactics(serverEntry)
  1090. if err == nil {
  1091. break
  1092. }
  1093. NoticeAlert("tactics request failed: %s", err)
  1094. // On error, proceed with a retry, as the error is likely
  1095. // due to a network failure.
  1096. //
  1097. // TODO: distinguish network and local errors and abort
  1098. // on local errors.
  1099. p := controller.config.clientParameters.Get()
  1100. timeout := common.JitterDuration(
  1101. p.Duration(parameters.TacticsRetryPeriod),
  1102. p.Float(parameters.TacticsRetryPeriodJitter))
  1103. p = nil
  1104. tacticsRetryDelay := time.NewTimer(timeout)
  1105. select {
  1106. case <-controller.establishCtx.Done():
  1107. return
  1108. case <-tacticsRetryDelay.C:
  1109. }
  1110. tacticsRetryDelay.Stop()
  1111. }
  1112. }
  1113. if tacticsRecord != nil &&
  1114. common.FlipWeightedCoin(tacticsRecord.Tactics.Probability) {
  1115. err := controller.config.SetClientParameters(
  1116. tacticsRecord.Tag, true, tacticsRecord.Tactics.Parameters)
  1117. if err != nil {
  1118. NoticeAlert("apply tactics failed: %s", err)
  1119. // The error will be due to invalid tactics values from
  1120. // the server. When ApplyClientParameters fails, all
  1121. // previous tactics values are left in place. Abort
  1122. // without retry since the server is highly unlikely
  1123. // to return different values immediately.
  1124. return
  1125. }
  1126. }
  1127. // Reclaim memory from the completed tactics request as we're likely
  1128. // to be proceeding to the memory-intensive tunnel establishment phase.
  1129. aggressiveGarbageCollection()
  1130. emitMemoryMetrics()
  1131. }
  1132. func (controller *Controller) doFetchTactics(
  1133. serverEntry *protocol.ServerEntry) (*tactics.Record, error) {
  1134. tacticsProtocols := serverEntry.GetSupportedTacticsProtocols()
  1135. index, err := common.MakeSecureRandomInt(len(tacticsProtocols))
  1136. if err != nil {
  1137. return nil, common.ContextError(err)
  1138. }
  1139. tacticsProtocol := tacticsProtocols[index]
  1140. meekConfig, err := initMeekConfig(
  1141. controller.config,
  1142. serverEntry,
  1143. tacticsProtocol,
  1144. "")
  1145. if err != nil {
  1146. return nil, common.ContextError(err)
  1147. }
  1148. meekConfig.RoundTripperOnly = true
  1149. dialConfig, dialStats := initDialConfig(controller.config, meekConfig)
  1150. NoticeRequestingTactics(
  1151. serverEntry.IpAddress,
  1152. serverEntry.Region,
  1153. tacticsProtocol,
  1154. dialStats)
  1155. // TacticsTimeout should be a very long timeout, since it's not
  1156. // adjusted by tactics in a new network context, and so clients
  1157. // with very slow connections must be accomodated. This long
  1158. // timeout will not entirely block the beginning of tunnel
  1159. // establishment, which beings after the shorter TacticsWaitPeriod.
  1160. //
  1161. // Using controller.establishCtx will cancel FetchTactics
  1162. // if tunnel establishment completes first.
  1163. timeout := controller.config.clientParameters.Get().Duration(
  1164. parameters.TacticsTimeout)
  1165. ctx, cancelFunc := context.WithTimeout(
  1166. controller.establishCtx,
  1167. timeout)
  1168. defer cancelFunc()
  1169. // DialMeek completes the TCP/TLS handshakes for HTTPS
  1170. // meek protocols but _not_ for HTTP meek protocols.
  1171. //
  1172. // TODO: pre-dial HTTP protocols to conform with speed
  1173. // test RTT spec.
  1174. //
  1175. // TODO: ensure that meek in round trip mode will fail
  1176. // the request when the pre-dial connection is broken,
  1177. // to minimize the possibility of network ID mismatches.
  1178. meekConn, err := DialMeek(ctx, meekConfig, dialConfig)
  1179. if err != nil {
  1180. return nil, common.ContextError(err)
  1181. }
  1182. defer meekConn.Close()
  1183. apiParams := getBaseAPIParameters(
  1184. controller.config,
  1185. controller.sessionId,
  1186. serverEntry,
  1187. tacticsProtocol,
  1188. dialStats)
  1189. tacticsRecord, err := tactics.FetchTactics(
  1190. ctx,
  1191. controller.config.clientParameters,
  1192. GetTacticsStorer(),
  1193. controller.config.networkIDGetter.GetNetworkID,
  1194. apiParams,
  1195. serverEntry.Region,
  1196. tacticsProtocol,
  1197. serverEntry.TacticsRequestPublicKey,
  1198. serverEntry.TacticsRequestObfuscatedKey,
  1199. meekConn.RoundTrip)
  1200. if err != nil {
  1201. return nil, common.ContextError(err)
  1202. }
  1203. NoticeRequestedTactics(
  1204. serverEntry.IpAddress,
  1205. serverEntry.Region,
  1206. tacticsProtocol,
  1207. dialStats)
  1208. return tacticsRecord, nil
  1209. }
  1210. // establishCandidateGenerator populates the candidate queue with server entries
  1211. // from the data store. Server entries are iterated in rank order, so that promoted
  1212. // servers with higher rank are priority candidates.
  1213. func (controller *Controller) establishCandidateGenerator(impairedProtocols []string) {
  1214. defer controller.establishWaitGroup.Done()
  1215. defer close(controller.candidateServerEntries)
  1216. // establishStartTime is used to calculate and report the
  1217. // client's tunnel establishment duration.
  1218. //
  1219. // networkWaitDuration is the elapsed time spent waiting
  1220. // for network connectivity. This duration will be excluded
  1221. // from reported tunnel establishment duration.
  1222. establishStartTime := monotime.Now()
  1223. var networkWaitDuration time.Duration
  1224. applyServerAffinity, iterator, err := NewServerEntryIterator(controller.config)
  1225. if err != nil {
  1226. NoticeAlert("failed to iterate over candidates: %s", err)
  1227. controller.SignalComponentFailure()
  1228. return
  1229. }
  1230. defer iterator.Close()
  1231. // TODO: reconcile server affinity scheme with multi-tunnel mode
  1232. if controller.config.TunnelPoolSize > 1 {
  1233. applyServerAffinity = false
  1234. }
  1235. isServerAffinityCandidate := true
  1236. if !applyServerAffinity {
  1237. isServerAffinityCandidate = false
  1238. close(controller.serverAffinityDoneBroadcast)
  1239. }
  1240. candidateCount := 0
  1241. loop:
  1242. // Repeat until stopped
  1243. for i := 0; ; i++ {
  1244. networkWaitStartTime := monotime.Now()
  1245. if !WaitForNetworkConnectivity(
  1246. controller.establishCtx,
  1247. controller.config.NetworkConnectivityChecker) {
  1248. break loop
  1249. }
  1250. networkWaitDuration += monotime.Since(networkWaitStartTime)
  1251. // Send each iterator server entry to the establish workers
  1252. startTime := monotime.Now()
  1253. for {
  1254. serverEntry, err := iterator.Next()
  1255. if err != nil {
  1256. NoticeAlert("failed to get next candidate: %s", err)
  1257. controller.SignalComponentFailure()
  1258. break loop
  1259. }
  1260. if serverEntry == nil {
  1261. // Completed this iteration
  1262. break
  1263. }
  1264. if controller.config.TargetApiProtocol == protocol.PSIPHON_SSH_API_PROTOCOL &&
  1265. !serverEntry.SupportsSSHAPIRequests() {
  1266. continue
  1267. }
  1268. // Use a prioritized tunnel protocol for the first
  1269. // PrioritizeTunnelProtocolsCandidateCount candidates.
  1270. // This facility can be used to favor otherwise slower
  1271. // protocols.
  1272. prioritizeCandidateCount := controller.config.clientParameters.Get().Int(
  1273. parameters.PrioritizeTunnelProtocolsCandidateCount)
  1274. usePriorityProtocol := candidateCount < prioritizeCandidateCount
  1275. // Disable impaired protocols. This is only done for the
  1276. // first iteration of the EstablishTunnelWorkTime
  1277. // loop since (a) one iteration should be sufficient to
  1278. // evade the attack; (b) there's a good chance of false
  1279. // positives (such as short tunnel durations due to network
  1280. // hopping on a mobile device).
  1281. var candidateImpairedProtocols []string
  1282. if i == 0 {
  1283. candidateImpairedProtocols = impairedProtocols
  1284. }
  1285. // adjustedEstablishStartTime is establishStartTime shifted
  1286. // to exclude time spent waiting for network connectivity.
  1287. adjustedEstablishStartTime := establishStartTime.Add(networkWaitDuration)
  1288. candidate := &candidateServerEntry{
  1289. serverEntry: serverEntry,
  1290. isServerAffinityCandidate: isServerAffinityCandidate,
  1291. usePriorityProtocol: usePriorityProtocol,
  1292. impairedProtocols: candidateImpairedProtocols,
  1293. adjustedEstablishStartTime: adjustedEstablishStartTime,
  1294. }
  1295. wasServerAffinityCandidate := isServerAffinityCandidate
  1296. // Note: there must be only one server affinity candidate, as it
  1297. // closes the serverAffinityDoneBroadcast channel.
  1298. isServerAffinityCandidate = false
  1299. // TODO: here we could generate multiple candidates from the
  1300. // server entry when there are many MeekFrontingAddresses.
  1301. candidateCount++
  1302. select {
  1303. case controller.candidateServerEntries <- candidate:
  1304. case <-controller.establishCtx.Done():
  1305. break loop
  1306. }
  1307. workTime := controller.config.clientParameters.Get().Duration(
  1308. parameters.EstablishTunnelWorkTime)
  1309. if startTime.Add(workTime).Before(monotime.Now()) {
  1310. // Start over, after a brief pause, with a new shuffle of the server
  1311. // entries, and potentially some newly fetched server entries.
  1312. break
  1313. }
  1314. if wasServerAffinityCandidate {
  1315. // Don't start the next candidate until either the server affinity
  1316. // candidate has completed (success or failure) or is still working
  1317. // and the grace period has elapsed.
  1318. gracePeriod := controller.config.clientParameters.Get().Duration(
  1319. parameters.EstablishTunnelServerAffinityGracePeriod)
  1320. if gracePeriod > 0 {
  1321. timer := time.NewTimer(gracePeriod)
  1322. select {
  1323. case <-timer.C:
  1324. case <-controller.serverAffinityDoneBroadcast:
  1325. case <-controller.establishCtx.Done():
  1326. timer.Stop()
  1327. break loop
  1328. }
  1329. timer.Stop()
  1330. }
  1331. } else {
  1332. p := controller.config.clientParameters.Get()
  1333. staggerPeriod := p.Duration(parameters.StaggerConnectionWorkersPeriod)
  1334. staggerJitter := p.Float(parameters.StaggerConnectionWorkersJitter)
  1335. p = nil
  1336. if staggerPeriod != 0 {
  1337. // Stagger concurrent connection workers.
  1338. timeout := common.JitterDuration(staggerPeriod, staggerJitter)
  1339. timer := time.NewTimer(timeout)
  1340. select {
  1341. case <-timer.C:
  1342. case <-controller.establishCtx.Done():
  1343. timer.Stop()
  1344. break loop
  1345. }
  1346. timer.Stop()
  1347. }
  1348. }
  1349. }
  1350. // Free up resources now, but don't reset until after the pause.
  1351. iterator.Close()
  1352. // Trigger a common remote server list fetch, since we may have failed
  1353. // to connect with all known servers. Don't block sending signal, since
  1354. // this signal may have already been sent.
  1355. // Don't wait for fetch remote to succeed, since it may fail and
  1356. // enter a retry loop and we're better off trying more known servers.
  1357. // TODO: synchronize the fetch response, so it can be incorporated
  1358. // into the server entry iterator as soon as available.
  1359. select {
  1360. case controller.signalFetchCommonRemoteServerList <- *new(struct{}):
  1361. default:
  1362. }
  1363. // Trigger an OSL fetch in parallel. Both fetches are run in parallel
  1364. // so that if one out of the common RLS and OSL set is large, it doesn't
  1365. // doesn't entirely block fetching the other.
  1366. select {
  1367. case controller.signalFetchObfuscatedServerLists <- *new(struct{}):
  1368. default:
  1369. }
  1370. // Trigger an out-of-band upgrade availability check and download.
  1371. // Since we may have failed to connect, we may benefit from upgrading
  1372. // to a new client version with new circumvention capabilities.
  1373. select {
  1374. case controller.signalDownloadUpgrade <- "":
  1375. default:
  1376. }
  1377. // After a complete iteration of candidate servers, pause before iterating again.
  1378. // This helps avoid some busy wait loop conditions, and also allows some time for
  1379. // network conditions to change. Also allows for fetch remote to complete,
  1380. // in typical conditions (it isn't strictly necessary to wait for this, there will
  1381. // be more rounds if required).
  1382. p := controller.config.clientParameters.Get()
  1383. timeout := common.JitterDuration(
  1384. p.Duration(parameters.EstablishTunnelPausePeriod),
  1385. p.Float(parameters.EstablishTunnelPausePeriodJitter))
  1386. p = nil
  1387. timer := time.NewTimer(timeout)
  1388. select {
  1389. case <-timer.C:
  1390. // Retry iterating
  1391. case <-controller.establishCtx.Done():
  1392. timer.Stop()
  1393. break loop
  1394. }
  1395. timer.Stop()
  1396. iterator.Reset()
  1397. }
  1398. }
  1399. // establishTunnelWorker pulls candidates from the candidate queue, establishes
  1400. // a connection to the tunnel server, and delivers the connected tunnel to a channel.
  1401. func (controller *Controller) establishTunnelWorker() {
  1402. defer controller.establishWaitGroup.Done()
  1403. loop:
  1404. for candidateServerEntry := range controller.candidateServerEntries {
  1405. // Note: don't receive from candidateServerEntries and isStopEstablishing
  1406. // in the same select, since we want to prioritize receiving the stop signal
  1407. if controller.isStopEstablishing() {
  1408. break loop
  1409. }
  1410. // There may already be a tunnel to this candidate. If so, skip it.
  1411. if controller.isActiveTunnelServerEntry(candidateServerEntry.serverEntry) {
  1412. continue
  1413. }
  1414. // ConnectTunnel will allocate significant memory, so first attempt to
  1415. // reclaim as much as possible.
  1416. defaultGarbageCollection()
  1417. // Select the tunnel protocol. The selection will be made at random from
  1418. // protocols supported by the server entry, optionally limited by
  1419. // LimitTunnelProtocols.
  1420. //
  1421. // When limiting concurrent meek connection workers, and at the limit,
  1422. // do not select meek since otherwise the candidate must be skipped.
  1423. //
  1424. // If at the limit and unabled to select a non-meek protocol, skip the
  1425. // candidate entirely and move on to the next. Since candidates are shuffled
  1426. // it's probable that the next candidate is not meek. In this case, a
  1427. // StaggerConnectionWorkersMilliseconds delay may still be incurred.
  1428. limitMeekConnectionWorkers := controller.config.clientParameters.Get().Int(
  1429. parameters.LimitMeekConnectionWorkers)
  1430. excludeMeek := false
  1431. controller.concurrentEstablishTunnelsMutex.Lock()
  1432. if limitMeekConnectionWorkers > 0 &&
  1433. controller.concurrentMeekEstablishTunnels >=
  1434. limitMeekConnectionWorkers {
  1435. excludeMeek = true
  1436. }
  1437. controller.concurrentEstablishTunnelsMutex.Unlock()
  1438. selectedProtocol, err := selectProtocol(
  1439. controller.config,
  1440. candidateServerEntry.serverEntry,
  1441. candidateServerEntry.impairedProtocols,
  1442. excludeMeek,
  1443. candidateServerEntry.usePriorityProtocol)
  1444. if err == errNoProtocolSupported {
  1445. // selectProtocol returns errNoProtocolSupported when the server
  1446. // does not support any protocol that remains after applying the
  1447. // LimitTunnelProtocols parameter, the impaired protocol filter,
  1448. // and the excludeMeek flag.
  1449. // Skip this candidate.
  1450. // Unblock other candidates immediately when
  1451. // server affinity candidate is skipped.
  1452. if candidateServerEntry.isServerAffinityCandidate {
  1453. close(controller.serverAffinityDoneBroadcast)
  1454. }
  1455. continue
  1456. }
  1457. var tunnel *Tunnel
  1458. if err == nil {
  1459. isMeek := protocol.TunnelProtocolUsesMeek(selectedProtocol)
  1460. controller.concurrentEstablishTunnelsMutex.Lock()
  1461. if isMeek {
  1462. // Recheck the limit now that we know we're selecting meek and
  1463. // adjusting concurrentMeekEstablishTunnels.
  1464. if limitMeekConnectionWorkers > 0 &&
  1465. controller.concurrentMeekEstablishTunnels >=
  1466. limitMeekConnectionWorkers {
  1467. // Skip this candidate.
  1468. controller.concurrentEstablishTunnelsMutex.Unlock()
  1469. continue
  1470. }
  1471. controller.concurrentMeekEstablishTunnels += 1
  1472. if controller.concurrentMeekEstablishTunnels > controller.peakConcurrentMeekEstablishTunnels {
  1473. controller.peakConcurrentMeekEstablishTunnels = controller.concurrentMeekEstablishTunnels
  1474. }
  1475. }
  1476. controller.concurrentEstablishTunnels += 1
  1477. if controller.concurrentEstablishTunnels > controller.peakConcurrentEstablishTunnels {
  1478. controller.peakConcurrentEstablishTunnels = controller.concurrentEstablishTunnels
  1479. }
  1480. controller.concurrentEstablishTunnelsMutex.Unlock()
  1481. tunnel, err = ConnectTunnel(
  1482. controller.establishCtx,
  1483. controller.config,
  1484. controller.sessionId,
  1485. candidateServerEntry.serverEntry,
  1486. selectedProtocol,
  1487. candidateServerEntry.adjustedEstablishStartTime)
  1488. controller.concurrentEstablishTunnelsMutex.Lock()
  1489. if isMeek {
  1490. controller.concurrentMeekEstablishTunnels -= 1
  1491. }
  1492. controller.concurrentEstablishTunnels -= 1
  1493. controller.concurrentEstablishTunnelsMutex.Unlock()
  1494. }
  1495. // Periodically emit memory metrics during the establishment cycle.
  1496. if !controller.isStopEstablishing() {
  1497. emitMemoryMetrics()
  1498. }
  1499. // Immediately reclaim memory allocated by the establishment. In the case
  1500. // of failure, first clear the reference to the tunnel. In the case of
  1501. // success, the garbage collection may still be effective as the initial
  1502. // phases of some protocols involve significant memory allocation that
  1503. // could now be reclaimed.
  1504. if err != nil {
  1505. tunnel = nil
  1506. }
  1507. defaultGarbageCollection()
  1508. if err != nil {
  1509. // Unblock other candidates immediately when
  1510. // server affinity candidate fails.
  1511. if candidateServerEntry.isServerAffinityCandidate {
  1512. close(controller.serverAffinityDoneBroadcast)
  1513. }
  1514. // Before emitting error, check if establish interrupted, in which
  1515. // case the error is noise.
  1516. if controller.isStopEstablishing() {
  1517. break loop
  1518. }
  1519. NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err)
  1520. continue
  1521. }
  1522. // Deliver connected tunnel.
  1523. // Don't block. Assumes the receiver has a buffer large enough for
  1524. // the number of desired tunnels. If there's no room, the tunnel must
  1525. // not be required so it's discarded.
  1526. select {
  1527. case controller.connectedTunnels <- tunnel:
  1528. default:
  1529. controller.discardTunnel(tunnel)
  1530. // Clear the reference to this discarded tunnel and immediately run
  1531. // a garbage collection to reclaim its memory.
  1532. tunnel = nil
  1533. defaultGarbageCollection()
  1534. }
  1535. // Unblock other candidates only after delivering when
  1536. // server affinity candidate succeeds.
  1537. if candidateServerEntry.isServerAffinityCandidate {
  1538. close(controller.serverAffinityDoneBroadcast)
  1539. }
  1540. }
  1541. }
  1542. func (controller *Controller) isStopEstablishing() bool {
  1543. select {
  1544. case <-controller.establishCtx.Done():
  1545. return true
  1546. default:
  1547. }
  1548. return false
  1549. }