Browse Source

Reduce server allocations, GC churn, and CPU overhead for tactics fetches

- Add tactics.Server.cachedTacticsData to cache prepared tactics payloads and
  hash tags for repeated tactics requests matching the same filters.

- Designate a single in-proxy proxy worker as the tactics checker and fetcher.
Rod Hynes 1 year ago
parent
commit
9d0ae9fcb0

+ 8 - 1
psiphon/common/inproxy/api.go

@@ -215,7 +215,7 @@ type ClientMetrics struct {
 // ProxyAnnounceRequest is an API request sent from a proxy to a broker,
 // announcing that it is available for a client connection. Proxies send one
 // ProxyAnnounceRequest for each available client connection. The broker will
-// match the proxy with a a client and return WebRTC connection information
+// match the proxy with a client and return WebRTC connection information
 // in the response.
 //
 // PersonalCompartmentIDs limits the clients to those that supply one of the
@@ -223,11 +223,18 @@ type ClientMetrics struct {
 // proxy operators to client users out-of-band and provide optional access
 // control.
 //
+// When CheckTactics is set, the broker will check for new tactics or indicate
+// that the proxy's cached tactics TTL may be extended. Tactics information
+// is returned in the response TacticsPayload. To minimize broker processing
+// overhead, proxies with multiple workers should designate just one worker
+// to set CheckTactics.
+//
 // The proxy's session public key is an implicit and cryptographically
 // verified proxy ID.
 type ProxyAnnounceRequest struct {
 	PersonalCompartmentIDs []ID          `cbor:"1,keyasint,omitempty"`
 	Metrics                *ProxyMetrics `cbor:"2,keyasint,omitempty"`
+	CheckTactics           bool          `cbor:"3,keyasint,omitempty"`
 }
 
 // WebRTCSessionDescription is compatible with pion/webrtc.SessionDescription

+ 15 - 12
psiphon/common/inproxy/broker.go

@@ -584,22 +584,25 @@ func (b *Broker) handleProxyAnnounce(
 	// proxy can store and apply the new tactics before announcing again.
 
 	var tacticsPayload []byte
-	tacticsPayload, newTacticsTag, err = b.config.GetTacticsPayload(geoIPData, apiParams)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-
-	if tacticsPayload != nil && newTacticsTag != "" {
-		responsePayload, err := MarshalProxyAnnounceResponse(
-			&ProxyAnnounceResponse{
-				TacticsPayload: tacticsPayload,
-				NoMatch:        true,
-			})
+	if announceRequest.CheckTactics {
+		tacticsPayload, newTacticsTag, err =
+			b.config.GetTacticsPayload(geoIPData, apiParams)
 		if err != nil {
 			return nil, errors.Trace(err)
 		}
 
-		return responsePayload, nil
+		if tacticsPayload != nil && newTacticsTag != "" {
+			responsePayload, err := MarshalProxyAnnounceResponse(
+				&ProxyAnnounceResponse{
+					TacticsPayload: tacticsPayload,
+					NoMatch:        true,
+				})
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
+
+			return responsePayload, nil
+		}
 	}
 
 	// AllowProxy may be used to disallow proxies from certain geolocations,

+ 12 - 3
psiphon/common/inproxy/proxy.go

@@ -202,14 +202,16 @@ func (p *Proxy) Run(ctx context.Context) {
 	// trip is awaited so that:
 	//
 	// - The first announce response will arrive with any new tactics,
-	//   avoiding a start up case where MaxClients initial, concurrent
-	//   announces all return with no-match and a tactics payload.
+	//   which may be applied before launching additions workers.
 	//
 	// - The first worker gets no announcement delay and is also guaranteed to
 	//   be the shared session establisher. Since the announcement delays are
 	//   applied _after_ waitToShareSession, it would otherwise be possible,
 	//   with a race of MaxClient initial, concurrent announces, for the
 	//   session establisher to be a different worker than the no-delay worker.
+	//
+	// The first worker is only the only proxy worker which sets
+	// ProxyAnnounceRequest.CheckMetrics.
 
 	signalFirstAnnounceCtx, signalFirstAnnounceDone :=
 		context.WithCancel(context.Background())
@@ -612,6 +614,10 @@ func (p *Proxy) proxyOneClient(
 	}
 	p.nextAnnounceMutex.Unlock()
 
+	// Only the first worker, which has signalAnnounceDone configured, checks
+	// for tactics.
+	checkTactics := signalAnnounceDone != nil
+
 	// A proxy ID is implicitly sent with requests; it's the proxy's session
 	// public key.
 	//
@@ -625,6 +631,7 @@ func (p *Proxy) proxyOneClient(
 		&ProxyAnnounceRequest{
 			PersonalCompartmentIDs: personalCompartmentIDs,
 			Metrics:                metrics,
+			CheckTactics:           checkTactics,
 		})
 	if logAnnounce() {
 		p.config.Logger.WithTraceFields(common.LogFields{
@@ -645,7 +652,9 @@ func (p *Proxy) proxyOneClient(
 		// discovery but proceed with handling the proxy announcement
 		// response as there may still be a match.
 
-		if p.config.HandleTacticsPayload(tacticsNetworkID, announceResponse.TacticsPayload) {
+		if p.config.HandleTacticsPayload(
+			tacticsNetworkID, announceResponse.TacticsPayload) {
+
 			p.resetNetworkDiscovery()
 		}
 	}

+ 120 - 21
psiphon/common/tactics/tactics.go

@@ -157,6 +157,7 @@ import (
 	"io/ioutil"
 	"net/http"
 	"sort"
+	"strings"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -164,6 +165,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	lrucache "github.com/cognusion/go-cache-lru"
 	"golang.org/x/crypto/nacl/box"
 )
 
@@ -189,6 +191,7 @@ const (
 	AGGREGATION_MINIMUM                = "Minimum"
 	AGGREGATION_MAXIMUM                = "Maximum"
 	AGGREGATION_MEDIAN                 = "Median"
+	PAYLOAD_CACHE_SIZE                 = 256
 )
 
 var (
@@ -250,6 +253,9 @@ type Server struct {
 	logger                common.Logger
 	logFieldFormatter     common.APIParameterLogFieldFormatter
 	apiParameterValidator common.APIParameterValidator
+
+	cachedTacticsData *lrucache.Cache
+	filterMatches     []bool
 }
 
 const (
@@ -442,6 +448,8 @@ func NewServer(
 		logger:                logger,
 		logFieldFormatter:     logFieldFormatter,
 		apiParameterValidator: apiParameterValidator,
+		cachedTacticsData: lrucache.NewWithLRU(
+			lrucache.NoExpiration, 1*time.Minute, PAYLOAD_CACHE_SIZE),
 	}
 
 	server.ReloadableFile = common.NewReloadableFile(
@@ -467,6 +475,18 @@ func NewServer(
 			server.DefaultTactics = newServer.DefaultTactics
 			server.FilteredTactics = newServer.FilteredTactics
 
+			// Any cached, merged tactics data is flushed when the
+			// configuration changes.
+			//
+			// A single filterMatches, used in getTactics, is allocated here
+			// to avoid allocating a slice per getTactics call.
+			//
+			// Server.ReloadableFile.RLock/RUnlock is the mutex for accessing
+			// these and other Server fields.
+
+			server.cachedTacticsData.Flush()
+			server.filterMatches = make([]bool, len(server.FilteredTactics))
+
 			server.initLookups()
 
 			server.loaded = true
@@ -730,6 +750,8 @@ func (server *Server) GetFilterGeoIPScope(geoIPData common.GeoIPData) int {
 //
 // Elements of the returned Payload, e.g., tactics parameters, will point to
 // data in DefaultTactics and FilteredTactics and must not be modifed.
+//
+// Callers must not mutate returned tactics data, which is cached.
 func (server *Server) GetTacticsPayload(
 	geoIPData common.GeoIPData,
 	apiParams common.APIParameters) (*Payload, error) {
@@ -737,22 +759,17 @@ func (server *Server) GetTacticsPayload(
 	// includeServerSideOnly is false: server-side only parameters are not
 	// used by the client, so including them wastes space and unnecessarily
 	// exposes the values.
-	tactics, err := server.GetTactics(false, geoIPData, apiParams)
+	tacticsData, err := server.getTactics(false, geoIPData, apiParams)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
 
-	if tactics == nil {
+	if tacticsData == nil {
 		return nil, nil
 	}
 
-	marshaledTactics, tag, err := marshalTactics(tactics)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-
 	payload := &Payload{
-		Tag: tag,
+		Tag: tacticsData.tag,
 	}
 
 	// New clients should always send STORED_TACTICS_TAG_PARAMETER_NAME. When they have no
@@ -777,7 +794,7 @@ func (server *Server) GetTacticsPayload(
 	}
 
 	if sendPayloadTactics {
-		payload.Tactics = marshaledTactics
+		payload.Tactics = tacticsData.payload
 	}
 
 	return payload, nil
@@ -797,37 +814,63 @@ func marshalTactics(tactics *Tactics) ([]byte, string, error) {
 }
 
 // GetTacticsWithTag returns a GetTactics value along with the associated tag value.
+//
+// Callers must not mutate returned tactics data, which is cached.
 func (server *Server) GetTacticsWithTag(
 	includeServerSideOnly bool,
 	geoIPData common.GeoIPData,
 	apiParams common.APIParameters) (*Tactics, string, error) {
 
-	tactics, err := server.GetTactics(
+	tacticsData, err := server.getTactics(
 		includeServerSideOnly, geoIPData, apiParams)
 	if err != nil {
 		return nil, "", errors.Trace(err)
 	}
 
-	if tactics == nil {
+	if tacticsData == nil {
 		return nil, "", nil
 	}
 
-	_, tag, err := marshalTactics(tactics)
+	return tacticsData.tactics, tacticsData.tag, nil
+}
+
+// tacticsData is cached tactics data, including the merged Tactics object,
+// the JSON marshaled paylod, and hashed tag.
+type tacticsData struct {
+	tactics *Tactics
+	payload []byte
+	tag     string
+}
+
+func newTacticsData(tactics *Tactics) (*tacticsData, error) {
+
+	payload, err := json.Marshal(tactics)
 	if err != nil {
-		return nil, "", errors.Trace(err)
+		return nil, errors.Trace(err)
 	}
 
-	return tactics, tag, nil
+	// MD5 hash is used solely as a data checksum and not for any security
+	// purpose.
+	digest := md5.Sum(payload)
+	tag := hex.EncodeToString(digest[:])
+
+	return &tacticsData{
+		tactics: tactics,
+		payload: payload,
+		tag:     tag,
+	}, nil
 }
 
 // GetTactics assembles and returns tactics data for a client with the
 // specified GeoIP, API parameter, and speed test attributes.
 //
 // The tactics return value may be nil.
-func (server *Server) GetTactics(
+//
+// Callers must not mutate returned tactics data, which is cached.
+func (server *Server) getTactics(
 	includeServerSideOnly bool,
 	geoIPData common.GeoIPData,
-	apiParams common.APIParameters) (*Tactics, error) {
+	apiParams common.APIParameters) (*tacticsData, error) {
 
 	server.ReloadableFile.RLock()
 	defer server.ReloadableFile.RUnlock()
@@ -837,11 +880,19 @@ func (server *Server) GetTactics(
 		return nil, nil
 	}
 
-	tactics := server.DefaultTactics.clone(includeServerSideOnly)
+	// Two passes are performed, one to get the list of matching filters, and
+	// then, if no merged tactics data is found for that filter match set,
+	// another pass to merge all the tactics parameters.
 
 	var aggregatedValues map[string]int
+	filterMatchCount := 0
 
-	for _, filteredTactics := range server.FilteredTactics {
+	// Use the preallocated slice to avoid an allocation per getTactics call.
+	filterMatches := server.filterMatches
+
+	for filterIndex, filteredTactics := range server.FilteredTactics {
+
+		filterMatches[filterIndex] = false
 
 		if len(filteredTactics.Filter.Regions) > 0 {
 			if filteredTactics.Filter.regionLookup != nil {
@@ -944,15 +995,63 @@ func (server *Server) GetTactics(
 			}
 		}
 
-		tactics.merge(includeServerSideOnly, &filteredTactics.Tactics)
+		filterMatchCount += 1
+		filterMatches[filterIndex] = true
+
+		// Continue to check for more matches. Last matching tactics filter
+		// has priority for any field.
+	}
+
+	// For any filter match set, the merged tactics parameters are the same,
+	// so the resulting merge is cached, along with the JSON encoding of the
+	// payload and hash tag. This cache reduces, for repeated tactics
+	// requests, heavy allocations from the JSON marshal and CPU load from
+	// both the marshal and hashing the marshal result.
+	//
+	// getCacheKey still allocates a strings.Builder buffer.
+
+	cacheKey := getCacheKey(filterMatchCount > 0, filterMatches)
 
-		// Continue to apply more matches. Last matching tactics has priority for any field.
+	cacheValue, ok := server.cachedTacticsData.Get(cacheKey)
+	if ok {
+		return cacheValue.(*tacticsData), nil
+	}
+
+	tactics := server.DefaultTactics.clone(includeServerSideOnly)
+	if filterMatchCount > 0 {
+		for filterIndex, filteredTactics := range server.FilteredTactics {
+			if filterMatches[filterIndex] {
+				tactics.merge(includeServerSideOnly, &filteredTactics.Tactics)
+			}
+		}
 	}
 
 	// See Tactics.Probability doc comment.
 	tactics.Probability = 1.0
 
-	return tactics, nil
+	tacticsData, err := newTacticsData(tactics)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	server.cachedTacticsData.Set(cacheKey, tacticsData, 0)
+
+	return tacticsData, nil
+}
+
+func getCacheKey(hasFilterMatches bool, filterMatches []bool) string {
+	// When no filters match, the key is "". The input hasFilterMatches allows
+	// for skipping the strings.Builder setup and loop entirely.
+	if !hasFilterMatches {
+		return ""
+	}
+	var b strings.Builder
+	for filterIndex, match := range filterMatches {
+		if match {
+			fmt.Fprintf(&b, "%x-", filterIndex)
+		}
+	}
+	return b.String()
 }
 
 // TODO: refactor this copy of psiphon/server.getStringRequestParam into common?

+ 71 - 0
psiphon/common/tactics/tactics_test.go

@@ -93,6 +93,16 @@ func TestTactics(t *testing.T) {
             }
           }
         },
+        {
+          "Filter" : {
+            "APIParameters" : {"client_platform" : ["P2"], "client_version": ["V2"]}
+          },
+          "Tactics" : {
+            "Parameters" : {
+              "ConnectionWorkerPoolSize" : 1
+            }
+          }
+        },
         {
           "Filter" : {
             "Regions": ["R2"]
@@ -323,6 +333,24 @@ func TestTactics(t *testing.T) {
 		}
 	}
 
+	// Helper to check server-side cachedTacticsData state
+
+	checkServerCache := func(cacheEntryFilterMatches ...[]bool) {
+
+		cacheItems := server.cachedTacticsData.Items()
+		if len(cacheItems) != len(cacheEntryFilterMatches) {
+			t.Fatalf("Unexpected cachedTacticsData size: %v", len(cacheItems))
+		}
+
+		for _, filterMatches := range cacheEntryFilterMatches {
+			cacheKey := getCacheKey(true, filterMatches)
+			_, ok := server.cachedTacticsData.Get(cacheKey)
+			if !ok {
+				t.Fatalf("Unexpected missing cachedTacticsData entry: %v", filterMatches)
+			}
+		}
+	}
+
 	// Initial tactics request; will also run a speed test
 
 	// Request should complete in < 1 second
@@ -352,6 +380,10 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(initialFetchTacticsRecord)
 
+	// Server should be caching tactics data for tactics matching first two
+	// filters.
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// There should now be cached local tactics
 
 	storedTacticsRecord, err := UseStoredTactics(storer, networkID)
@@ -434,6 +466,9 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(fetchTacticsRecord)
 
+	// Server cache should be the same
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// Modify tactics configuration to change payload
 
 	tacticsConnectionWorkerPoolSize = 6
@@ -474,6 +509,9 @@ func TestTactics(t *testing.T) {
 		t.Fatalf("Server config failed to reload")
 	}
 
+	// Server cache should be flushed
+	checkServerCache()
+
 	// Next fetch should return a different payload
 
 	fetchTacticsRecord, err = FetchTactics(
@@ -509,6 +547,8 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(fetchTacticsRecord)
 
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// Exercise handshake transport of tactics
 
 	// Wait for tactics to expire; handshake should renew
@@ -563,6 +603,8 @@ func TestTactics(t *testing.T) {
 
 	checkParameters(handshakeTacticsRecord)
 
+	checkServerCache([]bool{true, true, false, false, false})
+
 	// Now there should be stored tactics
 
 	storedTacticsRecord, err = UseStoredTactics(storer, networkID)
@@ -596,6 +638,35 @@ func TestTactics(t *testing.T) {
 		t.Fatalf("unexpected stored tactics record")
 	}
 
+	// Server should cache a new entry for different filter matches
+
+	apiParams2 := common.APIParameters{
+		"client_platform": "P2",
+		"client_version":  "V2"}
+
+	fetchTacticsRecord, err = FetchTactics(
+		context.Background(),
+		params,
+		storer,
+		getNetworkID,
+		apiParams2,
+		endPointProtocol,
+		endPointRegion,
+		encodedRequestPublicKey,
+		encodedObfuscatedKey,
+		obfuscatedRoundTripper)
+	if err != nil {
+		t.Fatalf("FetchTactics failed: %s", err)
+	}
+
+	if fetchTacticsRecord == nil {
+		t.Fatalf("expected tactics record")
+	}
+
+	checkServerCache(
+		[]bool{true, true, false, false, false},
+		[]bool{false, false, true, false, false})
+
 	// Exercise speed test sample truncation
 
 	maxSamples := params.Get().Int(parameters.SpeedTestMaxSampleCount)

+ 11 - 0
psiphon/server/tactics.go

@@ -135,6 +135,17 @@ func (c *ServerTacticsParametersCache) Get(
 
 	// Construct parameters from tactics.
 
+	// Note: since ServerTacticsParametersCache was implemented,
+	// tactics.Server.cachedTacticsData was added. This new cache is
+	// primarily intended to reduce server allocations and computations
+	// when _clients_ request tactics. cachedTacticsData also impacts
+	// GetTacticsWithTag.
+	//
+	// ServerTacticsParametersCache still optimizes performance for
+	// server-side tactics, since cachedTacticsData doesn't avoid filter
+	// checks, and ServerTacticsParametersCache includes a prepared
+	// parameters.ParametersAccessor.
+
 	tactics, tag, err := c.support.TacticsServer.GetTacticsWithTag(
 		true, common.GeoIPData(geoIPData), make(common.APIParameters))
 	if err != nil {