controller.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // Package psiphon implements the core tunnel functionality of a Psiphon client.
  20. // The main function is RunForever, which runs a Controller that obtains lists of
  21. // servers, establishes tunnel connections, and runs local proxies through which
  22. // tunneled traffic may be sent.
  23. package psiphon
  24. import (
  25. "errors"
  26. "net"
  27. "sync"
  28. "time"
  29. )
  30. // Controller is a tunnel lifecycle coordinator. It manages lists of servers to
  31. // connect to; establishes and monitors tunnels; and runs local proxies which
  32. // route traffic through the tunnels.
  33. type Controller struct {
  34. config *Config
  35. sessionId string
  36. componentFailureSignal chan struct{}
  37. shutdownBroadcast chan struct{}
  38. runWaitGroup *sync.WaitGroup
  39. establishedTunnels chan *Tunnel
  40. failedTunnels chan *Tunnel
  41. tunnelMutex sync.Mutex
  42. tunnels []*Tunnel
  43. nextTunnel int
  44. startedConnectedReporter bool
  45. isEstablishing bool
  46. establishWaitGroup *sync.WaitGroup
  47. stopEstablishingBroadcast chan struct{}
  48. candidateServerEntries chan *ServerEntry
  49. establishPendingConns *Conns
  50. fetchRemotePendingConns *Conns
  51. }
  52. // NewController initializes a new controller.
  53. func NewController(config *Config) (controller *Controller, err error) {
  54. // Generate a session ID for the Psiphon server API. This session ID is
  55. // used across all tunnels established by the controller.
  56. sessionId, err := MakeSessionId()
  57. if err != nil {
  58. return nil, ContextError(err)
  59. }
  60. return &Controller{
  61. config: config,
  62. sessionId: sessionId,
  63. // componentFailureSignal receives a signal from a component (including socks and
  64. // http local proxies) if they unexpectedly fail. Senders should not block.
  65. // A buffer allows at least one stop signal to be sent before there is a receiver.
  66. componentFailureSignal: make(chan struct{}, 1),
  67. shutdownBroadcast: make(chan struct{}),
  68. runWaitGroup: new(sync.WaitGroup),
  69. // establishedTunnels and failedTunnels buffer sizes are large enough to
  70. // receive full pools of tunnels without blocking. Senders should not block.
  71. establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  72. failedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
  73. tunnels: make([]*Tunnel, 0),
  74. startedConnectedReporter: false,
  75. isEstablishing: false,
  76. establishPendingConns: new(Conns),
  77. fetchRemotePendingConns: new(Conns),
  78. }, nil
  79. }
  80. // Run executes the controller. It launches components and then monitors
  81. // for a shutdown signal; after receiving the signal it shuts down the
  82. // controller.
  83. // The components include:
  84. // - the periodic remote server list fetcher
  85. // - the connected reporter
  86. // - the tunnel manager
  87. // - a local SOCKS proxy that port forwards through the pool of tunnels
  88. // - a local HTTP proxy that port forwards through the pool of tunnels
  89. func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
  90. NoticeCoreVersion(VERSION)
  91. // Start components
  92. socksProxy, err := NewSocksProxy(controller.config, controller)
  93. if err != nil {
  94. NoticeAlert("error initializing local SOCKS proxy: %s", err)
  95. return
  96. }
  97. defer socksProxy.Close()
  98. httpProxy, err := NewHttpProxy(controller.config, controller)
  99. if err != nil {
  100. NoticeAlert("error initializing local HTTP proxy: %s", err)
  101. return
  102. }
  103. defer httpProxy.Close()
  104. // Note: unlike legacy Psiphon clients, this code always makes the
  105. // fetch remote server list request
  106. if !controller.config.DisableRemoteServerListFetcher {
  107. controller.runWaitGroup.Add(1)
  108. go controller.remoteServerListFetcher()
  109. }
  110. /// Note: the connected reporter isn't started until a tunnel is
  111. // established
  112. controller.runWaitGroup.Add(1)
  113. go controller.runTunnels()
  114. // Wait while running
  115. select {
  116. case <-shutdownBroadcast:
  117. NoticeInfo("controller shutdown by request")
  118. case <-controller.componentFailureSignal:
  119. NoticeAlert("controller shutdown due to component failure")
  120. }
  121. close(controller.shutdownBroadcast)
  122. controller.establishPendingConns.CloseAll()
  123. controller.fetchRemotePendingConns.CloseAll()
  124. controller.runWaitGroup.Wait()
  125. NoticeInfo("exiting controller")
  126. }
  127. // SignalComponentFailure notifies the controller that an associated component has failed.
  128. // This will terminate the controller.
  129. func (controller *Controller) SignalComponentFailure() {
  130. select {
  131. case controller.componentFailureSignal <- *new(struct{}):
  132. default:
  133. }
  134. }
  135. // remoteServerListFetcher fetches an out-of-band list of server entries
  136. // for more tunnel candidates. It fetches immediately, retries after failure
  137. // with a wait period, and refetches after success with a longer wait period.
  138. func (controller *Controller) remoteServerListFetcher() {
  139. defer controller.runWaitGroup.Done()
  140. loop:
  141. for {
  142. err := FetchRemoteServerList(
  143. controller.config, controller.fetchRemotePendingConns)
  144. var duration time.Duration
  145. if err != nil {
  146. NoticeAlert("failed to fetch remote server list: %s", err)
  147. duration = FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD
  148. } else {
  149. duration = FETCH_REMOTE_SERVER_LIST_STALE_PERIOD
  150. }
  151. timeout := time.After(duration)
  152. select {
  153. case <-timeout:
  154. // Fetch again
  155. case <-controller.shutdownBroadcast:
  156. break loop
  157. }
  158. }
  159. NoticeInfo("exiting remote server list fetcher")
  160. }
  161. // connectedReporter sends periodic "connected" requests to the Psiphon API.
  162. // These requests are for server-side unique user stats calculation. See the
  163. // comment in DoConnectedRequest for a description of the request mechanism.
  164. // To ensure we don't over- or under-count unique users, only one connected
  165. // request is made across all simultaneous multi-tunnels; and the connected
  166. // request is repeated periodically.
  167. func (controller *Controller) connectedReporter() {
  168. defer controller.runWaitGroup.Done()
  169. loop:
  170. for {
  171. // Pick any active tunnel and make the next connected request. No error
  172. // is logged if there's no active tunnel, as that's not an unexpected condition.
  173. reported := false
  174. tunnel := controller.getNextActiveTunnel()
  175. if tunnel != nil {
  176. err := tunnel.session.DoConnectedRequest()
  177. if err == nil {
  178. reported = true
  179. } else {
  180. NoticeAlert("failed to make connected request: %s", err)
  181. }
  182. }
  183. // Schedule the next connected request and wait.
  184. var duration time.Duration
  185. if reported {
  186. duration = PSIPHON_API_CONNECTED_REQUEST_PERIOD
  187. } else {
  188. duration = PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD
  189. }
  190. timeout := time.After(duration)
  191. select {
  192. case <-timeout:
  193. // Make another connected request
  194. case <-controller.shutdownBroadcast:
  195. break loop
  196. }
  197. }
  198. NoticeInfo("exiting connected reporter")
  199. }
  200. func (controller *Controller) startConnectedReporter() {
  201. if controller.config.DisableApi {
  202. return
  203. }
  204. // Start the connected reporter after the first tunnel is established.
  205. // Concurrency note: only the runTunnels goroutine may access startedConnectedReporter.
  206. if !controller.startedConnectedReporter {
  207. controller.startedConnectedReporter = true
  208. controller.runWaitGroup.Add(1)
  209. go controller.connectedReporter()
  210. }
  211. }
  212. // runTunnels is the controller tunnel management main loop. It starts and stops
  213. // establishing tunnels based on the target tunnel pool size and the current size
  214. // of the pool. Tunnels are established asynchronously using worker goroutines.
  215. //
  216. // When there are no server entries for the target region/protocol, the
  217. // establishCandidateGenerator will yield no candidates and wait before
  218. // trying again. In the meantime, a remote server entry fetch may supply
  219. // valid candidates.
  220. //
  221. // When a tunnel is established, it's added to the active pool. The tunnel's
  222. // operateTunnel goroutine monitors the tunnel.
  223. //
  224. // When a tunnel fails, it's removed from the pool and the establish process is
  225. // restarted to fill the pool.
  226. func (controller *Controller) runTunnels() {
  227. defer controller.runWaitGroup.Done()
  228. // Start running
  229. controller.startEstablishing()
  230. loop:
  231. for {
  232. select {
  233. case failedTunnel := <-controller.failedTunnels:
  234. NoticeAlert("tunnel failed: %s", failedTunnel.serverEntry.IpAddress)
  235. controller.terminateTunnel(failedTunnel)
  236. // Concurrency note: only this goroutine may call startEstablishing/stopEstablishing
  237. // and access isEstablishing.
  238. if !controller.isEstablishing {
  239. controller.startEstablishing()
  240. }
  241. // !TODO! design issue: might not be enough server entries with region/caps to ever fill tunnel slots
  242. // solution(?) target MIN(CountServerEntries(region, protocol), TunnelPoolSize)
  243. case establishedTunnel := <-controller.establishedTunnels:
  244. if controller.registerTunnel(establishedTunnel) {
  245. NoticeActiveTunnel(establishedTunnel.serverEntry.IpAddress)
  246. } else {
  247. controller.discardTunnel(establishedTunnel)
  248. }
  249. if controller.isFullyEstablished() {
  250. controller.stopEstablishing()
  251. }
  252. controller.startConnectedReporter()
  253. case <-controller.shutdownBroadcast:
  254. break loop
  255. }
  256. }
  257. // Stop running
  258. controller.stopEstablishing()
  259. controller.terminateAllTunnels()
  260. // Drain tunnel channels
  261. close(controller.establishedTunnels)
  262. for tunnel := range controller.establishedTunnels {
  263. controller.discardTunnel(tunnel)
  264. }
  265. close(controller.failedTunnels)
  266. for tunnel := range controller.failedTunnels {
  267. controller.discardTunnel(tunnel)
  268. }
  269. NoticeInfo("exiting run tunnels")
  270. }
  271. // SignalTunnelFailure implements the TunnelOwner interface. This function
  272. // is called by Tunnel.operateTunnel when the tunnel has detected that it
  273. // has failed. The Controller will signal runTunnels to create a new
  274. // tunnel and/or remove the tunnel from the list of active tunnels.
  275. func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) {
  276. // Don't block. Assumes the receiver has a buffer large enough for
  277. // the typical number of operated tunnels. In case there's no room,
  278. // terminate the tunnel (runTunnels won't get a signal in this case,
  279. // but the tunnel will be removed from the list of active tunnels).
  280. select {
  281. case controller.failedTunnels <- tunnel:
  282. default:
  283. controller.terminateTunnel(tunnel)
  284. }
  285. }
  286. // discardTunnel disposes of a successful connection that is no longer required.
  287. func (controller *Controller) discardTunnel(tunnel *Tunnel) {
  288. NoticeInfo("discard tunnel: %s", tunnel.serverEntry.IpAddress)
  289. // TODO: not calling PromoteServerEntry, since that would rank the
  290. // discarded tunnel before fully active tunnels. Can a discarded tunnel
  291. // be promoted (since it connects), but with lower rank than all active
  292. // tunnels?
  293. tunnel.Close()
  294. }
  295. // registerTunnel adds the connected tunnel to the pool of active tunnels
  296. // which are candidates for port forwarding. Returns true if the pool has an
  297. // empty slot and false if the pool is full (caller should discard the tunnel).
  298. func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
  299. controller.tunnelMutex.Lock()
  300. defer controller.tunnelMutex.Unlock()
  301. if len(controller.tunnels) >= controller.config.TunnelPoolSize {
  302. return false
  303. }
  304. // Perform a final check just in case we've established
  305. // a duplicate connection.
  306. for _, activeTunnel := range controller.tunnels {
  307. if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress {
  308. NoticeAlert("duplicate tunnel: %s", tunnel.serverEntry.IpAddress)
  309. return false
  310. }
  311. }
  312. controller.tunnels = append(controller.tunnels, tunnel)
  313. NoticeTunnels(len(controller.tunnels))
  314. return true
  315. }
  316. // isFullyEstablished indicates if the pool of active tunnels is full.
  317. func (controller *Controller) isFullyEstablished() bool {
  318. controller.tunnelMutex.Lock()
  319. defer controller.tunnelMutex.Unlock()
  320. return len(controller.tunnels) >= controller.config.TunnelPoolSize
  321. }
  322. // terminateTunnel removes a tunnel from the pool of active tunnels
  323. // and closes the tunnel. The next-tunnel state used by getNextActiveTunnel
  324. // is adjusted as required.
  325. func (controller *Controller) terminateTunnel(tunnel *Tunnel) {
  326. controller.tunnelMutex.Lock()
  327. defer controller.tunnelMutex.Unlock()
  328. for index, activeTunnel := range controller.tunnels {
  329. if tunnel == activeTunnel {
  330. controller.tunnels = append(
  331. controller.tunnels[:index], controller.tunnels[index+1:]...)
  332. if controller.nextTunnel > index {
  333. controller.nextTunnel--
  334. }
  335. if controller.nextTunnel >= len(controller.tunnels) {
  336. controller.nextTunnel = 0
  337. }
  338. activeTunnel.Close()
  339. NoticeTunnels(len(controller.tunnels))
  340. break
  341. }
  342. }
  343. }
  344. // terminateAllTunnels empties the tunnel pool, closing all active tunnels.
  345. // This is used when shutting down the controller.
  346. func (controller *Controller) terminateAllTunnels() {
  347. controller.tunnelMutex.Lock()
  348. defer controller.tunnelMutex.Unlock()
  349. for _, activeTunnel := range controller.tunnels {
  350. activeTunnel.Close()
  351. }
  352. controller.tunnels = make([]*Tunnel, 0)
  353. controller.nextTunnel = 0
  354. NoticeTunnels(len(controller.tunnels))
  355. }
  356. // getNextActiveTunnel returns the next tunnel from the pool of active
  357. // tunnels. Currently, tunnel selection order is simple round-robin.
  358. func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
  359. controller.tunnelMutex.Lock()
  360. defer controller.tunnelMutex.Unlock()
  361. for i := len(controller.tunnels); i > 0; i-- {
  362. tunnel = controller.tunnels[controller.nextTunnel]
  363. controller.nextTunnel =
  364. (controller.nextTunnel + 1) % len(controller.tunnels)
  365. return tunnel
  366. }
  367. return nil
  368. }
  369. // isActiveTunnelServerEntries is used to check if there's already
  370. // an existing tunnel to a candidate server.
  371. func (controller *Controller) isActiveTunnelServerEntry(serverEntry *ServerEntry) bool {
  372. controller.tunnelMutex.Lock()
  373. defer controller.tunnelMutex.Unlock()
  374. for _, activeTunnel := range controller.tunnels {
  375. if activeTunnel.serverEntry.IpAddress == serverEntry.IpAddress {
  376. return true
  377. }
  378. }
  379. return false
  380. }
  381. // Dial selects an active tunnel and establishes a port forward
  382. // connection through the selected tunnel. Failure to connect is considered
  383. // a port foward failure, for the purpose of monitoring tunnel health.
  384. func (controller *Controller) Dial(remoteAddr string) (conn net.Conn, err error) {
  385. tunnel := controller.getNextActiveTunnel()
  386. if tunnel == nil {
  387. return nil, ContextError(errors.New("no active tunnels"))
  388. }
  389. tunneledConn, err := tunnel.Dial(remoteAddr)
  390. if err != nil {
  391. return nil, ContextError(err)
  392. }
  393. return tunneledConn, nil
  394. }
  395. // startEstablishing creates a pool of worker goroutines which will
  396. // attempt to establish tunnels to candidate servers. The candidates
  397. // are generated by another goroutine.
  398. func (controller *Controller) startEstablishing() {
  399. if controller.isEstablishing {
  400. return
  401. }
  402. NoticeInfo("start establishing")
  403. controller.isEstablishing = true
  404. controller.establishWaitGroup = new(sync.WaitGroup)
  405. controller.stopEstablishingBroadcast = make(chan struct{})
  406. controller.candidateServerEntries = make(chan *ServerEntry)
  407. controller.establishPendingConns.Reset()
  408. for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
  409. controller.establishWaitGroup.Add(1)
  410. go controller.establishTunnelWorker()
  411. }
  412. controller.establishWaitGroup.Add(1)
  413. go controller.establishCandidateGenerator()
  414. }
  415. // stopEstablishing signals the establish goroutines to stop and waits
  416. // for the group to halt. pendingConns is used to interrupt any worker
  417. // blocked on a socket connect.
  418. func (controller *Controller) stopEstablishing() {
  419. if !controller.isEstablishing {
  420. return
  421. }
  422. NoticeInfo("stop establishing")
  423. close(controller.stopEstablishingBroadcast)
  424. // Note: on Windows, interruptibleTCPClose doesn't really interrupt socket connects
  425. // and may leave goroutines running for a time after the Wait call.
  426. controller.establishPendingConns.CloseAll()
  427. // Note: establishCandidateGenerator closes controller.candidateServerEntries
  428. // (as it may be sending to that channel).
  429. controller.establishWaitGroup.Wait()
  430. controller.isEstablishing = false
  431. controller.establishWaitGroup = nil
  432. controller.stopEstablishingBroadcast = nil
  433. controller.candidateServerEntries = nil
  434. }
  435. // establishCandidateGenerator populates the candidate queue with server entries
  436. // from the data store. Server entries are iterated in rank order, so that promoted
  437. // servers with higher rank are priority candidates.
  438. func (controller *Controller) establishCandidateGenerator() {
  439. defer controller.establishWaitGroup.Done()
  440. iterator, err := NewServerEntryIterator(controller.config)
  441. if err != nil {
  442. NoticeAlert("failed to iterate over candidates: %s", err)
  443. controller.SignalComponentFailure()
  444. return
  445. }
  446. defer iterator.Close()
  447. loop:
  448. // Repeat until stopped
  449. for {
  450. // Yield each server entry returned by the iterator
  451. for {
  452. serverEntry, err := iterator.Next()
  453. if err != nil {
  454. NoticeAlert("failed to get next candidate: %s", err)
  455. controller.SignalComponentFailure()
  456. break loop
  457. }
  458. if serverEntry == nil {
  459. // Completed this iteration
  460. break
  461. }
  462. select {
  463. case controller.candidateServerEntries <- serverEntry:
  464. case <-controller.stopEstablishingBroadcast:
  465. break loop
  466. case <-controller.shutdownBroadcast:
  467. break loop
  468. }
  469. }
  470. iterator.Reset()
  471. // After a complete iteration of candidate servers, pause before iterating again.
  472. // This helps avoid some busy wait loop conditions, and also allows some time for
  473. // network conditions to change.
  474. timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD)
  475. select {
  476. case <-timeout:
  477. // Retry iterating
  478. case <-controller.stopEstablishingBroadcast:
  479. break loop
  480. case <-controller.shutdownBroadcast:
  481. break loop
  482. }
  483. }
  484. close(controller.candidateServerEntries)
  485. NoticeInfo("stopped candidate generator")
  486. }
  487. // establishTunnelWorker pulls candidates from the candidate queue, establishes
  488. // a connection to the tunnel server, and delivers the established tunnel to a channel.
  489. func (controller *Controller) establishTunnelWorker() {
  490. defer controller.establishWaitGroup.Done()
  491. loop:
  492. for serverEntry := range controller.candidateServerEntries {
  493. // Note: don't receive from candidateServerEntries and stopEstablishingBroadcast
  494. // in the same select, since we want to prioritize receiving the stop signal
  495. if controller.isStopEstablishingBroadcast() {
  496. break loop
  497. }
  498. // There may already be a tunnel to this candidate. If so, skip it.
  499. if controller.isActiveTunnelServerEntry(serverEntry) {
  500. continue
  501. }
  502. tunnel, err := EstablishTunnel(
  503. controller.config,
  504. controller.sessionId,
  505. controller.establishPendingConns,
  506. serverEntry,
  507. controller) // TunnelOwner
  508. if err != nil {
  509. // Before emitting error, check if establish interrupted, in which
  510. // case the error is noise.
  511. if controller.isStopEstablishingBroadcast() {
  512. break loop
  513. }
  514. NoticeInfo("failed to connect to %s: %s", serverEntry.IpAddress, err)
  515. continue
  516. }
  517. // Deliver established tunnel.
  518. // Don't block. Assumes the receiver has a buffer large enough for
  519. // the number of desired tunnels. If there's no room, the tunnel must
  520. // not be required so it's discarded.
  521. select {
  522. case controller.establishedTunnels <- tunnel:
  523. default:
  524. controller.discardTunnel(tunnel)
  525. }
  526. }
  527. NoticeInfo("stopped establish worker")
  528. }
  529. func (controller *Controller) isStopEstablishingBroadcast() bool {
  530. select {
  531. case <-controller.stopEstablishingBroadcast:
  532. return true
  533. default:
  534. }
  535. return false
  536. }