Browse Source

Add steering IP mechanism

In addition, this commit:

- Adds support for running test FRONTED-MEEK-OSSH servers on ports other than
  443

- Adds support for adding arbitrary HTTP headers to HTTPS requests

- Fixes the "already done" context case in GetTactics, which failed to apply
  local tactics

- Switches the meek rate limiter to use the unspoofable listener tunnel
  protocol
Rod Hynes 2 years ago
parent
commit
10aae8ce38

+ 7 - 0
psiphon/common/parameters/parameters.go

@@ -365,6 +365,9 @@ const (
 	TLSTunnelMaxTLSPadding                           = "TLSTunnelMaxTLSPadding"
 	TLSFragmentClientHelloProbability                = "TLSFragmentClientHelloProbability"
 	TLSFragmentClientHelloLimitProtocols             = "TLSFragmentClientHelloLimitProtocols"
+	SteeringIPCacheTTL                               = "SteeringIPCacheTTL"
+	SteeringIPCacheMaxEntries                        = "SteeringIPCacheMaxEntries"
+	SteeringIPProbability                            = "SteeringIPProbability"
 
 	// Retired parameters
 
@@ -779,6 +782,10 @@ var defaultParameters = map[string]struct {
 
 	TLSFragmentClientHelloProbability:    {value: 0.0, minimum: 0.0},
 	TLSFragmentClientHelloLimitProtocols: {value: protocol.TunnelProtocols{}},
+
+	SteeringIPCacheTTL:        {value: 1 * time.Hour, minimum: time.Duration(0)},
+	SteeringIPCacheMaxEntries: {value: 65536, minimum: 0},
+	SteeringIPProbability:     {value: 1.0, minimum: 0.0},
 }
 
 // IsServerSideOnly indicates if the parameter specified by name is used

+ 1 - 0
psiphon/common/protocol/protocol.go

@@ -600,6 +600,7 @@ type HandshakeResponse struct {
 	TacticsPayload           json.RawMessage     `json:"tactics_payload"`
 	UpstreamBytesPerSecond   int64               `json:"upstream_bytes_per_second"`
 	DownstreamBytesPerSecond int64               `json:"downstream_bytes_per_second"`
+	SteeringIP               string              `json:"steering_ip"`
 	Padding                  string              `json:"padding"`
 }
 

+ 11 - 1
psiphon/common/protocol/serverEntry.go

@@ -33,6 +33,7 @@ import (
 	"io"
 	"net"
 	"strings"
+	"sync/atomic"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -675,7 +676,7 @@ func (serverEntry *ServerEntry) GetDialPortNumber(tunnelProtocol string) (int, e
 
 	case TUNNEL_PROTOCOL_FRONTED_MEEK,
 		TUNNEL_PROTOCOL_FRONTED_MEEK_QUIC_OBFUSCATED_SSH:
-		return 443, nil
+		return int(atomic.LoadInt32(&frontedMeekHTTPSDialPortNumber)), nil
 
 	case TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP:
 		return 80, nil
@@ -689,6 +690,15 @@ func (serverEntry *ServerEntry) GetDialPortNumber(tunnelProtocol string) (int, e
 	return 0, errors.TraceNew("unknown protocol")
 }
 
+var frontedMeekHTTPSDialPortNumber = int32(443)
+
+// SetFrontedMeekHTTPDialPortNumber sets the FRONTED-MEEK-OSSH dial port
+// number, which defaults to 443. Overriding the port number enables running
+// test servers where binding to port 443 is not possible.
+func SetFrontedMeekHTTPDialPortNumber(port int) {
+	atomic.StoreInt32(&frontedMeekHTTPSDialPortNumber, int32(port))
+}
+
 // GetSupportedTacticsProtocols returns a list of tunnel protocols,
 // supported by the ServerEntry's capabilities, that may be used
 // for tactics requests.

+ 25 - 0
psiphon/config.go

@@ -271,6 +271,11 @@ type Config struct {
 	// upstream proxy when specified by UpstreamProxyURL.
 	CustomHeaders http.Header
 
+	// MeekAdditionalHeaders is a set of additional arbitrary HTTP headers
+	// that are added to all meek HTTP requests. An additional header is
+	// ignored when the header name is already present in a meek request.
+	MeekAdditionalHeaders http.Header
+
 	// NetworkConnectivityChecker is an interface that enables tunnel-core to
 	// call into the host application to check for network connectivity. See:
 	// NetworkConnectivityChecker doc.
@@ -899,6 +904,11 @@ type Config struct {
 	// AdditionalParameters is used for testing.
 	AdditionalParameters string
 
+	// SteeringIP fields are for testing purposes only.
+	SteeringIPCacheTTLSeconds *int
+	SteeringIPCacheMaxEntries *int
+	SteeringIPProbability     *float64
+
 	// params is the active parameters.Parameters with defaults, config values,
 	// and, optionally, tactics applied.
 	//
@@ -2145,6 +2155,18 @@ func (config *Config) makeConfigParameters() map[string]interface{} {
 		applyParameters[parameters.TLSFragmentClientHelloLimitProtocols] = protocol.TunnelProtocols(config.TLSFragmentClientHelloLimitProtocols)
 	}
 
+	if config.SteeringIPCacheTTLSeconds != nil {
+		applyParameters[parameters.SteeringIPCacheTTL] = fmt.Sprintf("%ds", *config.SteeringIPCacheTTLSeconds)
+	}
+
+	if config.SteeringIPCacheMaxEntries != nil {
+		applyParameters[parameters.SteeringIPCacheMaxEntries] = *config.SteeringIPCacheMaxEntries
+	}
+
+	if config.SteeringIPProbability != nil {
+		applyParameters[parameters.SteeringIPProbability] = *config.SteeringIPProbability
+	}
+
 	// When adding new config dial parameters that may override tactics, also
 	// update setDialParametersHash.
 
@@ -2756,6 +2778,9 @@ func (config *Config) setDialParametersHash() {
 		}
 	}
 
+	// Steering IPs are ephemeral and not replayed, so steering IP parameters
+	// are excluded here.
+
 	config.dialParametersHash = hash.Sum(nil)
 }
 

+ 14 - 0
psiphon/controller.go

@@ -87,6 +87,7 @@ type Controller struct {
 	packetTunnelTransport                   *PacketTunnelTransport
 	staggerMutex                            sync.Mutex
 	resolver                                *resolver.Resolver
+	steeringIPCache                         *lrucache.Cache
 }
 
 // NewController initializes a new controller.
@@ -114,6 +115,8 @@ func NewController(config *Config) (controller *Controller, err error) {
 		p.Duration(parameters.SplitTunnelClassificationTTL)
 	splitTunnelClassificationMaxEntries :=
 		p.Int(parameters.SplitTunnelClassificationMaxEntries)
+	steeringIPCacheTTL := p.Duration(parameters.SteeringIPCacheTTL)
+	steeringIPCacheMaxEntries := p.Int(parameters.SteeringIPCacheMaxEntries)
 
 	controller = &Controller{
 		config:       config,
@@ -149,6 +152,11 @@ func NewController(config *Config) (controller *Controller, err error) {
 		// signalRestartEstablishing has a buffer of 1 to ensure sending the
 		// signal doesn't block and receiving won't miss a signal.
 		signalRestartEstablishing: make(chan struct{}, 1),
+
+		steeringIPCache: lrucache.NewWithLRU(
+			steeringIPCacheTTL,
+			1*time.Minute,
+			steeringIPCacheMaxEntries),
 	}
 
 	// Initialize untunneledDialConfig, used by untunneled dials including
@@ -235,6 +243,11 @@ func (controller *Controller) Run(ctx context.Context) {
 	defer controller.resolver.Stop()
 	controller.config.SetResolver(controller.resolver)
 
+	// Maintain a cache of steering IPs to be applied to dials. A steering IP
+	// is an alternate dial IP; for example, steering IPs may be specified by
+	// a CDN service and used to load balance CDN traffic.
+	controller.steeringIPCache.Flush()
+
 	// TODO: IPv6 support
 	var listenIP string
 	if controller.config.ListenInterface == "" {
@@ -2180,6 +2193,7 @@ loop:
 
 		dialParams, err := MakeDialParameters(
 			controller.config,
+			controller.steeringIPCache,
 			upstreamProxyErrorCallback,
 			canReplay,
 			selectProtocol,

+ 125 - 15
psiphon/dialParameters.go

@@ -24,6 +24,7 @@ import (
 	"context"
 	"crypto/md5"
 	"encoding/binary"
+	"fmt"
 	"net"
 	"net/http"
 	"strconv"
@@ -42,6 +43,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/resolver"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
+	lrucache "github.com/cognusion/go-cache-lru"
 	"golang.org/x/net/bpf"
 )
 
@@ -159,6 +161,10 @@ type DialParameters struct {
 
 	HTTPTransformerParameters *transforms.HTTPTransformerParameters
 
+	SteeringIP         string
+	steeringIPCache    *lrucache.Cache `json:"-"`
+	steeringIPCacheKey string          `json:"-"`
+
 	dialConfig *DialConfig `json:"-"`
 	meekConfig *MeekConfig `json:"-"`
 }
@@ -182,6 +188,7 @@ type DialParameters struct {
 // when establishment is cancelled.
 func MakeDialParameters(
 	config *Config,
+	steeringIPCache *lrucache.Cache,
 	upstreamProxyErrorCallback func(error),
 	canReplay func(serverEntry *protocol.ServerEntry, replayProtocol string) bool,
 	selectProtocol func(serverEntry *protocol.ServerEntry) (string, bool),
@@ -337,12 +344,6 @@ func MakeDialParameters(
 		dialParams = &DialParameters{}
 	}
 
-	// Point to the current resolver to be used in dials.
-	dialParams.resolver = config.GetResolver()
-	if dialParams.resolver == nil {
-		return nil, errors.TraceNew("missing resolver")
-	}
-
 	if isExchanged {
 		// Set isReplay to false to cause all non-exchanged values to be
 		// initialized; this also causes the exchange case to not log as replay.
@@ -353,6 +354,14 @@ func MakeDialParameters(
 	// upon success.
 	dialParams.IsExchanged = false
 
+	// Point to the current resolver to be used in dials.
+	dialParams.resolver = config.GetResolver()
+	if dialParams.resolver == nil {
+		return nil, errors.TraceNew("missing resolver")
+	}
+
+	dialParams.steeringIPCache = steeringIPCache
+
 	dialParams.ServerEntry = serverEntry
 	dialParams.NetworkID = networkID
 	dialParams.IsReplay = isReplay
@@ -1142,6 +1151,94 @@ func MakeDialParameters(
 
 	// Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
 
+	var resolveIP func(ctx context.Context, hostname string) ([]net.IP, error)
+
+	// Determine whether to use a steering IP, and whether to indicate that
+	// this dial remains a replay or not.
+	//
+	// Steering IPs are used only for fronted tunnels and not lower-traffic
+	// tactics requests and signalling steps such as Conjure registration.
+	//
+	// The scope of the steering IP, and the corresponding cache key, is the
+	// fronting provider, tunnel protocol, and the current network ID.
+	//
+	// Currently, steering IPs are obtained and cached in the Psiphon API
+	// handshake response. A modest TTL is applied to cache entries, and, in
+	// the case of a failed tunnel, any corresponding cached steering IP is
+	// removed.
+	//
+	// DialParameters.SteeringIP is set and persisted, but is not used to dial
+	// in a replay case; it's used to determine whether this dial should be
+	// classified as a replay or not. A replay dial remains classified as
+	// replay if a steering IP is not used and no steering IP was used
+	// before; or when a steering IP is used and the same steering IP was
+	// used before.
+	//
+	// When a steering IP is used and none was used before, or vice versa,
+	// DialParameters.IsReplay is cleared so that is_replay is reported as
+	// false, since the dial may be very different in nature: using a
+	// different POP; skipping DNS; etc. Even if DialParameters.IsReplay was
+	// true and is cleared, this MakeDialParameters will have wired up all
+	// other dial parameters with replay values, so the benefit of those
+	// values is not lost.
+
+	var previousSteeringIP, currentSteeringIP string
+	if isReplay {
+		previousSteeringIP = dialParams.SteeringIP
+	}
+	dialParams.SteeringIP = ""
+
+	if !isTactics &&
+		protocol.TunnelProtocolUsesFrontedMeek(dialParams.TunnelProtocol) &&
+		dialParams.ServerEntry.FrontingProviderID != "" {
+
+		dialParams.steeringIPCacheKey = fmt.Sprintf("%s %s %s",
+			dialParams.NetworkID,
+			dialParams.ServerEntry.FrontingProviderID,
+			dialParams.TunnelProtocol)
+
+		steeringIPValue, ok := dialParams.steeringIPCache.Get(
+			dialParams.steeringIPCacheKey)
+		if ok {
+			currentSteeringIP = steeringIPValue.(string)
+		}
+
+		// A steering IP probability is applied and may be used to gradually
+		// apply steering IPs. The coin flip is made only to decide to start
+		// using a steering IP, avoiding flip flopping between dials. For any
+		// probability > 0.0, a long enough continuous session will
+		// eventually flip to true and then keep using steering IPs as long
+		// as they remain in the cache.
+
+		if previousSteeringIP == "" && currentSteeringIP != "" &&
+			!p.WeightedCoinFlip(parameters.SteeringIPProbability) {
+
+			currentSteeringIP = ""
+		}
+	}
+
+	if currentSteeringIP != "" {
+		IP := net.ParseIP(currentSteeringIP)
+		if IP == nil {
+			return nil, errors.TraceNew("invalid steering IP")
+		}
+
+		// Since tcpDial and NewUDPConn invoke ResolveIP unconditionally, even
+		// when the hostname is an IP address, a steering IP will be applied
+		// even in that case.
+		resolveIP = func(ctx context.Context, hostname string) ([]net.IP, error) {
+			return []net.IP{IP}, nil
+		}
+
+		// dialParams.SteeringIP will be used as the "previous" steering IP in
+		// the next replay.
+		dialParams.SteeringIP = currentSteeringIP
+	}
+
+	if currentSteeringIP != previousSteeringIP {
+		dialParams.IsReplay = false
+	}
+
 	// Custom ResolveParameters are set only when useResolver is true, but
 	// DialConfig.ResolveIP is required and wired up unconditionally. Any
 	// misconfigured or miscoded domain dial cases will use default
@@ -1150,16 +1247,18 @@ func MakeDialParameters(
 	// ResolveIP will use the networkID obtained above, as it will be used
 	// almost immediately, instead of incurring the overhead of calling
 	// GetNetworkID again.
-	resolveIP := func(ctx context.Context, hostname string) ([]net.IP, error) {
-		IPs, err := dialParams.resolver.ResolveIP(
-			ctx,
-			networkID,
-			dialParams.ResolveParameters,
-			hostname)
-		if err != nil {
-			return nil, errors.Trace(err)
+	if resolveIP == nil {
+		resolveIP = func(ctx context.Context, hostname string) ([]net.IP, error) {
+			IPs, err := dialParams.resolver.ResolveIP(
+				ctx,
+				networkID,
+				dialParams.ResolveParameters,
+				hostname)
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
+			return IPs, nil
 		}
-		return IPs, nil
 	}
 
 	// Fragmentor configuration.
@@ -1232,6 +1331,7 @@ func MakeDialParameters(
 			MeekObfuscatorPaddingSeed:     dialParams.MeekObfuscatorPaddingSeed,
 			NetworkLatencyMultiplier:      dialParams.NetworkLatencyMultiplier,
 			HTTPTransformerParameters:     dialParams.HTTPTransformerParameters,
+			AdditionalHeaders:             config.MeekAdditionalHeaders,
 		}
 
 		// Use an asynchronous callback to record the resolved IP address when
@@ -1350,6 +1450,16 @@ func (dialParams *DialParameters) Failed(config *Config) {
 			NoticeWarning("DeleteDialParameters failed: %s", err)
 		}
 	}
+
+	// When a failed tunnel dialed with steering IP, remove the corresponding
+	// cache entry to avoid continuously redialing a potentially blocked or
+	// degraded POP.
+	//
+	// TODO: don't remove, but reduce the TTL to allow for one more dial?
+
+	if dialParams.steeringIPCacheKey != "" {
+		dialParams.steeringIPCache.Delete(dialParams.steeringIPCacheKey)
+	}
 }
 
 func (dialParams *DialParameters) GetTLSVersionForMetrics() string {

+ 154 - 18
psiphon/dialParameters_test.go

@@ -21,6 +21,7 @@ package psiphon
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
@@ -35,6 +36,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
+	lrucache "github.com/cognusion/go-cache-lru"
 )
 
 func TestDialParametersAndReplay(t *testing.T) {
@@ -150,10 +152,12 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	// Test: expected dial parameter fields set
 
+	steeringIPCache := lrucache.NewWithLRU(1*time.Hour, 1*time.Hour, 0)
+
 	upstreamProxyErrorCallback := func(_ error) {}
 
 	dialParams, err := MakeDialParameters(
-		clientConfig, upstreamProxyErrorCallback, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+		clientConfig, steeringIPCache, upstreamProxyErrorCallback, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -249,8 +253,12 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	expectHoldOffTunnelProtocols := common.Contains(holdOffTunnelProtocols, tunnelProtocol)
 	expectHoldOffTunnelFrontingProviderIDs := protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol)
-	expectHoldOffDirectServerEntryRegions := protocol.TunnelProtocolIsDirect(tunnelProtocol) && common.Contains(holdOffDirectServerEntryRegions, dialParams.ServerEntry.Region)
-	expectHoldOffDirectServerEntryProviderRegion := protocol.TunnelProtocolIsDirect(tunnelProtocol) && common.ContainsAny(holdOffDirectServerEntryProviderRegions[dialParams.ServerEntry.ProviderID], []string{"", dialParams.ServerEntry.Region})
+	expectHoldOffDirectServerEntryRegions := protocol.TunnelProtocolIsDirect(tunnelProtocol) &&
+		common.Contains(holdOffDirectServerEntryRegions, dialParams.ServerEntry.Region)
+	expectHoldOffDirectServerEntryProviderRegion := protocol.TunnelProtocolIsDirect(tunnelProtocol) &&
+		common.ContainsAny(
+			holdOffDirectServerEntryProviderRegions[dialParams.ServerEntry.ProviderID],
+			[]string{"", dialParams.ServerEntry.Region})
 
 	if expectHoldOffTunnelProtocols ||
 		expectHoldOffTunnelFrontingProviderIDs ||
@@ -275,7 +283,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	dialParams.Failed(clientConfig)
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -290,7 +299,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	testNetworkID = prng.HexString(8)
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -307,7 +317,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	dialParams.Succeeded()
 
-	replayDialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	replayDialParams, err := MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -418,7 +429,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -436,7 +448,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -451,7 +464,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	time.Sleep(1 * time.Second)
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -466,7 +480,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 	serverEntries[0].ConfigurationVersion += 1
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -489,14 +504,16 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
 
 	dialParams.Succeeded()
 
-	replayDialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	replayDialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 	if err != nil {
 		t.Fatalf("MakeDialParameters failed: %s", err)
 	}
@@ -528,7 +545,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 
 	if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
 		if err == nil {
@@ -557,7 +575,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 
 	if protocol.TunnelProtocolIsDirect(tunnelProtocol) {
 		if err == nil {
@@ -587,7 +606,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetParameters failed: %s", err)
 	}
 
-	dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+	dialParams, err = MakeDialParameters(
+		clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
 
 	if protocol.TunnelProtocolIsDirect(tunnelProtocol) {
 		if err == nil {
@@ -608,6 +628,119 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 		t.Fatalf("SetParameters failed: %s", err)
 	}
 
+	if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
+
+		steeringIPCache.Flush()
+
+		// Test: steering IP used in non-replay case
+
+		dialParams, err = MakeDialParameters(
+			clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+
+		dialParams.Failed(clientConfig)
+
+		getCacheKey := func() string {
+			return fmt.Sprintf("%s %s %s", testNetworkID, frontingProviderID, tunnelProtocol)
+		}
+
+		setCacheEntry := func(steeringIP string) {
+			steeringIPCache.Set(getCacheKey(), steeringIP, lrucache.DefaultExpiration)
+		}
+
+		setCacheEntry("127.0.0.1")
+
+		dialParams, err = MakeDialParameters(
+			clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+		if err != nil {
+			t.Fatalf("MakeDialParameters failed: %s", err)
+		}
+
+		if dialParams.IsReplay {
+			t.Fatalf("unexpected replay")
+		}
+
+		checkSteeringIP := func(expectedSteeringIP string) {
+			ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Microsecond)
+			defer cancelFunc()
+			IPs, err := dialParams.dialConfig.ResolveIP(ctx, "example.com")
+			if err != nil {
+				t.Fatalf("ResolveIP failed: %s", err)
+			}
+			if IPs[0].String() != expectedSteeringIP {
+				t.Fatalf("missing expected steering IP")
+			}
+		}
+
+		checkSteeringIP("127.0.0.1")
+
+		// Test: steering IP used in replay case
+
+		dialParams.Succeeded()
+
+		dialParams, err = MakeDialParameters(
+			clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+		if err != nil {
+			t.Fatalf("MakeDialParameters failed: %s", err)
+		}
+
+		if !dialParams.IsReplay {
+			t.Fatalf("unexpected non-replay")
+		}
+
+		checkSteeringIP("127.0.0.1")
+
+		// Test: different steering IP clears replay flag
+
+		dialParams.Succeeded()
+
+		setCacheEntry("127.0.0.2")
+
+		dialParams, err = MakeDialParameters(
+			clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+		if err != nil {
+			t.Fatalf("MakeDialParameters failed: %s", err)
+		}
+
+		if dialParams.IsReplay {
+			t.Fatalf("unexpected replay")
+		}
+
+		checkSteeringIP("127.0.0.2")
+
+		// Test: newly present steering IP clears replay flag
+
+		steeringIPCache.Flush()
+
+		dialParams, err = MakeDialParameters(
+			clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+		if err != nil {
+			t.Fatalf("MakeDialParameters failed: %s", err)
+		}
+
+		dialParams.Succeeded()
+
+		setCacheEntry("127.0.0.3")
+
+		dialParams, err = MakeDialParameters(
+			clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
+		if err != nil {
+			t.Fatalf("MakeDialParameters failed: %s", err)
+		}
+
+		if dialParams.IsReplay {
+			t.Fatalf("unexpected replay")
+		}
+
+		// Test: steering IP cleared from cache after failure
+
+		dialParams.Failed(clientConfig)
+
+		_, ok := steeringIPCache.Get(getCacheKey())
+		if ok {
+			t.Fatalf("unexpected steering IP cache entry")
+		}
+	}
+
 	// Test: iterator shuffles
 
 	for i, serverEntry := range serverEntries {
@@ -630,7 +763,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 
 		if i%10 == 0 {
 
-			dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
+			dialParams, err := MakeDialParameters(
+				clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
 			if err != nil {
 				t.Fatalf("MakeDialParameters failed: %s", err)
 			}
@@ -659,7 +793,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 				t.Fatalf("ServerEntryIterator.Next failed: %s", err)
 			}
 
-			dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
+			dialParams, err := MakeDialParameters(
+				clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
 			if err != nil {
 				t.Fatalf("MakeDialParameters failed: %s", err)
 			}
@@ -681,7 +816,8 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
 				t.Fatalf("ServerEntryIterator.Next failed: %s", err)
 			}
 
-			dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
+			dialParams, err := MakeDialParameters(
+				clientConfig, steeringIPCache, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
 			if err != nil {
 				t.Fatalf("MakeDialParameters failed: %s", err)
 			}

+ 1 - 0
psiphon/exchange_test.go

@@ -186,6 +186,7 @@ func TestServerEntryExchange(t *testing.T) {
 			dialParams, err := MakeDialParameters(
 				config,
 				nil,
+				nil,
 				canReplay,
 				selectProtocol,
 				serverEntry,

+ 13 - 0
psiphon/meekConn.go

@@ -210,6 +210,11 @@ type MeekConfig struct {
 	// HTTPTransformerParameters specifies an HTTP transformer to apply to the
 	// meek connection if it uses HTTP.
 	HTTPTransformerParameters *transforms.HTTPTransformerParameters
+
+	// AdditionalHeaders is a set of additional arbitrary HTTP headers that
+	// are added to all meek HTTP requests. An additional header is ignored
+	// when the header name is already present in a meek request.
+	AdditionalHeaders http.Header
 }
 
 // MeekConn is a network connection that tunnels net.Conn flows over HTTP and supports
@@ -681,6 +686,14 @@ func DialMeek(
 		additionalHeaders.Set("X-Psiphon-Fronting-Address", host)
 	}
 
+	if meekConfig.AdditionalHeaders != nil {
+		for name, value := range meekConfig.AdditionalHeaders {
+			if _, ok := additionalHeaders[name]; !ok {
+				additionalHeaders[name] = value
+			}
+		}
+	}
+
 	meek.url = url
 	meek.additionalHeaders = additionalHeaders
 	meek.cachedTLSDialer = cachedTLSDialer

+ 10 - 1
psiphon/notice.go

@@ -490,6 +490,7 @@ func noticeWithDialParameters(noticeType string, dialParams *DialParameters, pos
 		}
 
 		if protocol.TunnelProtocolUsesFrontedMeek(dialParams.TunnelProtocol) {
+
 			meekResolvedIPAddress := dialParams.MeekResolvedIPAddress.Load().(string)
 			if meekResolvedIPAddress != "" {
 				nonredacted := common.EscapeRedactIPAddressString(meekResolvedIPAddress)
@@ -571,7 +572,15 @@ func noticeWithDialParameters(noticeType string, dialParams *DialParameters, pos
 			args = append(args, "conjureTransport", dialParams.ConjureTransport)
 		}
 
-		if dialParams.ResolveParameters != nil {
+		usedSteeringIP := false
+
+		if dialParams.SteeringIP != "" {
+			nonredacted := common.EscapeRedactIPAddressString(dialParams.SteeringIP)
+			args = append(args, "steeringIP", nonredacted)
+			usedSteeringIP = true
+		}
+
+		if dialParams.ResolveParameters != nil && !usedSteeringIP {
 
 			// See dialParams.ResolveParameters comment in getBaseAPIParameters.
 

+ 2 - 0
psiphon/server/api.go

@@ -401,6 +401,7 @@ func handshakeAPIRequestHandler(
 		TacticsPayload:           marshaledTacticsPayload,
 		UpstreamBytesPerSecond:   handshakeStateInfo.upstreamBytesPerSecond,
 		DownstreamBytesPerSecond: handshakeStateInfo.downstreamBytesPerSecond,
+		SteeringIP:               handshakeStateInfo.steeringIP,
 		Padding:                  strings.Repeat(" ", pad_response),
 	}
 
@@ -972,6 +973,7 @@ var baseDialParams = []requestParamSpec{
 	{"tls_padding", isIntString, requestParamOptional | requestParamLogStringAsInt},
 	{"tls_ossh_sni_server_name", isDomain, requestParamOptional},
 	{"tls_ossh_transformed_host_name", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+	{"steering_ip", isIPAddress, requestParamOptional | requestParamLogOnlyForFrontedMeekOrConjure},
 }
 
 // baseSessionAndDialParams adds baseDialParams to baseSessionParams.

+ 8 - 0
psiphon/server/config.go

@@ -792,6 +792,7 @@ type GenerateConfigParams struct {
 	LegacyPassthrough                  bool
 	LimitQUICVersions                  protocol.QUICVersions
 	EnableGQUIC                        bool
+	FrontingProviderID                 string
 }
 
 // GenerateConfig creates a new Psiphon server config. It returns JSON encoded
@@ -1090,6 +1091,8 @@ func GenerateConfig(params *GenerateConfigParams) ([]byte, []byte, []byte, []byt
 		capabilities = append(capabilities, protocol.CAPABILITY_UNTUNNELED_WEB_API_REQUESTS)
 	}
 
+	var frontingProviderID string
+
 	for tunnelProtocol := range params.TunnelProtocolPorts {
 
 		capability := protocol.GetCapability(tunnelProtocol)
@@ -1115,6 +1118,10 @@ func GenerateConfig(params *GenerateConfigParams) ([]byte, []byte, []byte, []byt
 
 			capabilities = append(capabilities, protocol.GetTacticsCapability(tunnelProtocol))
 		}
+
+		if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
+			frontingProviderID = params.FrontingProviderID
+		}
 	}
 
 	sshPort := params.TunnelProtocolPorts[protocol.TUNNEL_PROTOCOL_SSH]
@@ -1165,6 +1172,7 @@ func GenerateConfig(params *GenerateConfigParams) ([]byte, []byte, []byte, []byt
 		Capabilities:                  capabilities,
 		Region:                        "US",
 		ProviderID:                    prng.HexString(8),
+		FrontingProviderID:            frontingProviderID,
 		MeekServerPort:                meekPort,
 		MeekCookieEncryptionPublicKey: meekCookieEncryptionPublicKey,
 		MeekObfuscatedKey:             meekObfuscatedKey,

+ 56 - 22
psiphon/server/meek.go

@@ -117,7 +117,7 @@ type MeekServer struct {
 	httpClientIOTimeout             time.Duration
 	tlsConfig                       *tris.Config
 	obfuscatorSeedHistory           *obfuscator.SeedHistory
-	clientHandler                   func(clientTunnelProtocol string, clientConn net.Conn)
+	clientHandler                   func(clientConn net.Conn, data *additionalTransportData)
 	openConns                       *common.Conns
 	stopBroadcast                   <-chan struct{}
 	sessionsLock                    sync.RWMutex
@@ -138,7 +138,7 @@ func NewMeekServer(
 	listenerTunnelProtocol string,
 	listenerPort int,
 	useTLS, isFronted, useObfuscatedSessionTickets, useHTTPNormalizer bool,
-	clientHandler func(clientTunnelProtocol string, clientConn net.Conn),
+	clientHandler func(clientConn net.Conn, data *additionalTransportData),
 	stopBroadcast <-chan struct{}) (*MeekServer, error) {
 
 	passthroughAddress := support.Config.TunnelProtocolPassthroughAddresses[listenerTunnelProtocol]
@@ -709,8 +709,7 @@ func (server *MeekServer) getSessionOrEndpoint(
 		return "", nil, nil, "", nil, errors.TraceNew("invalid IP address")
 	}
 
-	if protocol.TunnelProtocolUsesFrontedMeek(server.listenerTunnelProtocol) &&
-		len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
+	if server.isFronted && len(server.support.Config.MeekProxyForwardedForHeaders) > 0 {
 
 		// When there are multiple header names in MeekProxyForwardedForHeaders,
 		// the first valid match is preferred. MeekProxyForwardedForHeaders should be
@@ -746,6 +745,27 @@ func (server *MeekServer) getSessionOrEndpoint(
 
 	geoIPData := server.support.GeoIPService.Lookup(clientIP)
 
+	// Check for a steering IP header, which contains an alternate dial IP to
+	// be returned to the client via the secure API handshake response.
+	// Steering may be used to load balance CDN traffic.
+	//
+	// The steering IP header is added by a CDN or CDN service process. To
+	// prevent steering IP spoofing, the service process must filter out any
+	// steering IP headers injected into ingress requests.
+	//
+	// Steering IP headers must appear in the first request of a meek session
+	// in order to be recorded here and relayed to the client.
+
+	var steeringIP string
+	if server.isFronted {
+		steeringIP = request.Header.Get("X-Psiphon-Steering-Ip")
+		IP := net.ParseIP(steeringIP)
+		if IP == nil || common.IsBogon(IP) {
+			steeringIP = ""
+			log.WithTraceFields(LogFields{"steeringIP": steeringIP}).Warning("invalid steering IP")
+		}
+	}
+
 	// The session is new (or expired). Treat the cookie value as a new meek
 	// cookie, extract the payload, and create a new session.
 
@@ -785,29 +805,13 @@ func (server *MeekServer) getSessionOrEndpoint(
 		return "", nil, nil, "", nil, errors.Trace(err)
 	}
 
-	tunnelProtocol := server.listenerTunnelProtocol
-
-	if clientSessionData.ClientTunnelProtocol != "" {
-
-		if !protocol.IsValidClientTunnelProtocol(
-			clientSessionData.ClientTunnelProtocol,
-			server.listenerTunnelProtocol,
-			server.support.Config.GetRunningProtocols()) {
-
-			return "", nil, nil, "", nil, errors.Tracef(
-				"invalid client tunnel protocol: %s", clientSessionData.ClientTunnelProtocol)
-		}
-
-		tunnelProtocol = clientSessionData.ClientTunnelProtocol
-	}
-
 	// Any rate limit is enforced after the meek cookie is validated, so a prober
 	// without the obfuscation secret will be unable to fingerprint the server
 	// based on response time combined with the rate limit configuration. The
 	// rate limit is primarily intended to limit memory resource consumption and
 	// not the overhead incurred by cookie validation.
 
-	if server.rateLimit(clientIP, geoIPData, tunnelProtocol) {
+	if server.rateLimit(clientIP, geoIPData, server.listenerTunnelProtocol) {
 		return "", nil, nil, "", nil, errors.TraceNew("rate limit exceeded")
 	}
 
@@ -862,6 +866,28 @@ func (server *MeekServer) getSessionOrEndpoint(
 		}
 	}
 
+	// The tunnel protocol name is used for stats and traffic rules. In many
+	// cases, its value is unambiguously determined by the listener port. In
+	// certain cases, such as multiple fronted protocols with a single
+	// backend listener, the client's reported tunnel protocol value is used.
+	// The caller must validate clientTunnelProtocol with
+	// protocol.IsValidClientTunnelProtocol.
+
+	var clientTunnelProtocol string
+	if clientSessionData.ClientTunnelProtocol != "" {
+
+		if !protocol.IsValidClientTunnelProtocol(
+			clientSessionData.ClientTunnelProtocol,
+			server.listenerTunnelProtocol,
+			server.support.Config.GetRunningProtocols()) {
+
+			return "", nil, nil, "", nil, errors.Tracef(
+				"invalid client tunnel protocol: %s", clientSessionData.ClientTunnelProtocol)
+		}
+
+		clientTunnelProtocol = clientSessionData.ClientTunnelProtocol
+	}
+
 	// Create a new session
 
 	bufferLength := MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH
@@ -919,9 +945,17 @@ func (server *MeekServer) getSessionOrEndpoint(
 	server.sessions[sessionID] = session
 	server.sessionsLock.Unlock()
 
+	var additionalData *additionalTransportData
+	if clientTunnelProtocol != "" || steeringIP != "" {
+		additionalData = &additionalTransportData{
+			overrideTunnelProtocol: clientTunnelProtocol,
+			steeringIP:             steeringIP,
+		}
+	}
+
 	// Note: from the tunnel server's perspective, this client connection
 	// will close when session.delete calls Close() on the meekConn.
-	server.clientHandler(clientSessionData.ClientTunnelProtocol, session.clientConn)
+	server.clientHandler(session.clientConn, additionalData)
 
 	return sessionID, session, underlyingConn, "", nil, nil
 }

+ 2 - 2
psiphon/server/meek_test.go

@@ -288,7 +288,7 @@ func testMeekResiliency(t *testing.T, spec *transforms.HTTPTransformerParameters
 
 	var serverClientConn atomic.Value
 
-	clientHandler := func(_ string, conn net.Conn) {
+	clientHandler := func(conn net.Conn, _ *additionalTransportData) {
 		serverClientConn.Store(conn)
 		name := "server"
 		relayWaitGroup.Add(1)
@@ -583,7 +583,7 @@ func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider, missing
 		isFronted,
 		useObfuscatedSessionTickets,
 		useHTTPNormalizer,
-		func(_ string, conn net.Conn) {
+		func(conn net.Conn, _ *additionalTransportData) {
 			go func() {
 				for {
 					buffer := make([]byte, 1)

+ 107 - 14
psiphon/server/server_test.go

@@ -44,6 +44,7 @@ import (
 	"syscall"
 	"testing"
 	"time"
+	"unsafe"
 
 	socks "github.com/Psiphon-Labs/goptlib"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
@@ -57,6 +58,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
+	lrucache "github.com/cognusion/go-cache-lru"
 	"github.com/miekg/dns"
 	"golang.org/x/net/proxy"
 )
@@ -567,6 +569,21 @@ func TestOmitProvider(t *testing.T) {
 		})
 }
 
+func TestSteeringIP(t *testing.T) {
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:       "FRONTED-MEEK-OSSH",
+			enableSSHAPIRequests: true,
+			requireAuthorization: true,
+			doTunneledWebRequest: true,
+			doTunneledNTPRequest: true,
+			forceFragmenting:     true,
+			doDanglingTCPConn:    true,
+			doLogHostProvider:    true,
+			doSteeringIP:         true,
+		})
+}
+
 type runServerConfig struct {
 	tunnelProtocol       string
 	clientTunnelProtocol string
@@ -593,6 +610,7 @@ type runServerConfig struct {
 	doChangeBytesConfig  bool
 	doLogHostProvider    bool
 	inspectFlows         bool
+	doSteeringIP         bool
 }
 
 var (
@@ -602,6 +620,9 @@ var (
 	testCustomHostNameRegex              = `[a-z0-9]{5,10}\.example\.org`
 	testClientFeatures                   = []string{"feature 1", "feature 2"}
 	testDisallowedTrafficAlertActionURLs = []string{"https://example.org/disallowed"}
+
+	// A steering IP must not be a bogon; this address is not dialed.
+	testSteeringIP = "1.1.1.1"
 )
 
 var serverRuns = 0
@@ -711,6 +732,10 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		generateConfigParams.TacticsRequestObfuscatedKey = tacticsRequestObfuscatedKey
 	}
 
+	if protocol.TunnelProtocolUsesFrontedMeek(runConfig.tunnelProtocol) {
+		generateConfigParams.FrontingProviderID = prng.HexString(8)
+	}
+
 	serverConfigJSON, _, _, _, encodedServerEntry, err := GenerateConfig(generateConfigParams)
 	if err != nil {
 		t.Fatalf("error generating server config: %s", err)
@@ -985,7 +1010,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	// Use a distinct suffix for network ID for each test run to ensure tactics
 	// from different runs don't apply; this is a workaround for the singleton
 	// datastore.
-	jsonNetworkID := fmt.Sprintf(`,"NetworkID" : "WIFI-%s"`, time.Now().String())
+	networkID := fmt.Sprintf("WIFI-%s", time.Now().String())
+	jsonNetworkID := fmt.Sprintf(`,"NetworkID" : "%s"`, networkID)
 
 	jsonLimitTLSProfiles := ""
 	if runConfig.tlsProfile != "" {
@@ -1065,6 +1091,25 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		clientConfig.CustomHostNameLimitProtocols = []string{clientTunnelProtocol}
 	}
 
+	if runConfig.doSteeringIP {
+
+		if runConfig.tunnelProtocol != protocol.TUNNEL_PROTOCOL_FRONTED_MEEK {
+			t.Fatalf("steering IP test requires FRONTED-MEEK-OSSH")
+		}
+
+		protocol.SetFrontedMeekHTTPDialPortNumber(psiphonServerPort)
+
+		// Note that in an actual fronting deployment, the steering IP header
+		// is added to the HTTP request by the CDN and any ingress steering
+		// IP header would be stripped to avoid spoofing. To facilitate this
+		// test case, we just have the client add the steering IP header as
+		// if it were the CDN.
+
+		headers := make(http.Header)
+		headers.Set("X-Psiphon-Steering-Ip", testSteeringIP)
+		clientConfig.MeekAdditionalHeaders = headers
+	}
+
 	err = clientConfig.Commit(false)
 	if err != nil {
 		t.Fatalf("error committing configuration file: %s", err)
@@ -1592,6 +1637,35 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			}
 		}
 	}
+
+	if runConfig.doSteeringIP {
+
+		// Access the unexported controller.steeringIPCache
+		controllerStruct := reflect.ValueOf(controller).Elem()
+		steeringIPCacheField := controllerStruct.Field(40)
+		steeringIPCacheField = reflect.NewAt(
+			steeringIPCacheField.Type(), unsafe.Pointer(steeringIPCacheField.UnsafeAddr())).Elem()
+		steeringIPCache := steeringIPCacheField.Interface().(*lrucache.Cache)
+
+		if steeringIPCache.ItemCount() != 1 {
+			t.Fatalf("unexpected steering IP cache size: %d", steeringIPCache.ItemCount())
+		}
+
+		key := fmt.Sprintf(
+			"%s %s %s",
+			networkID,
+			generateConfigParams.FrontingProviderID,
+			runConfig.tunnelProtocol)
+
+		entry, ok := steeringIPCache.Get(key)
+		if !ok {
+			t.Fatalf("no entry for steering IP cache key: %s", key)
+		}
+
+		if entry.(string) != testSteeringIP {
+			t.Fatalf("unexpected cached steering IP: %v", entry)
+		}
+	}
 }
 
 func sendNotificationReceived(c chan<- struct{}) {
@@ -1849,12 +1923,14 @@ func checkExpectedServerTunnelLogFields(
 			return fmt.Errorf("unexpected meek_host_header '%s'", fields["meek_host_header"])
 		}
 
-		for _, name := range []string{
-			"meek_dial_ip_address",
-			"meek_resolved_ip_address",
-		} {
-			if fields[name] != nil {
-				return fmt.Errorf("unexpected field '%s'", name)
+		if !protocol.TunnelProtocolUsesFrontedMeek(runConfig.tunnelProtocol) {
+			for _, name := range []string{
+				"meek_dial_ip_address",
+				"meek_resolved_ip_address",
+			} {
+				if fields[name] != nil {
+					return fmt.Errorf("unexpected field '%s'", name)
+				}
 			}
 		}
 	}
@@ -1876,13 +1952,15 @@ func checkExpectedServerTunnelLogFields(
 			return fmt.Errorf("unexpected meek_sni_server_name '%s'", fields["meek_sni_server_name"])
 		}
 
-		for _, name := range []string{
-			"meek_dial_ip_address",
-			"meek_resolved_ip_address",
-			"meek_host_header",
-		} {
-			if fields[name] != nil {
-				return fmt.Errorf("unexpected field '%s'", name)
+		if !protocol.TunnelProtocolUsesFrontedMeek(runConfig.tunnelProtocol) {
+			for _, name := range []string{
+				"meek_dial_ip_address",
+				"meek_resolved_ip_address",
+				"meek_host_header",
+			} {
+				if fields[name] != nil {
+					return fmt.Errorf("unexpected field '%s'", name)
+				}
 			}
 		}
 
@@ -2126,6 +2204,20 @@ func checkExpectedServerTunnelLogFields(
 		}
 	}
 
+	if runConfig.doSteeringIP {
+		name := "relayed_steering_ip"
+		if fields[name] == nil {
+			return fmt.Errorf("missing expected field '%s'", name)
+		}
+		if fields[name] != testSteeringIP {
+			return fmt.Errorf("unexpected field value %s: %v != %v", name, fields[name], testSteeringIP)
+		}
+		name = "steering_ip"
+		if fields[name] != nil {
+			return fmt.Errorf("unexpected field '%s'", name)
+		}
+	}
+
 	return nil
 }
 
@@ -3056,6 +3148,7 @@ func storePruneServerEntriesTest(
 		dialParams, err := psiphon.MakeDialParameters(
 			clientConfig,
 			nil,
+			nil,
 			func(_ *protocol.ServerEntry, _ string) bool { return true },
 			func(serverEntry *protocol.ServerEntry) (string, bool) {
 				return runConfig.tunnelProtocol, true

+ 1 - 0
psiphon/server/sessionID_test.go

@@ -167,6 +167,7 @@ func TestDuplicateSessionID(t *testing.T) {
 		dialParams, err := psiphon.MakeDialParameters(
 			clientConfig,
 			nil,
+			nil,
 			func(_ *protocol.ServerEntry, _ string) bool { return false },
 			func(_ *protocol.ServerEntry) (string, bool) { return "OSSH", true },
 			serverEntry,

+ 52 - 19
psiphon/server/tunnelServer.go

@@ -474,12 +474,20 @@ func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
 		atomic.SwapInt64(&sshServer.establishLimitedCount, 0)
 }
 
+// additionalTransportData is additional data gathered at transport level,
+// such as in MeekServer at the HTTP layer, and relayed to the
+// sshServer/sshClient.
+type additionalTransportData struct {
+	overrideTunnelProtocol string
+	steeringIP             string
+}
+
 // runListener is intended to run an a goroutine; it blocks
 // running a particular listener. If an unrecoverable error
 // occurs, it will send the error to the listenerError channel.
 func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {
 
-	handleClient := func(clientTunnelProtocol string, clientConn net.Conn) {
+	handleClient := func(clientConn net.Conn, transportData *additionalTransportData) {
 
 		// Note: establish tunnel limiter cannot simply stop TCP
 		// listeners in all cases (e.g., meek) since SSH tunnels can
@@ -491,17 +499,6 @@ func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError
 			return
 		}
 
-		// tunnelProtocol is used for stats and traffic rules. In many cases, its
-		// value is unambiguously determined by the listener port. In certain cases,
-		// such as multiple fronted protocols with a single backend listener, the
-		// client's reported tunnel protocol value is used. The caller must validate
-		// clientTunnelProtocol with protocol.IsValidClientTunnelProtocol.
-
-		tunnelProtocol := sshListener.tunnelProtocol
-		if clientTunnelProtocol != "" {
-			tunnelProtocol = clientTunnelProtocol
-		}
-
 		// sshListener.tunnelProtocol indictes the tunnel protocol run by the
 		// listener. For direct protocols, this is also the client tunnel protocol.
 		// For fronted protocols, the client may use a different protocol to connect
@@ -520,7 +517,7 @@ func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError
 		// client may dial a different port for its first hop.
 
 		// Process each client connection concurrently.
-		go sshServer.handleClient(sshListener, tunnelProtocol, clientConn)
+		go sshServer.handleClient(sshListener, clientConn, transportData)
 	}
 
 	// Note: when exiting due to a unrecoverable error, be sure
@@ -568,7 +565,7 @@ func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError
 
 // runMeekTLSOSSHDemuxListener blocks running a listener which demuxes meek and
 // TLS-OSSH connections received on the same port.
-func (sshServer *sshServer) runMeekTLSOSSHDemuxListener(sshListener *sshListener, listenerError chan<- error, handleClient func(clientTunnelProtocol string, clientConn net.Conn)) {
+func (sshServer *sshServer) runMeekTLSOSSHDemuxListener(sshListener *sshListener, listenerError chan<- error, handleClient func(clientConn net.Conn, transportData *additionalTransportData)) {
 
 	meekClassifier := protocolClassifier{
 		minBytesToMatch: 4,
@@ -687,7 +684,7 @@ func (sshServer *sshServer) runMeekTLSOSSHDemuxListener(sshListener *sshListener
 	wg.Wait()
 }
 
-func runListener(listener net.Listener, shutdownBroadcast <-chan struct{}, listenerError chan<- error, overrideTunnelProtocol string, handleClient func(clientTunnelProtocol string, clientConn net.Conn)) {
+func runListener(listener net.Listener, shutdownBroadcast <-chan struct{}, listenerError chan<- error, overrideTunnelProtocol string, handleClient func(clientConn net.Conn, transportData *additionalTransportData)) {
 	for {
 		conn, err := listener.Accept()
 
@@ -718,7 +715,14 @@ func runListener(listener net.Listener, shutdownBroadcast <-chan struct{}, liste
 			return
 		}
 
-		handleClient(overrideTunnelProtocol, conn)
+		var transportData *additionalTransportData
+		if overrideTunnelProtocol != "" {
+			transportData = &additionalTransportData{
+				overrideTunnelProtocol: overrideTunnelProtocol,
+			}
+		}
+
+		handleClient(conn, transportData)
 	}
 }
 
@@ -1370,7 +1374,19 @@ func (sshServer *sshServer) stopClients() {
 }
 
 func (sshServer *sshServer) handleClient(
-	sshListener *sshListener, tunnelProtocol string, clientConn net.Conn) {
+	sshListener *sshListener,
+	clientConn net.Conn,
+	transportData *additionalTransportData) {
+
+	// overrideTunnelProtocol sets the tunnel protocol to a value other than
+	// the listener tunnel protocol. This is used in fronted meek
+	// configuration, where a single HTTPS listener also handles fronted HTTP
+	// and QUIC traffic; and in the protocol demux case.
+
+	tunnelProtocol := sshListener.tunnelProtocol
+	if transportData != nil && transportData.overrideTunnelProtocol != "" {
+		tunnelProtocol = transportData.overrideTunnelProtocol
+	}
 
 	// Calling clientConn.RemoteAddr at this point, before any Read calls,
 	// satisfies the constraint documented in tapdance.Listen.
@@ -1507,6 +1523,7 @@ func (sshServer *sshServer) handleClient(
 		sshServer,
 		sshListener,
 		tunnelProtocol,
+		transportData,
 		serverPacketManipulation,
 		replayedServerPacketManipulation,
 		clientAddr,
@@ -1562,6 +1579,7 @@ type sshClient struct {
 	sshServer                            *sshServer
 	sshListener                          *sshListener
 	tunnelProtocol                       string
+	additionalTransportData              *additionalTransportData
 	sshConn                              ssh.Conn
 	throttledConn                        *common.ThrottledConn
 	serverPacketManipulation             string
@@ -1706,6 +1724,7 @@ type handshakeStateInfo struct {
 	authorizedAccessTypes    []string
 	upstreamBytesPerSecond   int64
 	downstreamBytesPerSecond int64
+	steeringIP               string
 }
 
 type handshakeState struct {
@@ -1805,6 +1824,7 @@ func newSshClient(
 	sshServer *sshServer,
 	sshListener *sshListener,
 	tunnelProtocol string,
+	transportData *additionalTransportData,
 	serverPacketManipulation string,
 	replayedServerPacketManipulation bool,
 	clientAddr net.Addr,
@@ -1820,6 +1840,7 @@ func newSshClient(
 		sshServer:                        sshServer,
 		sshListener:                      sshListener,
 		tunnelProtocol:                   tunnelProtocol,
+		additionalTransportData:          transportData,
 		serverPacketManipulation:         serverPacketManipulation,
 		replayedServerPacketManipulation: replayedServerPacketManipulation,
 		clientAddr:                       clientAddr,
@@ -3178,6 +3199,11 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
 		}
 	}
 
+	if sshClient.additionalTransportData != nil &&
+		sshClient.additionalTransportData.steeringIP != "" {
+		logFields["relayed_steering_ip"] = sshClient.additionalTransportData.steeringIP
+	}
+
 	// Retain lock when invoking LogRawFieldsWithTimestamp to block any
 	// concurrent writes to variables referenced by logFields.
 	log.LogRawFieldsWithTimestamp(logFields)
@@ -3583,12 +3609,19 @@ func (sshClient *sshClient) setHandshakeState(
 	// be applied gradually, handling mid-tunnel changes is not a priority.
 	sshClient.setDestinationBytesMetrics()
 
-	return &handshakeStateInfo{
+	info := &handshakeStateInfo{
 		activeAuthorizationIDs:   authorizationIDs,
 		authorizedAccessTypes:    authorizedAccessTypes,
 		upstreamBytesPerSecond:   upstreamBytesPerSecond,
 		downstreamBytesPerSecond: downstreamBytesPerSecond,
-	}, nil
+	}
+
+	// Relay the steering IP to the API handshake handler.
+	if sshClient.additionalTransportData != nil {
+		info.steeringIP = sshClient.additionalTransportData.steeringIP
+	}
+
+	return info, nil
 }
 
 // getHandshaked returns whether the client has completed a handshake API

+ 34 - 1
psiphon/serverApi.go

@@ -44,6 +44,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
+	lrucache "github.com/cognusion/go-cache-lru"
 )
 
 // ServerContext is a utility struct which holds all of the data associated
@@ -398,6 +399,31 @@ func (serverContext *ServerContext) doHandshakeRequest(
 		}
 	}
 
+	if serverContext.tunnel.dialParams.steeringIPCacheKey != "" {
+
+		// Cache any received steering IP, which will also extend the TTL for
+		// an existing entry.
+		//
+		// As typical tunnel duration is short and dialing can be challenging,
+		// this established tunnel is retained and the steering IP will be
+		// used on any subsequent dial to the same fronting provider,
+		// assuming the TTL has not expired.
+		//
+		// Note: to avoid TTL expiry for long-lived tunnels, the TTL could be
+		// set or extended at the end of the tunnel lifetime; however that
+		// may result in unintended steering.
+
+		IP := net.ParseIP(handshakeResponse.SteeringIP)
+		if IP != nil && !common.IsBogon(IP) {
+			serverContext.tunnel.dialParams.steeringIPCache.Set(
+				serverContext.tunnel.dialParams.steeringIPCacheKey,
+				handshakeResponse.SteeringIP,
+				lrucache.DefaultExpiration)
+		} else {
+			NoticeInfo("ignoring invalid steering IP")
+		}
+	}
+
 	return nil
 }
 
@@ -994,6 +1020,7 @@ func getBaseAPIParameters(
 		}
 
 		if protocol.TunnelProtocolUsesFrontedMeek(dialParams.TunnelProtocol) {
+
 			meekResolvedIPAddress := dialParams.MeekResolvedIPAddress.Load().(string)
 			if meekResolvedIPAddress != "" {
 				params["meek_resolved_ip_address"] = meekResolvedIPAddress
@@ -1102,7 +1129,13 @@ func getBaseAPIParameters(
 			params["conjure_transport"] = dialParams.ConjureTransport
 		}
 
-		if dialParams.ResolveParameters != nil {
+		usedSteeringIP := false
+		if dialParams.SteeringIP != "" {
+			params["steering_ip"] = dialParams.SteeringIP
+			usedSteeringIP = true
+		}
+
+		if dialParams.ResolveParameters != nil && !usedSteeringIP {
 
 			// Log enough information to distinguish several successful or
 			// failed circumvention cases of interest, including preferring

+ 6 - 5
psiphon/tactics.go

@@ -73,13 +73,13 @@ func GetTactics(ctx context.Context, config *Config) {
 		return
 	}
 
-	// If the context is already Done, don't even start the request.
-	if ctx.Err() != nil {
-		return
-	}
-
 	if tacticsRecord == nil {
 
+		// If the context is already Done, don't even start the request.
+		if ctx.Err() != nil {
+			return
+		}
+
 		iterator, err := NewTacticsServerEntryIterator(config)
 		if err != nil {
 			NoticeWarning("tactics iterator failed: %s", err)
@@ -217,6 +217,7 @@ func fetchTactics(
 	dialParams, err := MakeDialParameters(
 		config,
 		nil,
+		nil,
 		canReplay,
 		selectProtocol,
 		serverEntry,