/* * Copyright (c) 2015, Psiphon Inc. * All rights reserved. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ // Package psiphon implements the core tunnel functionality of a Psiphon client. // The main function is RunForever, which runs a Controller that obtains lists of // servers, establishes tunnel connections, and runs local proxies through which // tunneled traffic may be sent. package psiphon import ( "errors" "math/rand" "net" "sync" "time" ) // Controller is a tunnel lifecycle coordinator. It manages lists of servers to // connect to; establishes and monitors tunnels; and runs local proxies which // route traffic through the tunnels. type Controller struct { config *Config sessionId string componentFailureSignal chan struct{} shutdownBroadcast chan struct{} runWaitGroup *sync.WaitGroup establishedTunnels chan *Tunnel failedTunnels chan *Tunnel tunnelMutex sync.Mutex establishedOnce bool tunnels []*Tunnel nextTunnel int startedConnectedReporter bool startedUpgradeDownloader bool isEstablishing bool establishWaitGroup *sync.WaitGroup stopEstablishingBroadcast chan struct{} candidateServerEntries chan *candidateServerEntry establishPendingConns *Conns untunneledPendingConns *Conns untunneledDialConfig *DialConfig splitTunnelClassifier *SplitTunnelClassifier signalFetchRemoteServerList chan struct{} impairedProtocolClassification map[string]int signalReportConnected chan struct{} serverAffinityDoneBroadcast chan struct{} } type candidateServerEntry struct { serverEntry *ServerEntry isServerAffinityCandidate bool } // NewController initializes a new controller. func NewController(config *Config) (controller *Controller, err error) { // Needed by regen, at least rand.Seed(int64(time.Now().Nanosecond())) // Generate a session ID for the Psiphon server API. This session ID is // used across all tunnels established by the controller. sessionId, err := MakeSessionId() if err != nil { return nil, ContextError(err) } // untunneledPendingConns may be used to interrupt the fetch remote server list // request and other untunneled connection establishments. BindToDevice may be // used to exclude these requests and connection from VPN routing. untunneledPendingConns := new(Conns) untunneledDialConfig := &DialConfig{ UpstreamProxyUrl: config.UpstreamProxyUrl, PendingConns: untunneledPendingConns, DeviceBinder: config.DeviceBinder, DnsServerGetter: config.DnsServerGetter, UseIndistinguishableTLS: config.UseIndistinguishableTLS, TrustedCACertificatesFilename: config.TrustedCACertificatesFilename, } controller = &Controller{ config: config, sessionId: sessionId, // componentFailureSignal receives a signal from a component (including socks and // http local proxies) if they unexpectedly fail. Senders should not block. // A buffer allows at least one stop signal to be sent before there is a receiver. componentFailureSignal: make(chan struct{}, 1), shutdownBroadcast: make(chan struct{}), runWaitGroup: new(sync.WaitGroup), // establishedTunnels and failedTunnels buffer sizes are large enough to // receive full pools of tunnels without blocking. Senders should not block. establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize), failedTunnels: make(chan *Tunnel, config.TunnelPoolSize), tunnels: make([]*Tunnel, 0), establishedOnce: false, startedConnectedReporter: false, startedUpgradeDownloader: false, isEstablishing: false, establishPendingConns: new(Conns), untunneledPendingConns: untunneledPendingConns, untunneledDialConfig: untunneledDialConfig, impairedProtocolClassification: make(map[string]int), // TODO: Add a buffer of 1 so we don't miss a signal while receiver is // starting? Trade-off is potential back-to-back fetch remotes. As-is, // establish will eventually signal another fetch remote. signalFetchRemoteServerList: make(chan struct{}), signalReportConnected: make(chan struct{}), } controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller) return controller, nil } // Run executes the controller. It launches components and then monitors // for a shutdown signal; after receiving the signal it shuts down the // controller. // The components include: // - the periodic remote server list fetcher // - the connected reporter // - the tunnel manager // - a local SOCKS proxy that port forwards through the pool of tunnels // - a local HTTP proxy that port forwards through the pool of tunnels func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) { NoticeBuildInfo() ReportAvailableRegions() // Start components listenIP, err := GetInterfaceIPAddress(controller.config.ListenInterface) if err != nil { NoticeError("error getting listener IP: %s", err) return } socksProxy, err := NewSocksProxy(controller.config, controller, listenIP) if err != nil { NoticeAlert("error initializing local SOCKS proxy: %s", err) return } defer socksProxy.Close() httpProxy, err := NewHttpProxy( controller.config, controller.untunneledDialConfig, controller, listenIP) if err != nil { NoticeAlert("error initializing local HTTP proxy: %s", err) return } defer httpProxy.Close() if !controller.config.DisableRemoteServerListFetcher { controller.runWaitGroup.Add(1) go controller.remoteServerListFetcher() } /// Note: the connected reporter isn't started until a tunnel is // established controller.runWaitGroup.Add(1) go controller.runTunnels() if *controller.config.EstablishTunnelTimeoutSeconds != 0 { controller.runWaitGroup.Add(1) go controller.establishTunnelWatcher() } // Wait while running select { case <-shutdownBroadcast: NoticeInfo("controller shutdown by request") case <-controller.componentFailureSignal: NoticeAlert("controller shutdown due to component failure") } close(controller.shutdownBroadcast) controller.establishPendingConns.CloseAll() controller.runWaitGroup.Wait() // Stops untunneled connections, including fetch remote server list, // split tunnel port forwards and also untunneled final stats requests. // Note: there's a circular dependency with runWaitGroup.Wait() and // untunneledPendingConns.CloseAll(): runWaitGroup depends on tunnels // stopping which depends, in orderly shutdown, on final status requests // completing. So this pending conns cancel comes too late to interrupt // final status requests in the orderly shutdown case -- which is desired // since we give those a short timeout and would prefer to not interrupt // them. controller.untunneledPendingConns.CloseAll() controller.splitTunnelClassifier.Shutdown() NoticeInfo("exiting controller") } // SignalComponentFailure notifies the controller that an associated component has failed. // This will terminate the controller. func (controller *Controller) SignalComponentFailure() { select { case controller.componentFailureSignal <- *new(struct{}): default: } } // remoteServerListFetcher fetches an out-of-band list of server entries // for more tunnel candidates. It fetches when signalled, with retries // on failure. func (controller *Controller) remoteServerListFetcher() { defer controller.runWaitGroup.Done() var lastFetchTime time.Time fetcherLoop: for { // Wait for a signal before fetching select { case <-controller.signalFetchRemoteServerList: case <-controller.shutdownBroadcast: break fetcherLoop } // Skip fetch entirely (i.e., send no request at all, even when ETag would save // on response size) when a recent fetch was successful if time.Now().Before(lastFetchTime.Add(FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)) { continue } retryLoop: for { // Don't attempt to fetch while there is no network connectivity, // to avoid alert notice noise. if !WaitForNetworkConnectivity( controller.config.NetworkConnectivityChecker, controller.shutdownBroadcast) { break fetcherLoop } err := FetchRemoteServerList( controller.config, controller.untunneledDialConfig) if err == nil { lastFetchTime = time.Now() break retryLoop } NoticeAlert("failed to fetch remote server list: %s", err) timeout := time.After(FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD) select { case <-timeout: case <-controller.shutdownBroadcast: break fetcherLoop } } } NoticeInfo("exiting remote server list fetcher") } // establishTunnelWatcher terminates the controller if a tunnel // has not been established in the configured time period. This // is regardless of how many tunnels are presently active -- meaning // that if an active tunnel was established and lost the controller // is left running (to re-establish). func (controller *Controller) establishTunnelWatcher() { defer controller.runWaitGroup.Done() timeout := time.After( time.Duration(*controller.config.EstablishTunnelTimeoutSeconds) * time.Second) select { case <-timeout: if !controller.hasEstablishedOnce() { NoticeAlert("failed to establish tunnel before timeout") controller.SignalComponentFailure() } case <-controller.shutdownBroadcast: } NoticeInfo("exiting establish tunnel watcher") } // connectedReporter sends periodic "connected" requests to the Psiphon API. // These requests are for server-side unique user stats calculation. See the // comment in DoConnectedRequest for a description of the request mechanism. // To ensure we don't over- or under-count unique users, only one connected // request is made across all simultaneous multi-tunnels; and the connected // request is repeated periodically for very long-lived tunnels. // The signalReportConnected mechanism is used to trigger another connected // request immediately after a reconnect. func (controller *Controller) connectedReporter() { defer controller.runWaitGroup.Done() loop: for { // Pick any active tunnel and make the next connected request. No error // is logged if there's no active tunnel, as that's not an unexpected condition. reported := false tunnel := controller.getNextActiveTunnel() if tunnel != nil { err := tunnel.serverContext.DoConnectedRequest() if err == nil { reported = true } else { NoticeAlert("failed to make connected request: %s", err) } } // Schedule the next connected request and wait. var duration time.Duration if reported { duration = PSIPHON_API_CONNECTED_REQUEST_PERIOD } else { duration = PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD } timeout := time.After(duration) select { case <-controller.signalReportConnected: case <-timeout: // Make another connected request case <-controller.shutdownBroadcast: break loop } } NoticeInfo("exiting connected reporter") } func (controller *Controller) startOrSignalConnectedReporter() { // session is nil when DisableApi is set if controller.config.DisableApi { return } // Start the connected reporter after the first tunnel is established. // Concurrency note: only the runTunnels goroutine may access startedConnectedReporter. if !controller.startedConnectedReporter { controller.startedConnectedReporter = true controller.runWaitGroup.Add(1) go controller.connectedReporter() } else { select { case controller.signalReportConnected <- *new(struct{}): default: } } } // upgradeDownloader makes periodic attemps to complete a client upgrade // download. DownloadUpgrade() is resumable, so each attempt has potential for // getting closer to completion, even in conditions where the download or // tunnel is repeatedly interrupted. // Once the download is complete, the downloader exits and is not run again: // We're assuming that the upgrade will be applied and the entire system // restarted before another upgrade is to be downloaded. func (controller *Controller) upgradeDownloader(clientUpgradeVersion string) { defer controller.runWaitGroup.Done() loop: for { // Pick any active tunnel and make the next download attempt. No error // is logged if there's no active tunnel, as that's not an unexpected condition. tunnel := controller.getNextActiveTunnel() if tunnel != nil { err := DownloadUpgrade(controller.config, clientUpgradeVersion, tunnel) if err == nil { break loop } NoticeAlert("upgrade download failed: %s", err) } timeout := time.After(DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD) select { case <-timeout: // Make another download attempt case <-controller.shutdownBroadcast: break loop } } NoticeInfo("exiting upgrade downloader") } func (controller *Controller) startClientUpgradeDownloader( serverContext *ServerContext) { // serverContext is nil when DisableApi is set if controller.config.DisableApi { return } if controller.config.UpgradeDownloadUrl == "" || controller.config.UpgradeDownloadFilename == "" { // No upgrade is desired return } if serverContext.clientUpgradeVersion == "" { // No upgrade is offered return } // Start the client upgrade downloaded after the first tunnel is established. // Concurrency note: only the runTunnels goroutine may access startClientUpgradeDownloader. if !controller.startedUpgradeDownloader { controller.startedUpgradeDownloader = true controller.runWaitGroup.Add(1) go controller.upgradeDownloader(serverContext.clientUpgradeVersion) } } // runTunnels is the controller tunnel management main loop. It starts and stops // establishing tunnels based on the target tunnel pool size and the current size // of the pool. Tunnels are established asynchronously using worker goroutines. // // When there are no server entries for the target region/protocol, the // establishCandidateGenerator will yield no candidates and wait before // trying again. In the meantime, a remote server entry fetch may supply // valid candidates. // // When a tunnel is established, it's added to the active pool. The tunnel's // operateTunnel goroutine monitors the tunnel. // // When a tunnel fails, it's removed from the pool and the establish process is // restarted to fill the pool. func (controller *Controller) runTunnels() { defer controller.runWaitGroup.Done() // Start running controller.startEstablishing() loop: for { select { case failedTunnel := <-controller.failedTunnels: NoticeAlert("tunnel failed: %s", failedTunnel.serverEntry.IpAddress) controller.terminateTunnel(failedTunnel) // Note: we make this extra check to ensure the shutdown signal takes priority // and that we do not start establishing. Critically, startEstablishing() calls // establishPendingConns.Reset() which clears the closed flag in // establishPendingConns; this causes the pendingConns.Add() within // interruptibleTCPDial to succeed instead of aborting, and the result // is that it's possible for establish goroutines to run all the way through // NewServerContext before being discarded... delaying shutdown. select { case <-controller.shutdownBroadcast: break loop default: } controller.classifyImpairedProtocol(failedTunnel) // Concurrency note: only this goroutine may call startEstablishing/stopEstablishing // and access isEstablishing. if !controller.isEstablishing { controller.startEstablishing() } // !TODO! design issue: might not be enough server entries with region/caps to ever fill tunnel slots // solution(?) target MIN(CountServerEntries(region, protocol), TunnelPoolSize) case establishedTunnel := <-controller.establishedTunnels: tunnelCount, registered := controller.registerTunnel(establishedTunnel) if registered { NoticeActiveTunnel(establishedTunnel.serverEntry.IpAddress, establishedTunnel.protocol) if tunnelCount == 1 { // The split tunnel classifier is started once the first tunnel is // established. This first tunnel is passed in to be used to make // the routes data request. // A long-running controller may run while the host device is present // in different regions. In this case, we want the split tunnel logic // to switch to routes for new regions and not classify traffic based // on routes installed for older regions. // We assume that when regions change, the host network will also // change, and so all tunnels will fail and be re-established. Under // that assumption, the classifier will be re-Start()-ed here when // the region has changed. controller.splitTunnelClassifier.Start(establishedTunnel) // Signal a connected request on each 1st tunnel establishment. For // multi-tunnels, the session is connected as long as at least one // tunnel is established. controller.startOrSignalConnectedReporter() controller.startClientUpgradeDownloader( establishedTunnel.serverContext) } } else { controller.discardTunnel(establishedTunnel) } if controller.isFullyEstablished() { controller.stopEstablishing() } case <-controller.shutdownBroadcast: break loop } } // Stop running controller.stopEstablishing() controller.terminateAllTunnels() // Drain tunnel channels close(controller.establishedTunnels) for tunnel := range controller.establishedTunnels { controller.discardTunnel(tunnel) } close(controller.failedTunnels) for tunnel := range controller.failedTunnels { controller.discardTunnel(tunnel) } NoticeInfo("exiting run tunnels") } // classifyImpairedProtocol tracks "impaired" protocol classifications for failed // tunnels. A protocol is classified as impaired if a tunnel using that protocol // fails, repeatedly, shortly after the start of the connection. During tunnel // establishment, impaired protocols are briefly skipped. // // One purpose of this measure is to defend against an attack where the adversary, // for example, tags an OSSH TCP connection as an "unidentified" protocol; allows // it to connect; but then kills the underlying TCP connection after a short time. // Since OSSH has less latency than other protocols that may bypass an "unidentified" // filter, these other protocols might never be selected for use. // // Concurrency note: only the runTunnels() goroutine may call classifyImpairedProtocol func (controller *Controller) classifyImpairedProtocol(failedTunnel *Tunnel) { if failedTunnel.startTime.Add(IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION).After(time.Now()) { controller.impairedProtocolClassification[failedTunnel.protocol] += 1 } else { controller.impairedProtocolClassification[failedTunnel.protocol] = 0 } if len(controller.getImpairedProtocols()) == len(SupportedTunnelProtocols) { // Reset classification if all protocols are classified as impaired as // the network situation (or attack) may not be protocol-specific. // TODO: compare against count of distinct supported protocols for // current known server entries. controller.impairedProtocolClassification = make(map[string]int) } } // getImpairedProtocols returns a list of protocols that have sufficient // classifications to be considered impaired protocols. // // Concurrency note: only the runTunnels() goroutine may call getImpairedProtocols func (controller *Controller) getImpairedProtocols() []string { if len(controller.impairedProtocolClassification) > 0 { NoticeInfo("impaired protocols: %+v", controller.impairedProtocolClassification) } impairedProtocols := make([]string, 0) for protocol, count := range controller.impairedProtocolClassification { if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD { impairedProtocols = append(impairedProtocols, protocol) } } return impairedProtocols } // SignalTunnelFailure implements the TunnelOwner interface. This function // is called by Tunnel.operateTunnel when the tunnel has detected that it // has failed. The Controller will signal runTunnels to create a new // tunnel and/or remove the tunnel from the list of active tunnels. func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) { // Don't block. Assumes the receiver has a buffer large enough for // the typical number of operated tunnels. In case there's no room, // terminate the tunnel (runTunnels won't get a signal in this case, // but the tunnel will be removed from the list of active tunnels). select { case controller.failedTunnels <- tunnel: default: controller.terminateTunnel(tunnel) } } // discardTunnel disposes of a successful connection that is no longer required. func (controller *Controller) discardTunnel(tunnel *Tunnel) { NoticeInfo("discard tunnel: %s", tunnel.serverEntry.IpAddress) // TODO: not calling PromoteServerEntry, since that would rank the // discarded tunnel before fully active tunnels. Can a discarded tunnel // be promoted (since it connects), but with lower rank than all active // tunnels? tunnel.Close(true) } // registerTunnel adds the connected tunnel to the pool of active tunnels // which are candidates for port forwarding. Returns true if the pool has an // empty slot and false if the pool is full (caller should discard the tunnel). func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) { controller.tunnelMutex.Lock() defer controller.tunnelMutex.Unlock() if len(controller.tunnels) >= controller.config.TunnelPoolSize { return len(controller.tunnels), false } // Perform a final check just in case we've established // a duplicate connection. for _, activeTunnel := range controller.tunnels { if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress { NoticeAlert("duplicate tunnel: %s", tunnel.serverEntry.IpAddress) return len(controller.tunnels), false } } controller.establishedOnce = true controller.tunnels = append(controller.tunnels, tunnel) NoticeTunnels(len(controller.tunnels)) // Promote this successful tunnel to first rank so it's one // of the first candidates next time establish runs. // Connecting to a TargetServerEntry does not change the // ranking. if controller.config.TargetServerEntry == "" { PromoteServerEntry(tunnel.serverEntry.IpAddress) } return len(controller.tunnels), true } // hasEstablishedOnce indicates if at least one active tunnel has // been established up to this point. This is regardeless of how many // tunnels are presently active. func (controller *Controller) hasEstablishedOnce() bool { controller.tunnelMutex.Lock() defer controller.tunnelMutex.Unlock() return controller.establishedOnce } // isFullyEstablished indicates if the pool of active tunnels is full. func (controller *Controller) isFullyEstablished() bool { controller.tunnelMutex.Lock() defer controller.tunnelMutex.Unlock() return len(controller.tunnels) >= controller.config.TunnelPoolSize } // terminateTunnel removes a tunnel from the pool of active tunnels // and closes the tunnel. The next-tunnel state used by getNextActiveTunnel // is adjusted as required. func (controller *Controller) terminateTunnel(tunnel *Tunnel) { controller.tunnelMutex.Lock() defer controller.tunnelMutex.Unlock() for index, activeTunnel := range controller.tunnels { if tunnel == activeTunnel { controller.tunnels = append( controller.tunnels[:index], controller.tunnels[index+1:]...) if controller.nextTunnel > index { controller.nextTunnel-- } if controller.nextTunnel >= len(controller.tunnels) { controller.nextTunnel = 0 } activeTunnel.Close(false) NoticeTunnels(len(controller.tunnels)) break } } } // terminateAllTunnels empties the tunnel pool, closing all active tunnels. // This is used when shutting down the controller. func (controller *Controller) terminateAllTunnels() { controller.tunnelMutex.Lock() defer controller.tunnelMutex.Unlock() // Closing all tunnels in parallel. In an orderly shutdown, each tunnel // may take a few seconds to send a final status request. We only want // to wait as long as the single slowest tunnel. closeWaitGroup := new(sync.WaitGroup) closeWaitGroup.Add(len(controller.tunnels)) for _, activeTunnel := range controller.tunnels { tunnel := activeTunnel go func() { defer closeWaitGroup.Done() tunnel.Close(false) }() } closeWaitGroup.Wait() controller.tunnels = make([]*Tunnel, 0) controller.nextTunnel = 0 NoticeTunnels(len(controller.tunnels)) } // getNextActiveTunnel returns the next tunnel from the pool of active // tunnels. Currently, tunnel selection order is simple round-robin. func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) { controller.tunnelMutex.Lock() defer controller.tunnelMutex.Unlock() for i := len(controller.tunnels); i > 0; i-- { tunnel = controller.tunnels[controller.nextTunnel] controller.nextTunnel = (controller.nextTunnel + 1) % len(controller.tunnels) return tunnel } return nil } // isActiveTunnelServerEntry is used to check if there's already // an existing tunnel to a candidate server. func (controller *Controller) isActiveTunnelServerEntry(serverEntry *ServerEntry) bool { controller.tunnelMutex.Lock() defer controller.tunnelMutex.Unlock() for _, activeTunnel := range controller.tunnels { if activeTunnel.serverEntry.IpAddress == serverEntry.IpAddress { return true } } return false } // Dial selects an active tunnel and establishes a port forward // connection through the selected tunnel. Failure to connect is considered // a port foward failure, for the purpose of monitoring tunnel health. func (controller *Controller) Dial( remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) { tunnel := controller.getNextActiveTunnel() if tunnel == nil { return nil, ContextError(errors.New("no active tunnels")) } // Perform split tunnel classification when feature is enabled, and if the remote // address is classified as untunneled, dial directly. if !alwaysTunnel && controller.config.SplitTunnelDnsServer != "" { host, _, err := net.SplitHostPort(remoteAddr) if err != nil { return nil, ContextError(err) } // Note: a possible optimization, when split tunnel is active and IsUntunneled performs // a DNS resolution in order to make its classification, is to reuse that IP address in // the following Dials so they do not need to make their own resolutions. However, the // way this is currently implemented ensures that, e.g., DNS geo load balancing occurs // relative to the outbound network. if controller.splitTunnelClassifier.IsUntunneled(host) { // !TODO! track downstreamConn and close it when the DialTCP conn closes, as with tunnel.Dial conns? return DialTCP(remoteAddr, controller.untunneledDialConfig) } } tunneledConn, err := tunnel.Dial(remoteAddr, alwaysTunnel, downstreamConn) if err != nil { return nil, ContextError(err) } return tunneledConn, nil } // startEstablishing creates a pool of worker goroutines which will // attempt to establish tunnels to candidate servers. The candidates // are generated by another goroutine. func (controller *Controller) startEstablishing() { if controller.isEstablishing { return } NoticeInfo("start establishing") controller.isEstablishing = true controller.establishWaitGroup = new(sync.WaitGroup) controller.stopEstablishingBroadcast = make(chan struct{}) controller.candidateServerEntries = make(chan *candidateServerEntry) controller.establishPendingConns.Reset() // The server affinity mechanism attempts to favor the previously // used server when reconnecting. This is beneficial for user // applications which expect consistency in user IP address (for // example, a web site which prompts for additional user // authentication when the IP address changes). // // Only the very first server, as determined by // datastore.PromoteServerEntry(), is the server affinity candidate. // Concurrent connections attempts to many servers are launched // without delay, in case the affinity server connection fails. // While the affinity server connection is outstanding, when any // other connection is established, there is a short grace period // delay before delivering the established tunnel; this allows some // time for the affinity server connection to succeed first. // When the affinity server connection fails, any other established // tunnel is registered without delay. // // Note: the establishTunnelWorker that receives the affinity // candidate is solely resonsible for closing // controller.serverAffinityDoneBroadcast. // // Note: if config.EgressRegion or config.TunnelProtocol has changed // since the top server was promoted, the first server may not actually // be the last connected server. // TODO: should not favor the first server in this case controller.serverAffinityDoneBroadcast = make(chan struct{}) for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ { controller.establishWaitGroup.Add(1) go controller.establishTunnelWorker() } controller.establishWaitGroup.Add(1) go controller.establishCandidateGenerator( controller.getImpairedProtocols()) } // stopEstablishing signals the establish goroutines to stop and waits // for the group to halt. pendingConns is used to interrupt any worker // blocked on a socket connect. func (controller *Controller) stopEstablishing() { if !controller.isEstablishing { return } NoticeInfo("stop establishing") close(controller.stopEstablishingBroadcast) // Note: interruptibleTCPClose doesn't really interrupt socket connects // and may leave goroutines running for a time after the Wait call. controller.establishPendingConns.CloseAll() // Note: establishCandidateGenerator closes controller.candidateServerEntries // (as it may be sending to that channel). controller.establishWaitGroup.Wait() controller.isEstablishing = false controller.establishWaitGroup = nil controller.stopEstablishingBroadcast = nil controller.candidateServerEntries = nil controller.serverAffinityDoneBroadcast = nil } // establishCandidateGenerator populates the candidate queue with server entries // from the data store. Server entries are iterated in rank order, so that promoted // servers with higher rank are priority candidates. func (controller *Controller) establishCandidateGenerator(impairedProtocols []string) { defer controller.establishWaitGroup.Done() defer close(controller.candidateServerEntries) iterator, err := NewServerEntryIterator(controller.config) if err != nil { NoticeAlert("failed to iterate over candidates: %s", err) controller.SignalComponentFailure() return } defer iterator.Close() isServerAffinityCandidate := true // TODO: reconcile server affinity scheme with multi-tunnel mode if controller.config.TunnelPoolSize > 1 { isServerAffinityCandidate = false close(controller.serverAffinityDoneBroadcast) } loop: // Repeat until stopped for i := 0; ; i++ { if !WaitForNetworkConnectivity( controller.config.NetworkConnectivityChecker, controller.stopEstablishingBroadcast, controller.shutdownBroadcast) { break loop } // Send each iterator server entry to the establish workers startTime := time.Now() for { serverEntry, err := iterator.Next() if err != nil { NoticeAlert("failed to get next candidate: %s", err) controller.SignalComponentFailure() break loop } if serverEntry == nil { // Completed this iteration break } // Disable impaired protocols. This is only done for the // first iteration of the ESTABLISH_TUNNEL_WORK_TIME // loop since (a) one iteration should be sufficient to // evade the attack; (b) there's a good chance of false // positives (such as short tunnel durations due to network // hopping on a mobile device). // Impaired protocols logic is not applied when // config.TunnelProtocol is specified. // The edited serverEntry is temporary copy which is not // stored or reused. if i == 0 && controller.config.TunnelProtocol == "" { serverEntry.DisableImpairedProtocols(impairedProtocols) if len(serverEntry.GetSupportedProtocols()) == 0 { // Skip this server entry, as it has no supported // protocols after disabling the impaired ones // TODO: modify ServerEntryIterator to skip these? continue } } // Note: there must be only one server affinity candidate, as it // closes the serverAffinityDoneBroadcast channel. candidate := &candidateServerEntry{serverEntry, isServerAffinityCandidate} isServerAffinityCandidate = false // TODO: here we could generate multiple candidates from the // server entry when there are many MeekFrontingAddresses. select { case controller.candidateServerEntries <- candidate: case <-controller.stopEstablishingBroadcast: break loop case <-controller.shutdownBroadcast: break loop } if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME)) { // Start over, after a brief pause, with a new shuffle of the server // entries, and potentially some newly fetched server entries. break } } // Free up resources now, but don't reset until after the pause. iterator.Close() // Trigger a fetch remote server list, since we may have failed to // connect with all known servers. Don't block sending signal, since // this signal may have already been sent. // Don't wait for fetch remote to succeed, since it may fail and // enter a retry loop and we're better off trying more known servers. // TODO: synchronize the fetch response, so it can be incorporated // into the server entry iterator as soon as available. select { case controller.signalFetchRemoteServerList <- *new(struct{}): default: } // After a complete iteration of candidate servers, pause before iterating again. // This helps avoid some busy wait loop conditions, and also allows some time for // network conditions to change. Also allows for fetch remote to complete, // in typical conditions (it isn't strictly necessary to wait for this, there will // be more rounds if required). timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD) select { case <-timeout: // Retry iterating case <-controller.stopEstablishingBroadcast: break loop case <-controller.shutdownBroadcast: break loop } iterator.Reset() } NoticeInfo("stopped candidate generator") } // establishTunnelWorker pulls candidates from the candidate queue, establishes // a connection to the tunnel server, and delivers the established tunnel to a channel. func (controller *Controller) establishTunnelWorker() { defer controller.establishWaitGroup.Done() loop: for candidateServerEntry := range controller.candidateServerEntries { // Note: don't receive from candidateServerEntries and stopEstablishingBroadcast // in the same select, since we want to prioritize receiving the stop signal if controller.isStopEstablishingBroadcast() { break loop } // There may already be a tunnel to this candidate. If so, skip it. if controller.isActiveTunnelServerEntry(candidateServerEntry.serverEntry) { continue } tunnel, err := EstablishTunnel( controller.config, controller.untunneledDialConfig, controller.sessionId, controller.establishPendingConns, candidateServerEntry.serverEntry, controller) // TunnelOwner if err != nil { // Unblock other candidates immediately when // server affinity candidate fails. if candidateServerEntry.isServerAffinityCandidate { close(controller.serverAffinityDoneBroadcast) } // Before emitting error, check if establish interrupted, in which // case the error is noise. if controller.isStopEstablishingBroadcast() { break loop } NoticeInfo("failed to connect to %s: %s", candidateServerEntry.serverEntry.IpAddress, err) continue } // Block for server affinity grace period before delivering. if !candidateServerEntry.isServerAffinityCandidate { timer := time.NewTimer(ESTABLISH_TUNNEL_SERVER_AFFINITY_GRACE_PERIOD) select { case <-timer.C: case <-controller.serverAffinityDoneBroadcast: case <-controller.stopEstablishingBroadcast: } } // Deliver established tunnel. // Don't block. Assumes the receiver has a buffer large enough for // the number of desired tunnels. If there's no room, the tunnel must // not be required so it's discarded. select { case controller.establishedTunnels <- tunnel: default: controller.discardTunnel(tunnel) } // Unblock other candidates only after delivering when // server affinity candidate succeeds. if candidateServerEntry.isServerAffinityCandidate { close(controller.serverAffinityDoneBroadcast) } } NoticeInfo("stopped establish worker") } func (controller *Controller) isStopEstablishingBroadcast() bool { select { case <-controller.stopEstablishingBroadcast: return true default: } return false }