dsl.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /*
  2. * Copyright (c) 2025, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "context"
  22. "sync/atomic"
  23. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/dsl"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  29. )
  30. func runUntunneledDSLFetcher(
  31. ctx context.Context,
  32. config *Config,
  33. brokerClientManager *InproxyBrokerClientManager,
  34. signal <-chan struct{}) {
  35. NoticeInfo("running untunneled DSL fetcher")
  36. fetcherLoop:
  37. for !disableDSLFetches.Load() {
  38. select {
  39. case <-signal:
  40. case <-ctx.Done():
  41. break fetcherLoop
  42. }
  43. isTunneled := false
  44. // Log the error notice for all errors in this block.
  45. err := func() error {
  46. networkID := config.GetNetworkID()
  47. brokerClient, _, err := brokerClientManager.GetBrokerClient(networkID)
  48. if err != nil {
  49. return errors.Trace(err)
  50. }
  51. roundTripper := func(
  52. ctx context.Context,
  53. requestPayload []byte) ([]byte, error) {
  54. response, err := brokerClient.ClientDSL(
  55. ctx,
  56. &inproxy.ClientDSLRequest{
  57. RequestPayload: requestPayload,
  58. })
  59. if err != nil {
  60. return nil, errors.Trace(err)
  61. }
  62. return response.ResponsePayload, nil
  63. }
  64. // Detailed logging, retries, last request times, and
  65. // WaitForNetworkConnectivity are all handled inside dsl.Fetcher.
  66. // There is no equivilent to RecordRemoteServerListStat or
  67. // remote_server_list, since the DSL backend will log DSL request events.
  68. //
  69. // TODO: add a failed_dsl_request log, similar to failed_tunnel,
  70. // to record and report failures?
  71. err = doDSLFetch(ctx, config, networkID, isTunneled, roundTripper)
  72. if err != nil {
  73. return errors.Trace(err)
  74. }
  75. return nil
  76. }()
  77. if err != nil {
  78. NoticeError("untunneled DSL fetch failed: %v", errors.Trace(err))
  79. // No cooldown pause, since controller.triggerFetches isn't be
  80. // called in a tight loop.
  81. }
  82. }
  83. NoticeInfo("exiting untunneled DSL fetcher")
  84. }
  85. func runTunneledDSLFetcher(
  86. ctx context.Context,
  87. config *Config,
  88. getActiveTunnel func() *Tunnel,
  89. signal <-chan struct{}) {
  90. NoticeInfo("running tunneled DSL fetcher")
  91. fetcherLoop:
  92. for !disableDSLFetches.Load() {
  93. select {
  94. case <-signal:
  95. case <-ctx.Done():
  96. break fetcherLoop
  97. }
  98. tunnel := getActiveTunnel()
  99. if tunnel == nil {
  100. continue
  101. }
  102. isTunneled := true
  103. networkID := config.GetNetworkID()
  104. roundTripper := func(
  105. ctx context.Context,
  106. requestPayload []byte) ([]byte, error) {
  107. // The request ctx is ignored; tunnel.SendAPIRequest does not
  108. // support a request context. In practise, the input ctx is
  109. // controller.runCtx which includes the full lifetime of the
  110. // tunnel. When a tunnel closes, any in-flight SendAPIRequest
  111. // will be interrupted and not block.
  112. responsePayload, err := tunnel.SendAPIRequest(
  113. protocol.PSIPHON_API_DSL_REQUEST_NAME, requestPayload)
  114. return responsePayload, errors.Trace(err)
  115. }
  116. // Detailed logging, retries, last request times, and
  117. // WaitForNetworkConnectivity are all handled inside dsl.Fetcher.
  118. err := doDSLFetch(ctx, config, networkID, isTunneled, roundTripper)
  119. if err != nil {
  120. NoticeError("tunneled DSL fetch failed: %v", errors.Trace(err))
  121. // No cooldown pause, since runTunneledDSLFetcher is called only
  122. // once after fully connecting.
  123. }
  124. }
  125. NoticeInfo("exiting tunneled DSL fetcher")
  126. }
  127. func doDSLFetch(
  128. ctx context.Context,
  129. config *Config,
  130. networkID string,
  131. isTunneled bool,
  132. roundTripper dsl.FetcherRoundTripper) error {
  133. p := config.GetParameters().Get()
  134. if !p.Bool(parameters.EnableDSLFetcher) {
  135. p.Close()
  136. return nil
  137. }
  138. var paddingPRNG *prng.PRNG
  139. if isTunneled {
  140. // For a tunneled request, padding is added via the params since
  141. // there's no random padding at the SSH request layer. The PRNG seed
  142. // is not replayed.
  143. paddingPRNG = prng.DefaultPRNG()
  144. }
  145. includeSessionID := true
  146. baseAPIParams := getBaseAPIParameters(
  147. baseParametersNoDialParameters,
  148. paddingPRNG,
  149. includeSessionID,
  150. config,
  151. nil)
  152. // Copied from FetchObfuscatedServerLists.
  153. //
  154. // Prevent excessive notice noise in cases such as a general database
  155. // failure, as GetSLOK may be called thousands of times per fetch.
  156. emittedGetSLOKAlert := int32(0)
  157. lookupSLOKs := func(slokID []byte) []byte {
  158. key, err := GetSLOK(slokID)
  159. if err != nil && atomic.CompareAndSwapInt32(&emittedGetSLOKAlert, 0, 1) {
  160. NoticeWarning("GetSLOK failed: %s", err)
  161. }
  162. return key
  163. }
  164. // hasServerEntry and storeServerEntry handle PrioritizeDial hints from
  165. // the DSL backend for existing or new server entries respectively.
  166. //
  167. // In each case, a probability is applied to tune the rate of DSL
  168. // prioritization since it impacts the rate of replay. DSL
  169. // prioritizations don't _replace_ existing replay dial parameter
  170. // records, but new DSLPendingPrioritizeDial dial parameters
  171. // can _displace_ regular replays in the move-to-front server entry
  172. // iterator shuffle. It's not clear a priori which out of replay or DSL
  173. // prioritization is the optimal choice; the intention is to try a mix.
  174. //
  175. // When there's already an existing replay dial parameters for a server
  176. // entry, no DSLPendingPrioritizeDial placeholder is created since any
  177. // record suffices to move-to-front, and a non-expired replay dial
  178. // parameters record can be more useful. As a result, there's no
  179. // dsl_prioritized metric reported for cases where the client is already
  180. // going to prioritize selecting a server entry.
  181. //
  182. // Limitation: For existing server entries, the client could already know
  183. // that the server entry is not successful, but that knowledge is not
  184. // applied here; instead, DSLPrioritizeDialExistingServerEntryProbability
  185. // can merely be tuned lower. There could be failed_tunnel persistent
  186. // stats with the server entry tag, but that data is only temporary, is
  187. // truncated aggressively, and is expensive to unmarshal and process. A
  188. // potential future enhancement would be to store a less ephemeral and
  189. // simpler record of recent failures.
  190. //
  191. // Another potential future enhancement may be to count the number of
  192. // existing replay records, including a TTL check, and use that count to
  193. // adjust the rate of creating DSLPendingPrioritizeDial records.
  194. prioritizeDialNewServerEntryProbability :=
  195. p.Float(parameters.DSLPrioritizeDialNewServerEntryProbability)
  196. prioritizeDialExistingServerEntryProbability :=
  197. p.Float(parameters.DSLPrioritizeDialExistingServerEntryProbability)
  198. hasServerEntry := func(
  199. tag dsl.ServerEntryTag,
  200. version int,
  201. prioritizeDial bool) bool {
  202. prioritizeDial = prioritizeDial &&
  203. prng.FlipWeightedCoin(prioritizeDialExistingServerEntryProbability)
  204. return DSLHasServerEntry(
  205. tag,
  206. version,
  207. prioritizeDial,
  208. networkID)
  209. }
  210. storeServerEntry := func(
  211. packedServerEntryFields protocol.PackedServerEntryFields,
  212. source string,
  213. prioritizeDial bool) error {
  214. prioritizeDial = prioritizeDial &&
  215. prng.FlipWeightedCoin(prioritizeDialNewServerEntryProbability)
  216. return errors.Trace(
  217. DSLStoreServerEntry(
  218. config.ServerEntrySignaturePublicKey,
  219. packedServerEntryFields,
  220. source,
  221. prioritizeDial,
  222. networkID))
  223. }
  224. c := &dsl.FetcherConfig{
  225. Logger: NoticeCommonLogger(false),
  226. BaseAPIParameters: baseAPIParams,
  227. Tunneled: isTunneled,
  228. RoundTripper: roundTripper,
  229. DatastoreHasServerEntry: hasServerEntry,
  230. DatastoreStoreServerEntry: storeServerEntry,
  231. DatastoreGetLastActiveOSLsTime: DSLGetLastActiveOSLsTime,
  232. DatastoreSetLastActiveOSLsTime: DSLSetLastActiveOSLsTime,
  233. DatastoreKnownOSLIDs: DSLKnownOSLIDs,
  234. DatastoreGetOSLState: DSLGetOSLState,
  235. DatastoreStoreOSLState: DSLStoreOSLState,
  236. DatastoreDeleteOSLState: DSLDeleteOSLState,
  237. DatastoreSLOKLookup: lookupSLOKs,
  238. DatastoreFatalError: onDSLDatastoreFatalError,
  239. DoGarbageCollection: DoGarbageCollection,
  240. }
  241. if isTunneled {
  242. c.DatastoreGetLastFetchTime = DSLGetLastTunneledFetchTime
  243. c.DatastoreSetLastFetchTime = DSLSetLastTunneledFetchTime
  244. c.RequestTimeout = p.Duration(parameters.DSLFetcherTunneledRequestTimeout)
  245. c.RequestRetryCount = p.Int(parameters.DSLFetcherTunneledRequestRetryCount)
  246. c.RequestRetryDelay = p.Duration(parameters.DSLFetcherTunneledRequestRetryDelay)
  247. c.RequestRetryDelayJitter = p.Float(parameters.DSLFetcherTunneledRequestRetryDelayJitter)
  248. c.FetchTTL = p.Duration(parameters.DSLFetcherTunneledFetchTTL)
  249. c.DiscoverServerEntriesMinCount = p.Int(parameters.DSLFetcherTunneledDiscoverServerEntriesMinCount)
  250. c.DiscoverServerEntriesMaxCount = p.Int(parameters.DSLFetcherTunneledDiscoverServerEntriesMaxCount)
  251. c.GetServerEntriesMinCount = p.Int(parameters.DSLFetcherTunneledGetServerEntriesMinCount)
  252. c.GetServerEntriesMaxCount = p.Int(parameters.DSLFetcherTunneledGetServerEntriesMaxCount)
  253. // WaitForNetworkConnectivity is not wired up in this case since
  254. // tunnel must be connected. If the tunnel becomes disconnected due
  255. // to loss of network connectivity, prefer to fail this request and
  256. // try again, with a new tunnel, after reconnecting.
  257. } else {
  258. c.DatastoreGetLastFetchTime = DSLGetLastUntunneledFetchTime
  259. c.DatastoreSetLastFetchTime = DSLSetLastUntunneledFetchTime
  260. c.RequestTimeout = p.Duration(parameters.DSLFetcherUntunneledRequestTimeout)
  261. c.RequestRetryCount = p.Int(parameters.DSLFetcherUntunneledRequestRetryCount)
  262. c.RequestRetryDelay = p.Duration(parameters.DSLFetcherUntunneledRequestRetryDelay)
  263. c.RequestRetryDelayJitter = p.Float(parameters.DSLFetcherUntunneledRequestRetryDelayJitter)
  264. c.FetchTTL = p.Duration(parameters.DSLFetcherUntunneledFetchTTL)
  265. c.DiscoverServerEntriesMinCount = p.Int(parameters.DSLFetcherUntunneledDiscoverServerEntriesMinCount)
  266. c.DiscoverServerEntriesMaxCount = p.Int(parameters.DSLFetcherUntunneledDiscoverServerEntriesMaxCount)
  267. c.GetServerEntriesMinCount = p.Int(parameters.DSLFetcherUntunneledGetServerEntriesMinCount)
  268. c.GetServerEntriesMaxCount = p.Int(parameters.DSLFetcherUntunneledGetServerEntriesMaxCount)
  269. c.WaitForNetworkConnectivity = func() bool {
  270. return WaitForNetworkConnectivity(ctx, config.NetworkConnectivityChecker, nil)
  271. }
  272. }
  273. c.GetLastActiveOSLsTTL = p.Duration(parameters.DSLFetcherGetLastActiveOSLsTTL)
  274. c.GetOSLFileSpecsMinCount = p.Int(parameters.DSLFetcherGetOSLFileSpecsMinCount)
  275. c.GetOSLFileSpecsMaxCount = p.Int(parameters.DSLFetcherGetOSLFileSpecsMaxCount)
  276. p.Close()
  277. fetcher, err := dsl.NewFetcher(c)
  278. if err != nil {
  279. return errors.Trace(err)
  280. }
  281. err = fetcher.Run(ctx)
  282. if err != nil {
  283. return errors.Trace(err)
  284. }
  285. return nil
  286. }
  287. var disableDSLFetches atomic.Bool
  288. func onDSLDatastoreFatalError(_ error) {
  289. // Halt all DSL requests for the duration of the process on a
  290. // DatastoreFatalError, which includes failure to set the last request
  291. // time. This avoids continuous DSL request in this scenario.
  292. disableDSLFetches.Store(true)
  293. }