فهرست منبع

Add dsl package

Rod Hynes 7 ماه پیش
والد
کامیت
25f9f9380b
4فایلهای تغییر یافته به همراه2655 افزوده شده و 0 حذف شده
  1. 213 0
      psiphon/common/dsl/api.go
  2. 1207 0
      psiphon/common/dsl/dsl_test.go
  3. 872 0
      psiphon/common/dsl/fetcher.go
  4. 363 0
      psiphon/common/dsl/relay.go

+ 213 - 0
psiphon/common/dsl/api.go

@@ -0,0 +1,213 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+// Package dsl implements the Dynamic Server List (DSL) mechanism.
+//
+// Unlike Remote Server Lists (RSLs) and Obfuscated Server Lists (OSLs), which
+// are based on static file downloads, with DSLs the client requests
+// discovery and download of server entries from a DSL backend that actively
+// selects from compartmentalized servers based on the client's inputs and
+// other properties.
+//
+// Clients use relays with obfuscation and blocking resistence properties to
+// transport requests to a DSL backend.
+//
+// The discovery concepts of OSLs are retained with the client reporting its
+// known OSL keys to the DSL backend, as a proof-of-knowledge used to access
+// certain compartments of servers.
+package dsl
+
+import (
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	"github.com/fxamacker/cbor/v2"
+)
+
+type OSLID []byte
+type OSLKey []byte
+type OSLFileSpec []byte
+
+// DiscoverServerEntriesRequest is a request from a client to potentially
+// discover new server entries. The DSL backend serving the request selects
+// discoverable server entries using a combination of inputs in
+// BaseAPIParameters; active OSL keys known to the client; and client GeoIP
+// data.
+//
+// The response contains a list of server entry tags and versions, and
+// the client will then proceed to request full server entries for unknown or
+// stale server entries, based on the tags and versions. DiscoverCount
+// specifies a maximum number of server entry tags/versions to return; the
+// DSL backend may return less, but not more.
+type DiscoverServerEntriesRequest struct {
+	BaseAPIParameters protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
+	OSLKeys           []OSLKey                     `cbor:"2,keyasint,omitempty"`
+	DiscoverCount     int32                        `cbor:"3,keyasint,omitempty"`
+}
+
+// ServerEntryTag is a binary representation of a protocol.ServerEntry.Tag
+// value. Hex- or base64-encoded tag strings should be converted to binary
+// for compactness.
+type ServerEntryTag []byte
+
+// VersionedServerEntryTag is a server entry tag and version pair.
+type VersionedServerEntryTag struct {
+	Tag     ServerEntryTag `cbor:"1,keyasint,omitempty"`
+	Version int32          `cbor:"2,keyasint,omitempty"`
+}
+
+// DiscoverServerEntriesResponse is the set of server entries revealed to the
+// client, specified as server entry tag and version pairs, which enable the
+// client to determine if it already has the server entry, and has the latest
+// version. For new or updated server entries, the client will proceed to
+// send a GetServerEntriesRequest to fetch the server entries.
+type DiscoverServerEntriesResponse struct {
+	VersionedServerEntryTags []VersionedServerEntryTag `cbor:"1,keyasint,omitempty"`
+}
+
+// GetServerEntriesRequest is a request from a client to download the
+// specified server entries.
+type GetServerEntriesRequest struct {
+	BaseAPIParameters protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
+	ServerEntryTags   []ServerEntryTag             `cbor:"2,keyasint,omitempty"`
+}
+
+// SourcedServerEntry is a server entry and server entry source pair. The
+// client stores the server entry source as protocol.ServerEntry.LocalSource,
+// which is used for server_entry_source stats reporting.
+type SourcedServerEntry struct {
+	ServerEntryFields protocol.PackedServerEntryFields `cbor:"1,keyasint,omitempty"`
+	Source            string                           `cbor:"2,keyasint,omitempty"`
+}
+
+// GetServerEntriesResponse includes the list of server entries requested by
+// the client. Each requested tag has a corresponding entry in
+// SourcedServerEntries. When a requested tag is no longer available for
+// distribution, there is a nil/empty entry.
+type GetServerEntriesResponse struct {
+	SourcedServerEntries []*SourcedServerEntry `cbor:"1,keyasint,omitempty"`
+}
+
+// GetActiveOSLsRequest is a request from a client to get the list of
+// currently active OSL IDs.
+//
+// Clients maintain local copies of the OSL FileSpec for each active OSL,
+// using SLOKs to reassemble the keys for the OSLs using the key split
+// definitions in each OSL FileSpec. These current OSL keys, reassembled by
+// the client, are then included in DiscoverServerEntriesRequest requests,
+// demonstrating that the client can decrypt the OSL in the classic scheme;
+// the DSL backend uses the keys as proof-of-knowledge to grant access to
+// compartmentalized server entries.
+//
+// For new and unknown OSL IDs, clients will use GetOSLFileSpecsRequest to
+// download the corresponding OSL FileSpecs.
+//
+// It is assumed that the number of OSL schemes and scheme pave counts
+// (see common/osl.Config) produces an OSL ID list size that is appropriate
+// to return in full in a single response.
+type GetActiveOSLsRequest struct {
+	BaseAPIParameters protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
+}
+
+// GetActiveOSLsResponse is a list of the currently active OSL IDs.
+type GetActiveOSLsResponse struct {
+	ActiveOSLIDs []OSLID `cbor:"1,keyasint,omitempty"`
+}
+
+// GetOSLFileSpecsRequest is a request from a client to download the
+// OSL FileSpecs for the OSLs specified by ID.
+type GetOSLFileSpecsRequest struct {
+	BaseAPIParameters protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
+	OSLIDs            []OSLID                      `cbor:"2,keyasint,omitempty"`
+}
+
+// GetOSLFileSpecsResponse includes the list of OSL FileSpecs requested by the
+// client. Each requested OSL ID has a corresponding entry in OSLFileSpecs.
+// When a requsted OSL is no longer active or available for distribution,
+// there is a nil/empty entry.
+//
+// Here, OSLFileSpec is a []byte, not an osl.FileSpec, as this value doesn't
+// need to be unmarshaled immediately in the fetcher processing.
+type GetOSLFileSpecsResponse struct {
+	OSLFileSpecs []OSLFileSpec `cbor:"1,keyasint,omitempty"`
+}
+
+// Relay API layer
+//
+// DSL clients send requests to the DSL backend via a relay, which provides
+// circumvention and blocking resistance. Relays include in-proxy brokers,
+// with untunneled domain fronting over a secure Noise session; and Psiphon
+// servers, via SSH requests within an established tunnel. The relays remove
+// the RelayedRequest layer and forward requests to the DSL backend over
+// HTTPS with mutually authenticated TLS; and wrap responses with
+// RelayedResponse.
+//
+// The trusted relays will attach the original client IP and GeoIP data to
+// relayed requests; these inputs may be used by the DSL backend when
+// selecting server entries that the client may discover.
+//
+// 1. client -> broker/psiphond relay
+//    CBOR[RelayedRequest(requestTypeDiscoverServerEntries, v1, CBOR[DiscoverServerEntriesRequest])]
+//
+// 2. broker/psiphond -> DSL
+//    POST /DiscoverServerEntries/v1 HTTP/1.1
+//    X-Psiphon-Client-IP: x.x.x.x
+//    CBOR[DiscoverServerEntriesRequest]
+//
+// 3. DSL -> broker/psiphond
+//    HTTP/1.1 200 OK
+//    CBOR[DiscoverServerEntriesResponse]
+//
+// 4. broker/psiphond -> client
+//    CBOR[RelayedResponse(ErrorCode, CBOR[DiscoverServerEntriesResponse])]
+//
+
+// MaxRelayPayloadSize is bounded by inproxy.BrokerMaxRequestBodySize,
+// 64K, and the common/crypto/ssh maxPacket, 256K.
+const MaxRelayPayloadSize = 65536
+
+const (
+	psiphonClientIPHeader        = "X-Psiphon-Client-Ip"
+	psiphonClientGeoIPDataHeader = "X-Psiphon-Client-Geoipdata"
+
+	requestVersion                   = 1
+	requestTypeDiscoverServerEntries = 1
+	requestTypeGetServerEntries      = 2
+	requestTypeGetActiveOSLs         = 3
+	requestTypeGetOSLFileSpecs       = 4
+)
+
+var requestTypeToHTTPPath = map[int32]string{
+	requestTypeDiscoverServerEntries: "/v1/DiscoverServerEntries",
+	requestTypeGetServerEntries:      "/v1/GetServerEntries",
+	requestTypeGetActiveOSLs:         "/v1/GetActiveOSLs",
+	requestTypeGetOSLFileSpecs:       "/v1/GetOSLFileSpecs",
+}
+
+// RelayedRequest wraps a DSL request to be relayed. RequestType indicates the
+// type of the wrapped request. Version must be 1.
+type RelayedRequest struct {
+	RequestType int32           `cbor:"1,keyasint,omitempty"`
+	Version     int32           `cbor:"2,keyasint,omitempty"`
+	Request     cbor.RawMessage `cbor:"3,keyasint,omitempty"`
+}
+
+// RelayedResponse wraps a DSL response value or error.
+type RelayedResponse struct {
+	Error    int32           `cbor:"1,keyasint,omitempty"`
+	Response cbor.RawMessage `cbor:"2,keyasint,omitempty"`
+}

+ 1207 - 0
psiphon/common/dsl/dsl_test.go

@@ -0,0 +1,1207 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package dsl
+
+import (
+	"bytes"
+	"context"
+	"crypto/rand"
+	"crypto/rsa"
+	"crypto/tls"
+	"crypto/x509"
+	"crypto/x509/pkix"
+	"encoding/base64"
+	"encoding/hex"
+	"encoding/json"
+	"encoding/pem"
+	"fmt"
+	"io"
+	"math/big"
+	"net"
+	"net/http"
+	"runtime/debug"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server"
+	"github.com/fxamacker/cbor/v2"
+)
+
+type testConfig struct {
+	name               string
+	alreadyDiscovered  bool
+	requireOSLKeys     bool
+	interruptDownloads bool
+	enableRetries      bool
+	repeatBeforeTTL    bool
+	isConnected        bool
+	expectFailure      bool
+}
+
+func TestDSLs(t *testing.T) {
+
+	tests := []*testConfig{
+		{
+			name: "undiscovered server entries",
+		},
+		{
+			name: "require OSL keys",
+
+			requireOSLKeys: true,
+		},
+		{
+			name: "interruptions without retry",
+
+			interruptDownloads: true,
+			expectFailure:      true,
+		},
+		{
+			name: "interruptions with retry",
+
+			interruptDownloads: true,
+			enableRetries:      true,
+		},
+		{
+			name: "require OSL keys with interruptions",
+
+			requireOSLKeys:     true,
+			interruptDownloads: true,
+			enableRetries:      true,
+		},
+		{
+			name: "repeat before TTL",
+
+			repeatBeforeTTL: true,
+		},
+		{
+			name: "previously discovered server entries",
+
+			alreadyDiscovered: true,
+		},
+		{
+			name: "first request is-connected",
+
+			isConnected: true,
+		},
+	}
+
+	for _, testConfig := range tests {
+		t.Run(testConfig.name, func(t *testing.T) {
+			err := testDSLs(testConfig)
+			if err != nil && !testConfig.expectFailure {
+				t.Fatal(err.Error())
+			}
+		})
+	}
+}
+
+var testClientIP = "192.168.0.1"
+var testClientGeoIPData = common.GeoIPData{"Country", "City", "ISP", "ASN", "ASO"}
+
+func testDSLs(testConfig *testConfig) error {
+
+	// Initialize OSLs
+
+	var backendOSLPaveData1 []*osl.PaveData
+	var backendOSLPaveData2 []*osl.PaveData
+	var clientSLOKs []*osl.SLOK
+	if testConfig.requireOSLKeys {
+		var err error
+		backendOSLPaveData1, backendOSLPaveData2, clientSLOKs, err = initializeOSLs()
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	// Initialize backend
+
+	tlsConfig, err := initializeTLSConfiguration()
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	backend, err := initializeDSLBackend(backendOSLPaveData1)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	err = backend.start(tlsConfig)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	defer backend.stop()
+
+	// Initialize relay
+
+	relayConfig := &RelayConfig{
+		Logger:                      newTestLoggerWithComponent("relay"),
+		CACertificates:              []*x509.Certificate{tlsConfig.CACertificate},
+		HostCertificate:             tlsConfig.relayCertificate,
+		DynamicServerListServiceURL: backend.getAddress(),
+	}
+
+	relay, err := NewRelay(relayConfig)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	// Initialize client fetcher
+
+	// Set transfer targets that will exercise various scenarios, including
+	// requiring request size backoff (e.g. see Fetcher.doGetServerEntriesRequest)
+	// to succeed.
+
+	if len(backend.serverEntries) != 128 {
+		return errors.TraceNew("unexpected server entry count")
+	}
+	discoverCount := 128
+	getCount := 64
+	oslCount := 1
+	interruptLimit := 0
+	if testConfig.interruptDownloads {
+		interruptLimit = 8192
+	}
+	retryCount := 0
+	if testConfig.enableRetries {
+		retryCount = 20
+	}
+	isConnected := testConfig.isConnected
+	if isConnected {
+		discoverCount = 1
+	}
+
+	dslClient := newDSLClient(clientSLOKs)
+
+	clientRelayRoundTripper := func(
+		ctx context.Context,
+		requestPayload []byte) ([]byte, error) {
+
+		// Normally, the Fetcher.RoundTripper would add a circumvention,
+		// blocking resistant first hop. For this test, it's just a stub that
+		// directly invokes the relay.
+
+		responsePayload := relay.HandleRequest(
+			ctx,
+			testClientIP,
+			testClientGeoIPData,
+			requestPayload)
+
+		// Simulate interruption of large response.
+		if interruptLimit > 0 && len(responsePayload) > interruptLimit {
+			return nil, errors.TraceNew("interrupted")
+		}
+
+		return responsePayload, nil
+	}
+
+	// TODO: exercise BaseAPIParameters?
+
+	fetcherConfig := &FetcherConfig{
+		Logger: newTestLoggerWithComponent("fetcher"),
+
+		RoundTripper: clientRelayRoundTripper,
+
+		DatastoreGetLastDiscoverTime:   dslClient.DatastoreGetLastDiscoverTime,
+		DatastoreSetLastDiscoverTime:   dslClient.DatastoreSetLastDiscoverTime,
+		DatastoreGetLastActiveOSLsTime: dslClient.DatastoreGetLastActiveOSLsTime,
+		DatastoreSetLastActiveOSLsTime: dslClient.DatastoreSetLastActiveOSLsTime,
+		DatastoreHasServerEntry:        dslClient.DatastoreHasServerEntry,
+		DatastoreStoreServerEntry:      dslClient.DatastoreStoreServerEntry,
+		DatastoreKnownOSLIDs:           dslClient.DatastoreKnownOSLIDs,
+		DatastoreGetOSLState:           dslClient.DatastoreGetOSLState,
+		DatastoreStoreOSLState:         dslClient.DatastoreStoreOSLState,
+		DatastoreDeleteOSLState:        dslClient.DatastoreDeleteOSLState,
+		DatastoreSLOKLookup:            dslClient.DatastoreSLOKLookup,
+
+		RequestTimeout:          1 * time.Second,
+		RequestRetryCount:       retryCount,
+		RequestRetryDelay:       1 * time.Millisecond,
+		RequestRetryDelayJitter: 0.1,
+
+		DiscoverServerEntriesTTL:      1 * time.Hour,
+		DiscoverServerEntriesMinCount: discoverCount,
+		DiscoverServerEntriesMaxCount: discoverCount,
+		GetServerEntriesMinCount:      getCount,
+		GetServerEntriesMaxCount:      getCount,
+		GetLastActiveOSLsTTL:          1 * time.Hour,
+		GetOSLFileSpecsMinCount:       oslCount,
+		GetOSLFileSpecsMaxCount:       oslCount,
+
+		DoGarbageCollection: debug.FreeOSMemory,
+	}
+
+	fetcher, err := NewFetcher(fetcherConfig)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	// Fetch server entries
+
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 60*time.Second)
+	defer cancelFunc()
+
+	err = fetcher.Run(ctx, isConnected)
+	if testConfig.expectFailure && err == nil {
+		err = errors.TraceNew("unexpected success")
+	}
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	if testConfig.repeatBeforeTTL {
+
+		// Invoke fetch again with before the last discover time TTL expires.
+		// The always-failing round tripper will be hit if an unexpected
+		// request is sent.
+
+		fetcherConfig.RoundTripper = func(
+			context.Context,
+			[]byte) ([]byte, error) {
+			return nil, errors.TraceNew("round trip not permitted")
+		}
+
+		err = fetcher.Run(ctx, isConnected)
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	if testConfig.alreadyDiscovered && testConfig.isConnected {
+		return errors.TraceNew("invalid test configuration")
+	}
+
+	if testConfig.alreadyDiscovered {
+
+		// Fetch again after resetting the last discover time TTL. A
+		// DiscoverServerEntries request will be sent, but all tags should be
+		// known, and no GetServerEntries requests should be sent or any
+		// server entries stores, as will be checked via
+		// dslClient.serverEntryStoreCount.
+
+		dslClient.lastDiscoverTime = time.Time{}
+		dslClient.lastActiveOSLsTime = time.Time{}
+
+		err = fetcher.Run(ctx, isConnected)
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	if testConfig.isConnected {
+
+		// If the first request was isConnected, only one server entry will
+		// have been fetched and the last discover time TTL should not be
+		// set. Do another full fetch, and the
+		// dslClient.serverEntryStoreCount check will demonstrate that all
+		// remaining server entries were downloaded and stored.
+
+		discoverCount = 128
+		isConnected = false
+
+		fetcherConfig.DiscoverServerEntriesMinCount = discoverCount
+		fetcherConfig.DiscoverServerEntriesMaxCount = discoverCount
+
+		err = fetcher.Run(ctx, isConnected)
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	// TODO: check "updated" and "known" counters in "DSL: fetched server
+	// entries" logs.
+
+	if dslClient.serverEntryStoreCount != len(backend.serverEntries) {
+		return errors.Tracef(
+			"unexpected server entry store count: %d", dslClient.serverEntryStoreCount)
+	}
+
+	if testConfig.requireOSLKeys {
+
+		// Rotate to the next OSL period and clear all server entries. The
+		// fetcher will download the new, unknown OSL and reassemble the key,
+		// or else no server entries will be downloaded. Check that the
+		// fetcher cleans up the old, no longer active OSL state via
+		// dslClient.deleteOSLStateCount.
+
+		dslClient.lastDiscoverTime = time.Time{}
+		dslClient.lastActiveOSLsTime = time.Time{}
+
+		dslClient.serverEntries = make(map[string]protocol.ServerEntryFields)
+
+		backend.oslPaveData = backendOSLPaveData2
+
+		err = fetcher.Run(ctx, isConnected)
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		if dslClient.serverEntryStoreCount != len(backend.serverEntries) {
+			return errors.Tracef(
+				"unexpected server entry store count: %d", dslClient.serverEntryStoreCount)
+		}
+
+		if dslClient.deleteOSLStateCount < 1 {
+			return errors.Tracef(
+				"unexpected delete OSL state count: %d", dslClient.deleteOSLStateCount)
+		}
+	}
+
+	return nil
+}
+
+type dslClient struct {
+	mutex                 sync.Mutex
+	lastDiscoverTime      time.Time
+	lastActiveOSLsTime    time.Time
+	serverEntries         map[string]protocol.ServerEntryFields
+	serverEntryStoreCount int
+	oslStates             map[string][]byte
+	deleteOSLStateCount   int
+	SLOKs                 []*osl.SLOK
+}
+
+func newDSLClient(SLOKs []*osl.SLOK) *dslClient {
+	return &dslClient{
+		serverEntries: make(map[string]protocol.ServerEntryFields),
+		oslStates:     make(map[string][]byte),
+		SLOKs:         SLOKs,
+	}
+}
+
+func (c *dslClient) DatastoreGetLastDiscoverTime() (time.Time, error) {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	return c.lastDiscoverTime, nil
+}
+
+func (c *dslClient) DatastoreSetLastDiscoverTime(time time.Time) error {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	c.lastDiscoverTime = time
+	return nil
+}
+
+func (c *dslClient) DatastoreGetLastActiveOSLsTime() (time.Time, error) {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	return c.lastActiveOSLsTime, nil
+}
+
+func (c *dslClient) DatastoreSetLastActiveOSLsTime(time time.Time) error {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	c.lastActiveOSLsTime = time
+	return nil
+}
+
+func (c *dslClient) DatastoreHasServerEntry(tag ServerEntryTag, version int) bool {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	_, ok := c.serverEntries[base64.StdEncoding.EncodeToString(tag)]
+	return ok
+}
+
+func (c *dslClient) DatastoreStoreServerEntry(
+	packedServerEntryFields protocol.PackedServerEntryFields, source string) error {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	c.serverEntryStoreCount += 1
+
+	serverEntryFields, err := protocol.DecodePackedServerEntryFields(packedServerEntryFields)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	serverEntryFields.SetLocalSource(source)
+	serverEntryFields.SetLocalTimestamp(
+		common.TruncateTimestampToHour(common.GetCurrentTimestamp()))
+
+	c.serverEntries[serverEntryFields.GetTag()] = serverEntryFields
+
+	return nil
+}
+
+func (c *dslClient) DatastoreKnownOSLIDs() ([]OSLID, error) {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	var IDs []OSLID
+	for IDStr, _ := range c.oslStates {
+		ID, _ := hex.DecodeString(IDStr)
+		IDs = append(IDs, ID)
+	}
+
+	return IDs, nil
+}
+
+func (c *dslClient) DatastoreGetOSLState(ID OSLID) ([]byte, bool, error) {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	state, ok := c.oslStates[hex.EncodeToString(ID)]
+	return state, ok, nil
+}
+
+func (c *dslClient) DatastoreStoreOSLState(ID OSLID, state []byte) error {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	c.oslStates[hex.EncodeToString(ID)] = state
+	return nil
+}
+
+func (c *dslClient) DatastoreDeleteOSLState(ID OSLID) error {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	c.deleteOSLStateCount += 1
+
+	delete(c.oslStates, hex.EncodeToString(ID))
+	return nil
+}
+
+func (c *dslClient) DatastoreSLOKLookup(SLOKID []byte) []byte {
+	c.mutex.Lock()
+	defer c.mutex.Unlock()
+
+	for _, slok := range c.SLOKs {
+		if bytes.Equal(slok.ID, SLOKID) {
+			return slok.Key
+		}
+	}
+
+	return nil
+}
+
+func (c *dslClient) DatastoreFatalError(err error) {
+	panic(err.Error())
+}
+
+// TODO: move dslBackend to an internal/testing package to share with an
+// eventual psiphon/server end-to-end test? Also move testLogger to
+// internal/testing to share with common/inproxy and other packages.
+
+type dslBackend struct {
+	serverEntries map[string]*SourcedServerEntry
+	oslPaveData   []*osl.PaveData
+	listener      net.Listener
+}
+
+func initializeDSLBackend(backendOSLPaveData []*osl.PaveData) (*dslBackend, error) {
+
+	backend := &dslBackend{
+		serverEntries: make(map[string]*SourcedServerEntry),
+		oslPaveData:   backendOSLPaveData,
+	}
+
+	// Run GenerateConfig concurrently to try to take advantage of multiple
+	// CPU cores.
+
+	var initMutex sync.Mutex
+	var initGroup sync.WaitGroup
+	var initErr error
+
+	for i := 1; i <= 128; i++ {
+
+		initGroup.Add(1)
+		go func(i int) (retErr error) {
+			defer initGroup.Done()
+			defer func() {
+				if retErr != nil {
+					initMutex.Lock()
+					initErr = retErr
+					initMutex.Unlock()
+				}
+			}()
+
+			_, _, _, _, encodedServerEntry, err := server.GenerateConfig(
+				&server.GenerateConfigParams{
+					ServerIPAddress:     fmt.Sprintf("192.0.2.%d", i),
+					TunnelProtocolPorts: map[string]int{"OSSH": 1},
+				})
+			if err != nil {
+				return errors.Trace(err)
+			}
+
+			serverEntryFields, err := protocol.DecodeServerEntryFields(
+				string(encodedServerEntry), "", "")
+			if err != nil {
+				return errors.Trace(err)
+			}
+
+			tag := serverEntryFields.GetTag()
+			if tag == "" {
+				return errors.TraceNew("unexpected tag")
+			}
+
+			packed, err := protocol.EncodePackedServerEntryFields(serverEntryFields)
+			if err != nil {
+				return errors.Trace(err)
+			}
+
+			source := fmt.Sprintf("compartment-%d", i)
+
+			initMutex.Lock()
+
+			if backend.serverEntries[tag] != nil {
+				initMutex.Unlock()
+				return errors.TraceNew("duplicate tag")
+			}
+
+			backend.serverEntries[tag] = &SourcedServerEntry{
+				ServerEntryFields: packed,
+				Source:            source,
+			}
+
+			initMutex.Unlock()
+
+			return nil
+		}(i)
+	}
+	initGroup.Wait()
+
+	if initErr != nil {
+		return nil, errors.Trace(initErr)
+	}
+
+	return backend, nil
+}
+
+func (b *dslBackend) start(tlsConfig *tlsConfig) error {
+
+	logger := newTestLoggerWithComponent("backend")
+
+	listener, err := net.Listen("tcp", "127.0.0.1:0")
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	certificatePool := x509.NewCertPool()
+	certificatePool.AddCert(tlsConfig.CACertificate)
+
+	listener = tls.NewListener(
+		listener,
+		&tls.Config{
+			Certificates: []tls.Certificate{*tlsConfig.backendCertificate},
+			ClientAuth:   tls.RequireAndVerifyClientCert,
+			ClientCAs:    certificatePool,
+		})
+
+	mux := http.NewServeMux()
+
+	handlerAdapter := func(
+		w http.ResponseWriter,
+		r *http.Request,
+		handler func([]byte) ([]byte, error)) (retErr error) {
+
+		defer func() {
+			if retErr != nil {
+				logger.WithTrace().Warning(fmt.Sprintf("handler failed: %s\n", retErr))
+				http.Error(w, err.Error(), http.StatusInternalServerError)
+			}
+		}()
+
+		clientIPHeader, ok := r.Header[psiphonClientIPHeader]
+		if !ok {
+			return errors.Tracef("missing header: psiphonClientIPHeader")
+		}
+		if len(clientIPHeader) != 1 || clientIPHeader[0] != testClientIP {
+			return errors.Tracef("invalid header: psiphonClientIPHeader")
+		}
+
+		clientGeoIPDataHeader, ok := r.Header[psiphonClientGeoIPDataHeader]
+		if !ok {
+			return errors.Tracef("missing header: psiphonClientGeoIPDataHeader")
+		}
+		var geoIPData common.GeoIPData
+		if len(clientGeoIPDataHeader) != 1 ||
+			json.Unmarshal([]byte(clientGeoIPDataHeader[0]), &geoIPData) != nil ||
+			geoIPData != testClientGeoIPData {
+			return errors.Tracef("invalid header: psiphonClientGeoIPDataHeader")
+		}
+
+		request, err := io.ReadAll(r.Body)
+		if err != nil {
+			return errors.Trace(err)
+		}
+		r.Body.Close()
+
+		response, err := handler(request)
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		_, err = w.Write(response)
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		return nil
+	}
+
+	mux.HandleFunc(requestTypeToHTTPPath[requestTypeDiscoverServerEntries],
+		func(w http.ResponseWriter, r *http.Request) {
+			_ = handlerAdapter(w, r, b.handleDiscoverServerEntries)
+		})
+	mux.HandleFunc(requestTypeToHTTPPath[requestTypeGetServerEntries],
+		func(w http.ResponseWriter, r *http.Request) {
+			_ = handlerAdapter(w, r, b.handleGetServerEntries)
+		})
+	mux.HandleFunc(requestTypeToHTTPPath[requestTypeGetActiveOSLs],
+		func(w http.ResponseWriter, r *http.Request) {
+			_ = handlerAdapter(w, r, b.handleGetActiveOSLs)
+		})
+	mux.HandleFunc(requestTypeToHTTPPath[requestTypeGetOSLFileSpecs],
+		func(w http.ResponseWriter, r *http.Request) {
+			_ = handlerAdapter(w, r, b.handleGetOSLFileSpecs)
+		})
+
+	server := &http.Server{
+		Handler: mux,
+	}
+
+	go func() {
+		_ = server.Serve(listener)
+	}()
+
+	b.listener = listener
+
+	return nil
+}
+
+func (b *dslBackend) getAddress() string {
+	if b.listener == nil {
+		return ""
+	}
+	return b.listener.Addr().String()
+}
+
+func (b *dslBackend) stop() {
+	if b.listener == nil {
+		return
+	}
+	_ = b.listener.Close()
+}
+
+func (b *dslBackend) handleDiscoverServerEntries(cborRequest []byte) ([]byte, error) {
+
+	var request *DiscoverServerEntriesRequest
+	err := cbor.Unmarshal(cborRequest, &request)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	response := &DiscoverServerEntriesResponse{}
+
+	missingOSLs := false
+	if b.oslPaveData != nil {
+
+		// When b.oslPaveData is set, the client must provide the expected OSL
+		// keys in order to discover any server entries.
+
+		for _, oslPaveData := range b.oslPaveData {
+			found := false
+			for _, key := range request.OSLKeys {
+				if bytes.Equal(key, oslPaveData.FileKey) {
+					found = true
+					break
+				}
+
+			}
+			if !found {
+				missingOSLs = true
+				break
+			}
+		}
+	}
+
+	if !missingOSLs {
+
+		count := 0
+		for tag, _ := range b.serverEntries {
+			if count >= int(request.DiscoverCount) {
+				break
+			}
+			count += 1
+
+			// Test server entry tags are base64-encoded random byte strings.
+			serverEntryTag, err := base64.StdEncoding.DecodeString(tag)
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
+
+			response.VersionedServerEntryTags = append(
+				response.VersionedServerEntryTags,
+				VersionedServerEntryTag{Tag: serverEntryTag, Version: 1})
+		}
+	}
+
+	cborResponse, err := protocol.CBOREncoding.Marshal(response)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return cborResponse, nil
+}
+
+func (b *dslBackend) handleGetServerEntries(cborRequest []byte) ([]byte, error) {
+
+	var request *GetServerEntriesRequest
+	err := cbor.Unmarshal(cborRequest, &request)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	response := &GetServerEntriesResponse{}
+
+	for _, serverEntryTag := range request.ServerEntryTags {
+
+		tag := base64.StdEncoding.EncodeToString(serverEntryTag)
+
+		sourcedServerEntry, ok := b.serverEntries[tag]
+		if !ok {
+
+			// An actual DSL backend must return empty slot in this case, as
+			// the requested server entry could be pruned or unavailable. For
+			// this test, this case is unexpected.
+
+			return nil, errors.TraceNew("unknown server entry tag")
+		}
+
+		response.SourcedServerEntries = append(
+			response.SourcedServerEntries, sourcedServerEntry)
+	}
+
+	cborResponse, err := protocol.CBOREncoding.Marshal(response)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return cborResponse, nil
+}
+
+func (b *dslBackend) handleGetActiveOSLs(cborRequest []byte) ([]byte, error) {
+
+	var request *GetActiveOSLsRequest
+	err := cbor.Unmarshal(cborRequest, &request)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	response := &GetActiveOSLsResponse{}
+	for _, oslPaveData := range b.oslPaveData {
+		response.ActiveOSLIDs = append(
+			response.ActiveOSLIDs,
+			oslPaveData.FileSpec.ID)
+	}
+
+	cborResponse, err := protocol.CBOREncoding.Marshal(response)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return cborResponse, nil
+}
+
+func (b *dslBackend) handleGetOSLFileSpecs(cborRequest []byte) ([]byte, error) {
+
+	var request *GetOSLFileSpecsRequest
+	err := cbor.Unmarshal(cborRequest, &request)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	response := &GetOSLFileSpecsResponse{}
+
+	for _, oslID := range request.OSLIDs {
+
+		var matchingPaveData *osl.PaveData
+		for _, oslPaveData := range b.oslPaveData {
+			if bytes.Equal(oslID, oslPaveData.FileSpec.ID) {
+				matchingPaveData = oslPaveData
+				break
+			}
+
+		}
+		if matchingPaveData == nil {
+
+			// An actual DSL backend must return empty slot in this case, as
+			// the requested OSL may no longer be active. For this test, this
+			// case is unexpected.
+
+			return nil, errors.TraceNew("unknown server entry tag")
+		}
+
+		cborOSLFileSpec, err := protocol.CBOREncoding.Marshal(matchingPaveData.FileSpec)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		response.OSLFileSpecs = append(
+			response.OSLFileSpecs, cborOSLFileSpec)
+	}
+
+	cborResponse, err := protocol.CBOREncoding.Marshal(response)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return cborResponse, nil
+}
+
+func initializeOSLs() ([]*osl.PaveData, []*osl.PaveData, []*osl.SLOK, error) {
+
+	// Adapted from testObfuscatedRemoteServerLists in psiphon/remoteServerList_test.go
+
+	oslConfigJSONTemplate := `
+    {
+      "Schemes" : [
+        {
+          "Epoch" : "%s",
+          "PaveDataOSLCount" : 2,
+          "Regions" : [],
+          "PropagationChannelIDs" : ["%s"],
+          "MasterKey" : "vwab2WY3eNyMBpyFVPtsivMxF4MOpNHM/T7rHJIXctg=",
+          "SeedSpecs" : [
+            {
+              "ID" : "KuP2V6gLcROIFzb/27fUVu4SxtEfm2omUoISlrWv1mA=",
+              "UpstreamSubnets" : ["0.0.0.0/0"],
+              "Targets" :
+              {
+                  "BytesRead" : 1,
+                  "BytesWritten" : 1,
+                  "PortForwardDurationNanoseconds" : 1
+              }
+            }
+          ],
+          "SeedSpecThreshold" : 1,
+          "SeedPeriodNanoseconds" : %d,
+          "SeedPeriodKeySplits": [
+            {
+              "Total": 1,
+              "Threshold": 1
+            }
+          ]
+        }
+      ]
+    }`
+
+	now := time.Now().UTC()
+	seedPeriod := 1 * time.Second
+	epoch := now.Truncate(seedPeriod)
+	epochStr := epoch.Format(time.RFC3339Nano)
+
+	propagationChannelID := prng.HexString(8)
+
+	oslConfigJSON := fmt.Sprintf(
+		oslConfigJSONTemplate,
+		epochStr,
+		propagationChannelID,
+		seedPeriod)
+
+	oslConfig, err := osl.LoadConfig([]byte(oslConfigJSON))
+	if err != nil {
+		return nil, nil, nil, errors.Trace(err)
+	}
+
+	oslPaveData, err := oslConfig.GetPaveData(0)
+	if err != nil {
+		return nil, nil, nil, errors.Trace(err)
+	}
+
+	backendPaveData1, ok := oslPaveData[propagationChannelID]
+	if !ok {
+		return nil, nil, nil, errors.TraceNew("unexpected missing OSL file data")
+	}
+
+	// Mock seeding SLOKs
+	//
+	// Normally, clients supplying the specified propagation channel ID would
+	// receive SLOKs via the psiphond tunnel connection
+
+	seedState := oslConfig.NewClientSeedState("", propagationChannelID, nil)
+	seedPortForward := seedState.NewClientSeedPortForward(net.ParseIP("0.0.0.0"), nil)
+	seedPortForward.UpdateProgress(1, 1, 1)
+	payload := seedState.GetSeedPayload()
+	if len(payload.SLOKs) != 1 {
+		return nil, nil, nil, errors.Tracef("unexpected SLOK count %d", len(payload.SLOKs))
+	}
+	clientSLOKs := payload.SLOKs
+
+	// Rollover to the next OSL time period and generate a new set of active
+	// OSLs and SLOKs.
+
+	time.Sleep(2 * seedPeriod)
+
+	oslPaveData, err = oslConfig.GetPaveData(0)
+	if err != nil {
+		return nil, nil, nil, errors.Trace(err)
+	}
+
+	backendPaveData2, ok := oslPaveData[propagationChannelID]
+	if !ok {
+		return nil, nil, nil, errors.TraceNew("unexpected missing OSL file data")
+	}
+
+	seedState = oslConfig.NewClientSeedState("", propagationChannelID, nil)
+	seedPortForward = seedState.NewClientSeedPortForward(net.ParseIP("0.0.0.0"), nil)
+	seedPortForward.UpdateProgress(1, 1, 1)
+	payload = seedState.GetSeedPayload()
+	if len(payload.SLOKs) != 1 {
+		return nil, nil, nil, errors.Tracef("unexpected SLOK count %d", len(payload.SLOKs))
+	}
+	clientSLOKs = append(clientSLOKs, payload.SLOKs...)
+
+	// Double check that PaveData periods don't overlap.
+	for _, paveData1 := range backendPaveData1 {
+		for _, paveData2 := range backendPaveData2 {
+			if bytes.Equal(paveData1.FileSpec.ID, paveData2.FileSpec.ID) {
+				return nil, nil, nil, errors.TraceNew("unexpected pave data overlap")
+			}
+		}
+	}
+
+	return backendPaveData1, backendPaveData2, clientSLOKs, nil
+}
+
+type tlsConfig struct {
+	CACertificate      *x509.Certificate
+	backendCertificate *tls.Certificate
+	relayCertificate   *tls.Certificate
+}
+
+func initializeTLSConfiguration() (*tlsConfig, error) {
+
+	CAPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	now := time.Now()
+	template := &x509.Certificate{
+		SerialNumber: big.NewInt(1),
+		Subject: pkix.Name{
+			Organization: []string{"test root CA"},
+		},
+		NotBefore:             now,
+		NotAfter:              now.AddDate(0, 0, 1),
+		IsCA:                  true,
+		BasicConstraintsValid: true,
+		KeyUsage:              x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature,
+	}
+
+	CACertificateDER, err := x509.CreateCertificate(
+		rand.Reader, template, template, &CAPrivateKey.PublicKey, CAPrivateKey)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	CACertificate, err := x509.ParseCertificate(CACertificateDER)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	issueCertificate := func(
+		name string, isServer bool) (*tls.Certificate, error) {
+
+		privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		now := time.Now()
+		template := &x509.Certificate{
+			SerialNumber: big.NewInt(time.Now().UnixNano()),
+			Subject: pkix.Name{
+				CommonName: name,
+			},
+			NotBefore: now,
+			NotAfter:  now.AddDate(0, 0, 1),
+			KeyUsage:  x509.KeyUsageDigitalSignature,
+		}
+		if isServer {
+			template.IPAddresses = []net.IP{net.ParseIP("127.0.0.1")}
+			template.ExtKeyUsage = []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}
+		} else {
+			template.ExtKeyUsage = []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}
+		}
+
+		certificateDER, err := x509.CreateCertificate(
+			rand.Reader, template, CACertificate, &privateKey.PublicKey, CAPrivateKey)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		keyPEM := pem.EncodeToMemory(
+			&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)})
+
+		certPEM := pem.EncodeToMemory(
+			&pem.Block{Type: "CERTIFICATE", Bytes: certificateDER})
+
+		tlsCertificate, err := tls.X509KeyPair(certPEM, keyPEM)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		return &tlsCertificate, nil
+	}
+
+	backendCertificate, err := issueCertificate("backend", true)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	relayCertificate, err := issueCertificate("relay", false)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return &tlsConfig{
+		CACertificate:      CACertificate,
+		backendCertificate: backendCertificate,
+		relayCertificate:   relayCertificate,
+	}, nil
+}
+
+type testLogger struct {
+	component     string
+	logLevelDebug int32
+}
+
+func newTestLogger() *testLogger {
+	return &testLogger{
+		logLevelDebug: 0,
+	}
+}
+
+func newTestLoggerWithComponent(component string) *testLogger {
+	return &testLogger{
+		component:     component,
+		logLevelDebug: 0,
+	}
+}
+
+func (logger *testLogger) WithTrace() common.LogTrace {
+	return &testLoggerTrace{
+		logger: logger,
+		trace:  stacktrace.GetParentFunctionName(),
+	}
+}
+
+func (logger *testLogger) WithTraceFields(fields common.LogFields) common.LogTrace {
+	return &testLoggerTrace{
+		logger: logger,
+		trace:  stacktrace.GetParentFunctionName(),
+		fields: fields,
+	}
+}
+
+func (logger *testLogger) LogMetric(metric string, fields common.LogFields) {
+	jsonFields, _ := json.Marshal(fields)
+	var component string
+	if len(logger.component) > 0 {
+		component = fmt.Sprintf("[%s]", logger.component)
+	}
+	fmt.Printf(
+		"[%s]%s METRIC: %s: %s\n",
+		time.Now().UTC().Format(time.RFC3339),
+		component,
+		metric,
+		string(jsonFields))
+}
+
+func (logger *testLogger) IsLogLevelDebug() bool {
+	return atomic.LoadInt32(&logger.logLevelDebug) == 1
+}
+
+func (logger *testLogger) SetLogLevelDebug(logLevelDebug bool) {
+	value := int32(0)
+	if logLevelDebug {
+		value = 1
+	}
+	atomic.StoreInt32(&logger.logLevelDebug, value)
+}
+
+type testLoggerTrace struct {
+	logger *testLogger
+	trace  string
+	fields common.LogFields
+}
+
+func (logger *testLoggerTrace) log(priority, message string) {
+	now := time.Now().UTC().Format(time.RFC3339)
+	var component string
+	if len(logger.logger.component) > 0 {
+		component = fmt.Sprintf("[%s]", logger.logger.component)
+	}
+	if len(logger.fields) == 0 {
+		fmt.Printf(
+			"[%s]%s %s: %s: %s\n",
+			now, component, priority, logger.trace, message)
+	} else {
+		fields := common.LogFields{}
+		for k, v := range logger.fields {
+			switch v := v.(type) {
+			case error:
+				// Workaround for Go issue 5161: error types marshal to "{}"
+				fields[k] = v.Error()
+			default:
+				fields[k] = v
+			}
+		}
+		jsonFields, _ := json.Marshal(fields)
+		fmt.Printf(
+			"[%s]%s %s: %s: %s %s\n",
+			now, component, priority, logger.trace, message, string(jsonFields))
+	}
+}
+
+func (logger *testLoggerTrace) Debug(args ...interface{}) {
+	if !logger.logger.IsLogLevelDebug() {
+		return
+	}
+	logger.log("DEBUG", fmt.Sprint(args...))
+}
+
+func (logger *testLoggerTrace) Info(args ...interface{}) {
+	logger.log("INFO", fmt.Sprint(args...))
+}
+
+func (logger *testLoggerTrace) Warning(args ...interface{}) {
+	logger.log("WARNING", fmt.Sprint(args...))
+}
+
+func (logger *testLoggerTrace) Error(args ...interface{}) {
+	logger.log("ERROR", fmt.Sprint(args...))
+}

+ 872 - 0
psiphon/common/dsl/fetcher.go

@@ -0,0 +1,872 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package dsl
+
+import (
+	"bytes"
+	"context"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	"github.com/fxamacker/cbor/v2"
+)
+
+// FetcherRoundTripper is a pluggable round trip transport that sends requests
+// to a relay and returns the corresponding response. The FetcherRoundTripper
+// connection to a relay typically provides obfuscation and blocking
+// resistance, enabling the client to reach the DSL backend via the relay.
+//
+// Round trippers include in-proxy broker clients, where the broker is a
+// relay; and SSH tunnel requests, where the Psiphon server is the relay.
+type FetcherRoundTripper func(
+	ctx context.Context,
+	requestPayload []byte) (responsePayload []byte, err error)
+
+// FetcherConfig specifies the configuration for a Fetcher.
+type FetcherConfig struct {
+	Logger common.Logger
+
+	BaseAPIParameters common.APIParameters
+
+	RoundTripper FetcherRoundTripper
+
+	DatastoreGetLastDiscoverTime func() (time.Time, error)
+	DatastoreSetLastDiscoverTime func(time time.Time) error
+	DatastoreHasServerEntry      func(tag ServerEntryTag, version int) bool
+	DatastoreStoreServerEntry    func(
+		serverEntryFields protocol.PackedServerEntryFields,
+		source string) error
+
+	DatastoreGetLastActiveOSLsTime func() (time.Time, error)
+	DatastoreSetLastActiveOSLsTime func(time time.Time) error
+	DatastoreKnownOSLIDs           func() (IDs []OSLID, err error)
+	DatastoreGetOSLState           func(ID OSLID) (state []byte, notFound bool, err error)
+	DatastoreStoreOSLState         func(ID OSLID, state []byte) error
+	DatastoreDeleteOSLState        func(ID OSLID) error
+	DatastoreSLOKLookup            osl.SLOKLookup
+	DatastoreFatalError            func(error)
+
+	RequestTimeout          time.Duration
+	RequestRetryCount       int
+	RequestRetryDelay       time.Duration
+	RequestRetryDelayJitter float64
+
+	DiscoverServerEntriesTTL      time.Duration
+	DiscoverServerEntriesMinCount int
+	DiscoverServerEntriesMaxCount int
+	GetServerEntriesMinCount      int
+	GetServerEntriesMaxCount      int
+	GetLastActiveOSLsTTL          time.Duration
+	GetOSLFileSpecsMinCount       int
+	GetOSLFileSpecsMaxCount       int
+
+	DoGarbageCollection func()
+}
+
+const (
+	oslStateNoFileSpec  = 1
+	oslStateHasFileSpec = 2
+	oslStateHasKey      = 3
+)
+
+// fetcherOSLState is OSL state that's persisted to the datastore. For each
+// active OSL, the Fetcher will progressively download and persist the
+// corresponding FileSpec, and then attempt to reassemble the OSL key using
+// the FileSpec, persist any reassembled keys, and ultimately prune old OSL
+// state.
+type fetcherOSLState struct {
+	ID       OSLID       `cbor:"1,keyasint,omitempty"`
+	State    int32       `cbor:"2,keyasint,omitempty"`
+	FileSpec OSLFileSpec `cbor:"3,keyasint,omitempty"`
+	Key      OSLKey      `cbor:"4,keyasint,omitempty"`
+}
+
+// Fetcher orchestrates discovering and downloading server entries from a DSL
+// backend, via a relay. A Fetcher also synchronizes active OSL state and
+// reassembles OSL keys to be used as discovery inputs.
+type Fetcher struct {
+	config              *FetcherConfig
+	packedAPIParameters protocol.PackedAPIParameters
+}
+
+// NewFetcher creates a new Fetcher.
+func NewFetcher(config *FetcherConfig) (*Fetcher, error) {
+
+	packedAPIParameters, err := protocol.EncodePackedAPIParameters(
+		config.BaseAPIParameters)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return &Fetcher{
+		config:              config,
+		packedAPIParameters: packedAPIParameters,
+	}, nil
+}
+
+// Run performs a server entry discovery/download and OSL synchronization
+// sequence.
+//
+// Run supports two modes:
+//   - Frequent, intended for fetching via an established SSH tunnel, and
+//     discovering only a small number of servers. Frequent fetches can be
+//     repeated often.
+//   - Non-frequent, intended for fetching via an untunneled relay, and invoked
+//     after a client is unable to connect with its known servers. This fetch
+//     mode is intended for discovering a larger number of servers, and is
+//     subject to the DiscoverServerEntriesTTL, which skips repeated runs.
+//
+// Each Run may make incremental progress. New OSL state or new server entries
+// may be downloaded and persisted even when Run ultimately fails and returns
+// an error.
+//
+// Run will stop and return immediately when the input ctx is done.
+//
+// Data is processed incrementally and DoGarbageCollection is invoked
+// periodically in order to limit the overall memory footprint of the Run.
+//
+// The caller MUST:
+//
+//   - Schedule Fetcher runs when appropriate: when the client is unable to
+//     connect, a full, non-frequent, untunneled fetcher run should be
+//     triggered to potentially discover a large selection of servers; when the
+//     client connects, a frequent fetcher run should be triggered to discover
+//     a small number of servers.
+//
+//   - Configure DiscoverServerEntriesMin/MaxCount using appropriate parameters
+//     for the frequence/non-frequent mode.
+//
+//   - Provide a cooldown time or delay between repeated Run calls when Run
+//     returns an error.
+//
+//   - Cease all Run invocations if the DatastoreFatalError callback is invoked.
+//     In this case, a set-last-time datastore operation required for the
+//     DiscoverServerEntriesTTL/GetLastActiveOSLsTTL mechanism has failed. The
+//     calling client should not invoke Run again until after a Stop/Start
+//     cycle.
+//
+//   - Ensure that there's only one concurrent fetcher run. The datastore
+//     operations are intended for incremental, persistent progress and
+//     multiple concurrent runs may interleave conflicting datastore calls.
+//     This requirement means that if there's an ongoing untunneled fetcher run
+//     and a tunnel is established, any post-connected, frequent fetcher run
+//     must be skipped or postponed.
+func (f *Fetcher) Run(ctx context.Context, isFrequent bool) error {
+
+	if !isFrequent {
+
+		lastTime, err := f.config.DatastoreGetLastDiscoverTime()
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		if time.Now().Before(lastTime.Add(f.config.DiscoverServerEntriesTTL)) {
+			return nil
+		}
+	}
+
+	// processOSLs will:
+	//
+	// - check for new active OSLs, subject to GetLastActiveOSLsTTL
+	// - download any OSL FileSpecs for known, active OSL IDs
+	// - attempt to reassemble OSL keys for any unassembled OSLs
+	// - return the list of assembled, active OSL keys
+
+	OSLKeys, oslErr := f.processOSLs(ctx)
+	if oslErr != nil {
+		f.config.Logger.WithTraceFields(common.LogFields{
+			"error": oslErr.Error(),
+		}).Warning("DSL: process OSLs failed")
+		// Proceed without OSL keys
+	}
+
+	// Discover server entries, identified by tag.
+
+	// Vary the size of the requested response to avoid a trivial traffic
+	// fingerprint.
+	discoverCount := prng.Range(
+		f.config.DiscoverServerEntriesMinCount,
+		f.config.DiscoverServerEntriesMaxCount)
+
+	versionedTags, err := f.doDiscoverServerEntriesRequest(
+		ctx,
+		OSLKeys,
+		discoverCount)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	// Check each discovered server entry tag and version. Skip when the
+	// tag/version is already in the local datastore. Fetch the unknown or
+	// updated server entries in batches.
+	//
+	// Datastore transactions are per server entry, to allow for incremental
+	// progress in case of an error.
+
+	storeServerEntriesCount := 0
+	knownServerEntriesCount := 0
+	defer func() {
+		// Emit log even if not all fetches succeed.
+		f.config.Logger.WithTraceFields(common.LogFields{
+			"tags":    len(versionedTags),
+			"updated": storeServerEntriesCount,
+			"known":   knownServerEntriesCount,
+		}).Info("DSL: fetched server entries")
+	}()
+
+	var getTags []ServerEntryTag
+	for _, v := range versionedTags {
+		if f.config.DatastoreHasServerEntry(v.Tag, int(v.Version)) {
+			knownServerEntriesCount += 1
+			continue
+		}
+		getTags = append(getTags, v.Tag)
+	}
+
+	for len(getTags) > 0 {
+
+		// Vary the size of the request and response.
+		getCount := prng.Range(
+			f.config.GetServerEntriesMinCount,
+			f.config.GetServerEntriesMaxCount)
+
+		getBatch := getTags
+		if len(getBatch) > getCount {
+			getBatch = getBatch[:getCount]
+		}
+
+		sourcedServerEntries, err := f.doGetServerEntriesRequest(ctx, getBatch)
+		if err != nil {
+			return errors.Trace(err)
+		}
+
+		for _, sourcedEntry := range sourcedServerEntries {
+
+			if sourcedEntry == nil {
+				// The requested server entry is no longer distributable or
+				// doesn't exist.
+				continue
+			}
+
+			err := f.config.DatastoreStoreServerEntry(
+				sourcedEntry.ServerEntryFields,
+				sourcedEntry.Source)
+			if err != nil {
+				return errors.Trace(err)
+			}
+			storeServerEntriesCount += 1
+		}
+
+		// doGetServerEntriesRequest will retry failed requests and reduces
+		// the number of requested server entries in each retry. Adjust
+		// getTags in case less than the initial getBatch were fetched.
+		// Unfetched server entries will be added to the next batch.
+
+		getTags = getTags[len(sourcedServerEntries):]
+
+		f.config.DoGarbageCollection()
+	}
+
+	if !isFrequent {
+		err = f.config.DatastoreSetLastDiscoverTime(time.Now())
+		if err != nil {
+			err = errors.Trace(err)
+
+			// Signal a fatal datastore error. The caller should not run any
+			// Fetcher again, for the duration of its process, since the
+			// LastDiscoverTime mechanism won't prevent excess repeats.
+
+			f.config.DatastoreFatalError(err)
+			f.config.Logger.WithTraceFields(common.LogFields{
+				"error": err.Error(),
+			}).Warning("DSL: datastore failed")
+			// Proceed with this one run
+		}
+	}
+
+	if oslErr != nil {
+		return errors.Trace(oslErr)
+	}
+
+	return nil
+}
+
+func (f *Fetcher) processOSLs(ctx context.Context) ([]OSLKey, error) {
+
+	lastTime, err := f.config.DatastoreGetLastActiveOSLsTime()
+	if err != nil {
+		// TODO: proceed, but skip GetActiveOSLsRequest?
+		return nil, errors.Trace(err)
+	}
+
+	now := time.Now()
+
+	if now.After(lastTime.Add(f.config.GetLastActiveOSLsTTL)) {
+
+		// When the last GetActiveOSLsRequest fetch expires, request the
+		// current active OSLs again. Prune any locally stored OSL states for
+		// OSLs that are no longer active. Add new OSL states for previously
+		// unknown OSLs. These new OSLs states will trigger OSL FileSpec
+		// fetches in the next step.
+
+		// The size of the request and response is not varied in this case. In
+		// practise, the number of active OSL IDs is expected to be
+		// relatively small. The obfuscation hops to the relay should add a
+		// small amount of random padding.
+		activeOSLIDs, err := f.doGetActiveOSLsRequest(ctx)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		// Load known OSL states without attempting to reassemble OSL keys.
+
+		knownOSLStates, err := f.loadOSLStates(ctx, false)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		addedCount := 0
+		removedCount := 0
+
+		for _, activeID := range activeOSLIDs {
+			isKnown := false
+			for _, knownState := range knownOSLStates {
+				if bytes.Equal(activeID, knownState.ID) {
+					isKnown = true
+					break
+				}
+			}
+			if !isKnown {
+				err := f.storeOSLState(
+					activeID,
+					&fetcherOSLState{
+						ID:    activeID,
+						State: oslStateNoFileSpec,
+					})
+				if err != nil {
+					return nil, errors.Trace(err)
+				}
+				addedCount += 1
+			}
+		}
+
+		for _, knownState := range knownOSLStates {
+			isActive := false
+			for _, activeID := range activeOSLIDs {
+				if bytes.Equal(activeID, knownState.ID) {
+					isActive = true
+					break
+				}
+			}
+			if !isActive {
+				err := f.config.DatastoreDeleteOSLState(knownState.ID)
+				if err != nil {
+					return nil, errors.Trace(err)
+				}
+				removedCount += 1
+			}
+		}
+
+		f.config.DoGarbageCollection()
+
+		f.config.Logger.WithTraceFields(common.LogFields{
+			"total":   len(activeOSLIDs),
+			"added":   addedCount,
+			"removed": removedCount,
+		}).Info("DSL: fetched active OSL IDs")
+
+		err = f.config.DatastoreSetLastActiveOSLsTime(now)
+		if err != nil {
+			err = errors.Trace(err)
+
+			// Signal a fatal datastore error. The caller should not run any
+			// Fetcher again, for the duration of its process, since the
+			// LastActiveOSLsTime mechanism won't prevent excess repeats.
+
+			f.config.DatastoreFatalError(errors.Trace(err))
+
+			f.config.Logger.WithTraceFields(common.LogFields{
+				"error": err.Error(),
+			}).Warning("DSL: datastore failed")
+			// Proceed with this one run
+		}
+	}
+
+	// Load known OSL states, attempting to reassemble OSL keys. Any newly
+	// assembled keys will be stored back to the datastore, caching the
+	// assembly. For OSLs in the no-FileSpec state, the missing FileSpecs
+	// will be fetched.
+
+	knownOSLStates, err := f.loadOSLStates(ctx, true)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	addedSpecCount := 0
+	removedSpecCount := 0
+	defer func() {
+		// Emit log even if not all fetches succeed.
+		if addedSpecCount > 0 || removedSpecCount > 0 {
+			f.config.Logger.WithTraceFields(common.LogFields{
+				"added":   addedSpecCount,
+				"removed": removedSpecCount,
+			}).Info("DSL: fetched OSL FileSpecs")
+		}
+	}()
+
+	var getFileSpecs []OSLID
+	for _, knownState := range knownOSLStates {
+		if knownState.State == oslStateHasFileSpec ||
+			knownState.State == oslStateHasKey {
+			continue
+		}
+		getFileSpecs = append(getFileSpecs, knownState.ID)
+	}
+
+	for len(getFileSpecs) > 0 {
+
+		// Vary the size of the request and response.
+		getCount := prng.Range(
+			f.config.GetOSLFileSpecsMinCount,
+			f.config.GetOSLFileSpecsMaxCount)
+
+		getBatch := getFileSpecs
+		if len(getBatch) > getCount {
+			getBatch = getBatch[:getCount]
+		}
+
+		fileSpecs, err := f.doGetOSLFileSpecsRequest(ctx, getBatch)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		for i, fileSpec := range fileSpecs {
+
+			if len(fileSpec) > 0 {
+
+				err := f.storeOSLState(
+					getFileSpecs[i],
+					&fetcherOSLState{
+						ID:       getFileSpecs[i],
+						State:    oslStateHasFileSpec,
+						FileSpec: fileSpec})
+				if err != nil {
+					return nil, errors.Trace(err)
+				}
+				addedSpecCount += 1
+
+			} else {
+
+				// A nil/empty FileSpec in the response indicates that the
+				// requested OSL ID is invalid or no longer active. Prune the OSL state.
+				err := f.config.DatastoreDeleteOSLState(getBatch[i])
+				if err != nil {
+					return nil, errors.Trace(err)
+				}
+				removedSpecCount += 1
+			}
+		}
+
+		// doGetOSLFileSpecsRequest will retry failed requests and reduces
+		// the number of requested OSL FileSpecs in each retry. Adjust
+		// getFileSpecs in case less than the initial getBatch were fetched.
+		// Unfetched FileSpecs will be added to the next batch.
+
+		getFileSpecs = getFileSpecs[len(fileSpecs):]
+
+		f.config.DoGarbageCollection()
+	}
+
+	if addedSpecCount > 0 || removedSpecCount > 0 {
+
+		// Repeat attempting to reassemble OSL keys, since new FileSpecs were
+		// downloaded. This case also prunes any now-removed OSLs so their keys
+		// will not be included in the return value.
+
+		knownOSLStates, err = f.loadOSLStates(ctx, true)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+	}
+
+	var keys []OSLKey
+	for _, knownState := range knownOSLStates {
+		if knownState.State == oslStateHasKey {
+			keys = append(keys, knownState.Key)
+		}
+	}
+
+	return keys, nil
+}
+
+func (f *Fetcher) doDiscoverServerEntriesRequest(
+	ctx context.Context,
+	keys []OSLKey,
+	discoverCount int) ([]VersionedServerEntryTag, error) {
+
+	// Perform the request with retries. On each retry, reduce the requested
+	// response size to mitigate blocking or performance issues with larger
+	// responses.
+
+	for i := 0; ; i++ {
+
+		// All known OSL keys are sent in the request. In practise, the number
+		// of active OSL IDs is expected to be relatively small.
+
+		request := &DiscoverServerEntriesRequest{
+			BaseAPIParameters: f.packedAPIParameters,
+			OSLKeys:           keys,
+			DiscoverCount:     int32(discoverCount),
+		}
+
+		var response *DiscoverServerEntriesResponse
+		doRetry, err := f.doRelayedRequest(
+			ctx, requestTypeDiscoverServerEntries, request, &response)
+
+		if err == nil {
+			return response.VersionedServerEntryTags, nil
+		}
+
+		if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
+			return nil, errors.Trace(err)
+		}
+
+		f.config.Logger.WithTraceFields(common.LogFields{
+			"discoverCount": discoverCount,
+			"error":         err.Error(),
+		}).Warning("DSL: doDiscoverServerEntriesRequest failed")
+
+		common.SleepWithContext(
+			ctx,
+			prng.JitterDuration(
+				f.config.RequestRetryDelay,
+				f.config.RequestRetryDelayJitter))
+
+		if discoverCount > 1 {
+			discoverCount /= 2
+		}
+	}
+}
+
+func (f *Fetcher) doGetServerEntriesRequest(
+	ctx context.Context,
+	tags []ServerEntryTag) ([]*SourcedServerEntry, error) {
+
+	// Perform the request with retries. On each retry, reduce the requested
+	// response size to mitigate blocking or performance issues with larger
+	// responses.
+
+	for i := 0; ; i++ {
+
+		request := &GetServerEntriesRequest{
+			BaseAPIParameters: f.packedAPIParameters,
+			ServerEntryTags:   tags,
+		}
+
+		var response *GetServerEntriesResponse
+		doRetry, err := f.doRelayedRequest(
+			ctx, requestTypeGetServerEntries, request, &response)
+
+		if err == nil && len(tags) != len(response.SourcedServerEntries) {
+			err = errors.TraceNew("unexpected server entry count")
+		}
+
+		if err == nil {
+			return response.SourcedServerEntries, nil
+		}
+
+		if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
+			return nil, errors.Trace(err)
+		}
+
+		f.config.Logger.WithTraceFields(common.LogFields{
+			"attempt":  i,
+			"tagCount": len(tags),
+			"error":    err.Error(),
+		}).Warning("DSL: doGetServerEntriesRequest attempt failed")
+
+		common.SleepWithContext(
+			ctx,
+			prng.JitterDuration(
+				f.config.RequestRetryDelay,
+				f.config.RequestRetryDelayJitter))
+
+		if len(tags) > 1 {
+			n := len(tags) / 2
+			tags = tags[:n]
+		}
+	}
+}
+
+func (f *Fetcher) doGetActiveOSLsRequest(ctx context.Context) ([]OSLID, error) {
+
+	// Perform the request with retries. The response always includes all
+	// current, active OSL IDs and is not reduced on retry.
+
+	for i := 0; ; i++ {
+
+		request := &GetActiveOSLsRequest{
+			BaseAPIParameters: f.packedAPIParameters,
+		}
+
+		var response *GetActiveOSLsResponse
+		doRetry, err := f.doRelayedRequest(
+			ctx, requestTypeGetActiveOSLs, request, &response)
+		if err == nil {
+			return response.ActiveOSLIDs, nil
+		}
+
+		if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
+			return nil, errors.Trace(err)
+		}
+
+		f.config.Logger.WithTraceFields(common.LogFields{
+			"attempt": i,
+			"error":   err.Error(),
+		}).Warning("DSL: doGetActiveOSLsRequest attempt failed")
+
+		common.SleepWithContext(
+			ctx,
+			prng.JitterDuration(
+				f.config.RequestRetryDelay,
+				f.config.RequestRetryDelayJitter))
+	}
+}
+
+func (f *Fetcher) doGetOSLFileSpecsRequest(
+	ctx context.Context, IDs []OSLID) ([]OSLFileSpec, error) {
+
+	// Perform the request with retries. On each retry, reduce the requested
+	// response size to mitigate blocking or performance issues with larger
+	// responses.
+
+	for i := 0; ; i++ {
+
+		request := &GetOSLFileSpecsRequest{
+			BaseAPIParameters: f.packedAPIParameters,
+			OSLIDs:            IDs,
+		}
+
+		var response *GetOSLFileSpecsResponse
+		doRetry, err := f.doRelayedRequest(
+			ctx, requestTypeGetOSLFileSpecs, request, &response)
+
+		if err == nil && len(IDs) != len(response.OSLFileSpecs) {
+			err = errors.TraceNew("unexpected OSL file spec count")
+		}
+
+		if err == nil {
+			return response.OSLFileSpecs, nil
+		}
+
+		if i >= f.config.RequestRetryCount || !doRetry || ctx.Err() != nil {
+			return nil, errors.Trace(err)
+		}
+
+		f.config.Logger.WithTraceFields(common.LogFields{
+			"attempt":    i,
+			"OSLIDCount": len(IDs),
+			"error":      err.Error(),
+		}).Warning("DSL: doGetOSLFileSpecsRequest attempt failed")
+
+		common.SleepWithContext(
+			ctx,
+			prng.JitterDuration(
+				f.config.RequestRetryDelay,
+				f.config.RequestRetryDelayJitter))
+
+		if len(IDs) > 1 {
+			n := len(IDs) / 2
+			IDs = IDs[:n]
+		}
+	}
+}
+
+func (f *Fetcher) doRelayedRequest(
+	ctx context.Context,
+	requestType int32,
+	request any,
+	response any) (retRetry bool, retErr error) {
+
+	// Add the relay wrapping.
+
+	cborRequest, err := protocol.CBOREncoding.Marshal(request)
+	if err != nil {
+		return false, errors.Trace(err)
+	}
+
+	cborRelayedRequest, err := protocol.CBOREncoding.Marshal(
+		&RelayedRequest{
+			RequestType: requestType,
+			Version:     requestVersion,
+			Request:     cborRequest,
+		})
+	if err != nil {
+		return false, errors.Trace(err)
+	}
+
+	if len(cborRelayedRequest) > MaxRelayPayloadSize {
+		return false, errors.Tracef(
+			"request size %d exceeds limit %d", len(cborRelayedRequest), MaxRelayPayloadSize)
+	}
+
+	// Relay the request via the supplied RoundTripper.
+
+	requestCtx := ctx
+	if f.config.RequestTimeout > 0 {
+		var requestCancelFunc context.CancelFunc
+		requestCtx, requestCancelFunc = context.WithTimeout(ctx, f.config.RequestTimeout)
+		defer requestCancelFunc()
+	}
+
+	cborRelayedResponse, err := f.config.RoundTripper(requestCtx, cborRelayedRequest)
+	if err != nil {
+		// Allow retries for in case of intermittent network failures or
+		// potential blocking.
+		//
+		// TODO: check for specific retry-eligible errors from the RoundTripper?
+		return true, errors.Trace(err)
+	}
+
+	// Remove the relay wrapping.
+
+	var relayedResponse *RelayedResponse
+	err = cbor.Unmarshal(cborRelayedResponse, &relayedResponse)
+	if err != nil {
+		return false, errors.Trace(err)
+	}
+
+	if relayedResponse.Error != 0 {
+		// No retries if a response was received from the DSL backend.
+		return false, errors.Tracef(
+			"RelayedResponse.Error: %d", relayedResponse.Error)
+	}
+
+	err = cbor.Unmarshal(relayedResponse.Response, response)
+	if err != nil {
+		return false, errors.Trace(err)
+	}
+
+	return false, nil
+}
+
+func (f *Fetcher) loadOSLStates(ctx context.Context, reassembleKeys bool) ([]*fetcherOSLState, error) {
+
+	// Load just the set of known OSL IDs, and then process each OSL state one
+	// at a time, to avoid loading all states into memory at once.
+
+	activeIDs, err := f.config.DatastoreKnownOSLIDs()
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	var states []*fetcherOSLState
+
+	for _, ID := range activeIDs {
+
+		cborState, found, err := f.config.DatastoreGetOSLState(ID)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		if !found {
+			// This case is not expected since DatastoreKnownOSLIDs returns
+			// only known IDs.
+			continue
+		}
+
+		var state *fetcherOSLState
+		err = cbor.Unmarshal(cborState, &state)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		if !bytes.Equal(ID, state.ID) {
+			return nil, errors.TraceNew("unexpected OSL ID")
+		}
+		// TODO: sanity check FileSpec/Key fields match State?
+
+		if state.State == oslStateHasFileSpec {
+
+			// When we have the FileSpec, but not the reassembled key, attempt
+			// reassembly from SLOKs. A reassembled key is stored back to the
+			// datastore.
+
+			if reassembleKeys {
+
+				var fileSpec *osl.OSLFileSpec
+				err = cbor.Unmarshal(state.FileSpec, &fileSpec)
+				if err != nil {
+					return nil, errors.Trace(err)
+				}
+
+				ok, key, err := osl.ReassembleOSLKey(fileSpec, f.config.DatastoreSLOKLookup)
+				if err != nil {
+					return nil, errors.Trace(err)
+				}
+				if ok {
+
+					// Without the guarantee that there's only one concurrent
+					// fetcher run, it's possible, with two concurrent
+					// fetchers, that one prunes an OSL state after
+					// GetActiveOSLsRequest, while the other calls
+					// storeOSLState and incorrectly restores the pruned state.
+
+					state.State = oslStateHasKey
+					state.Key = key
+					state.FileSpec = nil
+					err = f.storeOSLState(ID, state)
+					if err != nil {
+						return nil, errors.Trace(err)
+					}
+				}
+				f.config.Logger.WithTrace().Info("DSL: reassembled OSL key")
+			}
+
+			// Allow state.FileSpec to be garbage collected.
+			state.FileSpec = nil
+
+			f.config.DoGarbageCollection()
+		}
+
+		states = append(states, state)
+	}
+
+	return states, nil
+}
+
+func (f *Fetcher) storeOSLState(ID OSLID, state *fetcherOSLState) error {
+
+	cborState, err := protocol.CBOREncoding.Marshal(state)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	err = f.config.DatastoreStoreOSLState(ID, cborState)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	return nil
+}

+ 363 - 0
psiphon/common/dsl/relay.go

@@ -0,0 +1,363 @@
+/*
+ * Copyright (c) 2025, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package dsl
+
+import (
+	"bytes"
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	lrucache "github.com/cognusion/go-cache-lru"
+	"github.com/fxamacker/cbor/v2"
+)
+
+const (
+	defaultMaxHttpConns        = 100
+	defaultMaxHttpIdleConns    = 100
+	defaultHttpIdleConnTimeout = 120 * time.Second
+	defaultRequestTimeout      = 30 * time.Second
+	defaultRequestRetryCount   = 2
+
+	defaultServerEntryCacheTTL     = 24 * time.Hour
+	defaultServerEntryCacheMaxSize = 100000
+)
+
+// RelayConfig specifies the configuration for a Relay.
+//
+// The CACertificates and HostCertificate parameters are used for mutually
+// authenticated TLS between the Relay and the DSL backend.
+type RelayConfig struct {
+	Logger common.Logger
+
+	CACertificates  []*x509.Certificate
+	HostCertificate *tls.Certificate
+
+	DynamicServerListServiceURL string
+}
+
+// Relay is an intermediary between a DSL client and the DSL backend which
+// provides circumvention and blocking resistance. Relays include in-proxy
+// brokers, and Psiphon servers. See the "Relay API layer" comment section is
+// in api.go for more details.
+//
+// The Relay maintains a pool of persistent HTTP connections for making
+// requests.
+//
+// The Relay supports transparent caching of server entries, where
+// GetServerEntriesRequest requests may be fully or partially served out of
+// the local cache.
+type Relay struct {
+	config        *RelayConfig
+	tlsConfig     *tls.Config
+	errorResponse []byte
+
+	mutex                      sync.Mutex
+	httpClient                 *http.Client
+	requestTimeout             time.Duration
+	requestRetryCount          int
+	serverEntryCache           *lrucache.Cache
+	serverEntryCacheDefaultTTL time.Duration
+	serverEntryCacheMaxSize    int
+}
+
+// NewRelay creates a new Relay.
+func NewRelay(config *RelayConfig) (*Relay, error) {
+
+	certPool := x509.NewCertPool()
+	for _, cert := range config.CACertificates {
+		certPool.AddCert(cert)
+	}
+
+	tlsConfig := &tls.Config{
+		RootCAs:      certPool,
+		Certificates: []tls.Certificate{*config.HostCertificate},
+	}
+
+	// Pre-marshal a generic, non-revealing error code to return on any
+	// upstream failure.
+	cborErrorResponse, err := protocol.CBOREncoding.Marshal(
+		&RelayedResponse{
+			Error: 1,
+		})
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	relay := &Relay{
+		config:        config,
+		tlsConfig:     tlsConfig,
+		errorResponse: cborErrorResponse,
+	}
+
+	relay.SetRequestParameters(
+		defaultMaxHttpConns,
+		defaultMaxHttpIdleConns,
+		defaultHttpIdleConnTimeout,
+		defaultRequestTimeout,
+		defaultRequestRetryCount)
+
+	relay.SetCacheParameters(
+		defaultServerEntryCacheTTL,
+		defaultServerEntryCacheMaxSize)
+
+	return relay, nil
+}
+
+// SetRequestParameters updates the HTTP request parameters used for upstream
+// requests.
+func (r *Relay) SetRequestParameters(
+	maxHttpConns int,
+	maxHttpIdleConns int,
+	httpIdleConnTimeout time.Duration,
+	requestTimeout time.Duration,
+	requestRetryCount int) {
+
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	r.requestTimeout = requestTimeout
+	r.requestRetryCount = requestRetryCount
+
+	// The http.Client client is replaced when the net/http configuration has
+	// changed. Any in-flight requests using the previous http.Client will
+	// continue until complete and eventually the previous http.Client will
+	// be garbage collected.
+	//
+	// TODO: don't retain the previous http.Client as long as
+	// http.Transport.IdleConnTimeout.
+
+	var httpTransport *http.Transport
+	if r.httpClient != nil {
+		httpTransport = r.httpClient.Transport.(*http.Transport)
+	}
+
+	if r.httpClient == nil ||
+		httpTransport.MaxConnsPerHost != maxHttpConns ||
+		httpTransport.MaxIdleConns != maxHttpIdleConns ||
+		httpTransport.IdleConnTimeout != httpIdleConnTimeout {
+
+		r.httpClient = &http.Client{
+			Transport: &http.Transport{
+				TLSClientConfig:     r.tlsConfig,
+				MaxConnsPerHost:     maxHttpConns,
+				MaxIdleConns:        maxHttpIdleConns,
+				MaxIdleConnsPerHost: maxHttpIdleConns,
+				IdleConnTimeout:     httpIdleConnTimeout,
+			},
+		}
+
+	}
+}
+
+// SetRequestParameters updates the parameters used for transparent server
+// entry caching. When the parameters change, any existing cache is flushed
+// and replaced.
+func (r *Relay) SetCacheParameters(
+	defaultTTL time.Duration,
+	maxSize int) {
+
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	if r.serverEntryCache == nil ||
+		r.serverEntryCacheDefaultTTL != defaultTTL ||
+		r.serverEntryCacheMaxSize != maxSize {
+
+		if r.serverEntryCache != nil {
+			r.serverEntryCache.Flush()
+		}
+
+		r.serverEntryCacheDefaultTTL = defaultTTL
+		r.serverEntryCacheMaxSize = maxSize
+
+		r.serverEntryCache = lrucache.NewWithLRU(
+			r.serverEntryCacheDefaultTTL,
+			1*time.Minute,
+			r.serverEntryCacheMaxSize)
+	}
+}
+
+// HandleRequest relays a DSL request.
+//
+// On request failure, HandleRequest logs to the provided logger. There's
+// always a response to be relayed back to the client.
+func (r *Relay) HandleRequest(
+	ctx context.Context,
+	clientIP string,
+	clientGeoIPData common.GeoIPData,
+	cborRelayedRequest []byte) []byte {
+
+	response, err := r.handleRequest(
+		ctx,
+		clientIP,
+		clientGeoIPData,
+		cborRelayedRequest)
+	if err != nil {
+		r.config.Logger.WithTraceFields(common.LogFields{
+			"error": err.Error(),
+		}).Warning("DSL: handle request failed")
+
+		return r.errorResponse
+	}
+
+	return response
+}
+
+func (r *Relay) handleRequest(
+	ctx context.Context,
+	clientIP string,
+	clientGeoIPData common.GeoIPData,
+	cborRelayedRequest []byte) ([]byte, error) {
+
+	r.mutex.Lock()
+	httpClient := r.httpClient
+	requestTimeout := r.requestTimeout
+	requestRetryCount := r.requestRetryCount
+	r.mutex.Unlock()
+
+	if httpClient == nil {
+		return nil, errors.TraceNew("missing http client")
+	}
+
+	if len(cborRelayedRequest) > MaxRelayPayloadSize {
+		return nil, errors.Tracef(
+			"request size %d exceeds limit %d",
+			len(cborRelayedRequest), MaxRelayPayloadSize)
+	}
+
+	var relayedRequest *RelayedRequest
+	err := cbor.Unmarshal(cborRelayedRequest, &relayedRequest)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	if relayedRequest.Version != requestVersion {
+		return nil, errors.Tracef(
+			"unexpected request version %d", relayedRequest.Version)
+	}
+
+	path, ok := requestTypeToHTTPPath[relayedRequest.RequestType]
+	if !ok {
+		return nil, errors.Tracef(
+			"unknown request type %d", relayedRequest.RequestType)
+	}
+
+	// TODO: implement transparent server entry caching.
+	//
+	// For requestTypeGetServerEntries, peek at the RelayedResponse.Response
+	// and extract server entries and add to the local cache, keyed by server
+	// entry tag. When the server entry has a specific TTL, use that as the
+	// cache TTL, otherwise using serverEntryCacheDefaultTTL.
+	//
+	// Peek at RelayedRequest.Request, and if all requested server entries are
+	// in the cache, serve the request entirely from the local cache.
+	// Consider also modifying requests to only fetch server entries that are
+	// not cached.
+	//
+	// Also handle for changes to server entry version.
+
+	requestCtx := ctx
+	if requestTimeout > 0 {
+		var requestCancelFunc context.CancelFunc
+		requestCtx, requestCancelFunc = context.WithTimeout(ctx, requestTimeout)
+		defer requestCancelFunc()
+	}
+
+	url := fmt.Sprintf("https://%s%s", r.config.DynamicServerListServiceURL, path)
+
+	for i := 0; ; i++ {
+
+		httpRequest, err := http.NewRequestWithContext(
+			requestCtx, "POST", url, bytes.NewBuffer(relayedRequest.Request))
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		// Attach the client IP and GeoIPData. The raw IP may be used, by the
+		// DSL backend, in server entry selection logic; the GeoIP data is
+		// for stats, and may also be used in server entry selection logic.
+		// Sending preresolved GeoIP data saves the DSL backend from needing
+		// its own GeoIP resolver, and ensures, for a given client a
+		// consistent GeoIP view between the Psiphon server and the DSL backend.
+
+		jsonGeoIPData, err := json.Marshal(clientGeoIPData)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+		httpRequest.Header.Set(psiphonClientIPHeader, clientIP)
+		httpRequest.Header.Set(psiphonClientGeoIPDataHeader, string(jsonGeoIPData))
+
+		startTime := time.Now()
+		httpResponse, err := r.httpClient.Do(httpRequest)
+		duration := time.Since(startTime)
+
+		if err == nil && httpResponse.StatusCode != http.StatusOK {
+			httpResponse.Body.Close()
+			err = errors.Tracef("unexpected response code: %d", httpResponse.StatusCode)
+		}
+
+		var response []byte
+		if err == nil {
+			response, err = io.ReadAll(httpResponse.Body)
+			httpResponse.Body.Close()
+		}
+
+		if err != nil {
+
+			r.config.Logger.WithTraceFields(common.LogFields{
+				"duration": duration.String(),
+				"error":    err.Error(),
+			}).Warning("DSL: service request attempt failed")
+
+			// Retry on network errors.
+			if i < requestRetryCount && ctx.Err() == nil {
+				continue
+			}
+
+			return nil, errors.Tracef("all attempts failed")
+		}
+
+		cborRelayedResponse, err := protocol.CBOREncoding.Marshal(
+			&RelayedResponse{
+				Response: response,
+			})
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		if len(cborRelayedResponse) > MaxRelayPayloadSize {
+			return nil, errors.Tracef(
+				"response size %d exceeds limit %d",
+				len(cborRelayedRequest), MaxRelayPayloadSize)
+		}
+
+		return cborRelayedResponse, nil
+	}
+}