Просмотр исходного кода

Fix: don't block on full ScanServerEntries for InitialLimitTunnelProtocols

- With 1000s of servers, the scan often delayed dialing tunnels by 10s of
  seconds.
- Also applies to TunnelPoolSize > 1.
Rod Hynes 1 год назад
Родитель
Сommit
03e91e536c
1 измененных файлов с 160 добавлено и 159 удалено
  1. 160 159
      psiphon/controller.go

+ 160 - 159
psiphon/controller.go

@@ -644,16 +644,7 @@ downloadLoop:
 }
 
 type serverEntriesReportRequest struct {
-	constraints   *protocolSelectionConstraints
-	awaitResponse chan *serverEntriesReportResponse
-}
-
-type serverEntriesReportResponse struct {
-	err                              error
-	candidates                       int
-	initialCandidates                int
-	initialCandidatesAnyEgressRegion int
-	availableEgressRegions           []string
+	constraints *protocolSelectionConstraints
 }
 
 // serverEntriesReporter performs scans over all server entries to report on
@@ -669,21 +660,16 @@ type serverEntriesReportResponse struct {
 // serverEntriesReporter also serves to combine these scans, which would
 // otherwise be logically independent, due to the performance impact of scans.
 //
-// The underlying datastore implementation _may_ block write transactions
-// while there are open read transactions. For example, bolt write
-// transactions which need to  re-map the data file (when the datastore grows)
-// will block on open read transactions. In these scenarios, a slow scan will
-// still block other operations.
+// Limitation: The underlying datastore implementation _may_ block write
+// transactions while there are open read transactions. For example, bolt
+// write transactions which need to re-map the data file (when the datastore
+// grows) will block on open read transactions. In these scenarios, a slow
+// scan will still block other operations.
 //
 // serverEntriesReporter runs beyond the establishment phase, since it's
 // important for notices such as AvailableEgressRegions to eventually emit
 // even if already established. serverEntriesReporter scans are cancellable,
 // so controller shutdown is not blocked by slow scans.
-//
-// In some special cases, establishment cannot begin without candidate counts
-// up front. In these cases only, the request contains a non-nil
-// awaitResponse, a channel which is used by the requester to block until the
-// scan is complete and the candidate counts are available.
 func (controller *Controller) serverEntriesReporter() {
 	defer controller.runWaitGroup.Done()
 
@@ -701,10 +687,11 @@ loop:
 		egressRegion := controller.config.EgressRegion
 		constraints := request.constraints
 
-		var response serverEntriesReportResponse
-
 		regions := make(map[string]bool)
 
+		initialCandidates := 0
+		candidates := 0
+
 		callback := func(serverEntry *protocol.ServerEntry) bool {
 
 			// In establishment, excludeIntensive depends on what set of protocols are
@@ -716,16 +703,12 @@ loop:
 			isInitialCandidate := constraints.isInitialCandidate(excludeIntensive, serverEntry)
 			isCandidate := constraints.isCandidate(excludeIntensive, serverEntry)
 
-			if isInitialCandidate {
-				response.initialCandidatesAnyEgressRegion += 1
-			}
-
 			if egressRegion == "" || serverEntry.Region == egressRegion {
 				if isInitialCandidate {
-					response.initialCandidates += 1
+					initialCandidates += 1
 				}
 				if isCandidate {
-					response.candidates += 1
+					candidates += 1
 				}
 			}
 
@@ -754,70 +737,44 @@ loop:
 
 		startTime := time.Now()
 
-		response.err = ScanServerEntries(callback)
+		err := ScanServerEntries(callback)
+		if err != nil {
+			NoticeWarning("ScanServerEntries failed: %v", errors.Trace(err))
+			continue
+		}
 
 		// Report this duration in CandidateServers as an indication of datastore
 		// performance.
 		duration := time.Since(startTime)
 
-		response.availableEgressRegions = make([]string, 0, len(regions))
-		for region := range regions {
-			response.availableEgressRegions = append(response.availableEgressRegions, region)
-		}
-
-		if response.err != nil {
-
-			// For diagnostics, we'll post this even when cancelled due to shutdown.
-			NoticeWarning("ScanServerEntries failed: %v", errors.Trace(response.err))
-
-			// Continue and send error reponse. Clear any partial data to avoid
-			// misuse.
-			response.candidates = 0
-			response.initialCandidates = 0
-			response.initialCandidatesAnyEgressRegion = 0
-			response.availableEgressRegions = []string{}
-		}
+		NoticeCandidateServers(
+			controller.config.EgressRegion,
+			constraints,
+			initialCandidates,
+			candidates,
+			duration)
 
-		if request.awaitResponse != nil {
-			select {
-			case request.awaitResponse <- &response:
-			case <-controller.runCtx.Done():
-				// The receiver may be gone when shutting down.
-			}
+		availableEgressRegions := make([]string, 0, len(regions))
+		for region := range regions {
+			availableEgressRegions = append(availableEgressRegions, region)
 		}
 
-		if response.err == nil {
-
-			NoticeCandidateServers(
-				controller.config.EgressRegion,
-				constraints,
-				response.initialCandidates,
-				response.candidates,
-				duration)
-
-			NoticeAvailableEgressRegions(
-				response.availableEgressRegions)
-		}
+		NoticeAvailableEgressRegions(
+			availableEgressRegions)
 	}
 
 	NoticeInfo("exiting server entries reporter")
 }
 
-// signalServerEntriesReporter triggers a new server entry report. Set
-// request.awaitResponse to obtain the report output. When awaitResponse is
-// set, signalServerEntriesReporter blocks until the reporter receives the
-// request, guaranteeing the new report runs. Otherwise, the report is
-// considered to be informational and may or may not run, depending on whether
-// another run is already in progress.
-func (controller *Controller) signalServerEntriesReporter(request *serverEntriesReportRequest) {
+// signalServerEntriesReporter triggers a new server entry report.The report
+// is considered to be informational and may or may not run, depending on
+// whether another run is already in progress.
+func (controller *Controller) signalServerEntriesReporter(
+	request *serverEntriesReportRequest) {
 
-	if request.awaitResponse == nil {
-		select {
-		case controller.signalReportServerEntries <- request:
-		default:
-		}
-	} else {
-		controller.signalReportServerEntries <- request
+	select {
+	case controller.signalReportServerEntries <- request:
+	default:
 	}
 }
 
@@ -1955,18 +1912,14 @@ func (controller *Controller) launchEstablishing() {
 		controller.config.TargetServerEntry != "" {
 		tunnelPoolSize = 1
 	}
+	controller.setTunnelPoolSize(tunnelPoolSize)
 
 	p.Close()
 
 	// Trigger CandidateServers and AvailableEgressRegions notices. By default,
 	// this is an asynchronous operation, as the underlying full server entry
-	// list enumeration may be a slow operation. In certain cases, where
-	// candidate counts are required up front, await the result before
-	// proceeding.
-
-	awaitResponse := tunnelPoolSize > 1 ||
-		controller.protocolSelectionConstraints.initialLimitTunnelProtocolsCandidateCount > 0
-
+	// list enumeration may be a slow operation.
+	//
 	// AvailableEgressRegions: after a fresh install, the outer client may not
 	// have a list of regions to display; and LimitTunnelProtocols may reduce the
 	// number of available regions.
@@ -1989,97 +1942,141 @@ func (controller *Controller) launchEstablishing() {
 	// necessary protocol capabilities (either locally or from a remote server
 	// list fetch).
 
-	// Concurrency note: controller.protocolSelectionConstraints may be
-	// overwritten before serverEntriesReporter reads it, and so cannot be
-	// accessed directly by serverEntriesReporter.
-	reportRequest := &serverEntriesReportRequest{
-		constraints: controller.protocolSelectionConstraints,
+	// Concurrency note: controller.protocolSelectionConstraints and its
+	// fields may be overwritten before serverEntriesReporter reads it, and
+	// so cannot be accessed directly by serverEntriesReporter.
+	//
+	// Limitation: the non-deep copy here shares slices (tunnel protocol
+	// lists) with the original; the contents of these slices don't change
+	// past this point. The rate limiter should not be used by
+	// serverEntriesReporter, but is cleared just in case.
+	copyConstraints := *controller.protocolSelectionConstraints
+	copyConstraints.inproxyClientDialRateLimiter = nil
+	controller.signalServerEntriesReporter(
+		&serverEntriesReportRequest{
+			constraints: &copyConstraints,
+		})
+
+	if controller.protocolSelectionConstraints.initialLimitTunnelProtocolsCandidateCount > 0 ||
+		tunnelPoolSize > 1 {
+
+		// Perform a synchronous scan over server entries in order to check if
+		// there are sufficient candidates to satisfy any initial tunnel
+		// protocol limit constraint and/or tunnel pool size > 1. If these
+		// requirements can't be met, the constraint and/or pool size are
+		// adjusted in order to avoid spinning unable to select any protocol
+		// or trying to establish more tunnels than is possible.
+		controller.doConstraintsScan()
 	}
 
-	if awaitResponse {
-		// Buffer size of 1 ensures the sender, serverEntryReporter, won't block on
-		// sending the response in the case where launchEstablishing exits due to
-		// stopping establishment.
-		reportRequest.awaitResponse = make(chan *serverEntriesReportResponse, 1)
+	for i := 0; i < workerPoolSize; i++ {
+		controller.establishWaitGroup.Add(1)
+		go controller.establishTunnelWorker()
 	}
 
-	controller.signalServerEntriesReporter(reportRequest)
+	controller.establishWaitGroup.Add(1)
+	go controller.establishCandidateGenerator()
+}
 
-	if awaitResponse {
+func (controller *Controller) doConstraintsScan() {
 
-		var reportResponse *serverEntriesReportResponse
-		select {
-		case reportResponse = <-reportRequest.awaitResponse:
-		case <-controller.establishCtx.Done():
-			// The sender may be gone when shutting down, or may not send until after
-			// stopping establishment.
-			return
-		}
-		if reportResponse.err != nil {
-			NoticeError("failed to report server entries: %v",
-				errors.Trace(reportResponse.err))
-			controller.SignalComponentFailure()
-			return
-		}
+	// Scan over server entries in order to check and adjust any initial
+	// tunnel protocol limit and tunnel pool size.
+	//
+	// The scan in serverEntriesReporter is _not_ used for these checks,
+	// since it takes too long to complete with 1000s of server entries,
+	// greatly delaying the start(or restart, if already scanning) of
+	// establishment. Instead a 2nd ScanServerEntries is run here, with an
+	// early exit when sufficient candidates are found, which is expected
+	// to happen quickly in the typical case.
+
+	hasInitialLimitTunnelProtocols :=
+		controller.protocolSelectionConstraints.initialLimitTunnelProtocolsCandidateCount > 0
+	tunnelPoolSize := controller.getTunnelPoolSize()
 
-		// Make adjustments based on candidate counts.
+	scanCount := 0
+	scanCancelled := false
+	candidates := 0
 
-		if tunnelPoolSize > 1 {
-			// Initial canidate count is ignored as count candidates will eventually
-			// become available.
-			if reportResponse.candidates < tunnelPoolSize {
-				tunnelPoolSize = reportResponse.candidates
-			}
-			if tunnelPoolSize < 1 {
-				tunnelPoolSize = 1
-			}
-		}
-		controller.setTunnelPoolSize(tunnelPoolSize)
+	callback := func(serverEntry *protocol.ServerEntry) bool {
 
-		// If InitialLimitTunnelProtocols is configured but cannot be satisfied,
-		// skip the initial phase in this establishment. This avoids spinning,
-		// unable to connect, in this case. InitialLimitTunnelProtocols is
-		// intended to prioritize certain protocols, but not strictly select them.
-		//
-		// The candidate count check ignores egress region selection. When an egress
-		// region is selected, it's the responsibility of the outer client to react
-		// to the following ReportAvailableRegions output and clear the user's
-		// selected region to prevent spinning, unable to connect. The initial phase
-		// is skipped only when InitialLimitTunnelProtocols cannot be satisfied
-		// _regardless_ of region selection.
-		//
-		// We presume that, in practise, most clients will have embedded server
-		// entries with capabilities for most protocols; and that clients will
-		// often perform RSL checks. So clients should most often have the
-		// necessary capabilities to satisfy InitialLimitTunnelProtocols. When
-		// this check fails, RSL/OSL/upgrade checks are triggered in order to gain
-		// new capabilities.
-		//
-		// LimitTunnelProtocols remains a hard limit, as using prohibited
-		// protocols may have some bad effect, such as a firewall blocking all
-		// traffic from a host.
+		scanCount += 1
 
-		if controller.protocolSelectionConstraints.initialLimitTunnelProtocolsCandidateCount > 0 {
+		// As in serverEntryReporter:
+		// - egress region is ignored, since AvailableEgressRegion alerts
+		//   the front end client when unable to connect due to egress
+		//   region constraints.
+		// - excludeIntensive is false, as any intensive candidate will
+		//   eventually be an available candidate.
 
-			if reportResponse.initialCandidatesAnyEgressRegion == 0 {
-				NoticeWarning("skipping initial limit tunnel protocols")
-				controller.protocolSelectionConstraints.initialLimitTunnelProtocolsCandidateCount = 0
+		excludeIntensive := false
+		if (hasInitialLimitTunnelProtocols &&
+			controller.protocolSelectionConstraints.isInitialCandidate(excludeIntensive, serverEntry)) ||
+			(!hasInitialLimitTunnelProtocols &&
+				controller.protocolSelectionConstraints.isCandidate(excludeIntensive, serverEntry)) {
+			candidates += 1
+		}
 
-				// Since we were unable to satisfy the InitialLimitTunnelProtocols
-				// tactic, trigger RSL, OSL, and upgrade fetches to potentially
-				// gain new capabilities.
-				controller.triggerFetches()
-			}
+		if candidates >= tunnelPoolSize {
+			// Exit the scan early once sufficient candidates have been found.
+			scanCancelled = true
+			return false
+		}
+
+		select {
+		case <-controller.runCtx.Done():
+			// Don't block controller shutdown: cancel the scan.
+			return false
+		default:
+			return true
 		}
 	}
 
-	for i := 0; i < workerPoolSize; i++ {
-		controller.establishWaitGroup.Add(1)
-		go controller.establishTunnelWorker()
+	startTime := time.Now()
+	scanErr := ScanServerEntries(callback)
+	if scanErr != nil && !scanCancelled {
+		NoticeWarning("ScanServerEntries failed: %v", errors.Trace(scanErr))
+		// Continue and make adjustments based on any partial results.
 	}
+	NoticeInfo("Awaited ScanServerEntries: scanned %d entries in %v", scanCount, time.Since(startTime))
 
-	controller.establishWaitGroup.Add(1)
-	go controller.establishCandidateGenerator()
+	// Make adjustments based on candidate counts.
+
+	if tunnelPoolSize > candidates && candidates > 0 {
+		tunnelPoolSize = candidates
+	}
+
+	// If InitialLimitTunnelProtocols is configured but cannot be satisfied,
+	// skip the initial phase in this establishment. This avoids spinning,
+	// unable to connect, in this case. InitialLimitTunnelProtocols is
+	// intended to prioritize certain protocols, but not strictly select them.
+	//
+	// The candidate count check ignores egress region selection. When an egress
+	// region is selected, it's the responsibility of the outer client to react
+	// to the following ReportAvailableRegions output and clear the user's
+	// selected region to prevent spinning, unable to connect. The initial phase
+	// is skipped only when InitialLimitTunnelProtocols cannot be satisfied
+	// _regardless_ of region selection.
+	//
+	// We presume that, in practise, most clients will have embedded server
+	// entries with capabilities for most protocols; and that clients will
+	// often perform RSL checks. So clients should most often have the
+	// necessary capabilities to satisfy InitialLimitTunnelProtocols. When
+	// this check fails, RSL/OSL/upgrade checks are triggered in order to gain
+	// new capabilities.
+	//
+	// LimitTunnelProtocols remains a hard limit, as using prohibited
+	// protocols may have some bad effect, such as a firewall blocking all
+	// traffic from a host.
+
+	if hasInitialLimitTunnelProtocols && candidates == 0 {
+		NoticeWarning("skipping initial limit tunnel protocols")
+		controller.protocolSelectionConstraints.initialLimitTunnelProtocolsCandidateCount = 0
+		// Since we were unable to satisfy the InitialLimitTunnelProtocols
+		// tactic, trigger RSL, OSL, and upgrade fetches to potentially
+		// gain new capabilities.
+		controller.triggerFetches()
+	}
 }
 
 // stopEstablishing signals the establish goroutines to stop and waits
@@ -2583,6 +2580,10 @@ loop:
 		// Increment establishConnectTunnelCount only after selectProtocol has
 		// succeeded to ensure InitialLimitTunnelProtocolsCandidateCount
 		// candidates use InitialLimitTunnelProtocols.
+		//
+		// TODO: add escape from initial limit to cover cases where the
+		// initial scan indicates there are sufficient candidates, but then
+		// server entries are deleted.
 		establishConnectTunnelCount := controller.establishConnectTunnelCount
 		controller.establishConnectTunnelCount += 1