fetcher.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922
  1. /*
  2. * Copyright (c) 2025, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package dsl
  20. import (
  21. "bytes"
  22. "context"
  23. "time"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  29. "github.com/fxamacker/cbor/v2"
  30. )
  31. // FetcherRoundTripper is a pluggable round trip transport that sends requests
  32. // to a relay and returns the corresponding response. The FetcherRoundTripper
  33. // connection to a relay typically provides obfuscation and blocking
  34. // resistance, enabling the client to reach the DSL backend via the relay.
  35. //
  36. // Round trippers include in-proxy broker clients, where the broker is a
  37. // relay; and SSH tunnel requests, where the Psiphon server is the relay.
  38. type FetcherRoundTripper func(
  39. ctx context.Context,
  40. requestPayload []byte) (responsePayload []byte, err error)
  41. // FetcherConfig specifies the configuration for a Fetcher.
  42. type FetcherConfig struct {
  43. Logger common.Logger
  44. BaseAPIParameters common.APIParameters
  45. Tunneled bool
  46. RoundTripper FetcherRoundTripper
  47. DatastoreGetLastFetchTime func() (time.Time, error)
  48. DatastoreSetLastFetchTime func(time time.Time) error
  49. DatastoreHasServerEntry func(
  50. tag ServerEntryTag,
  51. version int,
  52. prioritizeDial bool) bool
  53. DatastoreStoreServerEntry func(
  54. serverEntryFields protocol.PackedServerEntryFields,
  55. source string,
  56. prioritizeDial bool) error
  57. DatastoreGetLastActiveOSLsTime func() (time.Time, error)
  58. DatastoreSetLastActiveOSLsTime func(time time.Time) error
  59. DatastoreKnownOSLIDs func() (IDs []OSLID, err error)
  60. DatastoreGetOSLState func(ID OSLID) (state []byte, err error)
  61. DatastoreStoreOSLState func(ID OSLID, state []byte) error
  62. DatastoreDeleteOSLState func(ID OSLID) error
  63. DatastoreSLOKLookup osl.SLOKLookup
  64. DatastoreFatalError func(error)
  65. RequestTimeout time.Duration
  66. RequestRetryCount int
  67. RequestRetryDelay time.Duration
  68. RequestRetryDelayJitter float64
  69. FetchTTL time.Duration
  70. DiscoverServerEntriesMinCount int
  71. DiscoverServerEntriesMaxCount int
  72. GetServerEntriesMinCount int
  73. GetServerEntriesMaxCount int
  74. GetLastActiveOSLsTTL time.Duration
  75. GetOSLFileSpecsMinCount int
  76. GetOSLFileSpecsMaxCount int
  77. // WaitForNetworkConnectivity is an optional callback that should block
  78. // until there is network connectivity or shutdown. The return value is
  79. // true when there is network connectivity, and false for shutdown.
  80. WaitForNetworkConnectivity func() bool
  81. DoGarbageCollection func()
  82. }
  83. const (
  84. oslStateNoFileSpec = 1
  85. oslStateHasFileSpec = 2
  86. oslStateHasKey = 3
  87. )
  88. // fetcherOSLState is OSL state that's persisted to the datastore. For each
  89. // active OSL, the Fetcher will progressively download and persist the
  90. // corresponding FileSpec, and then attempt to reassemble the OSL key using
  91. // the FileSpec, persist any reassembled keys, and ultimately prune old OSL
  92. // state.
  93. type fetcherOSLState struct {
  94. ID OSLID `cbor:"1,keyasint,omitempty"`
  95. State int32 `cbor:"2,keyasint,omitempty"`
  96. FileSpec OSLFileSpec `cbor:"3,keyasint,omitempty"`
  97. Key OSLKey `cbor:"4,keyasint,omitempty"`
  98. }
  99. // Fetcher orchestrates discovering and downloading server entries from a DSL
  100. // backend, via a relay. A Fetcher also synchronizes active OSL state and
  101. // reassembles OSL keys to be used as discovery inputs.
  102. type Fetcher struct {
  103. config *FetcherConfig
  104. packedAPIParameters protocol.PackedAPIParameters
  105. }
  106. // NewFetcher creates a new Fetcher.
  107. func NewFetcher(config *FetcherConfig) (*Fetcher, error) {
  108. packedAPIParameters, err := protocol.EncodePackedAPIParameters(
  109. config.BaseAPIParameters)
  110. if err != nil {
  111. return nil, errors.Trace(err)
  112. }
  113. return &Fetcher{
  114. config: config,
  115. packedAPIParameters: packedAPIParameters,
  116. }, nil
  117. }
  118. // Run performs a server entry discovery/download and OSL synchronization
  119. // sequence.
  120. //
  121. // Each Run may make incremental progress. New OSL state or new server entries
  122. // may be downloaded and persisted even when Run ultimately fails and returns
  123. // an error.
  124. //
  125. // Run will stop and return immediately when the input ctx is done.
  126. //
  127. // Data is processed incrementally and DoGarbageCollection is invoked
  128. // periodically in order to limit the overall memory footprint of the Run.
  129. //
  130. // The caller MUST:
  131. //
  132. // - Schedule Fetcher runs when appropriate: when the client is unable to
  133. // connect, a full, non-frequent, untunneled fetcher run should be
  134. // triggered to potentially discover a large selection of servers; when the
  135. // client connects, a frequent fetcher run should be triggered to discover
  136. // a small number of servers.
  137. //
  138. // - Configure DiscoverServerEntriesMin/MaxCount using appropriate parameters
  139. // for the frequence/non-frequent mode.
  140. //
  141. // - Provide a cooldown time or delay between repeated Run calls when Run
  142. // returns an error.
  143. //
  144. // - Cease all Run invocations if the DatastoreFatalError callback is invoked.
  145. // In this case, a set-last-time datastore operation required for the
  146. // DiscoverServerEntriesTTL/GetLastActiveOSLsTTL mechanism has failed. The
  147. // calling client should not invoke Run again until after a Stop/Start
  148. // cycle.
  149. //
  150. // - Ensure that there's only one concurrent fetcher run. The datastore
  151. // operations are intended for incremental, persistent progress and
  152. // multiple concurrent runs may interleave conflicting datastore calls.
  153. // This requirement means that if there's an ongoing untunneled fetcher run
  154. // and a tunnel is established, any post-connected, frequent fetcher run
  155. // must be skipped or postponed.
  156. func (f *Fetcher) Run(ctx context.Context) error {
  157. lastTime, err := f.config.DatastoreGetLastFetchTime()
  158. if err != nil {
  159. return errors.Trace(err)
  160. }
  161. if time.Now().Before(lastTime.Add(f.config.FetchTTL)) {
  162. return nil
  163. }
  164. // processOSLs will:
  165. //
  166. // - check for new active OSLs, subject to GetLastActiveOSLsTTL
  167. // - download any OSL FileSpecs for known, active OSL IDs
  168. // - attempt to reassemble OSL keys for any unassembled OSLs
  169. // - return the list of assembled, active OSL keys
  170. OSLKeys, oslErr := f.processOSLs(ctx)
  171. if oslErr != nil {
  172. f.config.Logger.WithTraceFields(common.LogFields{
  173. "tunneled": f.config.Tunneled,
  174. "error": oslErr.Error(),
  175. }).Warning("DSL: process OSLs failed")
  176. // Proceed without OSL keys
  177. }
  178. // Discover server entries, identified by tag.
  179. // Vary the size of the requested response to avoid a trivial traffic
  180. // fingerprint.
  181. discoverCount := prng.Range(
  182. f.config.DiscoverServerEntriesMinCount,
  183. f.config.DiscoverServerEntriesMaxCount)
  184. versionedTags, err := f.doDiscoverServerEntriesRequest(
  185. ctx,
  186. OSLKeys,
  187. discoverCount)
  188. if err != nil {
  189. return errors.Trace(err)
  190. }
  191. // Check each discovered server entry tag and version. Skip when the
  192. // tag/version is already in the local datastore. Fetch the unknown or
  193. // updated server entries in batches.
  194. //
  195. // Datastore transactions are per server entry, to allow for incremental
  196. // progress in case of an error.
  197. storeServerEntriesCount := 0
  198. knownServerEntriesCount := 0
  199. tagCount := len(versionedTags)
  200. defer func() {
  201. // Emit log even if not all fetches succeed.
  202. f.config.Logger.WithTraceFields(common.LogFields{
  203. "tunneled": f.config.Tunneled,
  204. "tags": tagCount,
  205. "updated": storeServerEntriesCount,
  206. "known": knownServerEntriesCount,
  207. }).Info("DSL: fetched server entries")
  208. }()
  209. // A subset of versionedTags containing both the tag and prioritize flag
  210. // could be used here instead of two slices, but the memory impact of two
  211. // slices should be less, considering the DoGarbageCollection can reclaim
  212. // versionedTags, and we need a slice of tags (slice of getTags) to send
  213. // in GetServerEntries.
  214. //
  215. // As GetServerEntriesResponse will contain an entry (potentially nil) for
  216. // every requested server entry tag, in requested order, each index in
  217. // prioritizeDials corresponds both to the same index in getTags and the
  218. // sourcedServerEntries returned from doGetServerEntriesRequest.
  219. var getTags []ServerEntryTag
  220. var prioritizeDials []bool
  221. for _, v := range versionedTags {
  222. hasServerEntry := f.config.DatastoreHasServerEntry(
  223. v.Tag,
  224. int(v.Version),
  225. v.PrioritizeDial)
  226. if hasServerEntry {
  227. knownServerEntriesCount += 1
  228. continue
  229. }
  230. getTags = append(getTags, v.Tag)
  231. prioritizeDials = append(prioritizeDials, v.PrioritizeDial)
  232. }
  233. // Allow garbage collection.
  234. versionedTags = nil
  235. for len(getTags) > 0 {
  236. // Vary the size of the request and response.
  237. getCount := prng.Range(
  238. f.config.GetServerEntriesMinCount,
  239. f.config.GetServerEntriesMaxCount)
  240. getBatch := getTags
  241. if len(getBatch) > getCount {
  242. getBatch = getBatch[:getCount]
  243. }
  244. sourcedServerEntries, err := f.doGetServerEntriesRequest(ctx, getBatch)
  245. if err != nil {
  246. return errors.Trace(err)
  247. }
  248. for i, sourcedEntry := range sourcedServerEntries {
  249. if sourcedEntry == nil {
  250. // The requested server entry is no longer distributable or
  251. // doesn't exist.
  252. continue
  253. }
  254. err := f.config.DatastoreStoreServerEntry(
  255. sourcedEntry.ServerEntryFields,
  256. sourcedEntry.Source,
  257. prioritizeDials[i])
  258. if err != nil {
  259. return errors.Trace(err)
  260. }
  261. storeServerEntriesCount += 1
  262. }
  263. // doGetServerEntriesRequest will retry failed requests and reduces
  264. // the number of requested server entries in each retry. Adjust
  265. // getTags in case less than the initial getBatch were fetched.
  266. // Unfetched server entries will be added to the next batch.
  267. getTags = getTags[len(sourcedServerEntries):]
  268. prioritizeDials = prioritizeDials[len(sourcedServerEntries):]
  269. f.config.DoGarbageCollection()
  270. }
  271. err = f.config.DatastoreSetLastFetchTime(time.Now())
  272. if err != nil {
  273. err = errors.Trace(err)
  274. // Signal a fatal datastore error. The caller should not run any
  275. // Fetcher again, for the duration of its process, since the
  276. // LastDiscoverTime mechanism won't prevent excess repeats.
  277. f.config.DatastoreFatalError(err)
  278. f.config.Logger.WithTraceFields(common.LogFields{
  279. "tunneled": f.config.Tunneled,
  280. "error": err.Error(),
  281. }).Warning("DSL: datastore failed")
  282. // Proceed with this one run
  283. }
  284. if oslErr != nil {
  285. return errors.Trace(oslErr)
  286. }
  287. return nil
  288. }
  289. func (f *Fetcher) processOSLs(ctx context.Context) ([]OSLKey, error) {
  290. lastTime, err := f.config.DatastoreGetLastActiveOSLsTime()
  291. if err != nil {
  292. // TODO: proceed, but skip GetActiveOSLsRequest?
  293. return nil, errors.Trace(err)
  294. }
  295. now := time.Now()
  296. if now.After(lastTime.Add(f.config.GetLastActiveOSLsTTL)) {
  297. // When the last GetActiveOSLsRequest fetch expires, request the
  298. // current active OSLs again. Prune any locally stored OSL states for
  299. // OSLs that are no longer active. Add new OSL states for previously
  300. // unknown OSLs. These new OSLs states will trigger OSL FileSpec
  301. // fetches in the next step.
  302. // The size of the request and response is not varied in this case. In
  303. // practise, the number of active OSL IDs is expected to be
  304. // relatively small. The obfuscation hops to the relay should add a
  305. // small amount of random padding.
  306. activeOSLIDs, err := f.doGetActiveOSLsRequest(ctx)
  307. if err != nil {
  308. return nil, errors.Trace(err)
  309. }
  310. // Load known OSL states without attempting to reassemble OSL keys.
  311. knownOSLStates, err := f.loadOSLStates(ctx, false)
  312. if err != nil {
  313. return nil, errors.Trace(err)
  314. }
  315. addedCount := 0
  316. removedCount := 0
  317. for _, activeID := range activeOSLIDs {
  318. isKnown := false
  319. for _, knownState := range knownOSLStates {
  320. if bytes.Equal(activeID, knownState.ID) {
  321. isKnown = true
  322. break
  323. }
  324. }
  325. if !isKnown {
  326. err := f.storeOSLState(
  327. activeID,
  328. &fetcherOSLState{
  329. ID: activeID,
  330. State: oslStateNoFileSpec,
  331. })
  332. if err != nil {
  333. return nil, errors.Trace(err)
  334. }
  335. addedCount += 1
  336. }
  337. }
  338. for _, knownState := range knownOSLStates {
  339. isActive := false
  340. for _, activeID := range activeOSLIDs {
  341. if bytes.Equal(activeID, knownState.ID) {
  342. isActive = true
  343. break
  344. }
  345. }
  346. if !isActive {
  347. err := f.config.DatastoreDeleteOSLState(knownState.ID)
  348. if err != nil {
  349. return nil, errors.Trace(err)
  350. }
  351. removedCount += 1
  352. }
  353. }
  354. f.config.DoGarbageCollection()
  355. f.config.Logger.WithTraceFields(common.LogFields{
  356. "tunneled": f.config.Tunneled,
  357. "total": len(activeOSLIDs),
  358. "added": addedCount,
  359. "removed": removedCount,
  360. }).Info("DSL: fetched active OSL IDs")
  361. err = f.config.DatastoreSetLastActiveOSLsTime(now)
  362. if err != nil {
  363. err = errors.Trace(err)
  364. // Signal a fatal datastore error. The caller should not run any
  365. // Fetcher again, for the duration of its process, since the
  366. // LastActiveOSLsTime mechanism won't prevent excess repeats.
  367. f.config.DatastoreFatalError(errors.Trace(err))
  368. f.config.Logger.WithTraceFields(common.LogFields{
  369. "tunneled": f.config.Tunneled,
  370. "error": err.Error(),
  371. }).Warning("DSL: datastore failed")
  372. // Proceed with this one run
  373. }
  374. }
  375. // Load known OSL states, attempting to reassemble OSL keys. Any newly
  376. // assembled keys will be stored back to the datastore, caching the
  377. // assembly. For OSLs in the no-FileSpec state, the missing FileSpecs
  378. // will be fetched.
  379. knownOSLStates, err := f.loadOSLStates(ctx, true)
  380. if err != nil {
  381. return nil, errors.Trace(err)
  382. }
  383. addedSpecCount := 0
  384. removedSpecCount := 0
  385. defer func() {
  386. // Emit log even if not all fetches succeed.
  387. if addedSpecCount > 0 || removedSpecCount > 0 {
  388. f.config.Logger.WithTraceFields(common.LogFields{
  389. "tunneled": f.config.Tunneled,
  390. "added": addedSpecCount,
  391. "removed": removedSpecCount,
  392. }).Info("DSL: fetched OSL FileSpecs")
  393. }
  394. }()
  395. var getFileSpecs []OSLID
  396. for _, knownState := range knownOSLStates {
  397. if knownState.State == oslStateHasFileSpec ||
  398. knownState.State == oslStateHasKey {
  399. continue
  400. }
  401. getFileSpecs = append(getFileSpecs, knownState.ID)
  402. }
  403. for len(getFileSpecs) > 0 {
  404. // Vary the size of the request and response.
  405. getCount := prng.Range(
  406. f.config.GetOSLFileSpecsMinCount,
  407. f.config.GetOSLFileSpecsMaxCount)
  408. getBatch := getFileSpecs
  409. if len(getBatch) > getCount {
  410. getBatch = getBatch[:getCount]
  411. }
  412. fileSpecs, err := f.doGetOSLFileSpecsRequest(ctx, getBatch)
  413. if err != nil {
  414. return nil, errors.Trace(err)
  415. }
  416. for i, fileSpec := range fileSpecs {
  417. if len(fileSpec) > 0 {
  418. err := f.storeOSLState(
  419. getFileSpecs[i],
  420. &fetcherOSLState{
  421. ID: getFileSpecs[i],
  422. State: oslStateHasFileSpec,
  423. FileSpec: fileSpec})
  424. if err != nil {
  425. return nil, errors.Trace(err)
  426. }
  427. addedSpecCount += 1
  428. } else {
  429. // A nil/empty FileSpec in the response indicates that the
  430. // requested OSL ID is invalid or no longer active. Prune the OSL state.
  431. err := f.config.DatastoreDeleteOSLState(getBatch[i])
  432. if err != nil {
  433. return nil, errors.Trace(err)
  434. }
  435. removedSpecCount += 1
  436. }
  437. }
  438. // doGetOSLFileSpecsRequest will retry failed requests and reduces
  439. // the number of requested OSL FileSpecs in each retry. Adjust
  440. // getFileSpecs in case less than the initial getBatch were fetched.
  441. // Unfetched FileSpecs will be added to the next batch.
  442. getFileSpecs = getFileSpecs[len(fileSpecs):]
  443. f.config.DoGarbageCollection()
  444. }
  445. if addedSpecCount > 0 || removedSpecCount > 0 {
  446. // Repeat attempting to reassemble OSL keys, since new FileSpecs were
  447. // downloaded. This case also prunes any now-removed OSLs so their keys
  448. // will not be included in the return value.
  449. knownOSLStates, err = f.loadOSLStates(ctx, true)
  450. if err != nil {
  451. return nil, errors.Trace(err)
  452. }
  453. }
  454. var keys []OSLKey
  455. for _, knownState := range knownOSLStates {
  456. if knownState.State == oslStateHasKey {
  457. keys = append(keys, knownState.Key)
  458. }
  459. }
  460. return keys, nil
  461. }
  462. func (f *Fetcher) doDiscoverServerEntriesRequest(
  463. ctx context.Context,
  464. keys []OSLKey,
  465. discoverCount int) ([]*VersionedServerEntryTag, error) {
  466. // Perform the request with retries. On each retry, reduce the requested
  467. // response size to mitigate blocking or performance issues with larger
  468. // responses.
  469. for i := 0; ; i++ {
  470. // All known OSL keys are sent in the request. In practise, the number
  471. // of active OSL IDs is expected to be relatively small.
  472. request := &DiscoverServerEntriesRequest{
  473. BaseAPIParameters: f.packedAPIParameters,
  474. OSLKeys: keys,
  475. DiscoverCount: int32(discoverCount),
  476. }
  477. var response *DiscoverServerEntriesResponse
  478. doRetry, err := f.doRelayedRequest(
  479. ctx, requestTypeDiscoverServerEntries, request, &response)
  480. if err == nil {
  481. return response.VersionedServerEntryTags, nil
  482. }
  483. if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
  484. return nil, errors.Trace(err)
  485. }
  486. f.config.Logger.WithTraceFields(common.LogFields{
  487. "tunneled": f.config.Tunneled,
  488. "discoverCount": discoverCount,
  489. "error": err.Error(),
  490. }).Warning("DSL: doDiscoverServerEntriesRequest failed")
  491. common.SleepWithContext(
  492. ctx,
  493. prng.JitterDuration(
  494. f.config.RequestRetryDelay,
  495. f.config.RequestRetryDelayJitter))
  496. if discoverCount > 1 {
  497. discoverCount /= 2
  498. }
  499. }
  500. }
  501. func (f *Fetcher) doGetServerEntriesRequest(
  502. ctx context.Context,
  503. tags []ServerEntryTag) ([]*SourcedServerEntry, error) {
  504. // Perform the request with retries. On each retry, reduce the requested
  505. // response size to mitigate blocking or performance issues with larger
  506. // responses.
  507. for i := 0; ; i++ {
  508. request := &GetServerEntriesRequest{
  509. BaseAPIParameters: f.packedAPIParameters,
  510. ServerEntryTags: tags,
  511. }
  512. var response *GetServerEntriesResponse
  513. doRetry, err := f.doRelayedRequest(
  514. ctx, requestTypeGetServerEntries, request, &response)
  515. if err == nil && len(tags) != len(response.SourcedServerEntries) {
  516. err = errors.TraceNew("unexpected server entry count")
  517. }
  518. if err == nil {
  519. return response.SourcedServerEntries, nil
  520. }
  521. if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
  522. return nil, errors.Trace(err)
  523. }
  524. f.config.Logger.WithTraceFields(common.LogFields{
  525. "tunneled": f.config.Tunneled,
  526. "attempt": i,
  527. "tagCount": len(tags),
  528. "error": err.Error(),
  529. }).Warning("DSL: doGetServerEntriesRequest attempt failed")
  530. common.SleepWithContext(
  531. ctx,
  532. prng.JitterDuration(
  533. f.config.RequestRetryDelay,
  534. f.config.RequestRetryDelayJitter))
  535. if len(tags) > 1 {
  536. n := len(tags) / 2
  537. tags = tags[:n]
  538. }
  539. }
  540. }
  541. func (f *Fetcher) doGetActiveOSLsRequest(ctx context.Context) ([]OSLID, error) {
  542. // Perform the request with retries. The response always includes all
  543. // current, active OSL IDs and is not reduced on retry.
  544. for i := 0; ; i++ {
  545. request := &GetActiveOSLsRequest{
  546. BaseAPIParameters: f.packedAPIParameters,
  547. }
  548. var response *GetActiveOSLsResponse
  549. doRetry, err := f.doRelayedRequest(
  550. ctx, requestTypeGetActiveOSLs, request, &response)
  551. if err == nil {
  552. return response.ActiveOSLIDs, nil
  553. }
  554. if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
  555. return nil, errors.Trace(err)
  556. }
  557. f.config.Logger.WithTraceFields(common.LogFields{
  558. "tunneled": f.config.Tunneled,
  559. "attempt": i,
  560. "error": err.Error(),
  561. }).Warning("DSL: doGetActiveOSLsRequest attempt failed")
  562. common.SleepWithContext(
  563. ctx,
  564. prng.JitterDuration(
  565. f.config.RequestRetryDelay,
  566. f.config.RequestRetryDelayJitter))
  567. }
  568. }
  569. func (f *Fetcher) doGetOSLFileSpecsRequest(
  570. ctx context.Context, IDs []OSLID) ([]OSLFileSpec, error) {
  571. // Perform the request with retries. On each retry, reduce the requested
  572. // response size to mitigate blocking or performance issues with larger
  573. // responses.
  574. for i := 0; ; i++ {
  575. request := &GetOSLFileSpecsRequest{
  576. BaseAPIParameters: f.packedAPIParameters,
  577. OSLIDs: IDs,
  578. }
  579. var response *GetOSLFileSpecsResponse
  580. doRetry, err := f.doRelayedRequest(
  581. ctx, requestTypeGetOSLFileSpecs, request, &response)
  582. if err == nil && len(IDs) != len(response.OSLFileSpecs) {
  583. err = errors.TraceNew("unexpected OSL file spec count")
  584. }
  585. if err == nil {
  586. return response.OSLFileSpecs, nil
  587. }
  588. if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
  589. return nil, errors.Trace(err)
  590. }
  591. f.config.Logger.WithTraceFields(common.LogFields{
  592. "tunneled": f.config.Tunneled,
  593. "attempt": i,
  594. "OSLIDCount": len(IDs),
  595. "error": err.Error(),
  596. }).Warning("DSL: doGetOSLFileSpecsRequest attempt failed")
  597. common.SleepWithContext(
  598. ctx,
  599. prng.JitterDuration(
  600. f.config.RequestRetryDelay,
  601. f.config.RequestRetryDelayJitter))
  602. if len(IDs) > 1 {
  603. n := len(IDs) / 2
  604. IDs = IDs[:n]
  605. }
  606. }
  607. }
  608. func (f *Fetcher) doRelayedRequest(
  609. ctx context.Context,
  610. requestType int32,
  611. request any,
  612. response any) (retRetry bool, retErr error) {
  613. // Delay attempt to fetch while there is no network connectivity.
  614. if f.config.WaitForNetworkConnectivity != nil &&
  615. !f.config.WaitForNetworkConnectivity() {
  616. return false, errors.TraceNew("shutdown")
  617. }
  618. // Add the relay wrapping.
  619. cborRequest, err := protocol.CBOREncoding.Marshal(request)
  620. if err != nil {
  621. return false, errors.Trace(err)
  622. }
  623. cborRelayedRequest, err := protocol.CBOREncoding.Marshal(
  624. &RelayedRequest{
  625. RequestType: requestType,
  626. Version: requestVersion,
  627. Request: cborRequest,
  628. })
  629. if err != nil {
  630. return false, errors.Trace(err)
  631. }
  632. if len(cborRelayedRequest) > MaxRelayPayloadSize {
  633. return false, errors.Tracef(
  634. "request size %d exceeds limit %d", len(cborRelayedRequest), MaxRelayPayloadSize)
  635. }
  636. // Relay the request via the supplied RoundTripper.
  637. requestCtx := ctx
  638. if f.config.RequestTimeout > 0 {
  639. var requestCancelFunc context.CancelFunc
  640. requestCtx, requestCancelFunc = context.WithTimeout(ctx, f.config.RequestTimeout)
  641. defer requestCancelFunc()
  642. }
  643. cborRelayedResponse, err := f.config.RoundTripper(requestCtx, cborRelayedRequest)
  644. if err != nil {
  645. // Allow retries for in case of intermittent network failures or
  646. // potential blocking.
  647. //
  648. // TODO: check for specific retry-eligible errors from the RoundTripper?
  649. return true, errors.Trace(err)
  650. }
  651. // Remove the relay wrapping.
  652. var relayedResponse *RelayedResponse
  653. err = cbor.Unmarshal(cborRelayedResponse, &relayedResponse)
  654. if err != nil {
  655. return false, errors.Trace(err)
  656. }
  657. if relayedResponse.Error != 0 {
  658. // No retries if a response was received from the DSL backend.
  659. return false, errors.Tracef(
  660. "RelayedResponse.Error: %d", relayedResponse.Error)
  661. }
  662. uncompressedResponse, err := common.Decompress(
  663. relayedResponse.Compression, relayedResponse.Response)
  664. if err != nil {
  665. return false, errors.Trace(err)
  666. }
  667. err = cbor.Unmarshal(uncompressedResponse, response)
  668. if err != nil {
  669. return false, errors.Trace(err)
  670. }
  671. return false, nil
  672. }
  673. func (f *Fetcher) loadOSLStates(ctx context.Context, reassembleKeys bool) ([]*fetcherOSLState, error) {
  674. // Load just the set of known OSL IDs, and then process each OSL state one
  675. // at a time, to avoid loading all states into memory at once.
  676. activeIDs, err := f.config.DatastoreKnownOSLIDs()
  677. if err != nil {
  678. return nil, errors.Trace(err)
  679. }
  680. var states []*fetcherOSLState
  681. for _, ID := range activeIDs {
  682. cborState, err := f.config.DatastoreGetOSLState(ID)
  683. if err != nil {
  684. return nil, errors.Trace(err)
  685. }
  686. if cborState == nil {
  687. // This case is not expected since DatastoreKnownOSLIDs returns
  688. // only known IDs.
  689. f.config.Logger.WithTraceFields(common.LogFields{
  690. "tunneled": f.config.Tunneled,
  691. }).Warning("DSL: unexpected unknown OSL ID")
  692. continue
  693. }
  694. var state *fetcherOSLState
  695. err = cbor.Unmarshal(cborState, &state)
  696. if err != nil {
  697. return nil, errors.Trace(err)
  698. }
  699. if !bytes.Equal(ID, state.ID) {
  700. return nil, errors.TraceNew("unexpected OSL ID")
  701. }
  702. // TODO: sanity check FileSpec/Key fields match State?
  703. if state.State == oslStateHasFileSpec {
  704. // When we have the FileSpec, but not the reassembled key, attempt
  705. // reassembly from SLOKs. A reassembled key is stored back to the
  706. // datastore.
  707. if reassembleKeys {
  708. var fileSpec *osl.OSLFileSpec
  709. err = cbor.Unmarshal(state.FileSpec, &fileSpec)
  710. if err != nil {
  711. return nil, errors.Trace(err)
  712. }
  713. ok, key, err := osl.ReassembleOSLKey(fileSpec, f.config.DatastoreSLOKLookup)
  714. if err != nil {
  715. return nil, errors.Trace(err)
  716. }
  717. if ok {
  718. // Without the guarantee that there's only one concurrent
  719. // fetcher run, it's possible, with two concurrent
  720. // fetchers, that one prunes an OSL state after
  721. // GetActiveOSLsRequest, while the other calls
  722. // storeOSLState and incorrectly restores the pruned state.
  723. state.State = oslStateHasKey
  724. state.Key = key
  725. state.FileSpec = nil
  726. err = f.storeOSLState(ID, state)
  727. if err != nil {
  728. return nil, errors.Trace(err)
  729. }
  730. f.config.Logger.WithTraceFields(common.LogFields{
  731. "tunneled": f.config.Tunneled,
  732. }).Info("DSL: reassembled OSL key")
  733. }
  734. }
  735. // Allow state.FileSpec to be garbage collected.
  736. state.FileSpec = nil
  737. f.config.DoGarbageCollection()
  738. }
  739. states = append(states, state)
  740. }
  741. return states, nil
  742. }
  743. func (f *Fetcher) storeOSLState(ID OSLID, state *fetcherOSLState) error {
  744. cborState, err := protocol.CBOREncoding.Marshal(state)
  745. if err != nil {
  746. return errors.Trace(err)
  747. }
  748. err = f.config.DatastoreStoreOSLState(ID, cborState)
  749. if err != nil {
  750. return errors.Trace(err)
  751. }
  752. return nil
  753. }