|
|
@@ -47,7 +47,10 @@ const (
|
|
|
defaultRequestRetryCount = 1
|
|
|
|
|
|
defaultServerEntryCacheTTL = 24 * time.Hour
|
|
|
- defaultServerEntryCacheMaxSize = 200000
|
|
|
+ defaultServerEntryCacheMaxSize = 250000
|
|
|
+
|
|
|
+ defaultOSLFileSpecCacheTTL = 24 * time.Hour
|
|
|
+ defaultOSLFileSpecCacheMaxSize = 250000
|
|
|
)
|
|
|
|
|
|
// RelayConfig specifies the configuration for a Relay.
|
|
|
@@ -94,6 +97,7 @@ type Relay struct {
|
|
|
hostKeyFile common.ReloadableFile
|
|
|
|
|
|
mutex sync.Mutex
|
|
|
+ tlsSessionCache tls.ClientSessionCache
|
|
|
tlsConfig *tls.Config
|
|
|
httpClient *http.Client
|
|
|
requestTimeout time.Duration
|
|
|
@@ -101,6 +105,12 @@ type Relay struct {
|
|
|
serverEntryCache *lrucache.Cache
|
|
|
serverEntryCacheTTL time.Duration
|
|
|
serverEntryCacheMaxSize int
|
|
|
+ oslFileSpecCache *lrucache.Cache
|
|
|
+ oslFileSpecCacheTTL time.Duration
|
|
|
+ oslFileSpecCacheMaxSize int
|
|
|
+
|
|
|
+ getServerEntriesBufferPool sync.Pool
|
|
|
+ getOSLFileSpecsBufferPool sync.Pool
|
|
|
}
|
|
|
|
|
|
// NewRelay creates a new Relay.
|
|
|
@@ -112,6 +122,8 @@ func NewRelay(config *RelayConfig) (*Relay, error) {
|
|
|
caCertificatesFile: common.NewReloadableFile(config.CACertificatesFilename, false, nil),
|
|
|
hostCertificateFile: common.NewReloadableFile(config.HostCertificateFilename, false, nil),
|
|
|
hostKeyFile: common.NewReloadableFile(config.HostKeyFilename, false, nil),
|
|
|
+
|
|
|
+ tlsSessionCache: tls.NewLRUClientSessionCache(0),
|
|
|
}
|
|
|
|
|
|
_, err := relay.Reload()
|
|
|
@@ -128,7 +140,12 @@ func NewRelay(config *RelayConfig) (*Relay, error) {
|
|
|
|
|
|
relay.SetCacheParameters(
|
|
|
defaultServerEntryCacheTTL,
|
|
|
- defaultServerEntryCacheMaxSize)
|
|
|
+ defaultServerEntryCacheMaxSize,
|
|
|
+ defaultOSLFileSpecCacheTTL,
|
|
|
+ defaultOSLFileSpecCacheMaxSize)
|
|
|
+
|
|
|
+ relay.getServerEntriesBufferPool.New = func() any { return []*SourcedServerEntry{} }
|
|
|
+ relay.getOSLFileSpecsBufferPool.New = func() any { return []OSLFileSpec{} }
|
|
|
|
|
|
return relay, nil
|
|
|
}
|
|
|
@@ -187,9 +204,13 @@ func (r *Relay) Reload() (bool, error) {
|
|
|
r.mutex.Lock()
|
|
|
defer r.mutex.Unlock()
|
|
|
|
|
|
+ r.tlsSessionCache = tls.NewLRUClientSessionCache(0)
|
|
|
+
|
|
|
r.tlsConfig = &tls.Config{
|
|
|
RootCAs: caCertificates,
|
|
|
Certificates: []tls.Certificate{hostCertificate},
|
|
|
+
|
|
|
+ ClientSessionCache: r.tlsSessionCache,
|
|
|
}
|
|
|
|
|
|
if r.httpClient != nil {
|
|
|
@@ -273,22 +294,24 @@ func (r *Relay) SetRequestParameters(
|
|
|
// entry caching. When the parameters change, any existing cache is flushed
|
|
|
// and replaced.
|
|
|
func (r *Relay) SetCacheParameters(
|
|
|
- TTL time.Duration,
|
|
|
- maxSize int) {
|
|
|
+ serverEntryCacheTTL time.Duration,
|
|
|
+ serverEntryCacheMaxSize int,
|
|
|
+ oslFileSpecCacheTTL time.Duration,
|
|
|
+ oslFileSpecCacheMaxSize int) {
|
|
|
|
|
|
r.mutex.Lock()
|
|
|
defer r.mutex.Unlock()
|
|
|
|
|
|
if r.serverEntryCache == nil ||
|
|
|
- r.serverEntryCacheTTL != TTL ||
|
|
|
- r.serverEntryCacheMaxSize != maxSize {
|
|
|
+ r.serverEntryCacheTTL != serverEntryCacheTTL ||
|
|
|
+ r.serverEntryCacheMaxSize != serverEntryCacheMaxSize {
|
|
|
|
|
|
if r.serverEntryCache != nil {
|
|
|
r.serverEntryCache.Flush()
|
|
|
}
|
|
|
|
|
|
- r.serverEntryCacheTTL = TTL
|
|
|
- r.serverEntryCacheMaxSize = maxSize
|
|
|
+ r.serverEntryCacheTTL = serverEntryCacheTTL
|
|
|
+ r.serverEntryCacheMaxSize = serverEntryCacheMaxSize
|
|
|
|
|
|
if r.serverEntryCacheTTL > 0 {
|
|
|
|
|
|
@@ -302,6 +325,30 @@ func (r *Relay) SetCacheParameters(
|
|
|
r.serverEntryCache = nil
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if r.oslFileSpecCache == nil ||
|
|
|
+ r.oslFileSpecCacheTTL != oslFileSpecCacheTTL ||
|
|
|
+ r.oslFileSpecCacheMaxSize != oslFileSpecCacheMaxSize {
|
|
|
+
|
|
|
+ if r.oslFileSpecCache != nil {
|
|
|
+ r.oslFileSpecCache.Flush()
|
|
|
+ }
|
|
|
+
|
|
|
+ r.oslFileSpecCacheTTL = oslFileSpecCacheTTL
|
|
|
+ r.oslFileSpecCacheMaxSize = oslFileSpecCacheMaxSize
|
|
|
+
|
|
|
+ if r.oslFileSpecCacheTTL > 0 {
|
|
|
+
|
|
|
+ r.oslFileSpecCache = lrucache.NewWithLRU(
|
|
|
+ r.oslFileSpecCacheTTL,
|
|
|
+ 1*time.Minute,
|
|
|
+ r.oslFileSpecCacheMaxSize)
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ r.oslFileSpecCache = nil
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// HandleRequest relays a DSL request.
|
|
|
@@ -393,13 +440,31 @@ func (r *Relay) HandleRequest(
|
|
|
// DiscoverServerEntriesResponses and, for each tag/version pair, if
|
|
|
// the tag is in the cache and the cached entry is an old version,
|
|
|
// delete from the cache. This would require unpacking each server entry.
|
|
|
+ //
|
|
|
+ // Similarly, for requestTypeGetOSLFileSpecs, peek at the
|
|
|
+ // RelayedResponse.Response and extract OSL file specs and add to the
|
|
|
+ // local cache, keyed by OSL ID; and peek at RelayedRequest.Request, and
|
|
|
+ // if all requested OSL file specs are in the cache, serve the request
|
|
|
+ // entirely from the local cache.
|
|
|
|
|
|
var response []byte
|
|
|
cachedResponse := false
|
|
|
|
|
|
- if relayedRequest.RequestType == requestTypeGetServerEntries {
|
|
|
+ var serveCachedResponse func([]byte, common.GeoIPData) ([]byte, error)
|
|
|
+ var updateCache func([]byte, []byte) error
|
|
|
+
|
|
|
+ switch relayedRequest.RequestType {
|
|
|
+ case requestTypeGetServerEntries:
|
|
|
+ serveCachedResponse = r.getCachedGetServerEntriesResponse
|
|
|
+ updateCache = r.cacheGetServerEntriesResponse
|
|
|
+ case requestTypeGetOSLFileSpecs:
|
|
|
+ serveCachedResponse = r.getCachedGetOSLFileSpecsResponse
|
|
|
+ updateCache = r.cacheGetOSLFileSpecsResponse
|
|
|
+ }
|
|
|
+
|
|
|
+ if serveCachedResponse != nil {
|
|
|
var err error
|
|
|
- response, err = r.getCachedGetServerEntriesResponse(
|
|
|
+ response, err = serveCachedResponse(
|
|
|
relayedRequest.Request, clientGeoIPData)
|
|
|
if err != nil {
|
|
|
r.config.Logger.WithTraceFields(common.LogFields{
|
|
|
@@ -471,13 +536,13 @@ func (r *Relay) HandleRequest(
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
- if relayedRequest.RequestType == requestTypeGetServerEntries {
|
|
|
- err := r.cacheGetServerEntriesResponse(
|
|
|
+ if updateCache != nil {
|
|
|
+ err := updateCache(
|
|
|
relayedRequest.Request, response)
|
|
|
if err != nil {
|
|
|
r.config.Logger.WithTraceFields(common.LogFields{
|
|
|
"error": err.Error(),
|
|
|
- }).Warning("DSL: cache response failed")
|
|
|
+ }).Warning("DSL: update cache failed")
|
|
|
// Proceed with relaying response
|
|
|
}
|
|
|
}
|
|
|
@@ -542,7 +607,11 @@ func (r *Relay) cacheGetServerEntriesResponse(
|
|
|
cborRequest []byte,
|
|
|
cborResponse []byte) error {
|
|
|
|
|
|
- if r.serverEntryCacheTTL == 0 {
|
|
|
+ r.mutex.Lock()
|
|
|
+ cache := r.serverEntryCache
|
|
|
+ r.mutex.Unlock()
|
|
|
+
|
|
|
+ if cache == nil {
|
|
|
// Caching is disabled
|
|
|
return nil
|
|
|
}
|
|
|
@@ -571,7 +640,7 @@ func (r *Relay) cacheGetServerEntriesResponse(
|
|
|
// this tag, in case the server entry version is new. This also
|
|
|
// extends the cache TTL, since the server entry is fresh.
|
|
|
|
|
|
- r.serverEntryCache.Set(
|
|
|
+ cache.Set(
|
|
|
string(serverEntryTag),
|
|
|
response.SourcedServerEntries[i],
|
|
|
lrucache.DefaultExpiration)
|
|
|
@@ -584,7 +653,7 @@ func (r *Relay) cacheGetServerEntriesResponse(
|
|
|
// is an edge case since DiscoverServerEntries won't return
|
|
|
// invalid tags and so the "nil" value/state isn't cached.
|
|
|
|
|
|
- r.serverEntryCache.Delete(string(serverEntryTag))
|
|
|
+ cache.Delete(string(serverEntryTag))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -595,7 +664,11 @@ func (r *Relay) getCachedGetServerEntriesResponse(
|
|
|
cborRequest []byte,
|
|
|
clientGeoIPData common.GeoIPData) ([]byte, error) {
|
|
|
|
|
|
- if r.serverEntryCacheTTL == 0 {
|
|
|
+ r.mutex.Lock()
|
|
|
+ cache := r.serverEntryCache
|
|
|
+ r.mutex.Unlock()
|
|
|
+
|
|
|
+ if cache == nil {
|
|
|
// Caching is disabled
|
|
|
return nil, nil
|
|
|
}
|
|
|
@@ -607,19 +680,29 @@ func (r *Relay) getCachedGetServerEntriesResponse(
|
|
|
}
|
|
|
|
|
|
// Since we anticipate that most server entries will be cached, allocate
|
|
|
- // response slices optimistically.
|
|
|
+ // response slices optimistically. Use buffer pools to mitigate GC churn.
|
|
|
//
|
|
|
// TODO: check for sufficient cache entries before allocating these
|
|
|
// response slices? Would doubling the cache lookups use less resources
|
|
|
// than unused allocations?
|
|
|
|
|
|
- serverEntryTags := make([]string, len(request.ServerEntryTags))
|
|
|
+ buffer := r.getServerEntriesBufferPool.Get().([]*SourcedServerEntry)
|
|
|
+ size := len(request.ServerEntryTags)
|
|
|
+ if cap(buffer) < size {
|
|
|
+ buffer = make([]*SourcedServerEntry, size)
|
|
|
+ } else {
|
|
|
+ buffer = buffer[:size]
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ clear(buffer)
|
|
|
+ r.getServerEntriesBufferPool.Put(buffer)
|
|
|
+ }()
|
|
|
|
|
|
var response GetServerEntriesResponse
|
|
|
- response.SourcedServerEntries = make([]*SourcedServerEntry, len(request.ServerEntryTags))
|
|
|
+ response.SourcedServerEntries = buffer
|
|
|
|
|
|
for i, serverEntryTag := range request.ServerEntryTags {
|
|
|
- cacheEntry, ok := r.serverEntryCache.Get(string(serverEntryTag))
|
|
|
+ cacheEntry, ok := cache.Get(string(serverEntryTag))
|
|
|
if !ok {
|
|
|
|
|
|
// The request can't be served from the cache, as some server
|
|
|
@@ -634,10 +717,6 @@ func (r *Relay) getCachedGetServerEntriesResponse(
|
|
|
|
|
|
// The cached entry's TTL is not extended on a hit.
|
|
|
|
|
|
- // serverEntryTags are used for logging the request event when served
|
|
|
- // from the cache.
|
|
|
- serverEntryTags[i] = serverEntryTag.String()
|
|
|
-
|
|
|
response.SourcedServerEntries[i] = cacheEntry.(*SourcedServerEntry)
|
|
|
}
|
|
|
|
|
|
@@ -646,7 +725,7 @@ func (r *Relay) getCachedGetServerEntriesResponse(
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- // Log the request event. Since this request is server from the relay
|
|
|
+ // Log the request event. Since this request is served from the relay
|
|
|
// cache, the DSL backend will not see the request and log the event
|
|
|
// itself. This log should match the DSL log format and can be shipped to
|
|
|
// the same log aggregator.
|
|
|
@@ -662,12 +741,132 @@ func (r *Relay) getCachedGetServerEntriesResponse(
|
|
|
}
|
|
|
|
|
|
logFields := r.config.APIParameterLogFieldFormatter("", clientGeoIPData, baseParams)
|
|
|
- logFields["server_entry_tags"] = serverEntryTags
|
|
|
+ logFields["server_entry_tag_count"] = len(response.SourcedServerEntries)
|
|
|
r.config.Logger.LogMetric("dsl_relay_get_server_entries", logFields)
|
|
|
|
|
|
return cborResponse, nil
|
|
|
}
|
|
|
|
|
|
+func (r *Relay) cacheGetOSLFileSpecsResponse(
|
|
|
+ cborRequest []byte,
|
|
|
+ cborResponse []byte) error {
|
|
|
+
|
|
|
+ r.mutex.Lock()
|
|
|
+ cache := r.oslFileSpecCache
|
|
|
+ r.mutex.Unlock()
|
|
|
+
|
|
|
+ if cache == nil {
|
|
|
+ // Caching is disabled
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ var request GetOSLFileSpecsRequest
|
|
|
+ err := cbor.Unmarshal(cborRequest, &request)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ var response GetOSLFileSpecsResponse
|
|
|
+ err = cbor.Unmarshal(cborResponse, &response)
|
|
|
+ if err != nil {
|
|
|
+ return errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(request.OSLIDs) != len(response.OSLFileSpecs) {
|
|
|
+ return errors.TraceNew("unexpected spec count mismatch")
|
|
|
+ }
|
|
|
+
|
|
|
+ for i, oslID := range request.OSLIDs {
|
|
|
+
|
|
|
+ if response.OSLFileSpecs[i] != nil {
|
|
|
+
|
|
|
+ // This will extend the cache TTL for existing entries.
|
|
|
+
|
|
|
+ cache.Set(
|
|
|
+ string(oslID),
|
|
|
+ response.OSLFileSpecs[i],
|
|
|
+ lrucache.DefaultExpiration)
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ // In this case, the DSL backend is indicating that the OSL file
|
|
|
+ // spec is not longer active or available for distribution.
|
|
|
+
|
|
|
+ cache.Delete(string(oslID))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r *Relay) getCachedGetOSLFileSpecsResponse(
|
|
|
+ cborRequest []byte,
|
|
|
+ clientGeoIPData common.GeoIPData) ([]byte, error) {
|
|
|
+
|
|
|
+ r.mutex.Lock()
|
|
|
+ cache := r.oslFileSpecCache
|
|
|
+ r.mutex.Unlock()
|
|
|
+
|
|
|
+ if cache == nil {
|
|
|
+ // Caching is disabled
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ var request GetOSLFileSpecsRequest
|
|
|
+ err := cbor.Unmarshal(cborRequest, &request)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // This logic mirrors getCachedGetServerEntriesResponse. See the comments
|
|
|
+ // in that function.
|
|
|
+
|
|
|
+ buffer := r.getOSLFileSpecsBufferPool.Get().([]OSLFileSpec)
|
|
|
+ size := len(request.OSLIDs)
|
|
|
+ if cap(buffer) < size {
|
|
|
+ buffer = make([]OSLFileSpec, size)
|
|
|
+ } else {
|
|
|
+ buffer = buffer[:size]
|
|
|
+ }
|
|
|
+ defer func() {
|
|
|
+ clear(buffer)
|
|
|
+ r.getOSLFileSpecsBufferPool.Put(buffer)
|
|
|
+ }()
|
|
|
+
|
|
|
+ var response GetOSLFileSpecsResponse
|
|
|
+ response.OSLFileSpecs = buffer
|
|
|
+
|
|
|
+ for i, oslID := range request.OSLIDs {
|
|
|
+ cacheEntry, ok := cache.Get(string(oslID))
|
|
|
+ if !ok {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ response.OSLFileSpecs[i] = cacheEntry.(OSLFileSpec)
|
|
|
+ }
|
|
|
+
|
|
|
+ cborResponse, err := protocol.CBOREncoding.Marshal(&response)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ baseParams, err := protocol.DecodePackedAPIParameters(request.BaseAPIParameters)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = r.config.APIParameterValidator(baseParams)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ logFields := r.config.APIParameterLogFieldFormatter("", clientGeoIPData, baseParams)
|
|
|
+ logFields["osl_id_count"] = len(response.OSLFileSpecs)
|
|
|
+ r.config.Logger.LogMetric("dsl_relay_get_osl_file_specs", logFields)
|
|
|
+
|
|
|
+ return cborResponse, nil
|
|
|
+}
|
|
|
+
|
|
|
var relayGenericErrorResponse []byte
|
|
|
|
|
|
func init() {
|