Parcourir la source

Implement client upgrade resumable downloader component

Rod Hynes il y a 10 ans
Parent
commit
b6d1b32b9c
6 fichiers modifiés avec 114 ajouts et 14 suppressions
  1. 5 1
      psiphon/config.go
  2. 63 0
      psiphon/controller.go
  3. 6 0
      psiphon/notice.go
  4. 9 6
      psiphon/serverApi.go
  5. 1 7
      psiphon/tunnel.go
  6. 30 0
      psiphon/utils.go

+ 5 - 1
psiphon/config.go

@@ -57,7 +57,9 @@ const (
 	PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES = 256
 	PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES = 256
 	PSIPHON_API_CONNECTED_REQUEST_PERIOD         = 24 * time.Hour
 	PSIPHON_API_CONNECTED_REQUEST_PERIOD         = 24 * time.Hour
 	PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD   = 5 * time.Second
 	PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD   = 5 * time.Second
-	FETCH_ROUTES_TIMEOUT                         = 10 * time.Second
+	FETCH_ROUTES_TIMEOUT                         = 1 * time.Minute
+	DOWNLOAD_UPGRADE_TIMEOUT                     = 15 * time.Minute
+	DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD          = 5 * time.Second
 )
 )
 
 
 // To distinguish omitted timeout params from explicit 0 value timeout
 // To distinguish omitted timeout params from explicit 0 value timeout
@@ -93,6 +95,8 @@ type Config struct {
 	SplitTunnelRoutesUrlFormat          string
 	SplitTunnelRoutesUrlFormat          string
 	SplitTunnelRoutesSignaturePublicKey string
 	SplitTunnelRoutesSignaturePublicKey string
 	SplitTunnelDnsServer                string
 	SplitTunnelDnsServer                string
+	UpgradeDownloadUrl                  string
+	UpgradeDownloadFilename             string
 }
 }
 
 
 // LoadConfig parses and validates a JSON format Psiphon config JSON
 // LoadConfig parses and validates a JSON format Psiphon config JSON

+ 63 - 0
psiphon/controller.go

@@ -46,6 +46,7 @@ type Controller struct {
 	tunnels                     []*Tunnel
 	tunnels                     []*Tunnel
 	nextTunnel                  int
 	nextTunnel                  int
 	startedConnectedReporter    bool
 	startedConnectedReporter    bool
+	startedUpgradeDownloader    bool
 	isEstablishing              bool
 	isEstablishing              bool
 	establishWaitGroup          *sync.WaitGroup
 	establishWaitGroup          *sync.WaitGroup
 	stopEstablishingBroadcast   chan struct{}
 	stopEstablishingBroadcast   chan struct{}
@@ -94,6 +95,7 @@ func NewController(config *Config) (controller *Controller, err error) {
 		tunnels:                  make([]*Tunnel, 0),
 		tunnels:                  make([]*Tunnel, 0),
 		establishedOnce:          false,
 		establishedOnce:          false,
 		startedConnectedReporter: false,
 		startedConnectedReporter: false,
+		startedUpgradeDownloader: false,
 		isEstablishing:           false,
 		isEstablishing:           false,
 		establishPendingConns:    new(Conns),
 		establishPendingConns:    new(Conns),
 		untunneledPendingConns:   untunneledPendingConns,
 		untunneledPendingConns:   untunneledPendingConns,
@@ -317,6 +319,66 @@ func (controller *Controller) startConnectedReporter() {
 	}
 	}
 }
 }
 
 
+// upgradeDownloader makes periodic attemps to complete a client upgrade
+// download. DownloadUpgrade() is resumable, so each attempt has potential for
+// getting closer to completion, even in conditions where the download or
+// tunnel is repeatedly interrupted.
+// Once the download is complete, the downloader exits and is not run again:
+// We're assuming that the upgrade will be applied and the entire system
+// restarted before another upgrade is to be downloaded.
+func (controller *Controller) upgradeDownloader(clientUpgradeVersion string) {
+	defer controller.runWaitGroup.Done()
+
+loop:
+	for {
+		// Pick any active tunnel and make the next download attempt. No error
+		// is logged if there's no active tunnel, as that's not an unexpected condition.
+		tunnel := controller.getNextActiveTunnel()
+		if tunnel != nil {
+			err := DownloadUpgrade(controller.config, clientUpgradeVersion, tunnel)
+			if err == nil {
+				break loop
+			}
+			NoticeAlert("upgrade download failed: ", err)
+		}
+
+		timeout := time.After(DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD)
+		select {
+		case <-timeout:
+			// Make another download attempt
+		case <-controller.shutdownBroadcast:
+			break loop
+		}
+	}
+
+	NoticeInfo("exiting upgrade downloader")
+}
+
+func (controller *Controller) startClientUpgradeDownloader(clientUpgradeVersion string) {
+	if controller.config.DisableApi {
+		return
+	}
+
+	if controller.config.UpgradeDownloadUrl == "" ||
+		controller.config.UpgradeDownloadFilename == "" {
+		// No upgrade is desired
+		return
+	}
+
+	if clientUpgradeVersion == "" {
+		// No upgrade is offered
+		return
+	}
+
+	// Start the client upgrade downloaded after the first tunnel is established.
+	// Concurrency note: only the runTunnels goroutine may access startClientUpgradeDownloader.
+	if !controller.startedUpgradeDownloader {
+		controller.startedUpgradeDownloader = true
+		controller.runWaitGroup.Add(1)
+		go controller.upgradeDownloader(clientUpgradeVersion)
+	}
+}
+
 // runTunnels is the controller tunnel management main loop. It starts and stops
 // runTunnels is the controller tunnel management main loop. It starts and stops
 // establishing tunnels based on the target tunnel pool size and the current size
 // establishing tunnels based on the target tunnel pool size and the current size
 // of the pool. Tunnels are established asynchronously using worker goroutines.
 // of the pool. Tunnels are established asynchronously using worker goroutines.
@@ -361,6 +423,7 @@ loop:
 				controller.stopEstablishing()
 				controller.stopEstablishing()
 			}
 			}
 			controller.startConnectedReporter()
 			controller.startConnectedReporter()
+			controller.startClientUpgradeDownloader(establishedTunnel.session.clientUpgradeVersion)
 
 
 		case <-controller.shutdownBroadcast:
 		case <-controller.shutdownBroadcast:
 			break loop
 			break loop

+ 6 - 0
psiphon/notice.go

@@ -185,6 +185,12 @@ func NoticeUpstreamProxyError(err error) {
 	outputNotice("UpstreamProxyError", true, "message", fmt.Sprintf("%s", err))
 	outputNotice("UpstreamProxyError", true, "message", fmt.Sprintf("%s", err))
 }
 }
 
 
+// NoticeClientUpgradeDownloaded indicates that a client upgrade download
+// is complete and available at the destination specified.
+func NoticeClientUpgradeDownloaded(filename string) {
+	outputNotice("ClientUpgradeDownloaded", false, "filename", filename)
+}
+
 type noticeObject struct {
 type noticeObject struct {
 	NoticeType string          `json:"noticeType"`
 	NoticeType string          `json:"noticeType"`
 	Data       json.RawMessage `json:"data"`
 	Data       json.RawMessage `json:"data"`

+ 9 - 6
psiphon/serverApi.go

@@ -39,12 +39,13 @@ import (
 // includes the session ID (used for Psiphon API requests) and a http
 // includes the session ID (used for Psiphon API requests) and a http
 // client configured to make tunneled Psiphon API requests.
 // client configured to make tunneled Psiphon API requests.
 type Session struct {
 type Session struct {
-	sessionId          string
-	baseRequestUrl     string
-	psiphonHttpsClient *http.Client
-	statsRegexps       *transferstats.Regexps
-	statsServerId      string
-	clientRegion       string
+	sessionId            string
+	baseRequestUrl       string
+	psiphonHttpsClient   *http.Client
+	statsRegexps         *transferstats.Regexps
+	statsServerId        string
+	clientRegion         string
+	clientUpgradeVersion string
 }
 }
 
 
 // MakeSessionId creates a new session ID. Making the session ID is not done
 // MakeSessionId creates a new session ID. Making the session ID is not done
@@ -246,6 +247,8 @@ func (session *Session) doHandshakeRequest() error {
 	for _, homepage := range handshakeConfig.Homepages {
 	for _, homepage := range handshakeConfig.Homepages {
 		NoticeHomepage(homepage)
 		NoticeHomepage(homepage)
 	}
 	}
+
+	session.clientUpgradeVersion = handshakeConfig.UpgradeClientVersion
 	if handshakeConfig.UpgradeClientVersion != "" {
 	if handshakeConfig.UpgradeClientVersion != "" {
 		NoticeClientUpgradeAvailable(handshakeConfig.UpgradeClientVersion)
 		NoticeClientUpgradeAvailable(handshakeConfig.UpgradeClientVersion)
 	}
 	}

+ 1 - 7
psiphon/tunnel.go

@@ -566,9 +566,6 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 
 
 		case <-sshKeepAliveTimer.C:
 		case <-sshKeepAliveTimer.C:
 			err = sendSshKeepAlive(tunnel.sshClient)
 			err = sendSshKeepAlive(tunnel.sshClient)
-			if err != nil {
-				err = fmt.Errorf("ssh keep alive failed: %s", err)
-			}
 			sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 
 
 		case failures := <-tunnel.portForwardFailures:
 		case failures := <-tunnel.portForwardFailures:
@@ -585,9 +582,6 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 				// is hit. But if we can't make a simple round trip request to the
 				// is hit. But if we can't make a simple round trip request to the
 				// server, we'll immediately abort.
 				// server, we'll immediately abort.
 				err = sendSshKeepAlive(tunnel.sshClient)
 				err = sendSshKeepAlive(tunnel.sshClient)
-				if err != nil {
-					err = fmt.Errorf("ssh keep alive failed: %s", err)
-				}
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			}
 			}
 
 
@@ -626,7 +620,7 @@ func sendSshKeepAlive(sshClient *ssh.Client) error {
 		errChannel <- err
 		errChannel <- err
 	}()
 	}()
 
 
-	return <-errChannel
+	return ContextError(<-errChannel)
 }
 }
 
 
 // sendStats is a helper for sending session stats to the server.
 // sendStats is a helper for sending session stats to the server.

+ 30 - 0
psiphon/utils.go

@@ -182,3 +182,33 @@ func IsAddressInUseError(err error) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+// SyncFileWriter wraps a file and exposes an io.Writer. At predefined
+// steps, the file is synced (flushed to disk) while writing.
+type SyncFileWriter struct {
+	file  *os.File
+	step  int
+	count int
+}
+
+// NewSyncFileWriter creates a SyncFileWriter.
+func NewSyncFileWriter(file *os.File) *SyncFileWriter {
+	return &SyncFileWriter{
+		file:  file,
+		step:  2 << 16,
+		count: 0}
+}
+
+// Write implements io.Writer with periodic file syncing.
+func (writer *SyncFileWriter) Write(p []byte) (n int, err error) {
+	n, err = writer.file.Write(p)
+	if err != nil {
+		return
+	}
+	writer.count += n
+	if writer.count >= writer.step {
+		err = writer.file.Sync()
+		writer.count = 0
+	}
+	return
+}