| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922 |
- /*
- * Copyright (c) 2025, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package dsl
- import (
- "bytes"
- "context"
- "time"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
- "github.com/fxamacker/cbor/v2"
- )
- // FetcherRoundTripper is a pluggable round trip transport that sends requests
- // to a relay and returns the corresponding response. The FetcherRoundTripper
- // connection to a relay typically provides obfuscation and blocking
- // resistance, enabling the client to reach the DSL backend via the relay.
- //
- // Round trippers include in-proxy broker clients, where the broker is a
- // relay; and SSH tunnel requests, where the Psiphon server is the relay.
- type FetcherRoundTripper func(
- ctx context.Context,
- requestPayload []byte) (responsePayload []byte, err error)
- // FetcherConfig specifies the configuration for a Fetcher.
- type FetcherConfig struct {
- Logger common.Logger
- BaseAPIParameters common.APIParameters
- Tunneled bool
- RoundTripper FetcherRoundTripper
- DatastoreGetLastFetchTime func() (time.Time, error)
- DatastoreSetLastFetchTime func(time time.Time) error
- DatastoreHasServerEntry func(
- tag ServerEntryTag,
- version int,
- prioritizeDial bool) bool
- DatastoreStoreServerEntry func(
- serverEntryFields protocol.PackedServerEntryFields,
- source string,
- prioritizeDial bool) error
- DatastoreGetLastActiveOSLsTime func() (time.Time, error)
- DatastoreSetLastActiveOSLsTime func(time time.Time) error
- DatastoreKnownOSLIDs func() (IDs []OSLID, err error)
- DatastoreGetOSLState func(ID OSLID) (state []byte, err error)
- DatastoreStoreOSLState func(ID OSLID, state []byte) error
- DatastoreDeleteOSLState func(ID OSLID) error
- DatastoreSLOKLookup osl.SLOKLookup
- DatastoreFatalError func(error)
- RequestTimeout time.Duration
- RequestRetryCount int
- RequestRetryDelay time.Duration
- RequestRetryDelayJitter float64
- FetchTTL time.Duration
- DiscoverServerEntriesMinCount int
- DiscoverServerEntriesMaxCount int
- GetServerEntriesMinCount int
- GetServerEntriesMaxCount int
- GetLastActiveOSLsTTL time.Duration
- GetOSLFileSpecsMinCount int
- GetOSLFileSpecsMaxCount int
- // WaitForNetworkConnectivity is an optional callback that should block
- // until there is network connectivity or shutdown. The return value is
- // true when there is network connectivity, and false for shutdown.
- WaitForNetworkConnectivity func() bool
- DoGarbageCollection func()
- }
- const (
- oslStateNoFileSpec = 1
- oslStateHasFileSpec = 2
- oslStateHasKey = 3
- )
- // fetcherOSLState is OSL state that's persisted to the datastore. For each
- // active OSL, the Fetcher will progressively download and persist the
- // corresponding FileSpec, and then attempt to reassemble the OSL key using
- // the FileSpec, persist any reassembled keys, and ultimately prune old OSL
- // state.
- type fetcherOSLState struct {
- ID OSLID `cbor:"1,keyasint,omitempty"`
- State int32 `cbor:"2,keyasint,omitempty"`
- FileSpec OSLFileSpec `cbor:"3,keyasint,omitempty"`
- Key OSLKey `cbor:"4,keyasint,omitempty"`
- }
- // Fetcher orchestrates discovering and downloading server entries from a DSL
- // backend, via a relay. A Fetcher also synchronizes active OSL state and
- // reassembles OSL keys to be used as discovery inputs.
- type Fetcher struct {
- config *FetcherConfig
- packedAPIParameters protocol.PackedAPIParameters
- }
- // NewFetcher creates a new Fetcher.
- func NewFetcher(config *FetcherConfig) (*Fetcher, error) {
- packedAPIParameters, err := protocol.EncodePackedAPIParameters(
- config.BaseAPIParameters)
- if err != nil {
- return nil, errors.Trace(err)
- }
- return &Fetcher{
- config: config,
- packedAPIParameters: packedAPIParameters,
- }, nil
- }
- // Run performs a server entry discovery/download and OSL synchronization
- // sequence.
- //
- // Each Run may make incremental progress. New OSL state or new server entries
- // may be downloaded and persisted even when Run ultimately fails and returns
- // an error.
- //
- // Run will stop and return immediately when the input ctx is done.
- //
- // Data is processed incrementally and DoGarbageCollection is invoked
- // periodically in order to limit the overall memory footprint of the Run.
- //
- // The caller MUST:
- //
- // - Schedule Fetcher runs when appropriate: when the client is unable to
- // connect, a full, non-frequent, untunneled fetcher run should be
- // triggered to potentially discover a large selection of servers; when the
- // client connects, a frequent fetcher run should be triggered to discover
- // a small number of servers.
- //
- // - Configure DiscoverServerEntriesMin/MaxCount using appropriate parameters
- // for the frequence/non-frequent mode.
- //
- // - Provide a cooldown time or delay between repeated Run calls when Run
- // returns an error.
- //
- // - Cease all Run invocations if the DatastoreFatalError callback is invoked.
- // In this case, a set-last-time datastore operation required for the
- // DiscoverServerEntriesTTL/GetLastActiveOSLsTTL mechanism has failed. The
- // calling client should not invoke Run again until after a Stop/Start
- // cycle.
- //
- // - Ensure that there's only one concurrent fetcher run. The datastore
- // operations are intended for incremental, persistent progress and
- // multiple concurrent runs may interleave conflicting datastore calls.
- // This requirement means that if there's an ongoing untunneled fetcher run
- // and a tunnel is established, any post-connected, frequent fetcher run
- // must be skipped or postponed.
- func (f *Fetcher) Run(ctx context.Context) error {
- lastTime, err := f.config.DatastoreGetLastFetchTime()
- if err != nil {
- return errors.Trace(err)
- }
- if time.Now().Before(lastTime.Add(f.config.FetchTTL)) {
- return nil
- }
- // processOSLs will:
- //
- // - check for new active OSLs, subject to GetLastActiveOSLsTTL
- // - download any OSL FileSpecs for known, active OSL IDs
- // - attempt to reassemble OSL keys for any unassembled OSLs
- // - return the list of assembled, active OSL keys
- OSLKeys, oslErr := f.processOSLs(ctx)
- if oslErr != nil {
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "error": oslErr.Error(),
- }).Warning("DSL: process OSLs failed")
- // Proceed without OSL keys
- }
- // Discover server entries, identified by tag.
- // Vary the size of the requested response to avoid a trivial traffic
- // fingerprint.
- discoverCount := prng.Range(
- f.config.DiscoverServerEntriesMinCount,
- f.config.DiscoverServerEntriesMaxCount)
- versionedTags, err := f.doDiscoverServerEntriesRequest(
- ctx,
- OSLKeys,
- discoverCount)
- if err != nil {
- return errors.Trace(err)
- }
- // Check each discovered server entry tag and version. Skip when the
- // tag/version is already in the local datastore. Fetch the unknown or
- // updated server entries in batches.
- //
- // Datastore transactions are per server entry, to allow for incremental
- // progress in case of an error.
- storeServerEntriesCount := 0
- knownServerEntriesCount := 0
- tagCount := len(versionedTags)
- defer func() {
- // Emit log even if not all fetches succeed.
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "tags": tagCount,
- "updated": storeServerEntriesCount,
- "known": knownServerEntriesCount,
- }).Info("DSL: fetched server entries")
- }()
- // A subset of versionedTags containing both the tag and prioritize flag
- // could be used here instead of two slices, but the memory impact of two
- // slices should be less, considering the DoGarbageCollection can reclaim
- // versionedTags, and we need a slice of tags (slice of getTags) to send
- // in GetServerEntries.
- //
- // As GetServerEntriesResponse will contain an entry (potentially nil) for
- // every requested server entry tag, in requested order, each index in
- // prioritizeDials corresponds both to the same index in getTags and the
- // sourcedServerEntries returned from doGetServerEntriesRequest.
- var getTags []ServerEntryTag
- var prioritizeDials []bool
- for _, v := range versionedTags {
- hasServerEntry := f.config.DatastoreHasServerEntry(
- v.Tag,
- int(v.Version),
- v.PrioritizeDial)
- if hasServerEntry {
- knownServerEntriesCount += 1
- continue
- }
- getTags = append(getTags, v.Tag)
- prioritizeDials = append(prioritizeDials, v.PrioritizeDial)
- }
- // Allow garbage collection.
- versionedTags = nil
- for len(getTags) > 0 {
- // Vary the size of the request and response.
- getCount := prng.Range(
- f.config.GetServerEntriesMinCount,
- f.config.GetServerEntriesMaxCount)
- getBatch := getTags
- if len(getBatch) > getCount {
- getBatch = getBatch[:getCount]
- }
- sourcedServerEntries, err := f.doGetServerEntriesRequest(ctx, getBatch)
- if err != nil {
- return errors.Trace(err)
- }
- for i, sourcedEntry := range sourcedServerEntries {
- if sourcedEntry == nil {
- // The requested server entry is no longer distributable or
- // doesn't exist.
- continue
- }
- err := f.config.DatastoreStoreServerEntry(
- sourcedEntry.ServerEntryFields,
- sourcedEntry.Source,
- prioritizeDials[i])
- if err != nil {
- return errors.Trace(err)
- }
- storeServerEntriesCount += 1
- }
- // doGetServerEntriesRequest will retry failed requests and reduces
- // the number of requested server entries in each retry. Adjust
- // getTags in case less than the initial getBatch were fetched.
- // Unfetched server entries will be added to the next batch.
- getTags = getTags[len(sourcedServerEntries):]
- prioritizeDials = prioritizeDials[len(sourcedServerEntries):]
- f.config.DoGarbageCollection()
- }
- err = f.config.DatastoreSetLastFetchTime(time.Now())
- if err != nil {
- err = errors.Trace(err)
- // Signal a fatal datastore error. The caller should not run any
- // Fetcher again, for the duration of its process, since the
- // LastDiscoverTime mechanism won't prevent excess repeats.
- f.config.DatastoreFatalError(err)
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "error": err.Error(),
- }).Warning("DSL: datastore failed")
- // Proceed with this one run
- }
- if oslErr != nil {
- return errors.Trace(oslErr)
- }
- return nil
- }
- func (f *Fetcher) processOSLs(ctx context.Context) ([]OSLKey, error) {
- lastTime, err := f.config.DatastoreGetLastActiveOSLsTime()
- if err != nil {
- // TODO: proceed, but skip GetActiveOSLsRequest?
- return nil, errors.Trace(err)
- }
- now := time.Now()
- if now.After(lastTime.Add(f.config.GetLastActiveOSLsTTL)) {
- // When the last GetActiveOSLsRequest fetch expires, request the
- // current active OSLs again. Prune any locally stored OSL states for
- // OSLs that are no longer active. Add new OSL states for previously
- // unknown OSLs. These new OSLs states will trigger OSL FileSpec
- // fetches in the next step.
- // The size of the request and response is not varied in this case. In
- // practise, the number of active OSL IDs is expected to be
- // relatively small. The obfuscation hops to the relay should add a
- // small amount of random padding.
- activeOSLIDs, err := f.doGetActiveOSLsRequest(ctx)
- if err != nil {
- return nil, errors.Trace(err)
- }
- // Load known OSL states without attempting to reassemble OSL keys.
- knownOSLStates, err := f.loadOSLStates(ctx, false)
- if err != nil {
- return nil, errors.Trace(err)
- }
- addedCount := 0
- removedCount := 0
- for _, activeID := range activeOSLIDs {
- isKnown := false
- for _, knownState := range knownOSLStates {
- if bytes.Equal(activeID, knownState.ID) {
- isKnown = true
- break
- }
- }
- if !isKnown {
- err := f.storeOSLState(
- activeID,
- &fetcherOSLState{
- ID: activeID,
- State: oslStateNoFileSpec,
- })
- if err != nil {
- return nil, errors.Trace(err)
- }
- addedCount += 1
- }
- }
- for _, knownState := range knownOSLStates {
- isActive := false
- for _, activeID := range activeOSLIDs {
- if bytes.Equal(activeID, knownState.ID) {
- isActive = true
- break
- }
- }
- if !isActive {
- err := f.config.DatastoreDeleteOSLState(knownState.ID)
- if err != nil {
- return nil, errors.Trace(err)
- }
- removedCount += 1
- }
- }
- f.config.DoGarbageCollection()
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "total": len(activeOSLIDs),
- "added": addedCount,
- "removed": removedCount,
- }).Info("DSL: fetched active OSL IDs")
- err = f.config.DatastoreSetLastActiveOSLsTime(now)
- if err != nil {
- err = errors.Trace(err)
- // Signal a fatal datastore error. The caller should not run any
- // Fetcher again, for the duration of its process, since the
- // LastActiveOSLsTime mechanism won't prevent excess repeats.
- f.config.DatastoreFatalError(errors.Trace(err))
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "error": err.Error(),
- }).Warning("DSL: datastore failed")
- // Proceed with this one run
- }
- }
- // Load known OSL states, attempting to reassemble OSL keys. Any newly
- // assembled keys will be stored back to the datastore, caching the
- // assembly. For OSLs in the no-FileSpec state, the missing FileSpecs
- // will be fetched.
- knownOSLStates, err := f.loadOSLStates(ctx, true)
- if err != nil {
- return nil, errors.Trace(err)
- }
- addedSpecCount := 0
- removedSpecCount := 0
- defer func() {
- // Emit log even if not all fetches succeed.
- if addedSpecCount > 0 || removedSpecCount > 0 {
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "added": addedSpecCount,
- "removed": removedSpecCount,
- }).Info("DSL: fetched OSL FileSpecs")
- }
- }()
- var getFileSpecs []OSLID
- for _, knownState := range knownOSLStates {
- if knownState.State == oslStateHasFileSpec ||
- knownState.State == oslStateHasKey {
- continue
- }
- getFileSpecs = append(getFileSpecs, knownState.ID)
- }
- for len(getFileSpecs) > 0 {
- // Vary the size of the request and response.
- getCount := prng.Range(
- f.config.GetOSLFileSpecsMinCount,
- f.config.GetOSLFileSpecsMaxCount)
- getBatch := getFileSpecs
- if len(getBatch) > getCount {
- getBatch = getBatch[:getCount]
- }
- fileSpecs, err := f.doGetOSLFileSpecsRequest(ctx, getBatch)
- if err != nil {
- return nil, errors.Trace(err)
- }
- for i, fileSpec := range fileSpecs {
- if len(fileSpec) > 0 {
- err := f.storeOSLState(
- getFileSpecs[i],
- &fetcherOSLState{
- ID: getFileSpecs[i],
- State: oslStateHasFileSpec,
- FileSpec: fileSpec})
- if err != nil {
- return nil, errors.Trace(err)
- }
- addedSpecCount += 1
- } else {
- // A nil/empty FileSpec in the response indicates that the
- // requested OSL ID is invalid or no longer active. Prune the OSL state.
- err := f.config.DatastoreDeleteOSLState(getBatch[i])
- if err != nil {
- return nil, errors.Trace(err)
- }
- removedSpecCount += 1
- }
- }
- // doGetOSLFileSpecsRequest will retry failed requests and reduces
- // the number of requested OSL FileSpecs in each retry. Adjust
- // getFileSpecs in case less than the initial getBatch were fetched.
- // Unfetched FileSpecs will be added to the next batch.
- getFileSpecs = getFileSpecs[len(fileSpecs):]
- f.config.DoGarbageCollection()
- }
- if addedSpecCount > 0 || removedSpecCount > 0 {
- // Repeat attempting to reassemble OSL keys, since new FileSpecs were
- // downloaded. This case also prunes any now-removed OSLs so their keys
- // will not be included in the return value.
- knownOSLStates, err = f.loadOSLStates(ctx, true)
- if err != nil {
- return nil, errors.Trace(err)
- }
- }
- var keys []OSLKey
- for _, knownState := range knownOSLStates {
- if knownState.State == oslStateHasKey {
- keys = append(keys, knownState.Key)
- }
- }
- return keys, nil
- }
- func (f *Fetcher) doDiscoverServerEntriesRequest(
- ctx context.Context,
- keys []OSLKey,
- discoverCount int) ([]*VersionedServerEntryTag, error) {
- // Perform the request with retries. On each retry, reduce the requested
- // response size to mitigate blocking or performance issues with larger
- // responses.
- for i := 0; ; i++ {
- // All known OSL keys are sent in the request. In practise, the number
- // of active OSL IDs is expected to be relatively small.
- request := &DiscoverServerEntriesRequest{
- BaseAPIParameters: f.packedAPIParameters,
- OSLKeys: keys,
- DiscoverCount: int32(discoverCount),
- }
- var response *DiscoverServerEntriesResponse
- doRetry, err := f.doRelayedRequest(
- ctx, requestTypeDiscoverServerEntries, request, &response)
- if err == nil {
- return response.VersionedServerEntryTags, nil
- }
- if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
- return nil, errors.Trace(err)
- }
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "discoverCount": discoverCount,
- "error": err.Error(),
- }).Warning("DSL: doDiscoverServerEntriesRequest failed")
- common.SleepWithContext(
- ctx,
- prng.JitterDuration(
- f.config.RequestRetryDelay,
- f.config.RequestRetryDelayJitter))
- if discoverCount > 1 {
- discoverCount /= 2
- }
- }
- }
- func (f *Fetcher) doGetServerEntriesRequest(
- ctx context.Context,
- tags []ServerEntryTag) ([]*SourcedServerEntry, error) {
- // Perform the request with retries. On each retry, reduce the requested
- // response size to mitigate blocking or performance issues with larger
- // responses.
- for i := 0; ; i++ {
- request := &GetServerEntriesRequest{
- BaseAPIParameters: f.packedAPIParameters,
- ServerEntryTags: tags,
- }
- var response *GetServerEntriesResponse
- doRetry, err := f.doRelayedRequest(
- ctx, requestTypeGetServerEntries, request, &response)
- if err == nil && len(tags) != len(response.SourcedServerEntries) {
- err = errors.TraceNew("unexpected server entry count")
- }
- if err == nil {
- return response.SourcedServerEntries, nil
- }
- if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
- return nil, errors.Trace(err)
- }
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "attempt": i,
- "tagCount": len(tags),
- "error": err.Error(),
- }).Warning("DSL: doGetServerEntriesRequest attempt failed")
- common.SleepWithContext(
- ctx,
- prng.JitterDuration(
- f.config.RequestRetryDelay,
- f.config.RequestRetryDelayJitter))
- if len(tags) > 1 {
- n := len(tags) / 2
- tags = tags[:n]
- }
- }
- }
- func (f *Fetcher) doGetActiveOSLsRequest(ctx context.Context) ([]OSLID, error) {
- // Perform the request with retries. The response always includes all
- // current, active OSL IDs and is not reduced on retry.
- for i := 0; ; i++ {
- request := &GetActiveOSLsRequest{
- BaseAPIParameters: f.packedAPIParameters,
- }
- var response *GetActiveOSLsResponse
- doRetry, err := f.doRelayedRequest(
- ctx, requestTypeGetActiveOSLs, request, &response)
- if err == nil {
- return response.ActiveOSLIDs, nil
- }
- if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
- return nil, errors.Trace(err)
- }
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "attempt": i,
- "error": err.Error(),
- }).Warning("DSL: doGetActiveOSLsRequest attempt failed")
- common.SleepWithContext(
- ctx,
- prng.JitterDuration(
- f.config.RequestRetryDelay,
- f.config.RequestRetryDelayJitter))
- }
- }
- func (f *Fetcher) doGetOSLFileSpecsRequest(
- ctx context.Context, IDs []OSLID) ([]OSLFileSpec, error) {
- // Perform the request with retries. On each retry, reduce the requested
- // response size to mitigate blocking or performance issues with larger
- // responses.
- for i := 0; ; i++ {
- request := &GetOSLFileSpecsRequest{
- BaseAPIParameters: f.packedAPIParameters,
- OSLIDs: IDs,
- }
- var response *GetOSLFileSpecsResponse
- doRetry, err := f.doRelayedRequest(
- ctx, requestTypeGetOSLFileSpecs, request, &response)
- if err == nil && len(IDs) != len(response.OSLFileSpecs) {
- err = errors.TraceNew("unexpected OSL file spec count")
- }
- if err == nil {
- return response.OSLFileSpecs, nil
- }
- if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
- return nil, errors.Trace(err)
- }
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- "attempt": i,
- "OSLIDCount": len(IDs),
- "error": err.Error(),
- }).Warning("DSL: doGetOSLFileSpecsRequest attempt failed")
- common.SleepWithContext(
- ctx,
- prng.JitterDuration(
- f.config.RequestRetryDelay,
- f.config.RequestRetryDelayJitter))
- if len(IDs) > 1 {
- n := len(IDs) / 2
- IDs = IDs[:n]
- }
- }
- }
- func (f *Fetcher) doRelayedRequest(
- ctx context.Context,
- requestType int32,
- request any,
- response any) (retRetry bool, retErr error) {
- // Delay attempt to fetch while there is no network connectivity.
- if f.config.WaitForNetworkConnectivity != nil &&
- !f.config.WaitForNetworkConnectivity() {
- return false, errors.TraceNew("shutdown")
- }
- // Add the relay wrapping.
- cborRequest, err := protocol.CBOREncoding.Marshal(request)
- if err != nil {
- return false, errors.Trace(err)
- }
- cborRelayedRequest, err := protocol.CBOREncoding.Marshal(
- &RelayedRequest{
- RequestType: requestType,
- Version: requestVersion,
- Request: cborRequest,
- })
- if err != nil {
- return false, errors.Trace(err)
- }
- if len(cborRelayedRequest) > MaxRelayPayloadSize {
- return false, errors.Tracef(
- "request size %d exceeds limit %d", len(cborRelayedRequest), MaxRelayPayloadSize)
- }
- // Relay the request via the supplied RoundTripper.
- requestCtx := ctx
- if f.config.RequestTimeout > 0 {
- var requestCancelFunc context.CancelFunc
- requestCtx, requestCancelFunc = context.WithTimeout(ctx, f.config.RequestTimeout)
- defer requestCancelFunc()
- }
- cborRelayedResponse, err := f.config.RoundTripper(requestCtx, cborRelayedRequest)
- if err != nil {
- // Allow retries for in case of intermittent network failures or
- // potential blocking.
- //
- // TODO: check for specific retry-eligible errors from the RoundTripper?
- return true, errors.Trace(err)
- }
- // Remove the relay wrapping.
- var relayedResponse *RelayedResponse
- err = cbor.Unmarshal(cborRelayedResponse, &relayedResponse)
- if err != nil {
- return false, errors.Trace(err)
- }
- if relayedResponse.Error != 0 {
- // No retries if a response was received from the DSL backend.
- return false, errors.Tracef(
- "RelayedResponse.Error: %d", relayedResponse.Error)
- }
- uncompressedResponse, err := common.Decompress(
- relayedResponse.Compression, relayedResponse.Response)
- if err != nil {
- return false, errors.Trace(err)
- }
- err = cbor.Unmarshal(uncompressedResponse, response)
- if err != nil {
- return false, errors.Trace(err)
- }
- return false, nil
- }
- func (f *Fetcher) loadOSLStates(ctx context.Context, reassembleKeys bool) ([]*fetcherOSLState, error) {
- // Load just the set of known OSL IDs, and then process each OSL state one
- // at a time, to avoid loading all states into memory at once.
- activeIDs, err := f.config.DatastoreKnownOSLIDs()
- if err != nil {
- return nil, errors.Trace(err)
- }
- var states []*fetcherOSLState
- for _, ID := range activeIDs {
- cborState, err := f.config.DatastoreGetOSLState(ID)
- if err != nil {
- return nil, errors.Trace(err)
- }
- if cborState == nil {
- // This case is not expected since DatastoreKnownOSLIDs returns
- // only known IDs.
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- }).Warning("DSL: unexpected unknown OSL ID")
- continue
- }
- var state *fetcherOSLState
- err = cbor.Unmarshal(cborState, &state)
- if err != nil {
- return nil, errors.Trace(err)
- }
- if !bytes.Equal(ID, state.ID) {
- return nil, errors.TraceNew("unexpected OSL ID")
- }
- // TODO: sanity check FileSpec/Key fields match State?
- if state.State == oslStateHasFileSpec {
- // When we have the FileSpec, but not the reassembled key, attempt
- // reassembly from SLOKs. A reassembled key is stored back to the
- // datastore.
- if reassembleKeys {
- var fileSpec *osl.OSLFileSpec
- err = cbor.Unmarshal(state.FileSpec, &fileSpec)
- if err != nil {
- return nil, errors.Trace(err)
- }
- ok, key, err := osl.ReassembleOSLKey(fileSpec, f.config.DatastoreSLOKLookup)
- if err != nil {
- return nil, errors.Trace(err)
- }
- if ok {
- // Without the guarantee that there's only one concurrent
- // fetcher run, it's possible, with two concurrent
- // fetchers, that one prunes an OSL state after
- // GetActiveOSLsRequest, while the other calls
- // storeOSLState and incorrectly restores the pruned state.
- state.State = oslStateHasKey
- state.Key = key
- state.FileSpec = nil
- err = f.storeOSLState(ID, state)
- if err != nil {
- return nil, errors.Trace(err)
- }
- f.config.Logger.WithTraceFields(common.LogFields{
- "tunneled": f.config.Tunneled,
- }).Info("DSL: reassembled OSL key")
- }
- }
- // Allow state.FileSpec to be garbage collected.
- state.FileSpec = nil
- f.config.DoGarbageCollection()
- }
- states = append(states, state)
- }
- return states, nil
- }
- func (f *Fetcher) storeOSLState(ID OSLID, state *fetcherOSLState) error {
- cborState, err := protocol.CBOREncoding.Marshal(state)
- if err != nil {
- return errors.Trace(err)
- }
- err = f.config.DatastoreStoreOSLState(ID, cborState)
- if err != nil {
- return errors.Trace(err)
- }
- return nil
- }
|