Просмотр исходного кода

Client-side DSL request integration

Rod Hynes 5 месяцев назад
Родитель
Сommit
874f7a9860

+ 11 - 11
psiphon/common/dsl/dsl_test.go

@@ -268,8 +268,8 @@ func testDSLs(testConfig *testConfig) error {
 
 		RoundTripper: clientRelayRoundTripper,
 
-		DatastoreGetLastDiscoverTime:   dslClient.DatastoreGetLastDiscoverTime,
-		DatastoreSetLastDiscoverTime:   dslClient.DatastoreSetLastDiscoverTime,
+		DatastoreGetLastFetchTime:      dslClient.DatastoreGetLastFetchTime,
+		DatastoreSetLastFetchTime:      dslClient.DatastoreSetLastFetchTime,
 		DatastoreGetLastActiveOSLsTime: dslClient.DatastoreGetLastActiveOSLsTime,
 		DatastoreSetLastActiveOSLsTime: dslClient.DatastoreSetLastActiveOSLsTime,
 		DatastoreHasServerEntry:        dslClient.DatastoreHasServerEntry,
@@ -285,7 +285,7 @@ func testDSLs(testConfig *testConfig) error {
 		RequestRetryDelay:       1 * time.Millisecond,
 		RequestRetryDelayJitter: 0.1,
 
-		DiscoverServerEntriesTTL:      1 * time.Hour,
+		FetchTTL:                      1 * time.Hour,
 		DiscoverServerEntriesMinCount: discoverCount,
 		DiscoverServerEntriesMaxCount: discoverCount,
 		GetServerEntriesMinCount:      getCount,
@@ -345,7 +345,7 @@ func testDSLs(testConfig *testConfig) error {
 		// server entries stores, as will be checked via
 		// dslClient.serverEntryStoreCount.
 
-		dslClient.lastDiscoverTime = time.Time{}
+		dslClient.lastFetchTime = time.Time{}
 		dslClient.lastActiveOSLsTime = time.Time{}
 
 		err = fetcher.Run(ctx)
@@ -366,7 +366,7 @@ func testDSLs(testConfig *testConfig) error {
 		// dslClient.serverEntryStoreCount check will demonstrate that all
 		// remaining server entries were downloaded and stored.
 
-		dslClient.lastDiscoverTime = time.Time{}
+		dslClient.lastFetchTime = time.Time{}
 
 		discoverCount = 128
 
@@ -395,7 +395,7 @@ func testDSLs(testConfig *testConfig) error {
 		// fetcher cleans up the old, no longer active OSL state via
 		// dslClient.deleteOSLStateCount.
 
-		dslClient.lastDiscoverTime = time.Time{}
+		dslClient.lastFetchTime = time.Time{}
 		dslClient.lastActiveOSLsTime = time.Time{}
 
 		dslClient.serverEntries = make(map[string]protocol.ServerEntryFields)
@@ -428,7 +428,7 @@ func testDSLs(testConfig *testConfig) error {
 
 type dslClient struct {
 	mutex                 sync.Mutex
-	lastDiscoverTime      time.Time
+	lastFetchTime         time.Time
 	lastActiveOSLsTime    time.Time
 	serverEntries         map[string]protocol.ServerEntryFields
 	serverEntryStoreCount int
@@ -445,18 +445,18 @@ func newDSLClient(SLOKs []*osl.SLOK) *dslClient {
 	}
 }
 
-func (c *dslClient) DatastoreGetLastDiscoverTime() (time.Time, error) {
+func (c *dslClient) DatastoreGetLastFetchTime() (time.Time, error) {
 	c.mutex.Lock()
 	defer c.mutex.Unlock()
 
-	return c.lastDiscoverTime, nil
+	return c.lastFetchTime, nil
 }
 
-func (c *dslClient) DatastoreSetLastDiscoverTime(time time.Time) error {
+func (c *dslClient) DatastoreSetLastFetchTime(time time.Time) error {
 	c.mutex.Lock()
 	defer c.mutex.Unlock()
 
-	c.lastDiscoverTime = time
+	c.lastFetchTime = time
 	return nil
 }
 

+ 8 - 8
psiphon/common/dsl/fetcher.go

@@ -51,10 +51,10 @@ type FetcherConfig struct {
 
 	RoundTripper FetcherRoundTripper
 
-	DatastoreGetLastDiscoverTime func() (time.Time, error)
-	DatastoreSetLastDiscoverTime func(time time.Time) error
-	DatastoreHasServerEntry      func(tag ServerEntryTag, version int) bool
-	DatastoreStoreServerEntry    func(
+	DatastoreGetLastFetchTime func() (time.Time, error)
+	DatastoreSetLastFetchTime func(time time.Time) error
+	DatastoreHasServerEntry   func(tag ServerEntryTag, version int) bool
+	DatastoreStoreServerEntry func(
 		serverEntryFields protocol.PackedServerEntryFields,
 		source string) error
 
@@ -72,7 +72,7 @@ type FetcherConfig struct {
 	RequestRetryDelay       time.Duration
 	RequestRetryDelayJitter float64
 
-	DiscoverServerEntriesTTL      time.Duration
+	FetchTTL                      time.Duration
 	DiscoverServerEntriesMinCount int
 	DiscoverServerEntriesMaxCount int
 	GetServerEntriesMinCount      int
@@ -170,12 +170,12 @@ func NewFetcher(config *FetcherConfig) (*Fetcher, error) {
 //     must be skipped or postponed.
 func (f *Fetcher) Run(ctx context.Context) error {
 
-	lastTime, err := f.config.DatastoreGetLastDiscoverTime()
+	lastTime, err := f.config.DatastoreGetLastFetchTime()
 	if err != nil {
 		return errors.Trace(err)
 	}
 
-	if time.Now().Before(lastTime.Add(f.config.DiscoverServerEntriesTTL)) {
+	if time.Now().Before(lastTime.Add(f.config.FetchTTL)) {
 		return nil
 	}
 
@@ -281,7 +281,7 @@ func (f *Fetcher) Run(ctx context.Context) error {
 		f.config.DoGarbageCollection()
 	}
 
-	err = f.config.DatastoreSetLastDiscoverTime(time.Now())
+	err = f.config.DatastoreSetLastFetchTime(time.Now())
 	if err != nil {
 		err = errors.Trace(err)
 

+ 30 - 0
psiphon/common/inproxy/brokerClient.go

@@ -36,6 +36,7 @@ const (
 	proxyAnswerRequestTimeout         = 10 * time.Second
 	clientOfferRequestTimeout         = 10 * time.Second
 	clientRelayedPacketRequestTimeout = 10 * time.Second
+	clientDSLRequestTimeout           = 35 * time.Second
 )
 
 // BrokerClient is used to make requests to a broker.
@@ -231,6 +232,35 @@ func (b *BrokerClient) ClientRelayedPacket(
 	return response, nil
 }
 
+// ClientRelayedPacket sends a ClientRelayedPacket request and returns the
+// response.
+func (b *BrokerClient) ClientDSL(
+	ctx context.Context,
+	request *ClientDSLRequest) (*ClientDSLResponse, error) {
+
+	requestPayload, err := MarshalClientDSLRequest(request)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	requestTimeout := common.ValueOrDefault(
+		b.coordinator.DSLRequestTimeout(),
+		clientDSLRequestTimeout)
+
+	responsePayload, _, err := b.roundTrip(
+		ctx, 0, requestTimeout, requestPayload)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	response, err := UnmarshalClientDSLResponse(responsePayload)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return response, nil
+}
+
 func (b *BrokerClient) roundTrip(
 	ctx context.Context,
 	requestDelay time.Duration,

+ 1 - 0
psiphon/common/inproxy/coordinator.go

@@ -216,6 +216,7 @@ type BrokerDialCoordinator interface {
 	OfferRetryDelay() time.Duration
 	OfferRetryJitter() float64
 	RelayedPacketRequestTimeout() time.Duration
+	DSLRequestTimeout() time.Duration
 }
 
 // WebRTCDialCoordinator provides in-proxy dial parameters and configuration,

+ 7 - 0
psiphon/common/inproxy/coordinator_test.go

@@ -60,6 +60,7 @@ type testBrokerDialCoordinator struct {
 	offerRetryDelay                   time.Duration
 	offerRetryJitter                  float64
 	relayedPacketRequestTimeout       time.Duration
+	dslRequestTimeout                 time.Duration
 }
 
 func (t *testBrokerDialCoordinator) NetworkID() string {
@@ -206,6 +207,12 @@ func (t *testBrokerDialCoordinator) RelayedPacketRequestTimeout() time.Duration
 	return t.relayedPacketRequestTimeout
 }
 
+func (t *testBrokerDialCoordinator) DSLRequestTimeout() time.Duration {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	return t.dslRequestTimeout
+}
+
 type testWebRTCDialCoordinator struct {
 	mutex                           sync.Mutex
 	networkID                       string

+ 55 - 9
psiphon/common/parameters/parameters.go

@@ -445,7 +445,8 @@ const (
 	InproxyClientOfferRequestPersonalTimeout           = "InproxyClientOfferRequestPersonalTimeout"
 	InproxyClientOfferRetryDelay                       = "InproxyClientOfferRetryDelay"
 	InproxyClientOfferRetryJitter                      = "InproxyClientOfferRetryJitter"
-	InproxyClientRelayedPacketRequestTimeout           = "InproxyCloientRelayedPacketRequestTimeout"
+	InproxyClientRelayedPacketRequestTimeout           = "InproxyClientRelayedPacketRequestTimeout"
+	InproxyClientDSLRequestTimeout                     = "InproxyClientDSLRequestTimeout"
 	InproxyBrokerRoundTripStatusCodeFailureThreshold   = "InproxyBrokerRoundTripStatusCodeFailureThreshold"
 	InproxyDTLSRandomizationProbability                = "InproxyDTLSRandomizationProbability"
 	InproxyWebRTCMediaStreamsProbability               = "InproxyWebRTCMediaStreamsProbability"
@@ -522,6 +523,7 @@ const (
 	CheckServerEntryTagsMaxWorkTime                    = "CheckServerEntryTagsMaxWorkTime"
 	ServerEntryPruneDialPortNumberZero                 = "ServerEntryPruneDialPortNumberZero"
 	CompressTactics                                    = "CompressTactics"
+	EnableDSLFetches                                   = "EnableDSLFetches"
 	DSLRelayMaxHttpConns                               = "DSLRelayMaxHttpConns"
 	DSLRelayMaxHttpIdleConns                           = "DSLRelayMaxHttpIdleConns"
 	DSLRelayHttpIdleConnTimeout                        = "DSLRelayHttpIdleConnTimeout"
@@ -529,6 +531,27 @@ const (
 	DSLRelayRetryCount                                 = "DSLRelayRetryCount"
 	DSLRelayCacheTTL                                   = "DSLRelayCacheTTL"
 	DSLRelayCacheMaxSize                               = "DSLRelayCacheMaxSize"
+	DSLFetcherTunneledRequestTimeout                   = "DSLFetcherTunneledRequestTimeout"
+	DSLFetcherTunneledRequestRetryCount                = "DSLFetcherTunneledRequestRetryCount"
+	DSLFetcherTunneledRequestRetryDelay                = "DSLFetcherTunneledRequestRetryDelay"
+	DSLFetcherTunneledRequestRetryDelayJitter          = "DSLFetcherTunneledRequestRetryDelayJitter"
+	DSLFetcherTunneledFetchTTL                         = "DSLFetcherTunneledFetchTTL"
+	DSLFetcherTunneledDiscoverServerEntriesMinCount    = "DSLFetcherTunneledDiscoverServerEntriesMinCount"
+	DSLFetcherTunneledDiscoverServerEntriesMaxCount    = "DSLFetcherTunneledDiscoverServerEntriesMaxCount"
+	DSLFetcherTunneledGetServerEntriesMinCount         = "DSLFetcherTunneledGetServerEntriesMinCount"
+	DSLFetcherTunneledGetServerEntriesMaxCount         = "DSLFetcherTunneledGetServerEntriesMaxCount"
+	DSLFetcherUntunneledRequestTimeout                 = "DSLFetcherUntunneledRequestTimeout"
+	DSLFetcherUntunneledRequestRetryCount              = "DSLFetcherUntunneledRequestRetryCount"
+	DSLFetcherUntunneledRequestRetryDelay              = "DSLFetcherUntunneledRequestRetryDelay"
+	DSLFetcherUntunneledRequestRetryDelayJitter        = "DSLFetcherUntunneledRequestRetryDelayJitter"
+	DSLFetcherUntunneledFetchTTL                       = "DSLFetcherUntunneledFetchTTL"
+	DSLFetcherUntunneledDiscoverServerEntriesMinCount  = "DSLFetcherUntunneledDiscoverServerEntriesMinCount"
+	DSLFetcherUntunneledDiscoverServerEntriesMaxCount  = "DSLFetcherUntunneledDiscoverServerEntriesMaxCount"
+	DSLFetcherUntunneledGetServerEntriesMinCount       = "DSLFetcherUntunneledGetServerEntriesMinCount"
+	DSLFetcherUntunneledGetServerEntriesMaxCount       = "DSLFetcherUntunneledGetServerEntriesMaxCount"
+	DSLFetcherGetLastActiveOSLsTTL                     = "DSLFetcherGetLastActiveOSLsTTL"
+	DSLFetcherGetOSLFileSpecsMinCount                  = "DSLFetcherGetOSLFileSpecsMinCount"
+	DSLFetcherGetOSLFileSpecsMaxCount                  = "DSLFetcherGetOSLFileSpecsMaxCount"
 
 	// Retired parameters
 
@@ -1042,7 +1065,8 @@ var defaultParameters = map[string]struct {
 	InproxyClientOfferRequestPersonalTimeout:           {value: 5*time.Second + 10*time.Second, minimum: time.Duration(0)},
 	InproxyClientOfferRetryDelay:                       {value: 100 * time.Millisecond, minimum: time.Duration(0)},
 	InproxyClientOfferRetryJitter:                      {value: 0.5, minimum: 0.0},
-	InproxyClientRelayedPacketRequestTimeout:           {value: 10 * time.Second, minimum: time.Duration(0)},
+	InproxyClientRelayedPacketRequestTimeout:           {value: 35 * time.Second, minimum: time.Duration(0)},
+	InproxyClientDSLRequestTimeout:                     {value: 10 * time.Second, minimum: time.Duration(0)},
 	InproxyBrokerRoundTripStatusCodeFailureThreshold:   {value: 2 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyDTLSRandomizationProbability:                {value: 0.5, minimum: 0.0},
 	InproxyWebRTCMediaStreamsProbability:               {value: 0.0, minimum: 0.0},
@@ -1125,13 +1149,35 @@ var defaultParameters = map[string]struct {
 
 	CompressTactics: {value: true},
 
-	DSLRelayMaxHttpConns:        {value: 100, minimum: 1, flags: serverSideOnly},
-	DSLRelayMaxHttpIdleConns:    {value: 10, minimum: 1, flags: serverSideOnly},
-	DSLRelayHttpIdleConnTimeout: {value: 120 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
-	DSLRelayRequestTimeout:      {value: 30 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
-	DSLRelayRetryCount:          {value: 2, minimum: 0, flags: serverSideOnly},
-	DSLRelayCacheTTL:            {value: 24 * time.Hour, minimum: time.Duration(0), flags: serverSideOnly},
-	DSLRelayCacheMaxSize:        {value: 200000, minimum: 0, flags: serverSideOnly},
+	EnableDSLFetches:                                  {value: false},
+	DSLRelayMaxHttpConns:                              {value: 100, minimum: 1, flags: serverSideOnly},
+	DSLRelayMaxHttpIdleConns:                          {value: 10, minimum: 1, flags: serverSideOnly},
+	DSLRelayHttpIdleConnTimeout:                       {value: 120 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
+	DSLRelayRequestTimeout:                            {value: 30 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
+	DSLRelayRetryCount:                                {value: 2, minimum: 0, flags: serverSideOnly},
+	DSLRelayCacheTTL:                                  {value: 24 * time.Hour, minimum: time.Duration(0), flags: serverSideOnly},
+	DSLRelayCacheMaxSize:                              {value: 200000, minimum: 0, flags: serverSideOnly},
+	DSLFetcherTunneledRequestTimeout:                  {value: 5 * time.Second, minimum: 100 * time.Millisecond, flags: useNetworkLatencyMultiplier},
+	DSLFetcherTunneledRequestRetryCount:               {value: 0, minimum: 0},
+	DSLFetcherTunneledRequestRetryDelay:               {value: 5 * time.Second, minimum: 1 * time.Millisecond},
+	DSLFetcherTunneledRequestRetryDelayJitter:         {value: 0.3, minimum: 0.0},
+	DSLFetcherTunneledFetchTTL:                        {value: 1 * time.Hour, minimum: 1 * time.Millisecond},
+	DSLFetcherTunneledDiscoverServerEntriesMinCount:   {value: 1, minimum: 0},
+	DSLFetcherTunneledDiscoverServerEntriesMaxCount:   {value: 2, minimum: 0},
+	DSLFetcherTunneledGetServerEntriesMinCount:        {value: 30, minimum: 0},
+	DSLFetcherTunneledGetServerEntriesMaxCount:        {value: 60, minimum: 0},
+	DSLFetcherUntunneledRequestTimeout:                {value: 30 * time.Second, minimum: 100 * time.Millisecond, flags: useNetworkLatencyMultiplier},
+	DSLFetcherUntunneledRequestRetryCount:             {value: 2, minimum: 0},
+	DSLFetcherUntunneledRequestRetryDelay:             {value: 5 * time.Second, minimum: 1 * time.Millisecond},
+	DSLFetcherUntunneledRequestRetryDelayJitter:       {value: 0.3, minimum: 0.0},
+	DSLFetcherUntunneledFetchTTL:                      {value: 6 * time.Hour, minimum: 1 * time.Millisecond},
+	DSLFetcherUntunneledDiscoverServerEntriesMinCount: {value: 200, minimum: 0},
+	DSLFetcherUntunneledDiscoverServerEntriesMaxCount: {value: 400, minimum: 0},
+	DSLFetcherUntunneledGetServerEntriesMinCount:      {value: 30, minimum: 0},
+	DSLFetcherUntunneledGetServerEntriesMaxCount:      {value: 60, minimum: 0},
+	DSLFetcherGetLastActiveOSLsTTL:                    {value: 24 * time.Hour, minimum: 1 * time.Millisecond},
+	DSLFetcherGetOSLFileSpecsMinCount:                 {value: 1, minimum: 0},
+	DSLFetcherGetOSLFileSpecsMaxCount:                 {value: 1, minimum: 0},
 }
 
 // IsServerSideOnly indicates if the parameter specified by name is used

+ 2 - 0
psiphon/common/protocol/protocol.go

@@ -60,6 +60,7 @@ const (
 	SERVER_ENTRY_SOURCE_TARGET     = "TARGET"
 	SERVER_ENTRY_SOURCE_OBFUSCATED = "OBFUSCATED"
 	SERVER_ENTRY_SOURCE_EXCHANGED  = "EXCHANGED"
+	SERVER_ENTRY_SOURCE_DSL        = "DSL-*"
 
 	CAPABILITY_SSH_API_REQUESTS            = "ssh-api-requests"
 	CAPABILITY_UNTUNNELED_WEB_API_REQUESTS = "handshake"
@@ -112,6 +113,7 @@ var SupportedServerEntrySources = []string{
 	SERVER_ENTRY_SOURCE_TARGET,
 	SERVER_ENTRY_SOURCE_OBFUSCATED,
 	SERVER_ENTRY_SOURCE_EXCHANGED,
+	SERVER_ENTRY_SOURCE_DSL,
 }
 
 func AllowServerEntrySourceWithUpstreamProxy(source string) bool {

+ 1 - 1
psiphon/common/protocol/serverEntry.go

@@ -1151,7 +1151,7 @@ func ValidateServerEntryFields(serverEntryFields ServerEntryFields) error {
 	// Ensure locally initialized fields have been set.
 
 	source := serverEntryFields.GetLocalSource()
-	if !common.Contains(
+	if !common.ContainsWildcard(
 		SupportedServerEntrySources, source) {
 		return errors.Tracef("server entry has invalid source: %s", source)
 	}

+ 8 - 0
psiphon/config.go

@@ -689,6 +689,10 @@ type Config struct {
 	// 60 seconds.
 	ShutdownGoroutineProfileDeadlineSeconds *int `json:",omitempty"`
 
+	// EnableDSLFetches specifies whether the client will perform DSL requests
+	// to fetch server entries.
+	EnableDSLFetches *bool `json:",omitempty"`
+
 	//
 	// The following parameters are deprecated.
 	//
@@ -2896,6 +2900,10 @@ func (config *Config) makeConfigParameters() map[string]interface{} {
 		applyParameters[parameters.CompressTactics] = *config.CompressTactics
 	}
 
+	if config.EnableDSLFetches != nil {
+		applyParameters[parameters.EnableDSLFetches] = *config.EnableDSLFetches
+	}
+
 	// When adding new config dial parameters that may override tactics, also
 	// update setDialParametersHash.
 

+ 74 - 8
psiphon/controller.go

@@ -84,6 +84,8 @@ type Controller struct {
 	untunneledSplitTunnelClassifications    *lrucache.Cache
 	signalFetchCommonRemoteServerList       chan struct{}
 	signalFetchObfuscatedServerLists        chan struct{}
+	signalUntunneledDSLFetch                chan struct{}
+	signalTunneledDSLFetch                  chan struct{}
 	signalDownloadUpgrade                   chan string
 	signalReportServerEntries               chan *serverEntriesReportRequest
 	signalReportConnected                   chan struct{}
@@ -163,6 +165,8 @@ func NewController(config *Config) (controller *Controller, err error) {
 		// establish will eventually signal another fetch remote.
 		signalFetchCommonRemoteServerList: make(chan struct{}),
 		signalFetchObfuscatedServerLists:  make(chan struct{}),
+		signalUntunneledDSLFetch:          make(chan struct{}),
+		signalTunneledDSLFetch:            make(chan struct{}),
 		signalDownloadUpgrade:             make(chan string),
 		signalReportConnected:             make(chan struct{}),
 
@@ -315,6 +319,13 @@ func (controller *Controller) Run(ctx context.Context) {
 
 	// Start components
 
+	// Limitation: tactics parameters applied here won't reflect changes in
+	// any initial, untunneled tactics fetch or changes in tunnel handshake
+	// tactics.
+	p := controller.config.GetParameters().Get()
+	enableDSLFetches := p.Bool(parameters.EnableDSLFetches)
+	p.Close()
+
 	// Initialize a single resolver to be used by all dials. Sharing a single
 	// resolver ensures cached results are shared, and that network state
 	// query overhead is amortized over all dials. Multiple dials can resolve
@@ -417,6 +428,29 @@ func (controller *Controller) Run(ctx context.Context) {
 		}
 	}
 
+	if enableDSLFetches {
+
+		controller.runWaitGroup.Add(1)
+		go func() {
+			defer controller.runWaitGroup.Done()
+			runUntunneledDSLFetcher(
+				controller.runCtx,
+				controller.config,
+				controller.inproxyClientBrokerClientManager,
+				controller.signalUntunneledDSLFetch)
+		}()
+
+		controller.runWaitGroup.Add(1)
+		go func() {
+			defer controller.runWaitGroup.Done()
+			runTunneledDSLFetcher(
+				controller.runCtx,
+				controller.config,
+				controller.getNextActiveTunnel,
+				controller.signalTunneledDSLFetch)
+		}()
+	}
+
 	if controller.config.InproxyEnableProxy {
 		controller.runWaitGroup.Add(1)
 		go controller.runInproxyProxy()
@@ -1152,6 +1186,15 @@ loop:
 				// tunnel is established.
 				controller.signalConnectedReporter()
 
+				// Signal a tunneled DSL fetch. The tunneled fetch is similar
+				// to the handshake API discovery mechanism:
+				// opportunistically distribute a small number of new server
+				// entries to clients that are already able to connect.
+				select {
+				case controller.signalTunneledDSLFetch <- struct{}{}:
+				default:
+				}
+
 				// If the handshake indicated that a new client version is available,
 				// trigger an upgrade download.
 				// Note: serverContext is nil when DisableApi is set
@@ -1217,6 +1260,20 @@ func (controller *Controller) SignalSeededNewSLOK() {
 	case controller.signalFetchObfuscatedServerLists <- struct{}{}:
 	default:
 	}
+
+	// Reset any delay for the next tunneled DSL request. The next time a
+	// tunnel connects, the DSL request will launch, and the fetcher will
+	// attempt to reassemble OSLs, now with this new SLOK.
+	//
+	// The delay for the next untunneled DSL request is not reset since that
+	// request typically fetches many more server entries, which is more
+	// appropriate for when a client is unable to connect. Receiving a new
+	// SLOK implies the client is currently connected and is likely to
+	// reconnect again and arrive at the tunneled DSL request.
+	//
+	// TODO: launch an immediate tunneled DSL request?
+
+	_ = DSLSetLastTunneledFetchTime(time.Time{})
 }
 
 // SignalTunnelFailure implements the TunnelOwner interface. This function
@@ -1538,10 +1595,10 @@ func (controller *Controller) DirectDial(remoteAddr string) (conn net.Conn, err
 	return DialTCP(controller.runCtx, remoteAddr, controller.untunneledDialConfig)
 }
 
-// triggerFetches signals RSL, OSL, and upgrade download fetchers to begin, if
-// not already running. triggerFetches is called when tunnel establishment
-// fails to complete within a deadline and in other cases where local
-// circumvention capabilities are lacking and we may require new server
+// triggerFetches signals RSL, OSL, DSL, and upgrade download fetchers to
+// begin, if not already running. triggerFetches is called when tunnel
+// establishment fails to complete within a deadline and in other cases where
+// local circumvention capabilities are lacking and we may require new server
 // entries or client versions with new capabilities.
 func (controller *Controller) triggerFetches() {
 
@@ -1557,14 +1614,22 @@ func (controller *Controller) triggerFetches() {
 	default:
 	}
 
-	// Trigger an OSL fetch in parallel. Both fetches are run in parallel
-	// so that if one out of the common RLS and OSL set is large, it doesn't
-	// doesn't entirely block fetching the other.
+	// Trigger an OSL fetch in parallel. Server list fetches are run in
+	// parallel so that if one fetch is large or slow, it doesn't doesn't
+	// entirely block fetching the others.
 	select {
 	case controller.signalFetchObfuscatedServerLists <- struct{}{}:
 	default:
 	}
 
+	// Trigger the untunneled DSL fetch. The untunneled DSL fetch is similar
+	// to the classic RSL and OSL fetches in that it will attempt to download
+	// a larger, diverse selection of servers.
+	select {
+	case controller.signalUntunneledDSLFetch <- struct{}{}:
+	default:
+	}
+
 	// Trigger an out-of-band upgrade availability check and download.
 	// Since we may have failed to connect, we may benefit from upgrading
 	// to a new client version with new circumvention capabilities.
@@ -3313,8 +3378,9 @@ func (controller *Controller) inproxyGetProxyAPIParameters(includeTacticsParamet
 
 	// TODO: include broker fronting dial parameters to be logged by the
 	// broker.
+	includeSessionID := true
 	params := getBaseAPIParameters(
-		baseParametersNoDialParameters, true, controller.config, nil)
+		baseParametersNoDialParameters, nil, includeSessionID, controller.config, nil)
 
 	if controller.config.DisableTactics {
 		return params, "", nil

+ 304 - 38
psiphon/dataStore.go

@@ -22,6 +22,7 @@ package psiphon
 import (
 	"bytes"
 	"context"
+	"encoding/binary"
 	"encoding/json"
 	"fmt"
 	"io"
@@ -33,6 +34,7 @@ import (
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/dsl"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
@@ -52,6 +54,7 @@ var (
 	datastoreSpeedTestSamplesBucket             = []byte("speedTestSamples")
 	datastoreDialParametersBucket               = []byte("dialParameters")
 	datastoreNetworkReplayParametersBucket      = []byte("networkReplayParameters")
+	datastoreDSLOSLStatesBucket                 = []byte("dslOSLStates")
 	datastoreLastConnectedKey                   = "lastConnected"
 	datastoreLastServerEntryFilterKey           = []byte("lastServerEntryFilter")
 	datastoreAffinityServerEntryIDKey           = []byte("affinityServerEntryID")
@@ -59,7 +62,11 @@ var (
 	datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
 	datastorePersistentStatTypeFailedTunnel     = string(datastoreFailedTunnelStatsBucket)
 	datastoreCheckServerEntryTagsEndTimeKey     = "checkServerEntryTagsEndTime"
-	datastoreServerEntryFetchGCThreshold        = 10
+	datastoreDSLLastUntunneledFetchTimeKey      = "dslLastUntunneledDiscoverTime"
+	datastoreDSLLastTunneledFetchTimeKey        = "dslLastTunneledDiscoverTime"
+	datastoreDSLLastActiveOSLsTimeKey           = "dslLastActiveOSLsTime"
+
+	datastoreServerEntryFetchGCThreshold = 10
 
 	datastoreReferenceCountMutex sync.RWMutex
 	datastoreReferenceCount      int64
@@ -303,8 +310,10 @@ func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExi
 			}
 		}
 
+		configurationVersion := serverEntryFields.GetConfigurationVersion()
+
 		exists := existingConfigurationVersion > -1
-		newer := exists && existingConfigurationVersion < serverEntryFields.GetConfigurationVersion()
+		newer := exists && existingConfigurationVersion < configurationVersion
 		update := !exists || replaceIfExists || newer
 
 		if !update {
@@ -349,7 +358,13 @@ func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExi
 			return errors.Trace(err)
 		}
 
-		err = serverEntryTags.put(serverEntryTagBytes, serverEntryID)
+		serverEntryTagRecord, err := setServerEntryTagRecord(
+			serverEntryID, configurationVersion)
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		err = serverEntryTags.put(serverEntryTagBytes, serverEntryTagRecord)
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -1077,7 +1092,14 @@ func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
 					return errors.Trace(err)
 				}
 
-				err = serverEntryTags.put([]byte(serverEntryTag), serverEntryID)
+				serverEntryTagRecord, err := setServerEntryTagRecord(
+					[]byte(serverEntryTag),
+					serverEntryFields.GetConfigurationVersion())
+				if err != nil {
+					return errors.Trace(err)
+				}
+
+				err = serverEntryTags.put([]byte(serverEntryTag), serverEntryTagRecord)
 				if err != nil {
 					return errors.Trace(err)
 				}
@@ -1169,18 +1191,23 @@ func pruneServerEntry(config *Config, serverEntryTag string) (bool, error) {
 
 		serverEntryTagBytes := []byte(serverEntryTag)
 
-		serverEntryID := serverEntryTags.get(serverEntryTagBytes)
-		if serverEntryID == nil {
+		serverEntryTagRecord := serverEntryTags.get(serverEntryTagBytes)
+		if serverEntryTagRecord == nil {
 			return errors.TraceNew("server entry tag not found")
 		}
 
+		serverEntryID, _, err := getServerEntryTagRecord(serverEntryTagRecord)
+		if err != nil {
+			return errors.Trace(err)
+		}
+
 		serverEntryJson := serverEntries.get(serverEntryID)
 		if serverEntryJson == nil {
 			return errors.TraceNew("server entry not found")
 		}
 
 		var serverEntry *protocol.ServerEntry
-		err := json.Unmarshal(serverEntryJson, &serverEntry)
+		err = json.Unmarshal(serverEntryJson, &serverEntry)
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -1281,14 +1308,23 @@ func deleteServerEntry(config *Config, serverEntryID []byte) error {
 		}
 
 		// Remove any tags pointing to the deleted server entry.
+		var deleteKeys [][]byte
 		cursor := serverEntryTags.cursor()
-		defer cursor.close()
 		for key, value := cursor.first(); key != nil; key, value = cursor.next() {
 			if bytes.Equal(value, serverEntryID) {
-				err := serverEntryTags.delete(key)
-				if err != nil {
-					return errors.Trace(err)
-				}
+				deleteKeys = append(deleteKeys, key)
+			}
+		}
+		cursor.close()
+
+		// Mutate bucket only after cursor is closed.
+		//
+		// TODO: expose boltdb Cursor.Delete to allow for safe mutation
+		// within cursor loop.
+		for _, deleteKey := range deleteKeys {
+			err := serverEntryTags.delete(deleteKey)
+			if err != nil {
+				return errors.Trace(err)
 			}
 		}
 
@@ -1320,22 +1356,35 @@ func deleteServerEntryHelper(
 		}
 	}
 
+	// Each dial parameters key has serverID as a prefix; see
+	// makeDialParametersKey. There may be multiple keys with the
+	// serverEntryID prefix; they will be grouped together, so the loop can
+	// exit as soon as a previously found prefix is no longer found.
+	foundFirstMatch := false
+
 	// TODO: expose boltdb Seek functionality to skip to first matching record.
+	var deleteKeys [][]byte
 	cursor := dialParameters.cursor()
-	defer cursor.close()
-	foundFirstMatch := false
 	for key, _ := cursor.first(); key != nil; key, _ = cursor.next() {
-		// Dial parameters key has serverID as a prefix; see makeDialParametersKey.
 		if bytes.HasPrefix(key, serverEntryID) {
 			foundFirstMatch = true
-			err := dialParameters.delete(key)
-			if err != nil {
-				return errors.Trace(err)
-			}
+			deleteKeys = append(deleteKeys, key)
 		} else if foundFirstMatch {
 			break
 		}
 	}
+	cursor.close()
+
+	// Mutate bucket only after cursor is closed.
+	//
+	// TODO: expose boltdb Cursor.Delete to allow for safe mutation
+	// within cursor loop.
+	for _, deleteKey := range deleteKeys {
+		err := dialParameters.delete(deleteKey)
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
 
 	return nil
 }
@@ -1660,6 +1709,8 @@ func TakeOutUnreportedPersistentStats(
 		for _, statType := range persistentStatTypes {
 
 			bucket := tx.bucket([]byte(statType))
+
+			var deleteKeys [][]byte
 			cursor := bucket.cursor()
 			for key, value := cursor.first(); key != nil; key, value = cursor.next() {
 
@@ -1671,7 +1722,7 @@ func TakeOutUnreportedPersistentStats(
 					NoticeWarning(
 						"Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
 						string(key), err)
-					_ = bucket.delete(key)
+					deleteKeys = append(deleteKeys, key)
 					continue
 				}
 
@@ -1695,6 +1746,11 @@ func TakeOutUnreportedPersistentStats(
 			}
 			cursor.close()
 
+			// Mutate bucket only after cursor is closed.
+			for _, deleteKey := range deleteKeys {
+				_ = bucket.delete(deleteKey)
+			}
+
 			for _, key := range stats[statType] {
 				err := bucket.put(key, persistentStatStateReporting)
 				if err != nil {
@@ -1831,25 +1887,14 @@ func IsCheckServerEntryTagsDue(config *Config) bool {
 		return false
 	}
 
-	lastEndTimeValue, err := GetKeyValue(datastoreCheckServerEntryTagsEndTimeKey)
+	lastEndTime, err := getTimeKeyValue(datastoreCheckServerEntryTagsEndTimeKey)
 	if err != nil {
-		NoticeWarning("IsCheckServerEntryTagsDue GetKeyValue failed: %s", errors.Trace(err))
+		NoticeWarning("IsCheckServerEntryTagsDue getTimeKeyValue failed: %s", errors.Trace(err))
 		disableCheckServerEntryTags.Store(true)
 		return false
 	}
 
-	if lastEndTimeValue == "" {
-		return true
-	}
-
-	lastEndTime, err := time.Parse(time.RFC3339, lastEndTimeValue)
-	if err != nil {
-		NoticeWarning("IsCheckServerEntryTagsDue time.Parse failed: %s", errors.Trace(err))
-		disableCheckServerEntryTags.Store(true)
-		return false
-	}
-
-	return time.Now().After(lastEndTime.Add(checkPeriod))
+	return lastEndTime.IsZero() || time.Now().After(lastEndTime.Add(checkPeriod))
 }
 
 // UpdateCheckServerEntryTagsEndTime should be called after a prune check is
@@ -1879,11 +1924,9 @@ func UpdateCheckServerEntryTagsEndTime(config *Config, checkCount int, pruneCoun
 		return
 	}
 
-	err := SetKeyValue(
-		datastoreCheckServerEntryTagsEndTimeKey,
-		time.Now().Format(time.RFC3339))
+	err := setTimeKeyValue(datastoreCheckServerEntryTagsEndTimeKey, time.Now())
 	if err != nil {
-		NoticeWarning("UpdateCheckServerEntryTagsEndTime SetKeyValue failed: %s", errors.Trace(err))
+		NoticeWarning("UpdateCheckServerEntryTagsEndTime setTimeKeyValue failed: %s", errors.Trace(err))
 		disableCheckServerEntryTags.Store(true)
 		return
 	}
@@ -2505,6 +2548,171 @@ func DeleteNetworkReplayParameters[R any](networkID, replayID string) error {
 	return deleteBucketValue(datastoreNetworkReplayParametersBucket, key)
 }
 
+// DSLGetLastUntunneledFetchTime returns the timestamp of the last
+// successfully completed untunneled DSL fetch.
+func DSLGetLastUntunneledFetchTime() (time.Time, error) {
+	value, err := getTimeKeyValue(datastoreDSLLastUntunneledFetchTimeKey)
+	return value, errors.Trace(err)
+}
+
+// DSLSetLastUntunneledFetchTime sets the timestamp of the most recent
+// successfully completed untunneled DSL fetch.
+func DSLSetLastUntunneledFetchTime(time time.Time) error {
+	err := setTimeKeyValue(datastoreDSLLastUntunneledFetchTimeKey, time)
+	return errors.Trace(err)
+}
+
+// DSLGetLastUntunneledFetchTime returns the timestamp of the last
+// successfully completed tunneled DSL fetch.
+func DSLGetLastTunneledFetchTime() (time.Time, error) {
+	value, err := getTimeKeyValue(datastoreDSLLastTunneledFetchTimeKey)
+	return value, errors.Trace(err)
+}
+
+// DSLSetLastTunneledFetchTime sets the timestamp of the most recent
+// successfully completed untunneled DSL fetch.
+func DSLSetLastTunneledFetchTime(time time.Time) error {
+	err := setTimeKeyValue(datastoreDSLLastTunneledFetchTimeKey, time)
+	return errors.Trace(err)
+}
+
+// DSLHasServerEntry returns whether the datastore contains the server entry
+// with the specified tag and version. DSLHasServerEntry uses a fast lookup
+// which avoids unmarshaling server entries.
+func DSLHasServerEntry(tag dsl.ServerEntryTag, version int) bool {
+
+	hasServerEntry := false
+
+	err := datastoreView(func(tx *datastoreTx) error {
+
+		serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
+
+		serverEntryTagRecord := serverEntryTags.get(tag)
+		if serverEntryTagRecord == nil {
+			return errors.TraceNew("server entry tag not found")
+		}
+
+		_, configurationVersion, err := getServerEntryTagRecord(
+			serverEntryTagRecord)
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		hasServerEntry = (configurationVersion == version)
+
+		return nil
+	})
+
+	if err != nil {
+		NoticeWarning("DSLHasServerEntry failed: %s", errors.Trace(err))
+		return false
+	}
+
+	return hasServerEntry
+}
+
+// DSLStoreServerEntry adds the server entry to the datastore using
+// StoreServerEntry and populating LocalSource and LocalTimestamp.
+func DSLStoreServerEntry(
+	packedServerEntryFields protocol.PackedServerEntryFields,
+	source string) error {
+
+	serverEntryFields, err := protocol.DecodePackedServerEntryFields(packedServerEntryFields)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	// See protocol.DecodeServerEntryFields and ImportEmbeddedServerEntries
+	// for other code paths that populate SetLocalSource and SetLocalTimestamp.
+
+	serverEntryFields.SetLocalSource(source)
+	serverEntryFields.SetLocalTimestamp(common.TruncateTimestampToHour(common.GetCurrentTimestamp()))
+
+	err = StoreServerEntry(serverEntryFields, true)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	return nil
+}
+
+// DSLGetLastUntunneledFetchTime returns the timestamp of the last
+// successfully completed active OSL check.
+func DSLGetLastActiveOSLsTime() (time.Time, error) {
+	value, err := getTimeKeyValue(datastoreDSLLastActiveOSLsTimeKey)
+	return value, errors.Trace(err)
+}
+
+// DSLSetLastActiveOSLsTime sets the timestamp of the most recent
+// successfully completed active OSL check.
+func DSLSetLastActiveOSLsTime(time time.Time) error {
+	err := setTimeKeyValue(datastoreDSLLastActiveOSLsTimeKey, time)
+	return errors.Trace(err)
+}
+
+// DSLKnownOSLIDs returns the set of known OSL IDs retrieved from the active
+// OSL DSL request.
+func DSLKnownOSLIDs() ([]dsl.OSLID, error) {
+
+	IDs := []dsl.OSLID{}
+
+	err := getBucketKeys(datastoreDSLOSLStatesBucket, func(key []byte) {
+		// Must make a copy as slice is only valid within transaction.
+		IDs = append(IDs, append([]byte(nil), key...))
+	})
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	return IDs, nil
+}
+
+// DSLGetOSLState gets the current OSL state associated with an active OSL.
+// See dsl.Fetcher for more details on OSL states.
+func DSLGetOSLState(ID dsl.OSLID) ([]byte, bool, error) {
+	state, err := copyBucketValue(datastoreDSLOSLStatesBucket, ID)
+	if err != nil {
+		return nil, false, errors.Trace(err)
+	}
+	notFound := state == nil
+	return state, notFound, nil
+}
+
+// DSLStoreOSLState sets the OSL state associated with an active OSL.
+func DSLStoreOSLState(ID dsl.OSLID, state []byte) error {
+	err := setBucketValue(datastoreDSLOSLStatesBucket, ID, state)
+	return errors.Trace(err)
+}
+
+// DSLDeleteOSLState deletes the specified OSL state.
+func DSLDeleteOSLState(ID dsl.OSLID) error {
+	err := deleteBucketValue(datastoreDSLOSLStatesBucket, ID)
+	return errors.Trace(err)
+}
+
+func setTimeKeyValue(key string, timevalue time.Time) error {
+	err := SetKeyValue(key, timevalue.Format(time.RFC3339))
+	return errors.Trace(err)
+}
+
+func getTimeKeyValue(key string) (time.Time, error) {
+
+	value, err := GetKeyValue(key)
+	if err != nil {
+		return time.Time{}, errors.Trace(err)
+	}
+
+	if value == "" {
+		return time.Time{}, nil
+	}
+
+	timeValue, err := time.Parse(time.RFC3339, value)
+	if err != nil {
+		return time.Time{}, errors.Trace(err)
+	}
+
+	return timeValue, nil
+}
+
 func setBucketValue(bucket, key, value []byte) error {
 
 	err := datastoreUpdate(func(tx *datastoreTx) error {
@@ -2564,3 +2772,61 @@ func copyBucketValue(bucket, key []byte) ([]byte, error) {
 	})
 	return valueCopy, err
 }
+
+func getBucketKeys(bucket []byte, keyCallback func([]byte)) error {
+
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(bucket)
+		cursor := bucket.cursor()
+		for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
+			keyCallback(key)
+		}
+		cursor.close()
+		return nil
+	})
+
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	return nil
+}
+
+func setServerEntryTagRecord(
+	serverEntryID []byte, configurationVersion int) ([]byte, error) {
+
+	var delimiter = [1]byte{0}
+
+	if bytes.Contains(serverEntryID, delimiter[:]) {
+		return nil, errors.TraceNew("invalid serverEntryID")
+	}
+
+	if configurationVersion < 0 || configurationVersion >= math.MaxInt32 {
+		return nil, errors.TraceNew("invalid configurationVersion")
+	}
+
+	var version [4]byte
+	binary.LittleEndian.PutUint32(version[:], uint32(configurationVersion))
+
+	return append(append(serverEntryID, delimiter[:]...), version[:]...), nil
+}
+
+func getServerEntryTagRecord(
+	record []byte) ([]byte, int, error) {
+
+	var delimiter = [1]byte{0}
+
+	i := bytes.Index(record, delimiter[:])
+	if i == -1 {
+		// Backwards compatibility: assume version 0
+		return record, 0, nil
+	}
+
+	if len(record)-i != 4 {
+		return nil, 0, errors.TraceNew("invalid configurationVersion")
+	}
+
+	configurationVersion := binary.LittleEndian.Uint32(record[i:])
+
+	return record[:i], int(configurationVersion), nil
+}

+ 1 - 0
psiphon/dataStore_bolt.go

@@ -170,6 +170,7 @@ func tryDatastoreOpenDB(
 			datastoreSpeedTestSamplesBucket,
 			datastoreDialParametersBucket,
 			datastoreNetworkReplayParametersBucket,
+			datastoreDSLOSLStatesBucket,
 		}
 		for _, bucket := range requiredBuckets {
 			_, err := tx.CreateBucketIfNotExists(bucket)

+ 278 - 0
psiphon/dsl.go

@@ -0,0 +1,278 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"context"
+	"sync/atomic"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/dsl"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+)
+
+func runUntunneledDSLFetcher(
+	ctx context.Context,
+	config *Config,
+	brokerClientManager *InproxyBrokerClientManager,
+	signal <-chan struct{}) {
+
+	NoticeInfo("running untunneled DSL fetcher")
+
+fetcherLoop:
+	for !disableDSLFetches.Load() {
+
+		select {
+		case <-signal:
+		case <-ctx.Done():
+			break fetcherLoop
+		}
+
+		isTunneled := false
+
+		err := func() error {
+
+			brokerClient, _, err := brokerClientManager.GetBrokerClient(config.GetNetworkID())
+			if err != nil {
+				return errors.Trace(err)
+			}
+
+			roundTripper := func(
+				ctx context.Context,
+				requestPayload []byte) ([]byte, error) {
+
+				response, err := brokerClient.ClientDSL(
+					ctx,
+					&inproxy.ClientDSLRequest{
+						RequestPayload: requestPayload,
+					})
+				if err != nil {
+					return nil, errors.Trace(err)
+				}
+				return response.ResponsePayload, nil
+			}
+
+			// Detailed logging, retries, last request times, and
+			// WaitForNetworkConnectivity are all handled inside dsl.Fetcher.
+
+			// There is no equivilent to RecordRemoteServerListStat or
+			// remote_server_list, since the DSL backend will log DSL request events.
+			//
+			// TODO: add a failed_dsl_request log, similar to failed_tunnel,
+			// to record and report failures?
+
+			err = doDSLFetch(ctx, config, isTunneled, roundTripper)
+			if err != nil {
+				return errors.Trace(err)
+			}
+
+			return nil
+		}()
+
+		if err != nil {
+			NoticeError("untunneled DSL fetch failed: %v", errors.Trace(err))
+			// No cooldown pause, since controller.triggerFetches isn't be
+			// called in a tight loop.
+		}
+	}
+
+	NoticeInfo("exiting untunneled DSL fetcher")
+}
+
+func runTunneledDSLFetcher(
+	ctx context.Context,
+	config *Config,
+	getActiveTunnel func() *Tunnel,
+	signal <-chan struct{}) {
+
+	NoticeInfo("running tunneled DSL fetcher")
+
+fetcherLoop:
+	for !disableDSLFetches.Load() {
+
+		select {
+		case <-signal:
+		case <-ctx.Done():
+			break fetcherLoop
+		}
+
+		tunnel := getActiveTunnel()
+		if tunnel == nil {
+			continue
+		}
+
+		isTunneled := true
+
+		roundTripper := func(
+			ctx context.Context,
+			requestPayload []byte) ([]byte, error) {
+
+			// The request ctx is ignored; tunnel.SendAPIRequest does not
+			// support a request context. In practise, the input ctx is
+			// controller.runCtx which includes the full lifetime of the
+			// tunnel. When a tunnel closes, any in-flight SendAPIRequest
+			// will be interrupted and not block.
+
+			responsePayload, err := tunnel.SendAPIRequest(
+				protocol.PSIPHON_API_DSL_REQUEST_NAME, requestPayload)
+			return responsePayload, errors.Trace(err)
+		}
+
+		// Detailed logging, retries, last request times, and
+		// WaitForNetworkConnectivity are all handled inside dsl.Fetcher.
+
+		err := doDSLFetch(ctx, config, isTunneled, roundTripper)
+		if err != nil {
+			NoticeError("tunneled DSL fetch failed: %v", errors.Trace(err))
+			// No cooldown pause, since runTunneledDSLFetcher is called only
+			// once after fully connecting.
+		}
+	}
+
+	NoticeInfo("exiting tunneled DSL fetcher")
+}
+
+func doDSLFetch(
+	ctx context.Context,
+	config *Config,
+	isTunneled bool,
+	roundTripper dsl.FetcherRoundTripper) error {
+
+	var paddingPRNG *prng.PRNG
+	if isTunneled {
+
+		// For a tunneled request, padding is added via the params since
+		// there's no random padding at the SSH request layer. The PRNG seed
+		// is not replayed.
+		paddingPRNG = prng.DefaultPRNG()
+	}
+
+	includeSessionID := true
+	baseAPIParams := getBaseAPIParameters(
+		baseParametersNoDialParameters,
+		paddingPRNG,
+		includeSessionID,
+		config,
+		nil)
+
+	// Copied from FetchObfuscatedServerLists.
+	//
+	// Prevent excessive notice noise in cases such as a general database
+	// failure, as GetSLOK may be called thousands of times per fetch.
+	emittedGetSLOKAlert := int32(0)
+	lookupSLOKs := func(slokID []byte) []byte {
+		key, err := GetSLOK(slokID)
+		if err != nil && atomic.CompareAndSwapInt32(&emittedGetSLOKAlert, 0, 1) {
+			NoticeWarning("GetSLOK failed: %s", err)
+		}
+		return key
+	}
+
+	c := &dsl.FetcherConfig{
+		Logger:            NoticeCommonLogger(false),
+		BaseAPIParameters: baseAPIParams,
+		RoundTripper:      roundTripper,
+
+		DatastoreHasServerEntry:        DSLHasServerEntry,
+		DatastoreStoreServerEntry:      DSLStoreServerEntry,
+		DatastoreGetLastActiveOSLsTime: DSLGetLastActiveOSLsTime,
+		DatastoreSetLastActiveOSLsTime: DSLSetLastActiveOSLsTime,
+		DatastoreKnownOSLIDs:           DSLKnownOSLIDs,
+		DatastoreGetOSLState:           DSLGetOSLState,
+		DatastoreStoreOSLState:         DSLStoreOSLState,
+		DatastoreDeleteOSLState:        DSLDeleteOSLState,
+		DatastoreSLOKLookup:            lookupSLOKs,
+		DatastoreFatalError:            onDSLDatastoreFatalError,
+
+		DoGarbageCollection: DoGarbageCollection,
+	}
+
+	p := config.GetParameters().Get()
+	if isTunneled {
+
+		c.DatastoreGetLastFetchTime = DSLGetLastTunneledFetchTime
+		c.DatastoreSetLastFetchTime = DSLSetLastTunneledFetchTime
+
+		c.RequestTimeout = p.Duration(parameters.DSLFetcherTunneledRequestTimeout)
+		c.RequestRetryCount = p.Int(parameters.DSLFetcherTunneledRequestRetryCount)
+		c.RequestRetryDelay = p.Duration(parameters.DSLFetcherTunneledRequestRetryDelay)
+		c.RequestRetryDelayJitter = p.Float(parameters.DSLFetcherTunneledRequestRetryDelayJitter)
+		c.FetchTTL = p.Duration(parameters.DSLFetcherTunneledFetchTTL)
+		c.DiscoverServerEntriesMinCount = p.Int(parameters.DSLFetcherTunneledDiscoverServerEntriesMinCount)
+		c.DiscoverServerEntriesMaxCount = p.Int(parameters.DSLFetcherTunneledDiscoverServerEntriesMaxCount)
+		c.GetServerEntriesMinCount = p.Int(parameters.DSLFetcherTunneledGetServerEntriesMinCount)
+		c.GetServerEntriesMaxCount = p.Int(parameters.DSLFetcherTunneledGetServerEntriesMaxCount)
+
+		// WaitForNetworkConnectivity is not wired up in this case since
+		// tunnel must be connected. If the tunnel becomes disconnected due
+		// to loss of network connectivity, prefer to fail this request and
+		// try again, with a new tunnel, after reconnecting.
+
+	} else {
+
+		c.DatastoreGetLastFetchTime = DSLGetLastUntunneledFetchTime
+		c.DatastoreSetLastFetchTime = DSLSetLastUntunneledFetchTime
+
+		c.RequestTimeout = p.Duration(parameters.DSLFetcherUntunneledRequestTimeout)
+		c.RequestRetryCount = p.Int(parameters.DSLFetcherUntunneledRequestRetryCount)
+		c.RequestRetryDelay = p.Duration(parameters.DSLFetcherUntunneledRequestRetryDelay)
+		c.RequestRetryDelayJitter = p.Float(parameters.DSLFetcherUntunneledRequestRetryDelayJitter)
+		c.FetchTTL = p.Duration(parameters.DSLFetcherUntunneledFetchTTL)
+		c.DiscoverServerEntriesMinCount = p.Int(parameters.DSLFetcherUntunneledDiscoverServerEntriesMinCount)
+		c.DiscoverServerEntriesMaxCount = p.Int(parameters.DSLFetcherUntunneledDiscoverServerEntriesMaxCount)
+		c.GetServerEntriesMinCount = p.Int(parameters.DSLFetcherUntunneledGetServerEntriesMinCount)
+		c.GetServerEntriesMaxCount = p.Int(parameters.DSLFetcherUntunneledGetServerEntriesMaxCount)
+
+		c.WaitForNetworkConnectivity = func() bool {
+			return WaitForNetworkConnectivity(ctx, config.NetworkConnectivityChecker, nil)
+		}
+
+	}
+	c.GetLastActiveOSLsTTL = p.Duration(parameters.DSLFetcherGetLastActiveOSLsTTL)
+	c.GetOSLFileSpecsMinCount = p.Int(parameters.DSLFetcherGetOSLFileSpecsMinCount)
+	c.GetOSLFileSpecsMaxCount = p.Int(parameters.DSLFetcherGetOSLFileSpecsMaxCount)
+	p.Close()
+
+	fetcher, err := dsl.NewFetcher(c)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	err = fetcher.Run(ctx)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	return nil
+}
+
+var disableDSLFetches atomic.Bool
+
+func onDSLDatastoreFatalError(_ error) {
+
+	// Halt all DSL requests for the duration of the process on a
+	// DatastoreFatalError, which includes failure to set the last request
+	// time. This avoids continuous DSL request in this scenario.
+
+	disableDSLFetches.Store(true)
+}

+ 8 - 1
psiphon/inproxy.go

@@ -135,7 +135,7 @@ func (b *InproxyBrokerClientManager) NetworkChanged() error {
 }
 
 // GetBrokerClient returns the current, shared broker client and its
-// corresponding dial parametrers (for metrics logging). If there is no
+// corresponding dial parameters (for metrics logging). If there is no
 // current broker client, if the network ID differs from the network ID
 // associated with the previous broker client, a new broker client is
 // initialized.
@@ -331,6 +331,7 @@ type InproxyBrokerClientInstance struct {
 	offerRetryDelay               time.Duration
 	offerRetryJitter              float64
 	relayedPacketRequestTimeout   time.Duration
+	dslRequestTimeout             time.Duration
 	replayRetainFailedProbability float64
 	replayUpdateFrequency         time.Duration
 	retryOnFailedPeriod           time.Duration
@@ -576,6 +577,7 @@ func NewInproxyBrokerClientInstance(
 		offerRetryDelay:               p.Duration(parameters.InproxyClientOfferRetryDelay),
 		offerRetryJitter:              p.Float(parameters.InproxyClientOfferRetryJitter),
 		relayedPacketRequestTimeout:   p.Duration(parameters.InproxyClientRelayedPacketRequestTimeout),
+		dslRequestTimeout:             p.Duration(parameters.InproxyClientDSLRequestTimeout),
 		replayRetainFailedProbability: p.Float(parameters.InproxyReplayBrokerRetainFailedProbability),
 		replayUpdateFrequency:         p.Duration(parameters.InproxyReplayBrokerUpdateFrequency),
 	}
@@ -1147,6 +1149,11 @@ func (b *InproxyBrokerClientInstance) RelayedPacketRequestTimeout() time.Duratio
 	return b.relayedPacketRequestTimeout
 }
 
+// Implements the inproxy.BrokerDialCoordinator interface.
+func (b *InproxyBrokerClientInstance) DSLRequestTimeout() time.Duration {
+	return b.dslRequestTimeout
+}
+
 // InproxyBrokerDialParameters represents a selected broker transport and dial
 // paramaters.
 //

+ 36 - 33
psiphon/serverApi.go

@@ -118,7 +118,7 @@ func NewServerContext(tunnel *Tunnel) (*ServerContext, error) {
 // stored -- and sponsor info (home pages, stat regexes).
 func (serverContext *ServerContext) doHandshakeRequest(ignoreStatsRegexps bool) error {
 
-	params := serverContext.getBaseAPIParameters(baseParametersAll, false)
+	params := serverContext.getBaseAPIParameters(baseParametersAll)
 
 	// The server will return a signed copy of its own server entry when the
 	// client specifies this 'missing_server_entry_signature' parameter.
@@ -507,7 +507,7 @@ func (serverContext *ServerContext) DoConnectedRequest() error {
 	defer serverContext.tunnel.SetInFlightConnectedRequest(nil)
 
 	params := serverContext.getBaseAPIParameters(
-		baseParametersOnlyUpstreamFragmentorDialParameters, false)
+		baseParametersOnlyUpstreamFragmentorDialParameters)
 
 	lastConnected, err := getLastConnected()
 	if err != nil {
@@ -580,7 +580,7 @@ func (serverContext *ServerContext) StatsRegexps() *transferstats.Regexps {
 func (serverContext *ServerContext) DoStatusRequest() error {
 
 	params := serverContext.getBaseAPIParameters(
-		baseParametersNoDialParameters, false)
+		baseParametersNoDialParameters)
 
 	// Note: ensure putBackStatusRequestPayload is called, to replace
 	// payload for future attempt, in all failure cases.
@@ -928,7 +928,8 @@ func RecordFailedTunnelStat(
 		return errors.Trace(err)
 	}
 
-	params := getBaseAPIParameters(baseParametersAll, true, config, dialParams)
+	includeSessionID := true
+	params := getBaseAPIParameters(baseParametersAll, nil, includeSessionID, config, dialParams)
 
 	delete(params, "server_secret")
 	params["server_entry_tag"] = dialParams.ServerEntry.Tag
@@ -1059,35 +1060,19 @@ const (
 )
 
 func (serverContext *ServerContext) getBaseAPIParameters(
-	filter baseParametersFilter,
-	includeSessionID bool) common.APIParameters {
+	filter baseParametersFilter) common.APIParameters {
 
+	// For tunneled SSH API requests, the session ID is omitted since the
+	// server already has that value; and padding is added via the params
+	// since there's no random padding at the SSH request layer.
+	includeSessionID := false
 	params := getBaseAPIParameters(
 		filter,
+		serverContext.paddingPRNG,
 		includeSessionID,
 		serverContext.tunnel.config,
 		serverContext.tunnel.dialParams)
 
-	// Add a random amount of padding to defend against API call traffic size
-	// fingerprints. The "pad_response" field instructs the server to pad its
-	// response accordingly.
-
-	p := serverContext.tunnel.config.GetParameters().Get()
-	minUpstreamPadding := p.Int(parameters.APIRequestUpstreamPaddingMinBytes)
-	maxUpstreamPadding := p.Int(parameters.APIRequestUpstreamPaddingMaxBytes)
-	minDownstreamPadding := p.Int(parameters.APIRequestDownstreamPaddingMinBytes)
-	maxDownstreamPadding := p.Int(parameters.APIRequestDownstreamPaddingMaxBytes)
-
-	if maxUpstreamPadding > 0 {
-		size := serverContext.paddingPRNG.Range(minUpstreamPadding, maxUpstreamPadding)
-		params["padding"] = strings.Repeat(" ", size)
-	}
-
-	if maxDownstreamPadding > 0 {
-		size := serverContext.paddingPRNG.Range(minDownstreamPadding, maxDownstreamPadding)
-		params["pad_response"] = strconv.Itoa(size)
-	}
-
 	return params
 }
 
@@ -1099,19 +1084,37 @@ func (serverContext *ServerContext) getBaseAPIParameters(
 // baseParametersNoDialParameters.
 func getBaseAPIParameters(
 	filter baseParametersFilter,
+	paddingPRNG *prng.PRNG,
 	includeSessionID bool,
 	config *Config,
 	dialParams *DialParameters) common.APIParameters {
 
 	params := make(common.APIParameters)
 
-	// Temporary measure: unconditionally include legacy session_id and
-	// client_session_id fields for compatibility with existing servers used
-	// in CI.
-	//
-	// TODO: remove once necessary servers are upgraded
-	params["session_id"] = config.SessionID
-	params["client_session_id"] = config.SessionID
+	if paddingPRNG != nil {
+
+		// Add a random amount of padding to defend against API call traffic size
+		// fingerprints. The "pad_response" field instructs the server to pad its
+		// response accordingly.
+		//
+		// Padding may be omitted if it's already provided at transport layer.
+
+		p := config.GetParameters().Get()
+		minUpstreamPadding := p.Int(parameters.APIRequestUpstreamPaddingMinBytes)
+		maxUpstreamPadding := p.Int(parameters.APIRequestUpstreamPaddingMaxBytes)
+		minDownstreamPadding := p.Int(parameters.APIRequestDownstreamPaddingMinBytes)
+		maxDownstreamPadding := p.Int(parameters.APIRequestDownstreamPaddingMaxBytes)
+
+		if maxUpstreamPadding > 0 {
+			size := paddingPRNG.Range(minUpstreamPadding, maxUpstreamPadding)
+			params["padding"] = strings.Repeat(" ", size)
+		}
+
+		if maxDownstreamPadding > 0 {
+			size := paddingPRNG.Range(minDownstreamPadding, maxDownstreamPadding)
+			params["pad_response"] = strconv.Itoa(size)
+		}
+	}
 
 	if includeSessionID {
 		// The session ID is included in non-SSH API requests only. For SSH

+ 4 - 1
psiphon/tactics.go

@@ -299,8 +299,11 @@ func fetchTactics(
 	}
 	defer meekConn.Close()
 
+	// No padding is added via the params as this is provided by the tactics
+	// request obfuscation layer.
+	includeSessionID := true
 	apiParams := getBaseAPIParameters(
-		baseParametersAll, true, config, dialParams)
+		baseParametersAll, nil, includeSessionID, config, dialParams)
 
 	tacticsRecord, err := tactics.FetchTactics(
 		ctx,

+ 6 - 2
psiphon/tunnel.go

@@ -1588,10 +1588,14 @@ func dialInproxy(
 	}
 
 	// Unlike the proxy broker case, clients already actively fetch tactics
-	// during tunnel estalishment, so tactics.SetTacticsAPIParameters are not
+	// during tunnel establishment, so tactics.SetTacticsAPIParameters are not
 	// sent to the broker and no tactics are returned by the broker.
+	//
+	// No padding is added via the params as this is provided by the broker
+	// request obfuscation layer.
+	includeSessionID := true
 	params := getBaseAPIParameters(
-		baseParametersNoDialParameters, true, config, nil)
+		baseParametersNoDialParameters, nil, includeSessionID, config, nil)
 
 	// The debugLogging flag is passed to both NoticeCommonLogger and to the
 	// inproxy package as well; skipping debug logs in the inproxy package,