Преглед изворни кода

Server-side DSL relay integration

- Add relay GetServerEntriesResponse compression
Rod Hynes пре 5 месеци
родитељ
комит
440265713d

+ 2 - 1
psiphon/common/authPackage.go

@@ -203,7 +203,8 @@ func NewAuthenticatedDataPackageReader(
 		if err != nil {
 			return nil, errors.Trace(err)
 		}
-		// TODO: need to Close decompressor to ensure zlib checksum is verified?
+		defer decompressor.Close()
+		// TODO: need to check Close error to ensure zlib checksum is verified?
 
 		hash := sha256.New()
 

+ 11 - 6
psiphon/common/dsl/api.go

@@ -35,7 +35,6 @@ package dsl
 
 import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
-	"github.com/fxamacker/cbor/v2"
 )
 
 type OSLID []byte
@@ -202,13 +201,19 @@ var requestTypeToHTTPPath = map[int32]string{
 // RelayedRequest wraps a DSL request to be relayed. RequestType indicates the
 // type of the wrapped request. Version must be 1.
 type RelayedRequest struct {
-	RequestType int32           `cbor:"1,keyasint,omitempty"`
-	Version     int32           `cbor:"2,keyasint,omitempty"`
-	Request     cbor.RawMessage `cbor:"3,keyasint,omitempty"`
+	RequestType int32  `cbor:"1,keyasint,omitempty"`
+	Version     int32  `cbor:"2,keyasint,omitempty"`
+	Request     []byte `cbor:"3,keyasint,omitempty"`
 }
 
+const (
+	relayedResponseNoCompression   = 0
+	relayedResponseZlibCompression = 1
+)
+
 // RelayedResponse wraps a DSL response value or error.
 type RelayedResponse struct {
-	Error    int32           `cbor:"1,keyasint,omitempty"`
-	Response cbor.RawMessage `cbor:"2,keyasint,omitempty"`
+	Error       int32  `cbor:"1,keyasint,omitempty"`
+	Compression int32  `cbor:"2,keyasint,omitempty"`
+	Response    []byte `cbor:"3,keyasint,omitempty"`
 }

+ 29 - 23
psiphon/common/dsl/dsl_test.go

@@ -37,6 +37,7 @@ import (
 	"net"
 	"net/http"
 	"runtime/debug"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"testing"
@@ -48,7 +49,6 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server"
 	"github.com/fxamacker/cbor/v2"
 )
 
@@ -181,9 +181,12 @@ func testDSLs(testConfig *testConfig) error {
 
 	relayLogger := newTestLoggerWithMetricValidator("relay", metricsValidator)
 
+	certPool := x509.NewCertPool()
+	certPool.AddCert(tlsConfig.CACertificate)
+
 	relayConfig := &RelayConfig{
 		Logger:                      relayLogger,
-		CACertificates:              []*x509.Certificate{tlsConfig.CACertificate},
+		CACertificates:              certPool,
 		HostCertificate:             tlsConfig.relayCertificate,
 		DynamicServerListServiceURL: backend.getAddress(),
 		HostID:                      testHostID,
@@ -198,10 +201,7 @@ func testDSLs(testConfig *testConfig) error {
 		},
 	}
 
-	relay, err := NewRelay(relayConfig)
-	if err != nil {
-		return errors.Trace(err)
-	}
+	relay := NewRelay(relayConfig)
 
 	if !testConfig.cacheServerEntries {
 		relay.SetCacheParameters(0, 0)
@@ -242,11 +242,15 @@ func testDSLs(testConfig *testConfig) error {
 		// blocking resistant first hop. For this test, it's just a stub that
 		// directly invokes the relay.
 
-		responsePayload := relay.HandleRequest(
+		responsePayload, err := relay.HandleRequest(
 			ctx,
+			nil,
 			testClientIP,
 			testClientGeoIPData,
 			requestPayload)
+		if err != nil {
+			return GetRelayGenericErrorResponse(), errors.Trace(err)
+		}
 
 		// Simulate interruption of large response.
 		if interruptLimit > 0 && len(responsePayload) > interruptLimit {
@@ -569,6 +573,8 @@ func initializeDSLBackend(backendOSLPaveData []*osl.PaveData) (*dslBackend, erro
 
 	// Run GenerateConfig concurrently to try to take advantage of multiple
 	// CPU cores.
+	//
+	// Update: no longer using server.GenerateConfig due to import cycle.
 
 	var initMutex sync.Mutex
 	var initGroup sync.WaitGroup
@@ -587,26 +593,26 @@ func initializeDSLBackend(backendOSLPaveData []*osl.PaveData) (*dslBackend, erro
 				}
 			}()
 
-			_, _, _, _, encodedServerEntry, err := server.GenerateConfig(
-				&server.GenerateConfigParams{
-					ServerIPAddress:     fmt.Sprintf("192.0.2.%d", i),
-					TunnelProtocolPorts: map[string]int{"OSSH": 1},
-				})
-			if err != nil {
-				return errors.Trace(err)
+			serverEntry := &protocol.ServerEntry{
+				Tag:                  prng.Base64String(32),
+				IpAddress:            fmt.Sprintf("192.0.2.%d", i),
+				SshUsername:          prng.HexString(8),
+				SshPassword:          prng.HexString(32),
+				SshHostKey:           prng.Base64String(280),
+				SshObfuscatedPort:    prng.Range(1, 65535),
+				SshObfuscatedKey:     prng.HexString(32),
+				Capabilities:         []string{"OSSH"},
+				Region:               prng.HexString(1),
+				ProviderID:           strings.ToUpper(prng.HexString(8)),
+				ConfigurationVersion: 0,
+				Signature:            prng.Base64String(80),
 			}
 
-			serverEntryFields, err := protocol.DecodeServerEntryFields(
-				string(encodedServerEntry), "", "")
+			serverEntryFields, err := serverEntry.GetServerEntryFields()
 			if err != nil {
 				return errors.Trace(err)
 			}
 
-			tag := serverEntryFields.GetTag()
-			if tag == "" {
-				return errors.TraceNew("unexpected tag")
-			}
-
 			packed, err := protocol.EncodePackedServerEntryFields(serverEntryFields)
 			if err != nil {
 				return errors.Trace(err)
@@ -616,12 +622,12 @@ func initializeDSLBackend(backendOSLPaveData []*osl.PaveData) (*dslBackend, erro
 
 			initMutex.Lock()
 
-			if backend.serverEntries[tag] != nil {
+			if backend.serverEntries[serverEntry.Tag] != nil {
 				initMutex.Unlock()
 				return errors.TraceNew("duplicate tag")
 			}
 
-			backend.serverEntries[tag] = &SourcedServerEntry{
+			backend.serverEntries[serverEntry.Tag] = &SourcedServerEntry{
 				ServerEntryFields: packed,
 				Source:            source,
 			}

+ 28 - 1
psiphon/common/dsl/fetcher.go

@@ -21,7 +21,9 @@ package dsl
 
 import (
 	"bytes"
+	"compress/zlib"
 	"context"
+	"io"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -763,7 +765,32 @@ func (f *Fetcher) doRelayedRequest(
 			"RelayedResponse.Error: %d", relayedResponse.Error)
 	}
 
-	err = cbor.Unmarshal(relayedResponse.Response, response)
+	var uncompressedResponse []byte
+
+	switch relayedResponse.Compression {
+
+	case relayedResponseNoCompression:
+		uncompressedResponse = relayedResponse.Response
+
+	case relayedResponseZlibCompression:
+		r, err := zlib.NewReader(bytes.NewReader(relayedResponse.Response))
+		if err != nil {
+			return false, errors.Trace(err)
+		}
+		defer r.Close()
+		var b bytes.Buffer
+		_, err = io.Copy(&b, r)
+		if err != nil {
+			return false, errors.Trace(err)
+		}
+		uncompressedResponse = b.Bytes()
+
+	default:
+		return false, errors.Tracef(
+			"unknown RelayedResponse.Compression: %d", relayedResponse.Compression)
+	}
+
+	err = cbor.Unmarshal(uncompressedResponse, response)
 	if err != nil {
 		return false, errors.Trace(err)
 	}

+ 85 - 52
psiphon/common/dsl/relay.go

@@ -21,6 +21,7 @@ package dsl
 
 import (
 	"bytes"
+	"compress/zlib"
 	"context"
 	"crypto/tls"
 	"crypto/x509"
@@ -41,13 +42,13 @@ import (
 
 const (
 	defaultMaxHttpConns        = 100
-	defaultMaxHttpIdleConns    = 100
+	defaultMaxHttpIdleConns    = 10
 	defaultHttpIdleConnTimeout = 120 * time.Second
 	defaultRequestTimeout      = 30 * time.Second
 	defaultRequestRetryCount   = 2
 
 	defaultServerEntryCacheTTL     = 24 * time.Hour
-	defaultServerEntryCacheMaxSize = 100000
+	defaultServerEntryCacheMaxSize = 200000
 )
 
 // RelayConfig specifies the configuration for a Relay.
@@ -59,7 +60,7 @@ const (
 type RelayConfig struct {
 	Logger common.Logger
 
-	CACertificates []*x509.Certificate
+	CACertificates *x509.CertPool
 
 	HostCertificate *tls.Certificate
 
@@ -86,9 +87,8 @@ type RelayConfig struct {
 // GetServerEntriesRequest requests may be fully or partially served out of
 // the local cache.
 type Relay struct {
-	config        *RelayConfig
-	tlsConfig     *tls.Config
-	errorResponse []byte
+	config    *RelayConfig
+	tlsConfig *tls.Config
 
 	mutex                   sync.Mutex
 	httpClient              *http.Client
@@ -100,32 +100,16 @@ type Relay struct {
 }
 
 // NewRelay creates a new Relay.
-func NewRelay(config *RelayConfig) (*Relay, error) {
-
-	certPool := x509.NewCertPool()
-	for _, cert := range config.CACertificates {
-		certPool.AddCert(cert)
-	}
+func NewRelay(config *RelayConfig) *Relay {
 
 	tlsConfig := &tls.Config{
-		RootCAs:      certPool,
+		RootCAs:      config.CACertificates,
 		Certificates: []tls.Certificate{*config.HostCertificate},
 	}
 
-	// Pre-marshal a generic, non-revealing error code to return on any
-	// upstream failure.
-	cborErrorResponse, err := protocol.CBOREncoding.Marshal(
-		&RelayedResponse{
-			Error: 1,
-		})
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-
 	relay := &Relay{
-		config:        config,
-		tlsConfig:     tlsConfig,
-		errorResponse: cborErrorResponse,
+		config:    config,
+		tlsConfig: tlsConfig,
 	}
 
 	relay.SetRequestParameters(
@@ -139,7 +123,7 @@ func NewRelay(config *RelayConfig) (*Relay, error) {
 		defaultServerEntryCacheTTL,
 		defaultServerEntryCacheMaxSize)
 
-	return relay, nil
+	return relay
 }
 
 // SetRequestParameters updates the HTTP request parameters used for upstream
@@ -225,32 +209,17 @@ func (r *Relay) SetCacheParameters(
 
 // HandleRequest relays a DSL request.
 //
-// On request failure, HandleRequest logs to the provided logger. There's
-// always a response to be relayed back to the client.
+// If an extendTimeout callback is specified, it will be called with the
+// expected maximum request timeout, including retries; this callback may be
+// used to customize the response timeout for a transport handler.
+//
+// In the case of an error, the caller must log the error and send
+// dsl.GenericErrorResponse to the client. This generic error response
+// ensures that the client receives a DSL response and doesn't consider the
+// DSL FetcherRoundTripper to have failed.
 func (r *Relay) HandleRequest(
 	ctx context.Context,
-	clientIP string,
-	clientGeoIPData common.GeoIPData,
-	cborRelayedRequest []byte) []byte {
-
-	response, err := r.handleRequest(
-		ctx,
-		clientIP,
-		clientGeoIPData,
-		cborRelayedRequest)
-	if err != nil {
-		r.config.Logger.WithTraceFields(common.LogFields{
-			"error": err.Error(),
-		}).Warning("DSL: handle request failed")
-
-		return r.errorResponse
-	}
-
-	return response
-}
-
-func (r *Relay) handleRequest(
-	ctx context.Context,
+	extendTimeout func(time.Duration),
 	clientIP string,
 	clientGeoIPData common.GeoIPData,
 	cborRelayedRequest []byte) ([]byte, error) {
@@ -261,6 +230,10 @@ func (r *Relay) handleRequest(
 	requestRetryCount := r.requestRetryCount
 	r.mutex.Unlock()
 
+	if extendTimeout != nil {
+		extendTimeout(requestTimeout * time.Duration(requestRetryCount))
+	}
+
 	if httpClient == nil {
 		return nil, errors.TraceNew("missing http client")
 	}
@@ -412,9 +385,48 @@ func (r *Relay) handleRequest(
 		return nil, errors.Tracef("all attempts failed")
 	}
 
+	// Compress GetServerEntriesResponse responses.
+	//
+	// The CBOR-encoded SourcedServerEntry/protocol.PackedServerEntryFields
+	// items in GetServerEntriesResponse benefit from compression due to
+	// repeating server entry values. Only this response is compressed, as
+	// other responses almost completely consist of non-repeating random
+	// values.
+	//
+	// Compression is only added at the relay->client hop, to avoid additonal
+	// CPU load on the DSL backend, and avoid relays having to always
+	// decompress the backend response in cacheGetServerEntriesResponse.
+
+	compression := int32(relayedResponseNoCompression)
+	if relayedRequest.RequestType == requestTypeGetServerEntries {
+		compression = relayedResponseZlibCompression
+	}
+
+	var compressedResponse []byte
+
+	switch compression {
+
+	case relayedResponseNoCompression:
+		compressedResponse = response
+
+	case relayedResponseZlibCompression:
+		var b bytes.Buffer
+		w := zlib.NewWriter(&b)
+		_, err := w.Write(response)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+		err = w.Close()
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+		compressedResponse = b.Bytes()
+	}
+
 	cborRelayedResponse, err := protocol.CBOREncoding.Marshal(
 		&RelayedResponse{
-			Response: response,
+			Compression: compression,
+			Response:    compressedResponse,
 		})
 	if err != nil {
 		return nil, errors.Trace(err)
@@ -561,3 +573,24 @@ func (r *Relay) getCachedGetServerEntriesResponse(
 
 	return cborResponse, nil
 }
+
+var relayGenericErrorResponse []byte
+
+func init() {
+
+	// Pre-marshal a generic, non-revealing error code to return on any
+	// upstream failure.
+	cborErrorResponse, err := protocol.CBOREncoding.Marshal(
+		&RelayedResponse{
+			Error: 1,
+		})
+	if err != nil {
+		panic(err.Error())
+	}
+
+	relayGenericErrorResponse = cborErrorResponse
+}
+
+func GetRelayGenericErrorResponse() []byte {
+	return relayGenericErrorResponse
+}

+ 34 - 0
psiphon/common/inproxy/api.go

@@ -514,6 +514,18 @@ type BrokerServerReport struct {
 	ProxyPortMappingTypes PortMappingTypes `cbor:"6,keyasint,omitempty"`
 }
 
+// ClientDSLRequest is a client DSL request that the broker relays to the DSL
+// backend. The broker's role is to provide a blocking resistant initial
+// hop; DSL requests are not direct components of the in-proxy protocol.
+type ClientDSLRequest struct {
+	RequestPayload []byte `cbor:"1,keyasint,omitempty"`
+}
+
+// ClientDSLResponse is a DSL response relayed back to the client.
+type ClientDSLResponse struct {
+	ResponsePayload []byte `cbor:"1,keyasint,omitempty"`
+}
+
 // ProxyQualityKey is the key that proxy quality is indexed on a proxy ID and
 // a proxy ASN. Quality is tracked at a fine-grained level, with the proxy ID
 // representing, typically, an individual device, and the proxy ASN
@@ -1164,3 +1176,25 @@ func UnmarshalServerProxyQualityResponse(payload []byte) (*ServerProxyQualityRes
 	err := unmarshalRecord(recordTypeAPIServerProxyQualityResponse, payload, &response)
 	return response, errors.Trace(err)
 }
+
+func MarshalClientDSLRequest(request *ClientDSLRequest) ([]byte, error) {
+	payload, err := marshalRecord(request, recordTypeAPIClientDSLRequest)
+	return payload, errors.Trace(err)
+}
+
+func UnmarshalClientDSLRequest(payload []byte) (*ClientDSLRequest, error) {
+	var request *ClientDSLRequest
+	err := unmarshalRecord(recordTypeAPIClientDSLRequest, payload, &request)
+	return request, errors.Trace(err)
+}
+
+func MarshalClientDSLResponse(response *ClientDSLResponse) ([]byte, error) {
+	payload, err := marshalRecord(response, recordTypeAPIClientDSLResponse)
+	return payload, errors.Trace(err)
+}
+
+func UnmarshalClientDSLResponse(payload []byte) (*ClientDSLResponse, error) {
+	var response *ClientDSLResponse
+	err := unmarshalRecord(recordTypeAPIClientDSLResponse, payload, &response)
+	return response, errors.Trace(err)
+}

+ 106 - 1
psiphon/common/inproxy/broker.go

@@ -65,7 +65,18 @@ type ExtendTransportTimeout func(timeout time.Duration)
 // GetTacticsPayload is a callback which returns the appropriate tactics
 // payload for the specified client/proxy GeoIP data and API parameters.
 type GetTacticsPayload func(
-	common.GeoIPData, common.APIParameters) ([]byte, string, error)
+	geoIPData common.GeoIPData,
+	apiParams common.APIParameters) ([]byte, string, error)
+
+// RelayDSLRequest is a callback that provides DSL request relaying. The
+// callback must always return a response payload, even in the case of an
+// error. See dsl.Relay.HandleRequest.
+type RelayDSLRequest func(
+	ctx context.Context,
+	extendTimeout ExtendTransportTimeout,
+	clientIP string,
+	clientGeoIPData common.GeoIPData,
+	requestPayload []byte) ([]byte, error)
 
 // Broker is the in-proxy broker component, which matches clients and proxies
 // and provides WebRTC signaling functionalty.
@@ -182,6 +193,12 @@ type BrokerConfig struct {
 	// depending on the state of the corresponding queues.
 	IsLoadLimiting func() bool
 
+	// RelayDSLRequest provides DSL request relay support.
+	//
+	// RelayDSLRequest must be set; return an error if no DSL relay is
+	// configured.
+	RelayDSLRequest RelayDSLRequest
+
 	// PrivateKey is the broker's secure session long term private key.
 	PrivateKey SessionPrivateKey
 
@@ -515,6 +532,18 @@ func (b *Broker) HandleSessionPacket(
 			if err != nil {
 				return nil, errors.Trace(err)
 			}
+		case recordTypeAPIClientDSLRequest:
+			responsePayload, err = b.handleClientDSL(
+				ctx,
+				extendTransportTimeout,
+				transportLogFields,
+				brokerClientIP,
+				geoIPData,
+				initiatorID,
+				unwrappedRequestPayload)
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
 		default:
 			return nil, errors.Tracef("unexpected API record type %v", recordType)
 		}
@@ -1608,6 +1637,82 @@ func (b *Broker) handleClientRelayedPacket(
 	return responsePayload, nil
 }
 
+// handleClientDSL relays client DSL requests. The broker's role is to provide
+// a blocking resistant hop through to the DSL backend; DSL requests are not
+// direct components of the in-proxy protocol.
+func (b *Broker) handleClientDSL(
+	ctx context.Context,
+	extendTransportTimeout ExtendTransportTimeout,
+	transportLogFields common.LogFields,
+	clientIP string,
+	geoIPData common.GeoIPData,
+	initiatorID ID,
+	requestPayload []byte) (retResponse []byte, retErr error) {
+
+	startTime := time.Now()
+
+	var logFields common.LogFields
+	var requestSize, responseSize int
+
+	// Always log the outcome.
+	defer func() {
+		if logFields == nil {
+			logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
+		}
+		logFields["broker_event"] = "client-dsl"
+		logFields["broker_id"] = b.brokerID
+		logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
+		if retErr != nil {
+			logFields["error"] = retErr.Error()
+		}
+		logFields["request_size"] = requestSize
+		logFields["response_size"] = responseSize
+		if retErr != nil {
+			logFields["error"] = retErr.Error()
+		}
+		logFields.Add(transportLogFields)
+		b.config.Logger.LogMetric(brokerMetricName, logFields)
+		if retErr != nil {
+			retErr = NewBrokerLoggedEvent(retErr)
+		}
+	}()
+
+	dslRequest, err := UnmarshalClientDSLRequest(requestPayload)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	requestSize = len(dslRequest.RequestPayload)
+
+	// There is no ValidateAndGetLogFields call in this handler as the DSL
+	// backend (or relay) validates inputs and logs events.
+
+	dslResponsePayload, err := b.config.RelayDSLRequest(
+		ctx,
+		extendTransportTimeout,
+		clientIP,
+		geoIPData,
+		dslRequest.RequestPayload)
+	if err != nil {
+		// RelayDSLRequest will always return a generic error response payload
+		// to send to the client, to ensure it retains its broker client
+		// round tripper. Any DSL relay errors, including missing
+		// configuration, will be logged to the broker_event.
+		retErr = err
+	}
+
+	responseSize = len(dslResponsePayload)
+
+	responsePayload, err := MarshalClientDSLResponse(
+		&ClientDSLResponse{
+			ResponsePayload: dslResponsePayload,
+		})
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return responsePayload, nil
+}
+
 func (b *Broker) adjustRequestTimeout(
 	logFields common.LogFields, timeout time.Duration) time.Duration {
 

+ 3 - 1
psiphon/common/inproxy/records.go

@@ -49,7 +49,9 @@ const (
 	recordTypeAPIBrokerServerReport          = 11
 	recordTypeAPIServerProxyQualityRequest   = 12
 	recordTypeAPIServerProxyQualityResponse  = 13
-	recordTypeLast                           = 13
+	recordTypeAPIClientDSLRequest            = 14
+	recordTypeAPIClientDSLResponse           = 15
+	recordTypeLast                           = 15
 )
 
 func marshalRecord(record interface{}, recordType int) ([]byte, error) {

+ 15 - 0
psiphon/common/parameters/parameters.go

@@ -521,6 +521,13 @@ const (
 	CheckServerEntryTagsMaxSendBytes                   = "CheckServerEntryTagsMaxSendBytes"
 	CheckServerEntryTagsMaxWorkTime                    = "CheckServerEntryTagsMaxWorkTime"
 	ServerEntryPruneDialPortNumberZero                 = "ServerEntryPruneDialPortNumberZero"
+	DSLRelayMaxHttpConns                               = "DSLRelayMaxHttpConns"
+	DSLRelayMaxHttpIdleConns                           = "DSLRelayMaxHttpIdleConns"
+	DSLRelayHttpIdleConnTimeout                        = "DSLRelayHttpIdleConnTimeout"
+	DSLRelayRequestTimeout                             = "DSLRelayRequestTimeout"
+	DSLRelayRetryCount                                 = "DSLRelayRetryCount"
+	DSLRelayCacheTTL                                   = "DSLRelayCacheTTL"
+	DSLRelayCacheMaxSize                               = "DSLRelayCacheMaxSize"
 
 	// Retired parameters
 
@@ -1114,6 +1121,14 @@ var defaultParameters = map[string]struct {
 	CheckServerEntryTagsMaxSendBytes:   {value: 65536, minimum: 1},
 	CheckServerEntryTagsMaxWorkTime:    {value: 60 * time.Second, minimum: time.Duration(0)},
 	ServerEntryPruneDialPortNumberZero: {value: true},
+
+	DSLRelayMaxHttpConns:        {value: 100, minimum: 1, flags: serverSideOnly},
+	DSLRelayMaxHttpIdleConns:    {value: 10, minimum: 1, flags: serverSideOnly},
+	DSLRelayHttpIdleConnTimeout: {value: 120 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
+	DSLRelayRequestTimeout:      {value: 30 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
+	DSLRelayRetryCount:          {value: 2, minimum: 0, flags: serverSideOnly},
+	DSLRelayCacheTTL:            {value: 24 * time.Hour, minimum: time.Duration(0), flags: serverSideOnly},
+	DSLRelayCacheMaxSize:        {value: 200000, minimum: 0, flags: serverSideOnly},
 }
 
 // IsServerSideOnly indicates if the parameter specified by name is used

+ 1 - 0
psiphon/common/protocol/protocol.go

@@ -73,6 +73,7 @@ const (
 	PSIPHON_API_OSL_REQUEST_NAME           = "psiphon-osl"
 	PSIPHON_API_ALERT_REQUEST_NAME         = "psiphon-alert"
 	PSIPHON_API_INPROXY_RELAY_REQUEST_NAME = "psiphon-inproxy-relay"
+	PSIPHON_API_DSL_REQUEST_NAME           = "psiphon-dsl"
 
 	PSIPHON_API_ALERT_DISALLOWED_TRAFFIC = "disallowed-traffic"
 	PSIPHON_API_ALERT_UNSAFE_TRAFFIC     = "unsafe-traffic"

+ 22 - 0
psiphon/common/protocol/serverEntry.go

@@ -485,6 +485,28 @@ func GetTacticsCapability(protocol string) string {
 	return GetCapability(protocol) + "-TACTICS"
 }
 
+// GetServerEntryFields converts a ServerEntry to ServerEntryFields.
+//
+// Note that a conversion in this direction doesn't retain unrecognized
+// fields; see ServerEntryFields comment. Clients should only store
+// ServerEntryFields obtained from DecodeServerEntryFields or
+// DecodePackedServerEntryFields.
+func (serverEntry *ServerEntry) GetServerEntryFields() (ServerEntryFields, error) {
+
+	marshaledServerEntry, err := json.Marshal(serverEntry)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	var serverEntryFields ServerEntryFields
+	err = json.Unmarshal(marshaledServerEntry, &serverEntryFields)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	return serverEntryFields, nil
+}
+
 // hasCapability indicates if the server entry has the specified capability.
 //
 // Any internal "PASSTHROUGH-v2 or "PASSTHROUGH" component in the server

+ 90 - 29
psiphon/server/api.go

@@ -62,6 +62,42 @@ func sshAPIRequestHandler(
 	name string,
 	requestPayload []byte) ([]byte, error) {
 
+	// Before invoking the handlers, enforce some preconditions:
+	//
+	// - A handshake request must precede any other requests.
+	// - When the handshake results in a traffic rules state where
+	//   the client is immediately exhausted, no requests
+	//   may succeed. This case ensures that blocked clients do
+	//   not log "connected", etc.
+	//
+	// Only one handshake request may be made. There is no check here
+	// to enforce that handshakeAPIRequestHandler will be called at
+	// most once. The SetHandshakeState call in handshakeAPIRequestHandler
+	// enforces that only a single handshake is made; enforcing that there
+	// ensures no race condition even if concurrent requests are
+	// in flight.
+
+	if name != protocol.PSIPHON_API_HANDSHAKE_REQUEST_NAME {
+
+		completed, exhausted := sshClient.getHandshaked()
+		if !completed {
+			return nil, errors.TraceNew("handshake not completed")
+		}
+		if exhausted {
+			return nil, errors.TraceNew("exhausted after handshake")
+		}
+	}
+
+	// Here, the DSL request is and opaque payload. The DSL backend (or relay)
+	// will handle the API parameters and this case skips APIParameters
+	// processing.
+
+	if name == protocol.PSIPHON_API_DSL_REQUEST_NAME {
+		responsePayload, err := dslAPIRequestHandler(
+			support, sshClient, requestPayload)
+		return responsePayload, errors.Trace(err)
+	}
+
 	// Notes:
 	//
 	// - For SSH requests, MAX_API_PARAMS_SIZE is implicitly enforced
@@ -92,32 +128,6 @@ func sshAPIRequestHandler(
 		}
 	}
 
-	// Before invoking the handlers, enforce some preconditions:
-	//
-	// - A handshake request must precede any other requests.
-	// - When the handshake results in a traffic rules state where
-	//   the client is immediately exhausted, no requests
-	//   may succeed. This case ensures that blocked clients do
-	//   not log "connected", etc.
-	//
-	// Only one handshake request may be made. There is no check here
-	// to enforce that handshakeAPIRequestHandler will be called at
-	// most once. The SetHandshakeState call in handshakeAPIRequestHandler
-	// enforces that only a single handshake is made; enforcing that there
-	// ensures no race condition even if concurrent requests are
-	// in flight.
-
-	if name != protocol.PSIPHON_API_HANDSHAKE_REQUEST_NAME {
-
-		completed, exhausted := sshClient.getHandshaked()
-		if !completed {
-			return nil, errors.TraceNew("handshake not completed")
-		}
-		if exhausted {
-			return nil, errors.TraceNew("exhausted after handshake")
-		}
-	}
-
 	switch name {
 
 	case protocol.PSIPHON_API_HANDSHAKE_REQUEST_NAME:
@@ -131,16 +141,19 @@ func sshAPIRequestHandler(
 		return responsePayload, nil
 
 	case protocol.PSIPHON_API_CONNECTED_REQUEST_NAME:
-		return connectedAPIRequestHandler(
+		responsePayload, err := connectedAPIRequestHandler(
 			support, sshClient, params)
+		return responsePayload, errors.Trace(err)
 
 	case protocol.PSIPHON_API_STATUS_REQUEST_NAME:
-		return statusAPIRequestHandler(
+		responsePayload, err := statusAPIRequestHandler(
 			support, sshClient, params)
+		return responsePayload, errors.Trace(err)
 
 	case protocol.PSIPHON_API_CLIENT_VERIFICATION_REQUEST_NAME:
-		return clientVerificationAPIRequestHandler(
+		responsePayload, err := clientVerificationAPIRequestHandler(
 			support, sshClient, params)
+		return responsePayload, errors.Trace(err)
 	}
 
 	return nil, errors.Tracef("invalid request name: %s", name)
@@ -987,6 +1000,23 @@ func clientVerificationAPIRequestHandler(
 	return make([]byte, 0), nil
 }
 
+// dslAPIRequestHandler forwards DSL relay requests. The DSL backend
+// (or relay) will handle API parameter processing and event logging.
+func dslAPIRequestHandler(
+	support *SupportServices,
+	sshClient *sshClient,
+	requestPayload []byte) ([]byte, error) {
+
+	responsePayload, err := dslHandleRequest(
+		sshClient.runCtx,
+		support,
+		nil, // no extendTimeout
+		sshClient.getClientIP(),
+		common.GeoIPData(sshClient.getClientGeoIPData()),
+		requestPayload)
+	return responsePayload, errors.Trace(err)
+}
+
 var tacticsParams = []requestParamSpec{
 	{tactics.STORED_TACTICS_TAG_PARAMETER_NAME, isAnyString, requestParamOptional},
 	{tactics.SPEED_TEST_SAMPLES_PARAMETER_NAME, nil, requestParamOptional | requestParamJSON},
@@ -1070,6 +1100,37 @@ func getInproxyBrokerServerReportParameterLogFieldFormatter() common.APIParamete
 	}
 }
 
+var dslRequestParams = append(
+	append(
+		[]requestParamSpec{
+			{"session_id", isHexDigits, 0},
+			{"fronting_provider_id", isAnyString, requestParamOptional}},
+		tacticsParams...),
+	baseParams...)
+
+func getDSLAPIParameterValidator(config *Config) common.APIParameterValidator {
+	return func(params common.APIParameters) error {
+		return validateRequestParams(config, params, tacticsRequestParams)
+	}
+}
+
+func getDSLAPIParameterLogFieldFormatter() common.APIParameterLogFieldFormatter {
+
+	return func(prefix string, geoIPData common.GeoIPData, params common.APIParameters) common.LogFields {
+
+		logFields := getRequestLogFields(
+			"dsl_relay",
+			prefix,
+			"", // Use the session_id the client reported
+			GeoIPData(geoIPData),
+			nil,
+			params,
+			dslRequestParams)
+
+		return common.LogFields(logFields)
+	}
+}
+
 // requestParamSpec defines a request parameter. Each param is expected to be
 // a string, unless requestParamArray is specified, in which case an array of
 // strings is expected.

+ 50 - 1
psiphon/server/config.go

@@ -22,6 +22,7 @@ package server
 import (
 	"crypto/rand"
 	"crypto/rsa"
+	"crypto/tls"
 	"crypto/x509"
 	"encoding/base64"
 	"encoding/hex"
@@ -534,6 +535,23 @@ type Config struct {
 	// values.
 	IptablesAcceptRateLimitTunnelProtocolRateLimits map[string][2]int `json:",omitempty"`
 
+	// DSLRelayServiceURL specifies the DSL backend address to use for
+	// relaying client DSL requests. When specified, the following DSL relay
+	// PKI parameters must also be specified.
+	DSLRelayServiceURL string `json:",omitempty"`
+
+	// DSLRelayCACertificatesFilename is part of the mutual authentication PKI
+	// for DSL relaying.
+	DSLRelayCACertificatesFilename string `json:",omitempty"`
+
+	// DSLRelayHostCertificateFilename is part of the mutual authentication
+	// PKI for DSL relaying.
+	DSLRelayHostCertificateFilename string `json:",omitempty"`
+
+	// DSLRelayHostKeyFilename is part of the mutual authentication PKI for
+	// DSL relaying.
+	DSLRelayHostKeyFilename string `json:",omitempty"`
+
 	sshBeginHandshakeTimeout                       time.Duration
 	sshHandshakeTimeout                            time.Duration
 	peakUpstreamFailureRateMinimumSampleSize       int
@@ -545,6 +563,8 @@ type Config struct {
 	region                                         string
 	runningProtocols                               []string
 	runningOnlyInproxyBroker                       bool
+	dslRelayCACertificates                         *x509.CertPool
+	dslRelayHostCertificate                        *tls.Certificate
 }
 
 // GetLogFileReopenConfig gets the reopen retries, and create/mode inputs for
@@ -835,6 +855,35 @@ func LoadConfig(configJSON []byte) (*Config, error) {
 		}
 	}
 
+	if config.DSLRelayServiceURL != "" {
+		if config.DSLRelayCACertificatesFilename == "" ||
+			config.DSLRelayHostCertificateFilename == "" ||
+			config.DSLRelayHostKeyFilename == "" {
+			return nil, errors.TraceNew(
+				"DSL relay requires mutual TLS configuration")
+		}
+
+		caCertsPEM, err := os.ReadFile(config.DSLRelayCACertificatesFilename)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		dslRelayCACertificates := x509.NewCertPool()
+		if !dslRelayCACertificates.AppendCertsFromPEM(caCertsPEM) {
+			return nil, errors.Trace(err)
+		}
+
+		dslRelayHostCertificate, err := tls.LoadX509KeyPair(
+			config.DSLRelayHostCertificateFilename,
+			config.DSLRelayHostKeyFilename)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		config.dslRelayCACertificates = dslRelayCACertificates
+		config.dslRelayHostCertificate = &dslRelayHostCertificate
+	}
+
 	// Limitation: the following is a shortcut which extracts the server's
 	// fronting provider ID from the server's OwnEncodedServerEntries. This logic
 	// assumes a server has only one fronting provider. In principle, it's
@@ -1340,7 +1389,7 @@ func GenerateConfig(params *GenerateConfigParams) ([]byte, []byte, []byte, []byt
 		MeekFrontingDisableSNI:              false,
 		TacticsRequestPublicKey:             tacticsRequestPublicKey,
 		TacticsRequestObfuscatedKey:         tacticsRequestObfuscatedKey,
-		ConfigurationVersion:                1,
+		ConfigurationVersion:                0,
 		InproxySessionPublicKey:             inproxyServerSessionPublicKey,
 		InproxySessionRootObfuscationSecret: inproxyServerObfuscationRootSecret,
 		InproxySSHPort:                      inproxySSHPort,

+ 20 - 0
psiphon/server/meek.go

@@ -362,6 +362,7 @@ func NewMeekServer(
 				IsValidServerEntryTag:          support.PsinetDatabase.IsValidServerEntryTag,
 				GetTacticsPayload:              meekServer.inproxyBrokerGetTacticsPayload,
 				IsLoadLimiting:                 meekServer.support.TunnelServer.CheckLoadLimiting,
+				RelayDSLRequest:                meekServer.inproxyBrokerRelayDSLRequest,
 				PrivateKey:                     sessionPrivateKey,
 				ObfuscationRootSecret:          obfuscationRootSecret,
 				ServerEntrySignaturePublicKey:  support.Config.InproxyBrokerServerEntrySignaturePublicKey,
@@ -2054,6 +2055,25 @@ func (server *MeekServer) inproxyBrokerGetTacticsPayload(
 	return marshaledTacticsPayload, newTacticsTag, nil
 }
 
+// inproxyBrokerRelayDSLRequest is a callback used by the in-proxy broker to
+// relay client DSL requests.
+func (server *MeekServer) inproxyBrokerRelayDSLRequest(
+	ctx context.Context,
+	extendTimeout inproxy.ExtendTransportTimeout,
+	clientIP string,
+	clientGeoIPData common.GeoIPData,
+	requestPayload []byte) ([]byte, error) {
+
+	responsePayload, err := dslHandleRequest(
+		ctx,
+		server.support,
+		extendTimeout,
+		clientIP,
+		clientGeoIPData,
+		requestPayload)
+	return responsePayload, errors.Trace(err)
+}
+
 // inproxyBrokerHandler reads an in-proxy broker session protocol message from
 // the HTTP request body, dispatches the message to the broker, and writes
 // the broker session response message to the HTTP response body.

+ 30 - 2
psiphon/server/services.go

@@ -36,6 +36,7 @@ import (
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/buildinfo"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/dsl"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/packetman"
@@ -138,6 +139,23 @@ func RunServices(configJSON []byte) (retErr error) {
 		support.PacketManipulator = packetManipulator
 	}
 
+	if config.DSLRelayServiceURL != "" {
+		support.dslRelay = dsl.NewRelay(&dsl.RelayConfig{
+			Logger:                        CommonLogger(log),
+			CACertificates:                config.dslRelayCACertificates,
+			HostCertificate:               config.dslRelayHostCertificate,
+			DynamicServerListServiceURL:   config.DSLRelayServiceURL,
+			HostID:                        config.HostID,
+			APIParameterValidator:         getDSLAPIParameterValidator(config),
+			APIParameterLogFieldFormatter: getDSLAPIParameterLogFieldFormatter(),
+		})
+
+		err := dslReloadRelayTactics(support)
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
 	support.discovery = makeDiscovery(support)
 
 	// After this point, errors should be delivered to the errors channel and
@@ -580,8 +598,8 @@ type SupportServices struct {
 	PacketManipulator            *packetman.Manipulator
 	ReplayCache                  *ReplayCache
 	ServerTacticsParametersCache *ServerTacticsParametersCache
-
-	discovery *Discovery
+	dslRelay                     *dsl.Relay
+	discovery                    *Discovery
 }
 
 // NewSupportServices initializes a new SupportServices.
@@ -699,6 +717,16 @@ func (support *SupportServices) Reload() {
 		}
 
 		reloadDiscovery(true)
+
+		if support.dslRelay != nil {
+			err := dslReloadRelayTactics(support)
+			if err != nil {
+				log.WithTraceFields(
+					LogFields{"error": errors.Trace(err)}).Warning(
+					"failed to reload DSL relay tactics")
+			}
+
+		}
 	}
 
 	// Take these actions only after the corresponding Reloader has reloaded.