Просмотр исходного кода

Add server-side obfuscation parameter replay mechanism

Rod Hynes 5 лет назад
Родитель
Сommit
4c710f9ccf

+ 37 - 10
psiphon/common/fragmentor/fragmentor.go

@@ -159,6 +159,7 @@ type Conn struct {
 	isClosed        int32
 	writeMutex      sync.Mutex
 	numNotices      int
+	isReplay        bool
 	fragmentPRNG    *prng.PRNG
 	bytesToFragment int
 	bytesFragmented int
@@ -227,25 +228,51 @@ func GetUpstreamMetricsNames() []string {
 	return upstreamMetricsNames
 }
 
-// SetPRNG sets the PRNG to be used by the fragmentor. Specifying a PRNG
-// allows for optional replay of a fragmentor sequence. SetPRNG is intended to
-// be used with obfuscator.GetDerivedPRNG and allows for setting the PRNG
-// after a conn has already been wrapped with a fragmentor.Conn (but before
-// the first Write).
+// SetReplay sets the PRNG to be used by the fragmentor, allowing for replay
+// of a fragmentor sequence. SetPRNG may be used to set the PRNG after a conn
+// has already been wrapped with a fragmentor.Conn, when no PRNG is specified
+// in the config, and before the first Write. SetReplay sets the fragmentor
+// isReplay flag to true.
 //
-// If no seed is specified in NewUp/DownstreamConfig and SetPRNG is not called
-// before the first Write, the Write will fail. If a seed was specified, or
-// SetPRNG was already called, SetPRNG has no effect.
-func (c *Conn) SetPRNG(PRNG *prng.PRNG) {
+// For replay coordinated with a peer, SetReplay may be used with
+// obfuscator.GetDerivedPRNG, using a seed provided by the peer.
+//
+// If no seed is specified in NewUp/DownstreamConfig and SetReplay is not
+// called before the first Write, the Write will fail. If a seed was specified
+// in the config, or SetReplay was already called, or the input PRNG is nil,
+// SetReplay has no effect.
+//
+// SetReplay implements FragmentorReplayAccessor.
+func (c *Conn) SetReplay(PRNG *prng.PRNG) {
 
 	c.writeMutex.Lock()
 	defer c.writeMutex.Unlock()
 
-	if c.fragmentPRNG == nil {
+	if c.fragmentPRNG == nil && PRNG != nil {
+		c.isReplay = true
 		c.fragmentPRNG = PRNG
 	}
 }
 
+// GetReplay returns the seed for the fragmentor PRNG, and whether the
+// fragmentor was configured to replay. The seed return value may be nil when
+// isReplay is false.
+//
+// GetReplay implements GetReplay.
+func (c *Conn) GetReplay() (*prng.Seed, bool) {
+
+	c.writeMutex.Lock()
+	defer c.writeMutex.Unlock()
+
+	var seed *prng.Seed
+
+	if c.fragmentPRNG != nil {
+		seed = c.fragmentPRNG.GetSeed()
+	}
+
+	return seed, c.isReplay
+}
+
 func (c *Conn) Write(buffer []byte) (int, error) {
 
 	c.writeMutex.Lock()

+ 1 - 1
psiphon/common/fragmentor/fragmentor_test.go

@@ -118,7 +118,7 @@ func TestFragmentor(t *testing.T) {
 		if err != nil {
 			return errors.Trace(err)
 		}
-		fragConn.SetPRNG(PRNG)
+		fragConn.SetReplay(PRNG)
 		_, err = fragConn.Write(data)
 		if err != nil {
 			return errors.Trace(err)

+ 11 - 0
psiphon/common/logger.go

@@ -43,6 +43,17 @@ type LogTrace interface {
 // and logrus.LogFields.
 type LogFields map[string]interface{}
 
+// Add copies log fields from b to a, skipping fields which already exist,
+// regardless of value, in a.
+func (a LogFields) Add(b LogFields) {
+	for name, value := range b {
+		_, ok := a[name]
+		if !ok {
+			a[name] = value
+		}
+	}
+}
+
 // MetricsSource is an object that provides metrics to be logged
 type MetricsSource interface {
 

+ 23 - 4
psiphon/common/net.go

@@ -31,6 +31,7 @@ import (
 
 	"github.com/Psiphon-Labs/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/miekg/dns"
 	"github.com/wader/filtertransport"
 )
@@ -60,6 +61,24 @@ type IrregularIndicator interface {
 	IrregularTunnelError() error
 }
 
+// UnderlyingTCPAddrSource defines the interface for a type, typically a
+// net.Conn, such as a server meek Conn, which has an underlying TCP conn(s),
+// providing access to the LocalAddr and RemoteAddr properties of the
+// underlying TCP conn.
+type UnderlyingTCPAddrSource interface {
+
+	// GetUnderlyingTCPAddrs returns the LocalAddr and RemoteAddr properties of
+	// the underlying TCP conn.
+	GetUnderlyingTCPAddrs() (*net.TCPAddr, *net.TCPAddr, bool)
+}
+
+// FragmentorReplayAccessor defines the interface for accessing replay properties
+// of a fragmentor Conn.
+type FragmentorReplayAccessor interface {
+	SetReplay(*prng.PRNG)
+	GetReplay() (*prng.Seed, bool)
+}
+
 // TerminateHTTPConnection sends a 404 response to a client and also closes
 // the persistent connection.
 func TerminateHTTPConnection(
@@ -337,19 +356,19 @@ func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
 			}
 		}
 
+		lastReadActivityTime := atomic.LoadInt64(&conn.lastReadActivityTime)
 		readActivityTime := int64(monotime.Now())
 
+		atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
+
 		if conn.activityUpdater != nil {
 			conn.activityUpdater.UpdateProgress(
-				int64(n), 0, readActivityTime-atomic.LoadInt64(&conn.lastReadActivityTime))
+				int64(n), 0, readActivityTime-lastReadActivityTime)
 		}
 
 		if conn.lruEntry != nil {
 			conn.lruEntry.Touch()
 		}
-
-		atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
-
 	}
 	// Note: no context error to preserve error type
 	return n, err

+ 16 - 15
psiphon/common/packetman/packetman.go

@@ -42,14 +42,14 @@ algorithm component.
 Other notable differences:
 
 - We intercept, parse, and transform only server-side outbound SYN-ACK
-  packets. Geneva supports client-side packet manipulation with a more diverse
-  set of trigger packets, but in practise we cannot execute most low-level
-  packet operations on client platforms such as Android and iOS.
+packets. Geneva supports client-side packet manipulation with a more diverse
+set of trigger packets, but in practise we cannot execute most low-level
+packet operations on client platforms such as Android and iOS.
 
 - For expediancy, we use a simplified strategy syntax (called transformation
-  specs, to avoid confusion with the more general original). As we do not
-  evolve strategies, we do not use a tree representation and some
-  randomization tranformations are simplified.
+specs, to avoid confusion with the more general original). As we do not
+evolve strategies, we do not use a tree representation and some
+randomization tranformations are simplified.
 
 At this time, full functionality is limited to the Linux platform.
 
@@ -101,11 +101,13 @@ type Config struct {
 	// SelectSpecName is a callback invoked for each intercepted SYN-ACK packet.
 	// SelectSpecName must return a name of a Spec, in Specs, to apply that
 	// transformation spec, or "" to send the SYN-ACK packet unmodified.
+	// The second return value is arbitrary extra data that is associated
+	// with the packet's connection; see GetAppliedSpecName.
 	//
 	// The inputs protocolPort and clientIP allow the callback to select a Spec
 	// based on the protocol running at the intercepted packet's port and/or
 	// client GeoIP.
-	SelectSpecName func(protocolPort int, clientIP net.IP) string
+	SelectSpecName func(protocolPort int, clientIP net.IP) (string, interface{})
 
 	// SudoNetworkConfigCommands specifies whether to use "sudo" when executing
 	// network configuration commands. See comment for same parameter in
@@ -130,17 +132,16 @@ type Config struct {
 // Syntax of individual tranformations:
 //
 // "TCP-flags random|<flags>"
-// "TCP-<field> random|<base64>"
-// "TCP-option-<option> random|omit|<base64>"
-// "TCP-payload random|<base64>"
-//
-// flags:   FSRPAUECN
+// flags: FSRPAUECN
 //
-// fields:  srcport, dstport, seq, ack, dataoffset, window, checksum, urgent
+// "TCP-<field> random|<base64>"
+// field: srcport, dstport, seq, ack, dataoffset, window, checksum, urgent
 //
-// options: eol, nop, mss, windowscale, sackpermitted, sack, timestamps,
-//          altchecksum, altchecksumdata, md5header, usertimeout
+// "TCP-option-<option> random|omit|<base64>"
+// option: eol, nop, mss, windowscale, sackpermitted, sack, timestamps,
+// altchecksum, altchecksumdata, md5header, usertimeout
 //
+// "TCP-payload random|<base64>"
 //
 // For example, this Geneva strategy:
 //   [TCP:flags:SA]-duplicate(tamper{TCP:flags:replace:R},tamper{TCP:flags:replace:S})-| \/

+ 32 - 15
psiphon/common/packetman/packetman_linux.go

@@ -363,9 +363,15 @@ func makeConnectionID(
 	return string(connID[:])
 }
 
+type appliedSpec struct {
+	specName  string
+	extraData interface{}
+}
+
 // GetAppliedSpecName returns the packet manipulation spec name applied to the
 // TCP connection, represented by its local and remote address components,
-// that was ultimately accepted by a network listener.
+// that was ultimately accepted by a network listener. The second return value
+// is the arbitrary extra data returned by GetSpecName.
 //
 // This allows SelectSpecName, the spec selector, to be non-deterministic
 // while also allowing for accurate packet manipulation metrics to be
@@ -382,7 +388,7 @@ func makeConnectionID(
 // psiphon/server.meekConn) the true peer RemoteAddr must instead be
 // provided.
 func (m *Manipulator) GetAppliedSpecName(
-	localAddr, remoteAddr *net.TCPAddr) (string, error) {
+	localAddr, remoteAddr *net.TCPAddr) (string, interface{}, error) {
 
 	connID := makeConnectionID(
 		localAddr.IP,
@@ -390,18 +396,22 @@ func (m *Manipulator) GetAppliedSpecName(
 		remoteAddr.IP,
 		uint16(remoteAddr.Port))
 
-	specName, found := m.appliedSpecCache.Get(connID)
+	value, found := m.appliedSpecCache.Get(connID)
 	if !found {
-		return "", errors.TraceNew("connection not found")
+		return "", nil, errors.TraceNew("connection not found")
 	}
 
+	appliedSpec := value.(appliedSpec)
+
 	m.appliedSpecCache.Delete(connID)
 
-	return specName.(string), nil
+	return appliedSpec.specName, appliedSpec.extraData, nil
 }
 
 func (m *Manipulator) setAppliedSpecName(
-	interceptedPacket gopacket.Packet, specName string) {
+	interceptedPacket gopacket.Packet,
+	specName string,
+	extraData interface{}) {
 
 	srcIP, dstIP, _, _ := m.getPacketAddressInfo(interceptedPacket)
 
@@ -413,7 +423,13 @@ func (m *Manipulator) setAppliedSpecName(
 		dstIP,
 		uint16(interceptedTCP.DstPort))
 
-	m.appliedSpecCache.Set(connID, specName, cache.DefaultExpiration)
+	m.appliedSpecCache.Set(
+		connID,
+		appliedSpec{
+			specName:  specName,
+			extraData: extraData,
+		},
+		cache.DefaultExpiration)
 }
 
 func (m *Manipulator) getSocketMark() int {
@@ -448,7 +464,7 @@ func (m *Manipulator) handleInterceptedPacket(attr nfqueue.Attribute) int {
 		return 0
 	}
 
-	spec, err := m.getCompiledSpec(packet)
+	spec, extraData, err := m.getCompiledSpec(packet)
 	if err != nil {
 
 		// Fail open in this case.
@@ -466,12 +482,12 @@ func (m *Manipulator) handleInterceptedPacket(attr nfqueue.Attribute) int {
 	if spec == nil {
 
 		// No packet manipulation in this case.
-		m.setAppliedSpecName(packet, "")
+		m.setAppliedSpecName(packet, "", extraData)
 		m.nfqueue.SetVerdict(*attr.PacketID, nfqueue.NfAccept)
 		return 0
 	}
 
-	m.setAppliedSpecName(packet, spec.name)
+	m.setAppliedSpecName(packet, spec.name, extraData)
 	m.nfqueue.SetVerdict(*attr.PacketID, nfqueue.NfDrop)
 
 	err = m.injectPackets(packet, spec)
@@ -539,7 +555,8 @@ func (m *Manipulator) parseInterceptedPacket(packetData []byte) (gopacket.Packet
 	return packet, nil
 }
 
-func (m *Manipulator) getCompiledSpec(interceptedPacket gopacket.Packet) (*compiledSpec, error) {
+func (m *Manipulator) getCompiledSpec(
+	interceptedPacket gopacket.Packet) (*compiledSpec, interface{}, error) {
 
 	_, dstIP, _, _ := m.getPacketAddressInfo(interceptedPacket)
 
@@ -548,9 +565,9 @@ func (m *Manipulator) getCompiledSpec(interceptedPacket gopacket.Packet) (*compi
 	protocolPort := interceptedTCP.SrcPort
 	clientIP := dstIP
 
-	specName := m.config.SelectSpecName(int(protocolPort), clientIP)
+	specName, extraData := m.config.SelectSpecName(int(protocolPort), clientIP)
 	if specName == "" {
-		return nil, nil
+		return nil, extraData, nil
 	}
 
 	// Concurrency note: m.compiledSpecs may be replaced by SetSpecs, but any
@@ -562,10 +579,10 @@ func (m *Manipulator) getCompiledSpec(interceptedPacket gopacket.Packet) (*compi
 	m.compiledSpecsMutex.Unlock()
 
 	if !ok {
-		return nil, errors.Tracef("invalid spec name: %s", specName)
+		return nil, nil, errors.Tracef("invalid spec name: %s", specName)
 	}
 
-	return spec, nil
+	return spec, extraData, nil
 }
 
 func (m *Manipulator) injectPackets(interceptedPacket gopacket.Packet, spec *compiledSpec) error {

+ 9 - 4
psiphon/common/packetman/packetman_linux_test.go

@@ -77,15 +77,16 @@ func testPacketManipulator(useIPv6 bool, t *testing.T) {
 	// SYN packet, implementing TCP simultaneous open.
 
 	testSpecName := "test-spec"
+	extraDataValue := "extra-data"
 	config := &Config{
 		Logger:        newTestLogger(),
 		ProtocolPorts: []int{listenerPort},
 		Specs:         []*Spec{&Spec{Name: testSpecName, PacketSpecs: [][]string{[]string{"TCP-flags S"}}}},
-		SelectSpecName: func(protocolPort int, _ net.IP) string {
+		SelectSpecName: func(protocolPort int, _ net.IP) (string, interface{}) {
 			if protocolPort == listenerPort {
-				return testSpecName
+				return testSpecName, extraDataValue
 			}
-			return ""
+			return "", nil
 		},
 		QueueNumber: 1,
 	}
@@ -113,13 +114,17 @@ func testPacketManipulator(useIPv6 bool, t *testing.T) {
 				if state == http.StateNew {
 					localAddr := conn.LocalAddr().(*net.TCPAddr)
 					remoteAddr := conn.RemoteAddr().(*net.TCPAddr)
-					specName, err := m.GetAppliedSpecName(localAddr, remoteAddr)
+					specName, extraData, err := m.GetAppliedSpecName(localAddr, remoteAddr)
 					if err != nil {
 						t.Fatalf("GetAppliedSpecName failed: %v", err)
 					}
 					if specName != testSpecName {
 						t.Fatalf("unexpected spec name: %s", specName)
 					}
+					extraDataStr, ok := extraData.(string)
+					if !ok || extraDataStr != extraDataValue {
+						t.Fatalf("unexpected extra data value: %v", extraData)
+					}
 				}
 			},
 		}

+ 4 - 2
psiphon/common/packetman/packetman_unsupported.go

@@ -52,6 +52,8 @@ func (m *Manipulator) SetSpecs(_ []*Spec) error {
 	return errors.Trace(errUnsupported)
 }
 
-func (m *Manipulator) GetAppliedSpecName(_, _ *net.TCPAddr) (string, error) {
-	return "", errors.Trace(errUnsupported)
+func (m *Manipulator) GetAppliedSpecName(
+	_, _ *net.TCPAddr) (string, interface{}, error) {
+
+	return "", nil, errors.Trace(errUnsupported)
 }

+ 139 - 46
psiphon/common/parameters/clientParameters.go

@@ -249,6 +249,14 @@ const (
 	FeedbackUploadRetryMinDelaySeconds               = "FeedbackUploadRetryMinDelaySeconds"
 	FeedbackUploadRetryMaxDelaySeconds               = "FeedbackUploadRetryMaxDelaySeconds"
 	FeedbackUploadTimeoutSeconds                     = "FeedbackUploadTimeoutSeconds"
+	ServerReplayPacketManipulation                   = "ServerReplayPacketManipulation"
+	ServerReplayFragmentor                           = "ServerReplayFragmentor"
+	ServerReplayTTL                                  = "ServerReplayTTL"
+	ServerReplayTargetWaitDuration                   = "ServerReplayTargetWaitDuration"
+	ServerReplayTargetTunnelDuration                 = "ServerReplayTargetTunnelDuration"
+	ServerReplayTargetUpstreamBytes                  = "ServerReplayTargetUpstreamBytes"
+	ServerReplayTargetDownstreamBytes                = "ServerReplayTargetDownstreamBytes"
+	ServerReplayFailedCountThreshold                 = "ServerReplayFailedCountThreshold"
 )
 
 const (
@@ -517,6 +525,15 @@ var defaultClientParameters = map[string]struct {
 	FeedbackUploadRetryMinDelaySeconds: {value: 1 * time.Minute, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	FeedbackUploadRetryMaxDelaySeconds: {value: 5 * time.Minute, minimum: 1 * time.Second, flags: useNetworkLatencyMultiplier},
 	FeedbackUploadTimeoutSeconds:       {value: 30 * time.Second, minimum: 0 * time.Second, flags: useNetworkLatencyMultiplier},
+
+	ServerReplayPacketManipulation:    {value: true, flags: serverSideOnly},
+	ServerReplayFragmentor:            {value: true, flags: serverSideOnly},
+	ServerReplayTTL:                   {value: time.Duration(0), minimum: time.Duration(0), flags: serverSideOnly},
+	ServerReplayTargetWaitDuration:    {value: time.Duration(0), minimum: time.Duration(0), flags: serverSideOnly},
+	ServerReplayTargetTunnelDuration:  {value: time.Duration(0), minimum: time.Duration(0), flags: serverSideOnly},
+	ServerReplayTargetUpstreamBytes:   {value: 0, minimum: 0, flags: serverSideOnly},
+	ServerReplayTargetDownstreamBytes: {value: 0, minimum: 0, flags: serverSideOnly},
+	ServerReplayFailedCountThreshold:  {value: 0, minimum: 0, flags: serverSideOnly},
 }
 
 // IsServerSideOnly indicates if the parameter specified by name is used
@@ -599,6 +616,61 @@ func makeDefaultParameters() (map[string]interface{}, error) {
 func (p *ClientParameters) Set(
 	tag string, skipOnError bool, applyParameters ...map[string]interface{}) ([]int, error) {
 
+	makeTypedValue := func(templateValue, value interface{}) (interface{}, error) {
+
+		// Accept strings such as "1h" for duration parameters.
+
+		switch templateValue.(type) {
+		case time.Duration:
+			if s, ok := value.(string); ok {
+				if d, err := time.ParseDuration(s); err == nil {
+					value = d
+				}
+			}
+		}
+
+		// A JSON remarshal resolves cases where applyParameters is a
+		// result of unmarshal-into-interface, in which case non-scalar
+		// values will not have the expected types; see:
+		// https://golang.org/pkg/encoding/json/#Unmarshal. This remarshal
+		// also results in a deep copy.
+
+		marshaledValue, err := json.Marshal(value)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		newValuePtr := reflect.New(reflect.TypeOf(templateValue))
+
+		err = json.Unmarshal(marshaledValue, newValuePtr.Interface())
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		return newValuePtr.Elem().Interface(), nil
+	}
+
+	getAppliedValue := func(
+		name string,
+		parameters map[string]interface{},
+		applyParameters []map[string]interface{}) (interface{}, error) {
+
+		templateValue := parameters[name]
+		if templateValue == nil {
+			return nil, errors.Tracef("unknown parameter: %s", name)
+		}
+
+		value := templateValue
+		for i := len(applyParameters) - 1; i >= 0; i-- {
+			if v := applyParameters[i][name]; v != nil {
+				value = v
+				break
+			}
+		}
+
+		return makeTypedValue(templateValue, value)
+	}
+
 	var counts []int
 
 	parameters, err := makeDefaultParameters()
@@ -606,26 +678,31 @@ func (p *ClientParameters) Set(
 		return nil, errors.Trace(err)
 	}
 
-	// Special case: TLSProfiles/LabeledTLSProfiles may reference CustomTLSProfiles names.
-	// Inspect the CustomTLSProfiles parameter and extract its names. Do not
-	// call Get().CustomTLSProfilesNames() as CustomTLSProfiles may not yet be
-	// validated.
-
-	var customTLSProfileNames []string
+	// Special case: TLSProfiles/LabeledTLSProfiles may reference
+	// CustomTLSProfiles names. Inspect the CustomTLSProfiles parameter and
+	// extract its names. Do not call Get().CustomTLSProfilesNames() as
+	// CustomTLSProfiles may not yet be validated.
 
-	customTLSProfilesValue := parameters[CustomTLSProfiles]
-	for i := len(applyParameters) - 1; i >= 0; i-- {
-		if v := applyParameters[i][CustomTLSProfiles]; v != nil {
-			customTLSProfilesValue = v
-			break
-		}
+	customTLSProfilesValue, err := getAppliedValue(
+		CustomTLSProfiles, parameters, applyParameters)
+	if err != nil {
+		return nil, errors.Trace(err)
 	}
-	if customTLSProfiles, ok := customTLSProfilesValue.(protocol.CustomTLSProfiles); ok {
-		customTLSProfileNames = make([]string, len(customTLSProfiles))
-		for i := 0; i < len(customTLSProfiles); i++ {
-			customTLSProfileNames[i] = customTLSProfiles[i].Name
-		}
+	customTLSProfiles, _ := customTLSProfilesValue.(protocol.CustomTLSProfiles)
+	customTLSProfileNames := make([]string, len(customTLSProfiles))
+	for i, profile := range customTLSProfiles {
+		customTLSProfileNames[i] = profile.Name
+	}
+
+	// Special case: PacketManipulations will reference PacketManipulationSpecs.
+
+	serverPacketManipulationSpecsValue, err := getAppliedValue(
+		ServerPacketManipulationSpecs, parameters, applyParameters)
+	if err != nil {
+		return nil, errors.Trace(err)
 	}
+	serverPacketManipulationSpecs, _ :=
+		serverPacketManipulationSpecsValue.(PacketManipulationSpecs)
 
 	for i := 0; i < len(applyParameters); i++ {
 
@@ -633,7 +710,7 @@ func (p *ClientParameters) Set(
 
 		for name, value := range applyParameters[i] {
 
-			existingValue, ok := parameters[name]
+			templateValue, ok := parameters[name]
 			if !ok {
 				if skipOnError {
 					continue
@@ -641,40 +718,15 @@ func (p *ClientParameters) Set(
 				return nil, errors.Tracef("unknown parameter: %s", name)
 			}
 
-			// Accept strings such as "1h" for duration parameters.
-
-			switch existingValue.(type) {
-			case time.Duration:
-				if s, ok := value.(string); ok {
-					if d, err := time.ParseDuration(s); err == nil {
-						value = d
-					}
-				}
-			}
-
-			// A JSON remarshal resolves cases where applyParameters is a
-			// result of unmarshal-into-interface, in which case non-scalar
-			// values will not have the expected types; see:
-			// https://golang.org/pkg/encoding/json/#Unmarshal. This remarshal
-			// also results in a deep copy.
-
-			marshaledValue, err := json.Marshal(value)
-			if err != nil {
-				continue
-			}
-
-			newValuePtr := reflect.New(reflect.TypeOf(existingValue))
-
-			err = json.Unmarshal(marshaledValue, newValuePtr.Interface())
+			newValue, err := makeTypedValue(templateValue, value)
 			if err != nil {
 				if skipOnError {
 					continue
 				}
-				return nil, errors.Tracef("unmarshal parameter %s failed: %s", name, err)
+				return nil, errors.Tracef(
+					"unmarshal parameter %s failed: %v", name, err)
 			}
 
-			newValue := newValuePtr.Elem().Interface()
-
 			// Perform type-specific validation for some cases.
 
 			// TODO: require RemoteServerListSignaturePublicKey when
@@ -761,6 +813,28 @@ func (p *ClientParameters) Set(
 						return nil, errors.Trace(err)
 					}
 				}
+			case PacketManipulationSpecs:
+				err := v.Validate()
+				if err != nil {
+					if skipOnError {
+						continue
+					}
+					return nil, errors.Trace(err)
+				}
+			case ProtocolPacketManipulations:
+
+				var packetManipulationSpecs PacketManipulationSpecs
+				if name == ServerProtocolPacketManipulations {
+					packetManipulationSpecs = serverPacketManipulationSpecs
+				}
+
+				err := v.Validate(packetManipulationSpecs)
+				if err != nil {
+					if skipOnError {
+						continue
+					}
+					return nil, errors.Trace(err)
+				}
 			}
 
 			// Enforce any minimums. Assumes defaultClientParameters[name]
@@ -916,6 +990,25 @@ type ClientParametersAccessor struct {
 	customNetworkLatencyMultiplier float64
 }
 
+// MakeNilClientParametersAccessor produces a stub ClientParametersAccessor
+// which returns true for IsNil. This may be used where a
+// ClientParametersAccessor value is required, but ClientParameters.Get may
+// not succeed. In contexts where MakeNilClientParametersAccessor may be used,
+// calls to ClientParametersAccessor must first check IsNil before calling
+// accessor functions.
+func MakeNilClientParametersAccessor() ClientParametersAccessor {
+	return ClientParametersAccessor{}
+}
+
+// IsNil indicates that this ClientParametersAccessor is a stub and its
+// accessor functions may not be called. A ClientParametersAccessor produced
+// by ClientParameters.Get will never return true for IsNil and IsNil guards
+// are not required for ClientParametersAccessors known to be produced by
+// ClientParameters.Get.
+func (p ClientParametersAccessor) IsNil() bool {
+	return p.snapshot == nil
+}
+
 // Close clears internal references to large memory objects, allowing them to
 // be garbage collected. Call Close when done using a
 // ClientParametersAccessor, where memory footprint is a concern, and where

+ 8 - 0
psiphon/common/prng/prng.go

@@ -134,6 +134,14 @@ func NewPRNGWithSaltedSeed(seed *Seed, salt string) (*PRNG, error) {
 	return NewPRNGWithSeed(saltedSeed), nil
 }
 
+// GetSeed returns the seed for the PRNG. The returned value must not be mutated.
+func (p *PRNG) GetSeed() *Seed {
+	// Concurrency note: p.randomStreamSeed is not mutated after creationg, and
+	// is safe for concurrent reads. p.randomStreamSeed is reread internally, and
+	// so must not be mutated.
+	return p.randomStreamSeed
+}
+
 // Read reads random bytes from the PRNG stream into b. Read conforms to
 // io.Reader and always returns len(p), nil.
 func (p *PRNG) Read(b []byte) (int, error) {

+ 0 - 118
psiphon/common/tactics/tactics.go

@@ -161,18 +161,15 @@ import (
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
-	"net"
 	"net/http"
 	"sort"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"golang.org/x/crypto/nacl/box"
 )
 
@@ -1094,121 +1091,6 @@ func (server *Server) handleTacticsRequest(
 	server.logger.LogMetric(TACTICS_METRIC_EVENT_NAME, logFields)
 }
 
-// Listener wraps a net.Listener and applies server-side implementation of
-// certain tactics parameters to accepted connections. Tactics filtering is
-// limited to GeoIP attributes as the client has not yet sent API paramaters.
-type Listener struct {
-	net.Listener
-	server         *Server
-	tunnelProtocol string
-	geoIPLookup    func(IPaddress string) common.GeoIPData
-}
-
-// NewListener creates a new Listener.
-func NewListener(
-	listener net.Listener,
-	server *Server,
-	tunnelProtocol string,
-	geoIPLookup func(IPaddress string) common.GeoIPData) *Listener {
-
-	return &Listener{
-		Listener:       listener,
-		server:         server,
-		tunnelProtocol: tunnelProtocol,
-		geoIPLookup:    geoIPLookup,
-	}
-}
-
-// Accept calls the underlying listener's Accept, and then checks if tactics
-// for the connection set LimitTunnelProtocols.
-//
-// If LimitTunnelProtocols is set and does not include the tunnel protocol the
-// listener is running, the accepted connection is immediately closed and the
-// underlying Accept is called again.
-//
-// For retained connections, fragmentation is applied when specified by
-// tactics.
-func (listener *Listener) Accept() (net.Conn, error) {
-
-	conn, err := listener.Listener.Accept()
-	if err != nil {
-		// Don't modify error from net.Listener
-		return nil, err
-	}
-
-	geoIPData := listener.geoIPLookup(common.IPAddressFromAddr(conn.RemoteAddr()))
-
-	tactics, err := listener.server.GetTactics(true, geoIPData, make(common.APIParameters))
-	if err != nil {
-		listener.server.logger.WithTraceFields(
-			common.LogFields{"error": err}).Warning("failed to get tactics for connection")
-		// If tactics is somehow misconfigured, keep handling connections.
-		// Other error cases that follow below take the same approach.
-		return conn, nil
-	}
-
-	if tactics == nil {
-		// This server isn't configured with tactics.
-		return conn, nil
-	}
-
-	if !prng.FlipWeightedCoin(tactics.Probability) {
-		// Skip tactics with the configured probability.
-		return conn, nil
-	}
-
-	clientParameters, err := parameters.NewClientParameters(nil)
-	if err != nil {
-		return conn, nil
-	}
-
-	_, err = clientParameters.Set("", false, tactics.Parameters)
-	if err != nil {
-		return conn, nil
-	}
-
-	p := clientParameters.Get()
-
-	// Wrap the conn in a fragmentor.Conn, subject to tactics parameters.
-	//
-	// Limitation: this server-side fragmentation is not synchronized with
-	// client-side; where client-side will make a single coin flip to fragment
-	// or not fragment all TCP connections for a one meek session, the server
-	// will make a coin flip per connection.
-	//
-	// Delay seeding the fragmentor PRNG when we can derive a seed from the
-	// client's initial obfuscation message. This enables server-side replay
-	// of fragmentation when initiated by the client. Currently this is only
-	// supported for OSSH: SSH lacks the initial obfuscation message, and
-	// meek and other protocols transmit downstream data before the initial
-	// obfuscation message arrives.
-
-	var seed *prng.Seed
-	if listener.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
-		seed, err = prng.NewSeed()
-		if err != nil {
-			listener.server.logger.WithTraceFields(
-				common.LogFields{"error": err}).Warning("failed to seed fragmentor PRNG")
-			return conn, nil
-		}
-	}
-
-	fragmentorConfig := fragmentor.NewDownstreamConfig(
-		p, listener.tunnelProtocol, seed)
-
-	if fragmentorConfig.MayFragment() {
-		conn = fragmentor.NewConn(
-			fragmentorConfig,
-			func(message string) {
-				listener.server.logger.WithTraceFields(
-					common.LogFields{"message": message}).Debug("Fragmentor")
-			},
-			conn)
-	}
-
-	return conn, nil
-}
-
 // RoundTripper performs a round trip to the specified endpoint, sending the
 // request body and returning the response body. The context may be used to
 // set a timeout or cancel the rount trip.

+ 0 - 138
psiphon/common/tactics/tactics_test.go

@@ -33,7 +33,6 @@ import (
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
@@ -98,23 +97,6 @@ func TestTactics(t *testing.T) {
               "ConnectionWorkerPoolSize" : %d
             }
           }
-        },
-        {
-          "Filter" : {
-            "Regions": ["R7"],
-            "ISPs": ["I1"],
-            "Cities": ["C1"]
-          },
-          "Tactics" : {
-            "Parameters" : {
-              "FragmentorDownstreamProbability" : 1.0,
-              "FragmentorDownstreamMinTotalBytes" : 1,
-              "FragmentorDownstreamMaxTotalBytes" : 1,
-              "FragmentorDownstreamMinWriteBytes" : 1,
-              "FragmentorDownstreamMaxWriteBytes" : 1,
-              "FragmentorDownstreamLimitProtocols" : ["OSSH"]
-            }
-          }
         }
       ]
     }
@@ -136,20 +118,6 @@ func TestTactics(t *testing.T) {
 
 	expectedApplyCount := 3
 
-	listenerProtocol := "OSSH"
-	listenerFragmentedGeoIP := func(string) common.GeoIPData {
-		return common.GeoIPData{Country: "R7", ISP: "I1", City: "C1"}
-	}
-	listenerUnfragmentedGeoIPWrongRegion := func(string) common.GeoIPData {
-		return common.GeoIPData{Country: "R8", ISP: "I1", City: "C1"}
-	}
-	listenerUnfragmentedGeoIPWrongISP := func(string) common.GeoIPData {
-		return common.GeoIPData{Country: "R7", ISP: "I2", City: "C1"}
-	}
-	listenerUnfragmentedGeoIPWrongCity := func(string) common.GeoIPData {
-		return common.GeoIPData{Country: "R7", ISP: "I1", City: "C2"}
-	}
-
 	tacticsConfig := fmt.Sprintf(
 		tacticsConfigTemplate,
 		encodedRequestPublicKey,
@@ -726,112 +694,6 @@ func TestTactics(t *testing.T) {
 		t.Fatalf("HandleEndPoint unexpectedly handled request")
 	}
 
-	// Test Listener
-
-	tacticsProbability = 1.0
-
-	tacticsConfig = fmt.Sprintf(
-		tacticsConfigTemplate,
-		"",
-		"",
-		"",
-		tacticsProbability,
-		tacticsNetworkLatencyMultiplier,
-		tacticsConnectionWorkerPoolSize,
-		jsonTacticsLimitTunnelProtocols,
-		tacticsConnectionWorkerPoolSize+1)
-
-	err = ioutil.WriteFile(configFileName, []byte(tacticsConfig), 0600)
-	if err != nil {
-		t.Fatalf("WriteFile failed: %s", err)
-	}
-
-	reloaded, err = server.Reload()
-	if err != nil {
-		t.Fatalf("Reload failed: %s", err)
-	}
-
-	if !reloaded {
-		t.Fatalf("Server config failed to reload")
-	}
-
-	listenerTestCases := []struct {
-		description      string
-		geoIPLookup      func(string) common.GeoIPData
-		expectFragmentor bool
-	}{
-		{
-			"fragmented",
-			listenerFragmentedGeoIP,
-			true,
-		},
-		{
-			"unfragmented-region",
-			listenerUnfragmentedGeoIPWrongRegion,
-			false,
-		},
-		{
-			"unfragmented-ISP",
-			listenerUnfragmentedGeoIPWrongISP,
-			false,
-		},
-		{
-			"unfragmented-city",
-			listenerUnfragmentedGeoIPWrongCity,
-			false,
-		},
-	}
-
-	for _, testCase := range listenerTestCases {
-		t.Run(testCase.description, func(t *testing.T) {
-
-			tcpListener, err := net.Listen("tcp", ":0")
-			if err != nil {
-				t.Fatalf(" net.Listen failed: %s", err)
-			}
-
-			tacticsListener := NewListener(
-				tcpListener,
-				server,
-				listenerProtocol,
-				testCase.geoIPLookup)
-
-			clientConn, err := net.Dial("tcp", tacticsListener.Addr().String())
-			if err != nil {
-				t.Fatalf(" net.Dial failed: %s", err)
-				return
-			}
-
-			result := make(chan net.Conn, 1)
-
-			go func() {
-				serverConn, err := tacticsListener.Accept()
-				if err == nil {
-					result <- serverConn
-				}
-			}()
-
-			timer := time.NewTimer(3 * time.Second)
-			defer timer.Stop()
-
-			select {
-			case serverConn := <-result:
-				_, isFragmentor := serverConn.(*fragmentor.Conn)
-				if testCase.expectFragmentor && !isFragmentor {
-					t.Fatalf("unexpected non-fragmentor: %T", serverConn)
-				} else if !testCase.expectFragmentor && isFragmentor {
-					t.Fatalf("unexpected fragmentor:  %T", serverConn)
-				}
-				serverConn.Close()
-			case <-timer.C:
-				t.Fatalf("timeout before expected accepted connection")
-			}
-
-			clientConn.Close()
-			tacticsListener.Close()
-		})
-	}
-
 	// TODO: test replay attack defence
 	// TODO: test Server.Validate with invalid tactics configurations
 }

+ 2 - 1
psiphon/dataStore.go

@@ -569,7 +569,8 @@ func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *Server
 				config.UseUpstreamProxy(),
 				limitTunnelProtocols,
 				false)) == 0 {
-				return false, nil, errors.TraceNew("TargetServerEntry does not support LimitTunnelProtocols")
+				return false, nil, errors.Tracef(
+					"TargetServerEntry does not support LimitTunnelProtocols: %v", limitTunnelProtocols)
 			}
 		}
 	}

+ 19 - 0
psiphon/server/config.go

@@ -214,6 +214,25 @@ type Config struct {
 	// used as the client IP.
 	MeekProxyForwardedForHeaders []string
 
+	// MeekTurnAroundTimeoutMilliseconds specifies the amount of time meek will
+	// wait for downstream bytes before responding to a request. The default is
+	// MEEK_DEFAULT_TURN_AROUND_TIMEOUT.
+	MeekTurnAroundTimeoutMilliseconds *int
+
+	// MeekExtendedTurnAroundTimeoutMilliseconds specifies the extended amount of
+	// time meek will wait for downstream bytes, as long as bytes arrive every
+	// MeekTurnAroundTimeoutMilliseconds, before responding to a request. The
+	// default is MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT.
+	MeekExtendedTurnAroundTimeoutMilliseconds *int
+
+	// MeekMaxSessionStalenessMilliseconds specifies the TTL for meek sessions.
+	// The default is MEEK_DEFAULT_MAX_SESSION_STALENESS.
+	MeekMaxSessionStalenessMilliseconds *int
+
+	// MeekHTTPClientIOTimeoutMilliseconds specifies meek HTTP server I/O
+	// timeouts. The default is MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT.
+	MeekHTTPClientIOTimeoutMilliseconds *int
+
 	// MeekCachedResponseBufferSize is the size of a private,
 	// fixed-size buffer allocated for every meek client. The buffer
 	// is used to cache response payload, allowing the client to retry

+ 147 - 0
psiphon/server/listener.go

@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package server
+
+import (
+	"net"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+)
+
+// TacticsListener wraps a net.Listener and applies server-side implementation
+// of certain tactics parameters to accepted connections. Tactics filtering is
+// limited to GeoIP attributes as the client has not yet sent API paramaters.
+type TacticsListener struct {
+	net.Listener
+	support        *SupportServices
+	tunnelProtocol string
+	geoIPLookup    func(IPaddress string) GeoIPData
+}
+
+// NewTacticsListener creates a new TacticsListener.
+func NewTacticsListener(
+	support *SupportServices,
+	listener net.Listener,
+	tunnelProtocol string,
+	geoIPLookup func(IPaddress string) GeoIPData) *TacticsListener {
+
+	return &TacticsListener{
+		Listener:       listener,
+		support:        support,
+		tunnelProtocol: tunnelProtocol,
+		geoIPLookup:    geoIPLookup,
+	}
+}
+
+// Accept calls the underlying listener's Accept, and then checks if tactics
+// for the connection set LimitTunnelProtocols.
+//
+// If LimitTunnelProtocols is set and does not include the tunnel protocol the
+// listener is running, the accepted connection is immediately closed and the
+// underlying Accept is called again.
+//
+// For retained connections, fragmentation is applied when specified by
+// tactics.
+func (listener *TacticsListener) Accept() (net.Conn, error) {
+
+	conn, err := listener.Listener.Accept()
+	if err != nil {
+		// Don't modify error from net.Listener
+		return nil, err
+	}
+
+	geoIPData := listener.geoIPLookup(
+		common.IPAddressFromAddr(conn.RemoteAddr()))
+
+	p, err := GetServerTacticsParameters(
+		listener.support, geoIPData)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+
+	if p.IsNil() {
+		// No tactics are configured; use the accepted conn without customization.
+		return conn, nil
+	}
+
+	// Server-side fragmentation may be synchronized with client-side in two ways.
+	//
+	// In the OSSH case, replay is always activated and it is seeded using the
+	// content of the client's OSSH seed message, which is fully delivered before
+	// the server sends any bytes. SetReplay is deferred until after the seed
+	// message is read by obfuscator.NewServerObfuscatedSSHConn. doReplay is set
+	// to true so no seed is set at this time.
+	//
+	// SSH lacks the initial obfuscation message, and meek and other protocols
+	// transmit downstream data before the initial obfuscation message arrives.
+	// For these protocols, server-side fragmentation will happen, initially,
+	// with an uncoordinated coin flip, based on server-side tactics
+	// configuration. For protocols with multiple underlying TCP connections,
+	// such as meek, the coin flip is performed independently once per
+	// TCP connection.
+	//
+	// The server-side replay mechanism is used to replay successful server-side
+	// fragmentation for uncoordinated protocols, subject to replay configuration
+	// parameters. In this case, the replay seed returned by GetReplayFragmentor
+	// below is applied.
+
+	replaySeed, doReplay := listener.support.ReplayCache.GetReplayFragmentor(
+		listener.tunnelProtocol, geoIPData)
+
+	if listener.tunnelProtocol == protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
+		replaySeed = nil
+		doReplay = true
+	}
+
+	var newSeed *prng.Seed
+	if !doReplay {
+		var err error
+		newSeed, err = prng.NewSeed()
+		if err != nil {
+			log.WithTraceFields(
+				LogFields{"error": err}).Warning("failed to seed fragmentor PRNG")
+			return conn, nil
+		}
+	}
+
+	fragmentorConfig := fragmentor.NewDownstreamConfig(
+		p, listener.tunnelProtocol, newSeed)
+
+	if fragmentorConfig.MayFragment() {
+		conn = fragmentor.NewConn(
+			fragmentorConfig,
+			func(message string) {
+				log.WithTraceFields(
+					LogFields{"message": message}).Debug("Fragmentor")
+			},
+			conn)
+
+		if doReplay && replaySeed != nil {
+			conn.(common.FragmentorReplayAccessor).SetReplay(
+				prng.NewPRNGWithSeed(replaySeed))
+		}
+	}
+
+	return conn, nil
+}

+ 193 - 0
psiphon/server/listener_test.go

@@ -0,0 +1,193 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package server
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net"
+	"path/filepath"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
+)
+
+func TestListener(t *testing.T) {
+
+	tunnelProtocol := protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH
+
+	tacticsConfigJSONFormat := `
+    {
+      "RequestPublicKey" : "%s",
+      "RequestPrivateKey" : "%s",
+      "RequestObfuscatedKey" : "%s",
+      "DefaultTactics" : {
+        "TTL" : "60s",
+        "Probability" : 1.0
+      },
+      "FilteredTactics" : [
+        {
+          "Filter" : {
+            "Regions": ["R1"],
+            "ISPs": ["I1"],
+            "Cities": ["C1"]
+          },
+          "Tactics" : {
+            "Parameters" : {
+              "LimitTunnelProtocols" : ["%s"],
+              "FragmentorDownstreamLimitProtocols" : ["%s"],
+              "FragmentorDownstreamProbability" : 1.0,
+              "FragmentorDownstreamMinTotalBytes" : 1,
+              "FragmentorDownstreamMaxTotalBytes" : 1,
+              "FragmentorDownstreamMinWriteBytes" : 1,
+              "FragmentorDownstreamMaxWriteBytes" : 1,
+              "FragmentorDownstreamLimitProtocols" : ["OSSH"]
+            }
+          }
+        }
+      ]
+    }
+    `
+
+	tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey, err :=
+		tactics.GenerateKeys()
+	if err != nil {
+		t.Fatalf("error generating tactics keys: %s", err)
+	}
+
+	tacticsConfigJSON := fmt.Sprintf(
+		tacticsConfigJSONFormat,
+		tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey,
+		tunnelProtocol, tunnelProtocol)
+
+	tacticsConfigFilename := filepath.Join(testDataDirName, "tactics_config.json")
+
+	err = ioutil.WriteFile(tacticsConfigFilename, []byte(tacticsConfigJSON), 0600)
+	if err != nil {
+		t.Fatalf("error paving tactics config file: %s", err)
+	}
+
+	tacticsServer, err := tactics.NewServer(
+		nil,
+		nil,
+		nil,
+		tacticsConfigFilename)
+	if err != nil {
+		t.Fatalf("NewServer failed: %s", err)
+	}
+
+	listenerFragmentedGeoIP := func(string) GeoIPData {
+		return GeoIPData{Country: "R1", ISP: "I1", City: "C1"}
+	}
+	listenerUnfragmentedGeoIPWrongRegion := func(string) GeoIPData {
+		return GeoIPData{Country: "R2", ISP: "I1", City: "C1"}
+	}
+	listenerUnfragmentedGeoIPWrongISP := func(string) GeoIPData {
+		return GeoIPData{Country: "R1", ISP: "I2", City: "C1"}
+	}
+	listenerUnfragmentedGeoIPWrongCity := func(string) GeoIPData {
+		return GeoIPData{Country: "R1", ISP: "I1", City: "C2"}
+	}
+
+	listenerTestCases := []struct {
+		description      string
+		geoIPLookup      func(string) GeoIPData
+		expectFragmentor bool
+	}{
+		{
+			"fragmented",
+			listenerFragmentedGeoIP,
+			true,
+		},
+		{
+			"unfragmented-region",
+			listenerUnfragmentedGeoIPWrongRegion,
+			false,
+		},
+		{
+			"unfragmented-ISP",
+			listenerUnfragmentedGeoIPWrongISP,
+			false,
+		},
+		{
+			"unfragmented-city",
+			listenerUnfragmentedGeoIPWrongCity,
+			false,
+		},
+	}
+
+	for _, testCase := range listenerTestCases {
+		t.Run(testCase.description, func(t *testing.T) {
+
+			tcpListener, err := net.Listen("tcp", ":0")
+			if err != nil {
+				t.Fatalf(" net.Listen failed: %s", err)
+			}
+
+			support := &SupportServices{
+				TacticsServer: tacticsServer,
+			}
+			support.ReplayCache = NewReplayCache(support)
+
+			tacticsListener := NewTacticsListener(
+				support,
+				tcpListener,
+				tunnelProtocol,
+				testCase.geoIPLookup)
+
+			clientConn, err := net.Dial("tcp", tacticsListener.Addr().String())
+			if err != nil {
+				t.Fatalf(" net.Dial failed: %s", err)
+				return
+			}
+
+			result := make(chan net.Conn, 1)
+
+			go func() {
+				serverConn, err := tacticsListener.Accept()
+				if err == nil {
+					result <- serverConn
+				}
+			}()
+
+			timer := time.NewTimer(3 * time.Second)
+			defer timer.Stop()
+
+			select {
+			case serverConn := <-result:
+				_, isFragmentor := serverConn.(*fragmentor.Conn)
+				if testCase.expectFragmentor && !isFragmentor {
+					t.Fatalf("unexpected non-fragmentor: %T", serverConn)
+				} else if !testCase.expectFragmentor && isFragmentor {
+					t.Fatalf("unexpected fragmentor:  %T", serverConn)
+				}
+				serverConn.Close()
+			case <-timer.C:
+				t.Fatalf("timeout before expected accepted connection")
+			}
+
+			clientConn.Close()
+			tacticsListener.Close()
+		})
+	}
+}

+ 11 - 0
psiphon/server/log.go

@@ -48,6 +48,17 @@ type TraceLogger struct {
 // package.
 type LogFields logrus.Fields
 
+// Add copies log fields from b to a, skipping fields which already exist,
+// regardless of value, in a.
+func (a LogFields) Add(b LogFields) {
+	for name, value := range b {
+		_, ok := a[name]
+		if !ok {
+			a[name] = value
+		}
+	}
+}
+
 // WithTrace adds a "trace" field containing the caller's function name
 // and source file line number; and "host_id" and "build_rev" fields
 // identifying this server and build. Use this function when the log has no

+ 184 - 102
psiphon/server/meek.go

@@ -74,16 +74,16 @@ const (
 	// when retrying a request for a partially downloaded response payload.
 	MEEK_PROTOCOL_VERSION_3 = 3
 
-	MEEK_MAX_REQUEST_PAYLOAD_LENGTH     = 65536
-	MEEK_TURN_AROUND_TIMEOUT            = 20 * time.Millisecond
-	MEEK_EXTENDED_TURN_AROUND_TIMEOUT   = 100 * time.Millisecond
-	MEEK_MAX_SESSION_STALENESS          = 45 * time.Second
-	MEEK_HTTP_CLIENT_IO_TIMEOUT         = 45 * time.Second
-	MEEK_MIN_SESSION_ID_LENGTH          = 8
-	MEEK_MAX_SESSION_ID_LENGTH          = 20
-	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
-	MEEK_DEFAULT_POOL_BUFFER_LENGTH     = 65536
-	MEEK_DEFAULT_POOL_BUFFER_COUNT      = 2048
+	MEEK_MAX_REQUEST_PAYLOAD_LENGTH           = 65536
+	MEEK_MIN_SESSION_ID_LENGTH                = 8
+	MEEK_MAX_SESSION_ID_LENGTH                = 20
+	MEEK_DEFAULT_TURN_AROUND_TIMEOUT          = 20 * time.Millisecond
+	MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT = 100 * time.Millisecond
+	MEEK_DEFAULT_MAX_SESSION_STALENESS        = 45 * time.Second
+	MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT       = 45 * time.Second
+	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH       = 65536
+	MEEK_DEFAULT_POOL_BUFFER_LENGTH           = 65536
+	MEEK_DEFAULT_POOL_BUFFER_COUNT            = 2048
 )
 
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
@@ -98,24 +98,28 @@ const (
 // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
 // the meekConn struct.
 type MeekServer struct {
-	support                *SupportServices
-	listener               net.Listener
-	listenerTunnelProtocol string
-	listenerPort           int
-	passthroughAddress     string
-	tlsConfig              *tris.Config
-	obfuscatorSeedHistory  *obfuscator.SeedHistory
-	clientHandler          func(clientTunnelProtocol string, clientConn net.Conn)
-	openConns              *common.Conns
-	stopBroadcast          <-chan struct{}
-	sessionsLock           sync.RWMutex
-	sessions               map[string]*meekSession
-	checksumTable          *crc64.Table
-	bufferPool             *CachedResponseBufferPool
-	rateLimitLock          sync.Mutex
-	rateLimitHistory       map[string][]time.Time
-	rateLimitCount         int
-	rateLimitSignalGC      chan struct{}
+	support                   *SupportServices
+	listener                  net.Listener
+	listenerTunnelProtocol    string
+	listenerPort              int
+	passthroughAddress        string
+	turnAroundTimeout         time.Duration
+	extendedTurnAroundTimeout time.Duration
+	maxSessionStaleness       time.Duration
+	httpClientIOTimeout       time.Duration
+	tlsConfig                 *tris.Config
+	obfuscatorSeedHistory     *obfuscator.SeedHistory
+	clientHandler             func(clientTunnelProtocol string, clientConn net.Conn)
+	openConns                 *common.Conns
+	stopBroadcast             <-chan struct{}
+	sessionsLock              sync.RWMutex
+	sessions                  map[string]*meekSession
+	checksumTable             *crc64.Table
+	bufferPool                *CachedResponseBufferPool
+	rateLimitLock             sync.Mutex
+	rateLimitHistory          map[string][]time.Time
+	rateLimitCount            int
+	rateLimitSignalGC         chan struct{}
 }
 
 // NewMeekServer initializes a new meek server.
@@ -128,6 +132,32 @@ func NewMeekServer(
 	clientHandler func(clientTunnelProtocol string, clientConn net.Conn),
 	stopBroadcast <-chan struct{}) (*MeekServer, error) {
 
+	passthroughAddress := support.Config.TunnelProtocolPassthroughAddresses[listenerTunnelProtocol]
+
+	turnAroundTimeout := MEEK_DEFAULT_TURN_AROUND_TIMEOUT
+	if support.Config.MeekTurnAroundTimeoutMilliseconds != nil {
+		turnAroundTimeout = time.Duration(
+			*support.Config.MeekTurnAroundTimeoutMilliseconds) * time.Millisecond
+	}
+
+	extendedTurnAroundTimeout := MEEK_DEFAULT_EXTENDED_TURN_AROUND_TIMEOUT
+	if support.Config.MeekExtendedTurnAroundTimeoutMilliseconds != nil {
+		extendedTurnAroundTimeout = time.Duration(
+			*support.Config.MeekExtendedTurnAroundTimeoutMilliseconds) * time.Millisecond
+	}
+
+	maxSessionStaleness := MEEK_DEFAULT_MAX_SESSION_STALENESS
+	if support.Config.MeekMaxSessionStalenessMilliseconds != nil {
+		maxSessionStaleness = time.Duration(
+			*support.Config.MeekMaxSessionStalenessMilliseconds) * time.Millisecond
+	}
+
+	httpClientIOTimeout := MEEK_DEFAULT_HTTP_CLIENT_IO_TIMEOUT
+	if support.Config.MeekHTTPClientIOTimeoutMilliseconds != nil {
+		httpClientIOTimeout = time.Duration(
+			*support.Config.MeekHTTPClientIOTimeoutMilliseconds) * time.Millisecond
+	}
+
 	checksumTable := crc64.MakeTable(crc64.ECMA)
 
 	bufferLength := MEEK_DEFAULT_POOL_BUFFER_LENGTH
@@ -142,23 +172,25 @@ func NewMeekServer(
 
 	bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
 
-	passthroughAddress := support.Config.TunnelProtocolPassthroughAddresses[listenerTunnelProtocol]
-
 	meekServer := &MeekServer{
-		support:                support,
-		listener:               listener,
-		listenerTunnelProtocol: listenerTunnelProtocol,
-		listenerPort:           listenerPort,
-		passthroughAddress:     passthroughAddress,
-		obfuscatorSeedHistory:  obfuscator.NewSeedHistory(nil),
-		clientHandler:          clientHandler,
-		openConns:              common.NewConns(),
-		stopBroadcast:          stopBroadcast,
-		sessions:               make(map[string]*meekSession),
-		checksumTable:          checksumTable,
-		bufferPool:             bufferPool,
-		rateLimitHistory:       make(map[string][]time.Time),
-		rateLimitSignalGC:      make(chan struct{}, 1),
+		support:                   support,
+		listener:                  listener,
+		listenerTunnelProtocol:    listenerTunnelProtocol,
+		listenerPort:              listenerPort,
+		passthroughAddress:        passthroughAddress,
+		turnAroundTimeout:         turnAroundTimeout,
+		extendedTurnAroundTimeout: extendedTurnAroundTimeout,
+		maxSessionStaleness:       maxSessionStaleness,
+		httpClientIOTimeout:       httpClientIOTimeout,
+		obfuscatorSeedHistory:     obfuscator.NewSeedHistory(nil),
+		clientHandler:             clientHandler,
+		openConns:                 common.NewConns(),
+		stopBroadcast:             stopBroadcast,
+		sessions:                  make(map[string]*meekSession),
+		checksumTable:             checksumTable,
+		bufferPool:                bufferPool,
+		rateLimitHistory:          make(map[string][]time.Time),
+		rateLimitSignalGC:         make(chan struct{}, 1),
 	}
 
 	if useTLS {
@@ -192,7 +224,7 @@ func (server *MeekServer) Run() error {
 	waitGroup.Add(1)
 	go func() {
 		defer waitGroup.Done()
-		ticker := time.NewTicker(MEEK_MAX_SESSION_STALENESS / 2)
+		ticker := time.NewTicker(server.maxSessionStaleness / 2)
 		defer ticker.Stop()
 		for {
 			select {
@@ -220,8 +252,8 @@ func (server *MeekServer) Run() error {
 	//   now be sufficient.
 
 	httpServer := &http.Server{
-		ReadTimeout:  MEEK_HTTP_CLIENT_IO_TIMEOUT,
-		WriteTimeout: MEEK_HTTP_CLIENT_IO_TIMEOUT,
+		ReadTimeout:  server.httpClientIOTimeout,
+		WriteTimeout: server.httpClientIOTimeout,
 		Handler:      server,
 		ConnState:    server.httpConnStateCallback,
 		ConnContext: func(ctx context.Context, conn net.Conn) context.Context {
@@ -644,8 +676,7 @@ func (server *MeekServer) getSessionOrEndpoint(
 	clientConn := newMeekConn(
 		server,
 		session,
-		underlyingConn.LocalAddr(),
-		underlyingConn.RemoteAddr(),
+		underlyingConn,
 		&net.TCPAddr{
 			IP:   net.ParseIP(clientIP),
 			Port: 0,
@@ -1130,8 +1161,14 @@ func (session *meekSession) touch() {
 }
 
 func (session *meekSession) expired() bool {
+	if session.clientConn == nil {
+		// Not fully initialized. meekSession.clientConn will be set before adding
+		// the session to MeekServer.sessions.
+		return false
+	}
 	lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
-	return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
+	return monotime.Since(lastActivity) >
+		session.clientConn.meekServer.maxSessionStaleness
 }
 
 // delete releases all resources allocated by a session.
@@ -1209,50 +1246,51 @@ func makeMeekSessionID() (string, error) {
 // connection by the tunnel server (being passed to sshServer.handleClient).
 // meekConn bridges net/http request/response payload readers and writers
 // and goroutines calling Read()s and Write()s.
-//
-// meekConn implements the UnderlyingTCPAddrSource, returning the TCP
-// addresses for the _first_ underlying TCP connection in the meek tunnel.
 type meekConn struct {
-	meekServer           *MeekServer
-	meekSession          *meekSession
-	remoteAddr           net.Addr
-	underlyingLocalAddr  net.Addr
-	underlyingRemoteAddr net.Addr
-	protocolVersion      int
-	closeBroadcast       chan struct{}
-	closed               int32
-	lastReadChecksum     *uint64
-	readLock             sync.Mutex
-	emptyReadBuffer      chan *bytes.Buffer
-	partialReadBuffer    chan *bytes.Buffer
-	fullReadBuffer       chan *bytes.Buffer
-	writeLock            sync.Mutex
-	nextWriteBuffer      chan []byte
-	writeResult          chan error
+	meekServer          *MeekServer
+	meekSession         *meekSession
+	firstUnderlyingConn net.Conn
+	remoteAddr          net.Addr
+	protocolVersion     int
+	closeBroadcast      chan struct{}
+	closed              int32
+	lastReadChecksum    *uint64
+	readLock            sync.Mutex
+	emptyReadBuffer     chan *bytes.Buffer
+	partialReadBuffer   chan *bytes.Buffer
+	fullReadBuffer      chan *bytes.Buffer
+	writeLock           sync.Mutex
+	nextWriteBuffer     chan []byte
+	writeResult         chan error
 }
 
 func newMeekConn(
 	meekServer *MeekServer,
 	meekSession *meekSession,
-	underlyingLocalAddr net.Addr,
-	underlyingRemoteAddr net.Addr,
+	underlyingConn net.Conn,
 	remoteAddr net.Addr,
 	protocolVersion int) *meekConn {
 
+	// In order to inspect its properties, meekConn will hold a reference to
+	// firstUnderlyingConn, the _first_ underlying TCP conn, for the full
+	// lifetime of meekConn, which may exceed the lifetime of firstUnderlyingConn
+	// and include subsequent underlying TCP conns. In this case, it is expected
+	// that firstUnderlyingConn will be closed by "net/http", so no OS resources
+	// (e.g., a socket) are retained longer than necessary.
+
 	conn := &meekConn{
-		meekServer:           meekServer,
-		meekSession:          meekSession,
-		underlyingLocalAddr:  underlyingLocalAddr,
-		underlyingRemoteAddr: underlyingRemoteAddr,
-		remoteAddr:           remoteAddr,
-		protocolVersion:      protocolVersion,
-		closeBroadcast:       make(chan struct{}),
-		closed:               0,
-		emptyReadBuffer:      make(chan *bytes.Buffer, 1),
-		partialReadBuffer:    make(chan *bytes.Buffer, 1),
-		fullReadBuffer:       make(chan *bytes.Buffer, 1),
-		nextWriteBuffer:      make(chan []byte, 1),
-		writeResult:          make(chan error, 1),
+		meekServer:          meekServer,
+		meekSession:         meekSession,
+		firstUnderlyingConn: underlyingConn,
+		remoteAddr:          remoteAddr,
+		protocolVersion:     protocolVersion,
+		closeBroadcast:      make(chan struct{}),
+		closed:              0,
+		emptyReadBuffer:     make(chan *bytes.Buffer, 1),
+		partialReadBuffer:   make(chan *bytes.Buffer, 1),
+		fullReadBuffer:      make(chan *bytes.Buffer, 1),
+		nextWriteBuffer:     make(chan []byte, 1),
+		writeResult:         make(chan error, 1),
 	}
 	// Read() calls and pumpReads() are synchronized by exchanging control
 	// of a single readBuffer. This is the same scheme used in and described
@@ -1261,18 +1299,70 @@ func newMeekConn(
 	return conn
 }
 
+// GetMetrics implements the common.MetricsSource interface. The metrics are
+// maintained in the meek session type; but logTunnel, which calls
+// MetricsSource.GetMetrics, has a pointer only to this conn, so it calls
+// through to the session.
+func (conn *meekConn) GetMetrics() common.LogFields {
+
+	logFields := conn.meekSession.GetMetrics()
+
+	if conn.meekServer.passthroughAddress != "" {
+		logFields["passthrough_address"] = conn.meekServer.passthroughAddress
+	}
+
+	// Include metrics, such as fragmentor metrics, from the _first_ underlying
+	// TCP conn. Properties of subsequent underlying TCP conns are not reflected
+	// in these metrics; we assume that the first TCP conn, which most likely
+	// transits the various protocol handshakes, is most significant.
+	underlyingMetrics, ok := conn.firstUnderlyingConn.(common.MetricsSource)
+	if ok {
+		logFields.Add(underlyingMetrics.GetMetrics())
+	}
+
+	return logFields
+}
+
+// GetUnderlyingTCPAddrs implements the common.UnderlyingTCPAddrSource
+// interface, returning the TCP addresses for the _first_ underlying TCP
+// connection in the meek tunnel.
 func (conn *meekConn) GetUnderlyingTCPAddrs() (*net.TCPAddr, *net.TCPAddr, bool) {
-	localAddr, ok := conn.underlyingLocalAddr.(*net.TCPAddr)
+	localAddr, ok := conn.firstUnderlyingConn.LocalAddr().(*net.TCPAddr)
 	if !ok {
 		return nil, nil, false
 	}
-	remoteAddr, ok := conn.underlyingRemoteAddr.(*net.TCPAddr)
+	remoteAddr, ok := conn.firstUnderlyingConn.RemoteAddr().(*net.TCPAddr)
 	if !ok {
 		return nil, nil, false
 	}
 	return localAddr, remoteAddr, true
 }
 
+// SetReplay implements the common.FragmentorReplayAccessor interface, applying
+// the inputs to the _first_ underlying TCP connection in the meek tunnel. If
+// the underlying connection is closed, the SetSeed call will have no effect.
+func (conn *meekConn) SetReplay(PRNG *prng.PRNG) {
+	fragmentor, ok := conn.firstUnderlyingConn.(common.FragmentorReplayAccessor)
+	if ok {
+		fragmentor.SetReplay(PRNG)
+	}
+}
+
+// GetReplay implements the FragmentorReplayAccessor interface, getting the
+// outputs from the _first_ underlying TCP connection in the meek tunnel.
+//
+// We assume that the first TCP conn is most significant: the initial TCP
+// connection most likely fragments protocol handshakes; and, in the case the
+// packet manipulation, any selected packet manipulation spec would have been
+// successful.
+func (conn *meekConn) GetReplay() (*prng.Seed, bool) {
+	fragmentor, ok := conn.firstUnderlyingConn.(common.FragmentorReplayAccessor)
+	if ok {
+		return fragmentor.GetReplay()
+	}
+	return nil, false
+}
+
 // pumpReads causes goroutines blocking on meekConn.Read() to read
 // from the specified reader. This function blocks until the reader
 // is fully consumed or the meekConn is closed. A read buffer allows
@@ -1387,7 +1477,7 @@ func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
 func (conn *meekConn) pumpWrites(writer io.Writer) (int, error) {
 
 	startTime := time.Now()
-	timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
+	timeout := time.NewTimer(conn.meekServer.turnAroundTimeout)
 	defer timeout.Stop()
 
 	n := 0
@@ -1411,10 +1501,10 @@ func (conn *meekConn) pumpWrites(writer io.Writer) (int, error) {
 				return n, nil
 			}
 			totalElapsedTime := time.Since(startTime) / time.Millisecond
-			if totalElapsedTime >= MEEK_EXTENDED_TURN_AROUND_TIMEOUT {
+			if totalElapsedTime >= conn.meekServer.extendedTurnAroundTimeout {
 				return n, nil
 			}
-			timeout.Reset(MEEK_TURN_AROUND_TIMEOUT)
+			timeout.Reset(conn.meekServer.turnAroundTimeout)
 		case <-timeout.C:
 			return n, nil
 		case <-conn.closeBroadcast:
@@ -1483,6 +1573,10 @@ func (conn *meekConn) Write(buffer []byte) (int, error) {
 func (conn *meekConn) Close() error {
 	if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
 		close(conn.closeBroadcast)
+
+		// In general, we reply on "net/http" to close underlying TCP conns. In this
+		// case, we can directly close the first once, if it's still open.
+		conn.firstUnderlyingConn.Close()
 	}
 	return nil
 }
@@ -1509,7 +1603,7 @@ func (conn *meekConn) RemoteAddr() net.Addr {
 // timing out on idle on or before the requested deadline.
 func (conn *meekConn) SetDeadline(t time.Time) error {
 	// Overhead: nanoseconds (https://blog.cloudflare.com/its-go-time-on-linux/)
-	if time.Now().Add(MEEK_MAX_SESSION_STALENESS).Before(t) {
+	if time.Now().Add(conn.meekServer.maxSessionStaleness).Before(t) {
 		return nil
 	}
 	return errors.TraceNew("not supported")
@@ -1524,15 +1618,3 @@ func (conn *meekConn) SetReadDeadline(t time.Time) error {
 func (conn *meekConn) SetWriteDeadline(t time.Time) error {
 	return errors.TraceNew("not supported")
 }
-
-// GetMetrics implements the common.MetricsSource interface. The metrics are
-// maintained in the meek session type; but logTunnel, which calls
-// MetricsSource.GetMetrics, has a pointer only to this conn, so it calls
-// through to the session.
-func (conn *meekConn) GetMetrics() common.LogFields {
-	logFields := conn.meekSession.GetMetrics()
-	if conn.meekServer.passthroughAddress != "" {
-		logFields["passthrough_address"] = conn.meekServer.passthroughAddress
-	}
-	return logFields
-}

+ 0 - 4
psiphon/server/net.go

@@ -77,7 +77,3 @@ func (server *HTTPSServer) ServeTLS(listener net.Listener, config *tris.Config)
 	tlsListener := tris.NewListener(listener, config)
 	return server.Serve(tlsListener)
 }
-
-type UnderlyingTCPAddrSource interface {
-	GetUnderlyingTCPAddrs() (*net.TCPAddr, *net.TCPAddr, bool)
-}

+ 49 - 62
psiphon/server/packetman.go

@@ -22,7 +22,6 @@ package server
 import (
 	"net"
 
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/packetman"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
@@ -44,17 +43,18 @@ func makePacketManipulatorConfig(
 		}
 	}
 
-	selectSpecName := func(protocolPort int, clientIP net.IP) string {
+	selectSpecName := func(protocolPort int, clientIP net.IP) (string, interface{}) {
 
-		specName, err := selectPacketManipulationSpec(support, protocolPort, clientIP)
+		specName, extraData, err := selectPacketManipulationSpec(
+			support, protocolPort, clientIP)
 		if err != nil {
 			log.WithTraceFields(
 				LogFields{"error": err}).Warning(
 				"failed to get tactics for packet manipulation")
-			return ""
+			return "", nil
 		}
 
-		return specName
+		return specName, extraData
 	}
 
 	specs, err := getPacketManipulationSpecs(support)
@@ -75,29 +75,19 @@ func makePacketManipulatorConfig(
 func getPacketManipulationSpecs(support *SupportServices) ([]*packetman.Spec, error) {
 
 	// By convention, parameters.ServerPacketManipulationSpecs should be in
-	// DefaultTactics, not FilteredTactics; and Tactics.Probability is ignored.
+	// DefaultTactics, not FilteredTactics; and GetServerTacticsParameters
+	// ignores Tactics.Probability.
 
-	tactics, err := support.TacticsServer.GetTactics(
-		true, common.GeoIPData(NewGeoIPData()), make(common.APIParameters))
+	p, err := GetServerTacticsParameters(support, NewGeoIPData())
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
 
-	if tactics == nil {
-		// This server isn't configured with tactics.
-		return []*packetman.Spec{}, nil
+	if p.IsNil() {
+		// No tactics are configured; return an empty spec list.
+		return nil, nil
 	}
 
-	clientParameters, err := parameters.NewClientParameters(nil)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-	_, err = clientParameters.Set("", false, tactics.Parameters)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
-	p := clientParameters.Get()
-
 	paramSpecs := p.PacketManipulationSpecs(parameters.ServerPacketManipulationSpecs)
 
 	specs := make([]*packetman.Spec, len(paramSpecs))
@@ -125,70 +115,67 @@ func reloadPacketManipulationSpecs(support *SupportServices) error {
 }
 
 func selectPacketManipulationSpec(
-	support *SupportServices, protocolPort int, clientIP net.IP) (string, error) {
+	support *SupportServices,
+	protocolPort int,
+	clientIP net.IP) (string, interface{}, error) {
 
-	geoIPData := support.GeoIPService.Lookup(clientIP.String())
+	// First check for reply, then check tactics.
 
-	tactics, err := support.TacticsServer.GetTactics(
-		true, common.GeoIPData(geoIPData), make(common.APIParameters))
-	if err != nil {
-		return "", errors.Trace(err)
-	}
+	// The intercepted packet source/protocol port is used to determine the
+	// tunnel protocol name, which is used to lookup first replay and then
+	// enabled packet manipulation specs in ServerProtocolPacketManipulations.
 
-	if tactics == nil {
-		// This server isn't configured with tactics.
-		return "", nil
+	targetTunnelProtocol := ""
+	for tunnelProtocol, port := range support.Config.TunnelProtocolPorts {
+		if port == protocolPort {
+			targetTunnelProtocol = tunnelProtocol
+			break
+		}
 	}
-
-	if !prng.FlipWeightedCoin(tactics.Probability) {
-		// Skip tactics with the configured probability.
-		return "", nil
+	if targetTunnelProtocol == "" {
+		return "", nil, errors.Tracef(
+			"packet manipulation protocol port not found: %d", protocolPort)
 	}
 
-	clientParameters, err := parameters.NewClientParameters(nil)
-	if err != nil {
-		return "", errors.Trace(err)
-	}
-	_, err = clientParameters.Set("", false, tactics.Parameters)
-	if err != nil {
-		return "", errors.Trace(err)
+	geoIPData := support.GeoIPService.Lookup(clientIP.String())
+
+	specName, doReplay := support.ReplayCache.GetReplayPacketManipulation(
+		targetTunnelProtocol, geoIPData)
+
+	// extraData records the is_server_replay metric.
+	extraData := doReplay
+
+	if doReplay {
+		return specName, extraData, nil
 	}
-	p := clientParameters.Get()
 
-	// GeoIP tactics filtering is applied before getting
+	// GeoIP tactics filtering is applied when getting
 	// ServerPacketManipulationProbability and ServerProtocolPacketManipulations.
 	//
-	// The intercepted packet source/protocol port is used to determine the
-	// tunnel protocol name, which is used to lookup enabled packet manipulation
-	// specs in ServerProtocolPacketManipulations.
-	//
 	// When there are multiple enabled specs, one is selected at random.
 	//
 	// Specs under the key "All" apply to all protocols. Duplicate specs per
 	// entry are allowed, enabling weighted selection. If a spec appears in both
 	// "All" and a specific protocol, the duplicate(s) are retained.
 
-	if !p.WeightedCoinFlip(parameters.ServerPacketManipulationProbability) {
-		return "", nil
+	p, err := GetServerTacticsParameters(support, geoIPData)
+	if err != nil {
+		return "", nil, errors.Trace(err)
 	}
 
-	targetTunnelProtocol := ""
-	for tunnelProtocol, port := range support.Config.TunnelProtocolPorts {
-		if port == protocolPort {
-			targetTunnelProtocol = tunnelProtocol
-			break
-		}
+	if p.IsNil() {
+		// No tactics are configured; select no spec.
+		return "", extraData, nil
 	}
-	if targetTunnelProtocol == "" {
-		return "", errors.Tracef(
-			"packet manipulation protocol port not found: %d", protocolPort)
+
+	if !p.WeightedCoinFlip(parameters.ServerPacketManipulationProbability) {
+		return "", extraData, nil
 	}
 
 	protocolSpecs := p.ProtocolPacketManipulations(
 		parameters.ServerProtocolPacketManipulations)
 
 	// TODO: cache merged per-protocol + "All" lists?
-
 	specNames, ok := protocolSpecs[targetTunnelProtocol]
 	if !ok {
 		specNames = []string{}
@@ -201,8 +188,8 @@ func selectPacketManipulationSpec(
 
 	if len(specNames) < 1 {
 		// Tactics contains no candidate specs for this protocol.
-		return "", nil
+		return "", extraData, nil
 	}
 
-	return specNames[prng.Range(0, len(specNames)-1)], nil
+	return specNames[prng.Range(0, len(specNames)-1)], extraData, nil
 }

+ 396 - 0
psiphon/server/replay.go

@@ -0,0 +1,396 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package server
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	lrucache "github.com/cognusion/go-cache-lru"
+)
+
+const (
+	REPLAY_CACHE_MAX_ENTRIES      = 100000
+	REPLAY_CACHE_CLEANUP_INTERVAL = 1 * time.Minute
+)
+
+// ReplayCache is a cache of recently used and successful network obfuscation
+// parameters that may be replayed -- reused -- for subsequent tunnel
+// connections.
+//
+// Server-side replay is analogous to client-side replay, with one key
+// difference: server-side replay can be applied across multiple clients in
+// the same GeoIP scope.
+//
+// Replay is enabled with tactics, and tactics determine the tunnel quality
+// targets for establishing and clearing replay parameters.
+//
+// ReplayCache has a maximum capacity with an LRU strategy to cap memory
+// overhead.
+type ReplayCache struct {
+	support    *SupportServices
+	cacheMutex sync.Mutex
+	cache      *lrucache.Cache
+	metrics    *replayCacheMetrics
+}
+
+type replayCacheMetrics struct {
+	MaxCacheEntries    int64
+	SetReplayCount     int64
+	GetReplayHitCount  int64
+	GetReplayMissCount int64
+	FailedReplayCount  int64
+	DeleteReplayCount  int64
+}
+
+// NewReplayCache creates a new ReplayCache.
+func NewReplayCache(support *SupportServices) *ReplayCache {
+	return &ReplayCache{
+		support: support,
+		cache: lrucache.NewWithLRU(
+			0, REPLAY_CACHE_CLEANUP_INTERVAL, REPLAY_CACHE_MAX_ENTRIES),
+		metrics: &replayCacheMetrics{},
+	}
+}
+
+// Flush clears all entries in the ReplayCache. Flush should be called when
+// tactics hot reload and change to clear any cached replay parameters that
+// may be based on stale tactics.
+func (r *ReplayCache) Flush() {
+
+	r.cacheMutex.Lock()
+	defer r.cacheMutex.Unlock()
+
+	r.cache.Flush()
+}
+
+// GetMetrics returns a snapshop of current ReplayCache event counters and
+// resets all counters to zero.
+func (r *ReplayCache) GetMetrics() LogFields {
+
+	r.cacheMutex.Lock()
+	defer r.cacheMutex.Unlock()
+
+	logFields := LogFields{
+		"replay_max_cache_entries":     r.metrics.MaxCacheEntries,
+		"replay_set_replay_count":      r.metrics.SetReplayCount,
+		"replay_get_replay_hit_count":  r.metrics.GetReplayHitCount,
+		"replay_get_replay_miss_count": r.metrics.GetReplayMissCount,
+		"replay_failed_replay_count":   r.metrics.FailedReplayCount,
+		"replay_delete_replay_count":   r.metrics.DeleteReplayCount,
+	}
+
+	r.metrics = &replayCacheMetrics{}
+
+	return logFields
+}
+
+// GetReplayTargetDuration returns the tactics replay target tunnel duration
+// for the specified GeoIP data. Tunnels which are active for the specified
+// duration are candidates for setting or extending replay parameters. Wait
+// for the returned wait duration before evaluating the tunnel duration. Once
+// this target is met, call SetReplayParameters, which will check additional
+// targets and conditionally set replay parameters.
+func (r *ReplayCache) GetReplayTargetDuration(
+	geoIPData GeoIPData) (bool, time.Duration, time.Duration) {
+
+	p, err := GetServerTacticsParameters(r.support, geoIPData)
+	if err != nil {
+		log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
+			"GetServerTacticsParameters failed")
+		return false, 0, 0
+	}
+
+	if p.IsNil() {
+		// No tactics are configured; replay is disabled.
+		return false, 0, 0
+	}
+
+	TTL := p.Duration(parameters.ServerReplayTTL)
+
+	if TTL == 0 {
+		// Server replay is disabled when TTL is 0.
+		return false, 0, 0
+	}
+
+	return true,
+		p.Duration(parameters.ServerReplayTargetWaitDuration),
+		p.Duration(parameters.ServerReplayTargetTunnelDuration)
+}
+
+// SetReplayParameters sets replay parameters, packetManipulationSpecName and
+// fragmentorSeed, for the specified tunnel protocol and GeoIP scope.
+// Once set, replay parameters are active for a tactics-configurable TTL.
+//
+// The specified tunneledBytesUp/Down must meet tactics replay bytes
+// transferred targets. SetReplayParameters should be called only after first
+// calling ReplayTargetDuration and ensuring the tunnel meets the active
+// tunnel duration target. When cached replay parameters exist, their TTL is
+// extended and any failure counts are reset to zero.
+//
+// SetReplayParameters must be called only once per tunnel. Extending replay
+// parameters TTL should only be done only immediately after a successful
+// tunnel dial and target achievement, as this is the part of a tunnel
+// lifecycle at highest risk of blocking.
+//
+// The value pointed to by fragmentorSeed must not be mutated after calling
+// SetReplayParameters.
+func (r *ReplayCache) SetReplayParameters(
+	tunnelProtocol string,
+	geoIPData GeoIPData,
+	packetManipulationSpecName string,
+	fragmentorSeed *prng.Seed,
+	tunneledBytesUp int64,
+	tunneledBytesDown int64) {
+
+	p, err := GetServerTacticsParameters(r.support, geoIPData)
+	if err != nil {
+		log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
+			"GetServerTacticsParameters failed")
+		return
+	}
+
+	if p.IsNil() {
+		// No tactics are configured; replay is disabled.
+		return
+	}
+
+	TTL := p.Duration(parameters.ServerReplayTTL)
+
+	if TTL == 0 {
+		return
+	}
+
+	targetUpstreamBytes := p.Int(parameters.ServerReplayTargetUpstreamBytes)
+	targetDownstreamBytes := p.Int(parameters.ServerReplayTargetDownstreamBytes)
+
+	if tunneledBytesUp < int64(targetUpstreamBytes) {
+		return
+	}
+	if tunneledBytesDown < int64(targetDownstreamBytes) {
+		return
+	}
+
+	key := r.makeKey(tunnelProtocol, geoIPData)
+
+	value := &replayParameters{}
+
+	if p.Bool(parameters.ServerReplayPacketManipulation) {
+		value.replayPacketManipulation = true
+		value.packetManipulationSpecName = packetManipulationSpecName
+	}
+
+	if p.Bool(parameters.ServerReplayFragmentor) {
+		value.replayFragmentor = (fragmentorSeed != nil)
+		value.fragmentorSeed = fragmentorSeed
+	}
+
+	r.cacheMutex.Lock()
+	defer r.cacheMutex.Unlock()
+
+	r.cache.Add(key, value, TTL)
+
+	// go-cache-lru is typically safe for concurrent access but explicit
+	// synchronization is required when accessing Items. Items may include
+	// entries that are expired but not yet purged.
+	cacheSize := int64(len(r.cache.Items()))
+
+	if cacheSize > r.metrics.MaxCacheEntries {
+		r.metrics.MaxCacheEntries = cacheSize
+	}
+	r.metrics.SetReplayCount += 1
+}
+
+// GetReplayPacketManipulation returns an active replay packet manipulation
+// spec for the specified tunnel protocol and GeoIP scope.
+//
+// While Flush should be called to clear parameters based on stale tactics,
+// it's still possible for GetReplayPacketManipulation to return a spec name
+// that's no longer in the current list of known specs.
+func (r *ReplayCache) GetReplayPacketManipulation(
+	tunnelProtocol string,
+	geoIPData GeoIPData) (string, bool) {
+
+	r.cacheMutex.Lock()
+	defer r.cacheMutex.Unlock()
+
+	parameters, ok := r.getReplayParameters(
+		tunnelProtocol, geoIPData)
+	if !ok {
+		return "", false
+	}
+
+	if !parameters.replayPacketManipulation {
+		return "", false
+	}
+
+	return parameters.packetManipulationSpecName, true
+}
+
+// GetReplayFragmentor returns an active replay fragmentor seed for the
+// specified tunnel protocol and GeoIP scope.
+func (r *ReplayCache) GetReplayFragmentor(
+	tunnelProtocol string,
+	geoIPData GeoIPData) (*prng.Seed, bool) {
+
+	r.cacheMutex.Lock()
+	defer r.cacheMutex.Unlock()
+
+	parameters, ok := r.getReplayParameters(
+		tunnelProtocol, geoIPData)
+	if !ok {
+		return nil, false
+	}
+
+	if !parameters.replayFragmentor {
+		return nil, false
+	}
+
+	return parameters.fragmentorSeed, true
+}
+
+func (r *ReplayCache) getReplayParameters(
+	tunnelProtocol string,
+	geoIPData GeoIPData) (*replayParameters, bool) {
+
+	key := r.makeKey(tunnelProtocol, geoIPData)
+
+	value, ok := r.cache.Get(key)
+
+	if !ok {
+		r.metrics.GetReplayMissCount += 1
+		return nil, false
+	}
+
+	r.metrics.GetReplayHitCount += 1
+
+	parameters, ok := value.(*replayParameters)
+
+	return parameters, ok
+}
+
+// FailedReplayParameters increments the count of tunnels which failed to
+// complete any liveness test and API handshake after using replay parameters.
+// Once a failure threshold is reached, cached replay parameters are cleared.
+// Call this function for tunnels which meet the failure criteria.
+func (r *ReplayCache) FailedReplayParameters(
+	tunnelProtocol string,
+	geoIPData GeoIPData,
+	packetManipulationSpecName string,
+	fragmentorSeed *prng.Seed) {
+
+	p, err := GetServerTacticsParameters(r.support, geoIPData)
+	if err != nil {
+		log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
+			"GetServerTacticsParameters failed")
+		return
+	}
+
+	thresholdFailedCount := p.Int(parameters.ServerReplayFailedCountThreshold)
+
+	key := r.makeKey(tunnelProtocol, geoIPData)
+
+	r.cacheMutex.Lock()
+	defer r.cacheMutex.Unlock()
+
+	parameters, ok := r.getReplayParameters(tunnelProtocol, geoIPData)
+	if !ok {
+		return
+	}
+
+	// Do not count the failure if the replay values for the tunnel protocol and
+	// GeoIP scope are now different; these parameters now reflect a newer,
+	// successful tunnel.
+
+	if (parameters.replayPacketManipulation &&
+		parameters.packetManipulationSpecName != packetManipulationSpecName) ||
+		(parameters.replayFragmentor &&
+			*parameters.fragmentorSeed != *fragmentorSeed) {
+		return
+	}
+
+	parameters.failedCount += 1
+	r.metrics.FailedReplayCount += 1
+
+	if thresholdFailedCount == 0 {
+		// No failure limit; the entry will not be deleted.
+		return
+	}
+
+	if parameters.failedCount >= thresholdFailedCount {
+		r.cache.Delete(key)
+		r.metrics.DeleteReplayCount += 1
+	}
+}
+
+func (r *ReplayCache) makeKey(
+	tunnelProtocol string, geoIPData GeoIPData) string {
+	return fmt.Sprintf(
+		"%s-%s-%s",
+		tunnelProtocol, geoIPData.Country, geoIPData.ISP)
+}
+
+type replayParameters struct {
+	replayPacketManipulation   bool
+	packetManipulationSpecName string
+	replayFragmentor           bool
+	fragmentorSeed             *prng.Seed
+	failedCount                int
+}
+
+// GetServerTacticsParameters returns server-side tactics parameters for the
+// specified GeoIP scope. GetServerTacticsParameters is designed to be called
+// before the API handshake and does not filter by API parameters. IsNil
+// guards must be used when accessing the  returned ClientParametersAccessor.
+func GetServerTacticsParameters(
+	support *SupportServices,
+	geoIPData GeoIPData) (parameters.ClientParametersAccessor, error) {
+
+	nilAccessor := parameters.MakeNilClientParametersAccessor()
+
+	tactics, err := support.TacticsServer.GetTactics(
+		true, common.GeoIPData(geoIPData), make(common.APIParameters))
+	if err != nil {
+		return nilAccessor, errors.Trace(err)
+	}
+
+	if tactics == nil {
+		// This server isn't configured with tactics.
+		return nilAccessor, nil
+	}
+
+	// Tactics.Probability is ignored for server-side tactics.
+
+	clientParameters, err := parameters.NewClientParameters(nil)
+	if err != nil {
+		return nilAccessor, errors.Trace(err)
+	}
+	_, err = clientParameters.Set("", false, tactics.Parameters)
+	if err != nil {
+		return nilAccessor, errors.Trace(err)
+	}
+
+	return clientParameters.Get(), nil
+}

+ 346 - 0
psiphon/server/replay_test.go

@@ -0,0 +1,346 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package server
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
+)
+
+func TestServerFragmentorReplay(t *testing.T) {
+	runServerReplayTests(t, false)
+}
+
+func runServerReplayTests(t *testing.T, runPacketManipulation bool) {
+
+	// Do not use OSSH, which has a different fragmentor replay mechanism. Meek
+	// has a unique code path for passing around replay parameters and metrics.
+	testCases := protocol.TunnelProtocols{
+		protocol.TUNNEL_PROTOCOL_SSH,
+		protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
+	}
+
+	for _, tunnelProtocol := range testCases {
+		t.Run(tunnelProtocol, func(t *testing.T) {
+			runServerReplayTest(t, runPacketManipulation, tunnelProtocol)
+		})
+	}
+}
+
+func runServerReplayTest(
+	t *testing.T,
+	runPacketManipulation bool,
+	tunnelProtocol string) {
+
+	psiphon.SetEmitDiagnosticNotices(true, true)
+
+	// Configure tactics
+
+	tacticsConfigJSONFormat := `
+    {
+      "RequestPublicKey" : "%s",
+      "RequestPrivateKey" : "%s",
+      "RequestObfuscatedKey" : "%s",
+      "DefaultTactics" : {
+        "TTL" : "60s",
+        "Probability" : 1.0,
+        "Parameters" : {
+          "LimitTunnelProtocols" : ["%s"],
+          "FragmentorDownstreamLimitProtocols" : ["%s"],
+          "FragmentorDownstreamProbability" : 1.0,
+          "FragmentorDownstreamMinTotalBytes" : 10,
+          "FragmentorDownstreamMaxTotalBytes" : 10,
+          "FragmentorDownstreamMinWriteBytes" : 1,
+          "FragmentorDownstreamMaxWriteBytes" : 1,
+          "FragmentorDownstreamMinDelay" : "1ms",
+          "FragmentorDownstreamMaxDelay" : "1ms",
+          "ServerPacketManipulationSpecs" : [{"Name": "test-packetman-spec", "PacketSpecs": [[]]}],
+          "ServerPacketManipulationProbability" : 1.0,
+          "ServerProtocolPacketManipulations": {"%s" : ["test-packetman-spec"]},
+          "ServerReplayPacketManipulation" : true,
+          "ServerReplayFragmentor" : true,
+          "ServerReplayTTL" : "3s",
+          "ServerReplayTargetWaitDuration" : "200ms",
+          "ServerReplayTargetTunnelDuration" : "100ms",
+          "ServerReplayTargetUpstreamBytes" : 0,
+          "ServerReplayTargetDownstreamBytes" : 0,
+          "ServerReplayFailedCountThreshold" : 1,
+          "ServerReplayFailedCountThreshold" : 1
+        }
+      }
+    }
+    `
+
+	tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey, err :=
+		tactics.GenerateKeys()
+	if err != nil {
+		t.Fatalf("error generating tactics keys: %s", err)
+	}
+
+	tacticsConfigJSON := fmt.Sprintf(
+		tacticsConfigJSONFormat,
+		tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey,
+		tunnelProtocol, tunnelProtocol, tunnelProtocol)
+
+	tacticsConfigFilename := filepath.Join(testDataDirName, "tactics_config.json")
+
+	err = ioutil.WriteFile(tacticsConfigFilename, []byte(tacticsConfigJSON), 0600)
+	if err != nil {
+		t.Fatalf("error paving tactics config file: %s", err)
+	}
+
+	// Run Psiphon server
+
+	generateConfigParams := &GenerateConfigParams{
+		ServerIPAddress:      "127.0.0.1",
+		EnableSSHAPIRequests: true,
+		WebServerPort:        8000,
+		TunnelProtocolPorts:  map[string]int{tunnelProtocol: 4000},
+	}
+
+	serverConfigJSON, _, _, _, encodedServerEntry, err := GenerateConfig(generateConfigParams)
+	if err != nil {
+		t.Fatalf("error generating server config: %s", err)
+	}
+
+	var serverConfig map[string]interface{}
+	json.Unmarshal(serverConfigJSON, &serverConfig)
+
+	serverConfig["LogFilename"] = filepath.Join(testDataDirName, "psiphond.log")
+	serverConfig["LogLevel"] = "debug"
+	serverConfig["TacticsConfigFilename"] = tacticsConfigFilename
+
+	// Ensure server_tunnels emit quickly.
+	serverConfig["MeekMaxSessionStalenessMilliseconds"] = 50
+
+	if runPacketManipulation {
+		serverConfig["RunPacketManipulator"] = true
+	}
+
+	serverConfigJSON, _ = json.Marshal(serverConfig)
+
+	serverTunnelLog := make(chan map[string]interface{}, 1)
+
+	setLogCallback(func(log []byte) {
+		logFields := make(map[string]interface{})
+		err := json.Unmarshal(log, &logFields)
+		if err != nil {
+			return
+		}
+		if logFields["event_name"] == nil {
+			return
+		}
+		if logFields["event_name"].(string) == "server_tunnel" {
+			select {
+			case serverTunnelLog <- logFields:
+			default:
+			}
+		}
+	})
+
+	serverWaitGroup := new(sync.WaitGroup)
+	serverWaitGroup.Add(1)
+	go func() {
+		defer serverWaitGroup.Done()
+		err := RunServices(serverConfigJSON)
+		if err != nil {
+			t.Errorf("error running server: %s", err)
+		}
+	}()
+
+	defer func() {
+		p, _ := os.FindProcess(os.Getpid())
+		p.Signal(os.Interrupt)
+		serverWaitGroup.Wait()
+	}()
+
+	// TODO: monitor logs for more robust wait-until-loaded.
+	time.Sleep(1 * time.Second)
+
+	checkServerTunnelLog := func(expectReplay bool) {
+
+		// Numbers are float64 due to JSON decoding.
+		expectedServerTunnelFields := map[string]interface{}{
+			"downstream_bytes_fragmented":       float64(10),
+			"downstream_min_bytes_written":      float64(1),
+			"downstream_max_bytes_written":      float64(1),
+			"downstream_min_delayed":            float64(1000),
+			"downstream_max_delayed":            float64(1000),
+			"server_replay_fragmentation":       expectReplay,
+			"server_replay_packet_manipulation": expectReplay && runPacketManipulation,
+		}
+		if runPacketManipulation {
+			expectedServerTunnelFields["server_packet_manipulation"] = "test-packetman-spec"
+		}
+
+		logFields := <-serverTunnelLog
+
+		for name, value := range expectedServerTunnelFields {
+			logValue, ok := logFields[name]
+			if !ok {
+				t.Fatalf("Missing expected server_tunnel field: %s", name)
+			}
+			if !reflect.DeepEqual(logValue, value) {
+				t.Fatalf(
+					"Unexpected server_tunnel %s value: got %T(%v); expected %T(%v)",
+					name, logValue, logValue, value, value)
+			}
+		}
+	}
+
+	t.Log("first client run; no replay")
+
+	runServerReplayClient(t, encodedServerEntry, true)
+	checkServerTunnelLog(false)
+
+	t.Log("second client run; is replay")
+
+	runServerReplayClient(t, encodedServerEntry, true)
+	checkServerTunnelLog(true)
+
+	t.Log("TTL expires; no replay")
+
+	// Wait until TTL expires.
+	time.Sleep(3001 * time.Millisecond)
+
+	runServerReplayClient(t, encodedServerEntry, true)
+	checkServerTunnelLog(false)
+
+	t.Log("failure clears replay; no replay")
+
+	runServerReplayClient(t, encodedServerEntry, true)
+	checkServerTunnelLog(true)
+
+	runServerReplayClient(t, encodedServerEntry, false)
+	// No server_tunnel for SSH handshake failure.
+
+	// Wait for session to be retired, which will trigger replay failure.
+	if protocol.TunnelProtocolUsesMeek(tunnelProtocol) {
+		time.Sleep(251 * time.Millisecond)
+	}
+
+	runServerReplayClient(t, encodedServerEntry, true)
+	checkServerTunnelLog(false)
+}
+
+func runServerReplayClient(
+	t *testing.T,
+	encodedServerEntry []byte,
+	handshakeSuccess bool) {
+
+	if !handshakeSuccess {
+		serverEntry, err := protocol.DecodeServerEntry(string(encodedServerEntry), "", "")
+		if err != nil {
+			t.Fatalf("error decoding server entry: %s", err)
+		}
+		serverEntry.SshPassword = ""
+		encodedServerEntryStr, err := protocol.EncodeServerEntry(serverEntry)
+		if err != nil {
+			t.Fatalf("error encoding server entry: %s", err)
+		}
+		encodedServerEntry = []byte(encodedServerEntryStr)
+	}
+
+	dataRootDir, err := ioutil.TempDir(testDataDirName, "serverReplayClient")
+	if err != nil {
+		t.Fatalf("error createing temp dir: %s", err)
+	}
+	defer os.RemoveAll(dataRootDir)
+
+	clientConfigJSON := fmt.Sprintf(`
+    {
+        "DataRootDirectory" : "%s",
+        "ClientPlatform" : "Windows",
+        "ClientVersion" : "0",
+        "SponsorId" : "0",
+        "PropagationChannelId" : "0",
+        "TargetServerEntry" : "%s"
+    }`, dataRootDir, string(encodedServerEntry))
+
+	clientConfig, err := psiphon.LoadConfig([]byte(clientConfigJSON))
+	if err != nil {
+		t.Fatalf("error processing configuration file: %s", err)
+	}
+
+	err = clientConfig.Commit(false)
+	if err != nil {
+		t.Fatalf("error committing configuration file: %s", err)
+	}
+
+	err = psiphon.OpenDataStore(clientConfig)
+	if err != nil {
+		t.Fatalf("error initializing client datastore: %s", err)
+	}
+	defer psiphon.CloseDataStore()
+
+	controller, err := psiphon.NewController(clientConfig)
+	if err != nil {
+		t.Fatalf("error creating client controller: %s", err)
+	}
+
+	tunnelEstablished := make(chan struct{}, 1)
+
+	psiphon.SetNoticeWriter(psiphon.NewNoticeReceiver(
+		func(notice []byte) {
+			noticeType, payload, err := psiphon.GetNotice(notice)
+			if err != nil {
+				return
+			}
+			if noticeType == "Tunnels" {
+				count := int(payload["count"].(float64))
+				if count >= 1 {
+					tunnelEstablished <- struct{}{}
+				}
+			}
+		}))
+
+	ctx, cancelFunc := context.WithCancel(context.Background())
+	controllerWaitGroup := new(sync.WaitGroup)
+	controllerWaitGroup.Add(1)
+	go func() {
+		defer controllerWaitGroup.Done()
+		controller.Run(ctx)
+	}()
+
+	if handshakeSuccess {
+		<-tunnelEstablished
+	}
+
+	// Meet tunnel duration critera.
+	for i := 0; i < 10; i++ {
+		time.Sleep(10 * time.Millisecond)
+		_, _ = controller.Dial("127.0.0.1:80", true, nil)
+	}
+
+	cancelFunc()
+	controllerWaitGroup.Wait()
+}

+ 4 - 0
psiphon/server/server_packetman_test.go

@@ -44,3 +44,7 @@ func TestServerPacketManipulation(t *testing.T) {
 			doPacketManipulation: true,
 		})
 }
+
+func TestServerPacketManipulationReplay(t *testing.T) {
+	runServerReplayTests(t, true)
+}

+ 0 - 1
psiphon/server/server_test.go

@@ -1957,7 +1957,6 @@ func paveTacticsConfigFile(
       "RequestPublicKey" : "%s",
       "RequestPrivateKey" : "%s",
       "RequestObfuscatedKey" : "%s",
-      "EnforceServerSide" : true,
       "DefaultTactics" : {
         "TTL" : "60s",
         "Probability" : 1.0,

+ 49 - 34
psiphon/server/services.go

@@ -71,7 +71,7 @@ func RunServices(configJSON []byte) (retErr error) {
 
 	loggingInitialized = true
 
-	supportServices, err := NewSupportServices(config)
+	support, err := NewSupportServices(config)
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -84,20 +84,20 @@ func RunServices(configJSON []byte) (retErr error) {
 	shutdownBroadcast := make(chan struct{})
 	errorChannel := make(chan error, 1)
 
-	tunnelServer, err := NewTunnelServer(supportServices, shutdownBroadcast)
+	tunnelServer, err := NewTunnelServer(support, shutdownBroadcast)
 	if err != nil {
 		return errors.Trace(err)
 	}
 
-	supportServices.TunnelServer = tunnelServer
+	support.TunnelServer = tunnelServer
 
 	if config.RunPacketTunnel {
 
 		packetTunnelServer, err := tun.NewServer(&tun.ServerConfig{
 			Logger:                      CommonLogger(log),
 			SudoNetworkConfigCommands:   config.PacketTunnelSudoNetworkConfigCommands,
-			GetDNSResolverIPv4Addresses: supportServices.DNSResolver.GetAllIPv4,
-			GetDNSResolverIPv6Addresses: supportServices.DNSResolver.GetAllIPv6,
+			GetDNSResolverIPv4Addresses: support.DNSResolver.GetAllIPv4,
+			GetDNSResolverIPv6Addresses: support.DNSResolver.GetAllIPv6,
 			EgressInterface:             config.PacketTunnelEgressInterface,
 			DownstreamPacketQueueSize:   config.PacketTunnelDownstreamPacketQueueSize,
 			SessionIdleExpirySeconds:    config.PacketTunnelSessionIdleExpirySeconds,
@@ -107,12 +107,12 @@ func RunServices(configJSON []byte) (retErr error) {
 			return errors.Trace(err)
 		}
 
-		supportServices.PacketTunnelServer = packetTunnelServer
+		support.PacketTunnelServer = packetTunnelServer
 	}
 
 	if config.RunPacketManipulator {
 
-		packetManipulatorConfig, err := makePacketManipulatorConfig(supportServices)
+		packetManipulatorConfig, err := makePacketManipulatorConfig(support)
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -122,7 +122,7 @@ func RunServices(configJSON []byte) (retErr error) {
 			return errors.Trace(err)
 		}
 
-		supportServices.PacketManipulator = packetManipulator
+		support.PacketManipulator = packetManipulator
 	}
 
 	// After this point, errors should be delivered to the errors channel and
@@ -130,17 +130,17 @@ func RunServices(configJSON []byte) (retErr error) {
 	// all workers are synchronously stopped.
 
 	if config.RunPacketTunnel {
-		supportServices.PacketTunnelServer.Start()
+		support.PacketTunnelServer.Start()
 		waitGroup.Add(1)
 		go func() {
 			defer waitGroup.Done()
 			<-shutdownBroadcast
-			supportServices.PacketTunnelServer.Stop()
+			support.PacketTunnelServer.Stop()
 		}()
 	}
 
 	if config.RunPacketManipulator {
-		err := supportServices.PacketManipulator.Start()
+		err := support.PacketManipulator.Start()
 		if err != nil {
 			select {
 			case errorChannel <- err:
@@ -151,7 +151,7 @@ func RunServices(configJSON []byte) (retErr error) {
 			go func() {
 				defer waitGroup.Done()
 				<-shutdownBroadcast
-				supportServices.PacketManipulator.Stop()
+				support.PacketManipulator.Stop()
 			}()
 		}
 	}
@@ -167,7 +167,7 @@ func RunServices(configJSON []byte) (retErr error) {
 				case <-shutdownBroadcast:
 					return
 				case <-ticker.C:
-					logServerLoad(tunnelServer)
+					logServerLoad(support)
 				}
 			}
 		}()
@@ -194,7 +194,7 @@ func RunServices(configJSON []byte) (retErr error) {
 		waitGroup.Add(1)
 		go func() {
 			defer waitGroup.Done()
-			err := RunWebServer(supportServices, shutdownBroadcast)
+			err := RunWebServer(support, shutdownBroadcast)
 			select {
 			case errorChannel <- err:
 			default:
@@ -222,7 +222,7 @@ func RunServices(configJSON []byte) (retErr error) {
 		for {
 			select {
 			case <-signalProcessProfiles:
-				outputProcessProfiles(supportServices.Config, "")
+				outputProcessProfiles(support.Config, "")
 			case <-shutdownBroadcast:
 				return
 			}
@@ -269,7 +269,7 @@ loop:
 				// Run the profile dump in a goroutine and don't block this loop. Shutdown
 				// doesn't wait for any running outputProcessProfiles to complete.
 				go func() {
-					outputProcessProfiles(supportServices.Config, "stop_establish_tunnels")
+					outputProcessProfiles(support.Config, "stop_establish_tunnels")
 				}()
 			}
 
@@ -277,7 +277,7 @@ loop:
 			tunnelServer.SetEstablishTunnels(true)
 
 		case <-reloadSupportServicesSignal:
-			supportServices.Reload()
+			support.Reload()
 
 		case <-logServerLoadSignal:
 			// Signal profiles writes first to ensure some diagnostics are
@@ -287,7 +287,7 @@ loop:
 			case signalProcessProfiles <- struct{}{}:
 			default:
 			}
-			logServerLoad(tunnelServer)
+			logServerLoad(support)
 
 		case <-systemStopSignal:
 			log.WithTrace().Info("shutdown by system")
@@ -312,7 +312,7 @@ loop:
 				return
 			case <-ticker.C:
 				filenameSuffix := fmt.Sprintf("delayed_shutdown_%ds", i)
-				outputProcessProfiles(supportServices.Config, filenameSuffix)
+				outputProcessProfiles(support.Config, filenameSuffix)
 			}
 		}
 	}()
@@ -365,18 +365,22 @@ func outputProcessProfiles(config *Config, filenameSuffix string) {
 	}
 }
 
-func logServerLoad(server *TunnelServer) {
-
-	protocolStats, regionStats := server.GetLoadStats()
+func logServerLoad(support *SupportServices) {
 
 	serverLoad := getRuntimeMetrics()
 
 	serverLoad["event_name"] = "server_load"
 
-	establishTunnels, establishLimitedCount := server.GetEstablishTunnelsMetrics()
+	establishTunnels, establishLimitedCount :=
+		support.TunnelServer.GetEstablishTunnelsMetrics()
 	serverLoad["establish_tunnels"] = establishTunnels
 	serverLoad["establish_tunnels_limited_count"] = establishLimitedCount
 
+	serverLoad.Add(support.ReplayCache.GetMetrics())
+
+	protocolStats, regionStats :=
+		support.TunnelServer.GetLoadStats()
+
 	for protocol, stats := range protocolStats {
 		serverLoad[protocol] = stats
 	}
@@ -435,6 +439,7 @@ type SupportServices struct {
 	TacticsServer      *tactics.Server
 	Blocklist          *Blocklist
 	PacketManipulator  *packetman.Manipulator
+	ReplayCache        *ReplayCache
 }
 
 // NewSupportServices initializes a new SupportServices.
@@ -480,7 +485,7 @@ func NewSupportServices(config *Config) (*SupportServices, error) {
 		return nil, errors.Trace(err)
 	}
 
-	return &SupportServices{
+	support := &SupportServices{
 		Config:          config,
 		TrafficRulesSet: trafficRulesSet,
 		OSLConfig:       oslConfig,
@@ -489,7 +494,11 @@ func NewSupportServices(config *Config) (*SupportServices, error) {
 		DNSResolver:     dnsResolver,
 		TacticsServer:   tacticsServer,
 		Blocklist:       blocklist,
-	}, nil
+	}
+
+	support.ReplayCache = NewReplayCache(support)
+
+	return support, nil
 }
 
 // Reload reinitializes traffic rules, psinet database, and geo IP database
@@ -510,15 +519,12 @@ func (support *SupportServices) Reload() {
 	// reload; new tactics will be obtained on the next client handshake or
 	// tactics request.
 
-	// Take these actions only after the corresponding Reloader has reloaded.
-	// In both the traffic rules and OSL cases, there is some impact from state
-	// reset, so the reset should be avoided where possible.
-	reloadPostActions := map[common.Reloader]func(){
-		support.TrafficRulesSet: func() { support.TunnelServer.ResetAllClientTrafficRules() },
-		support.OSLConfig:       func() { support.TunnelServer.ResetAllClientOSLConfigs() },
-	}
-	if support.Config.RunPacketManipulator {
-		reloadPostActions[support.TacticsServer] = func() {
+	reloadTactics := func() {
+
+		// Don't replay using stale tactics.
+		support.ReplayCache.Flush()
+
+		if support.Config.RunPacketManipulator {
 			err := reloadPacketManipulationSpecs(support)
 			if err != nil {
 				log.WithTraceFields(
@@ -528,6 +534,15 @@ func (support *SupportServices) Reload() {
 		}
 	}
 
+	// Take these actions only after the corresponding Reloader has reloaded.
+	// In both the traffic rules and OSL cases, there is some impact from state
+	// reset, so the reset should be avoided where possible.
+	reloadPostActions := map[common.Reloader]func(){
+		support.TrafficRulesSet: func() { support.TunnelServer.ResetAllClientTrafficRules() },
+		support.OSLConfig:       func() { support.TunnelServer.ResetAllClientOSLConfigs() },
+		support.TacticsServer:   reloadTactics,
+	}
+
 	for _, reloader := range reloaders {
 
 		if !reloader.WillReload() {

+ 136 - 35
psiphon/server/tunnelServer.go

@@ -41,7 +41,6 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/marionette"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
@@ -189,13 +188,11 @@ func (server *TunnelServer) Run() error {
 			return errors.Trace(err)
 		}
 
-		tacticsListener := tactics.NewListener(
+		tacticsListener := NewTacticsListener(
+			support,
 			listener,
-			support.TacticsServer,
 			tunnelProtocol,
-			func(IPAddress string) common.GeoIPData {
-				return common.GeoIPData(support.GeoIPService.Lookup(IPAddress))
-			})
+			func(IP string) GeoIPData { return support.GeoIPService.Lookup(IP) })
 
 		log.WithTraceFields(
 			LogFields{
@@ -1057,7 +1054,12 @@ func (sshServer *sshServer) handleClient(
 		}
 	}
 
+	// Get any packet manipulation values from GetAppliedSpecName as soon as
+	// possible due to the expiring TTL.
+
 	serverPacketManipulation := ""
+	replayedServerPacketManipulation := false
+
 	if sshServer.support.Config.RunPacketManipulator &&
 		protocol.TunnelProtocolMayUseServerPacketManipulation(tunnelProtocol) {
 
@@ -1072,7 +1074,7 @@ func (sshServer *sshServer) handleClient(
 
 		var localAddr, remoteAddr *net.TCPAddr
 		var ok bool
-		underlying, ok := clientConn.(UnderlyingTCPAddrSource)
+		underlying, ok := clientConn.(common.UnderlyingTCPAddrSource)
 		if ok {
 			localAddr, remoteAddr, ok = underlying.GetUnderlyingTCPAddrs()
 		} else {
@@ -1083,10 +1085,11 @@ func (sshServer *sshServer) handleClient(
 		}
 
 		if ok {
-			specName, err := sshServer.support.PacketManipulator.
+			specName, extraData, err := sshServer.support.PacketManipulator.
 				GetAppliedSpecName(localAddr, remoteAddr)
 			if err == nil {
 				serverPacketManipulation = specName
+				replayedServerPacketManipulation, _ = extraData.(bool)
 			}
 		}
 	}
@@ -1147,6 +1150,7 @@ func (sshServer *sshServer) handleClient(
 		sshListener,
 		tunnelProtocol,
 		serverPacketManipulation,
+		replayedServerPacketManipulation,
 		geoIPData)
 
 	// sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
@@ -1192,6 +1196,7 @@ type sshClient struct {
 	activityConn                         *common.ActivityMonitoredConn
 	throttledConn                        *common.ThrottledConn
 	serverPacketManipulation             string
+	replayedServerPacketManipulation     bool
 	geoIPData                            GeoIPData
 	sessionID                            string
 	isFirstTunnelInSession               bool
@@ -1283,6 +1288,7 @@ func newSshClient(
 	sshListener *sshListener,
 	tunnelProtocol string,
 	serverPacketManipulation string,
+	replayedServerPacketManipulation bool,
 	geoIPData GeoIPData) *sshClient {
 
 	runCtx, stopRunning := context.WithCancel(context.Background())
@@ -1292,19 +1298,20 @@ func newSshClient(
 	// unthrottled bytes during the initial protocol negotiation.
 
 	client := &sshClient{
-		sshServer:                sshServer,
-		sshListener:              sshListener,
-		tunnelProtocol:           tunnelProtocol,
-		serverPacketManipulation: serverPacketManipulation,
-		geoIPData:                geoIPData,
-		isFirstTunnelInSession:   true,
-		tcpPortForwardLRU:        common.NewLRUConns(),
-		signalIssueSLOKs:         make(chan struct{}, 1),
-		runCtx:                   runCtx,
-		stopRunning:              stopRunning,
-		stopped:                  make(chan struct{}),
-		sendAlertRequests:        make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),
-		sentAlertRequests:        make(map[protocol.AlertRequest]bool),
+		sshServer:                        sshServer,
+		sshListener:                      sshListener,
+		tunnelProtocol:                   tunnelProtocol,
+		serverPacketManipulation:         serverPacketManipulation,
+		replayedServerPacketManipulation: replayedServerPacketManipulation,
+		geoIPData:                        geoIPData,
+		isFirstTunnelInSession:           true,
+		tcpPortForwardLRU:                common.NewLRUConns(),
+		signalIssueSLOKs:                 make(chan struct{}, 1),
+		runCtx:                           runCtx,
+		stopRunning:                      stopRunning,
+		stopped:                          make(chan struct{}),
+		sendAlertRequests:                make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),
+		sentAlertRequests:                make(map[protocol.AlertRequest]bool),
 	}
 
 	client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
@@ -1359,6 +1366,90 @@ func (sshClient *sshClient) run(
 	throttledConn := common.NewThrottledConn(conn, sshClient.rateLimits())
 	conn = throttledConn
 
+	// Replay of server-side parameters is set or extended after a new tunnel
+	// meets duration and bytes transferred targets. Set a timer now that expires
+	// shortly after the target duration. When the timer fires, check the time of
+	// last byte read (a read indicating a live connection with the client),
+	// along with total bytes transferred and set or extend replay if the targets
+	// are met.
+	//
+	// Both target checks are conservative: the tunnel may be healthy, but a byte
+	// may not have been read in the last second when the timer fires. Or bytes
+	// may be transferring, but not at the target level. Only clients that meet
+	// the strict targets at the single check time will trigger replay; however,
+	// this replay will impact all clients with similar GeoIP data.
+	//
+	// A deferred function cancels the timer and also increments the replay
+	// failure counter, which will ultimately clear replay parameters, when the
+	// tunnel fails before the API handshake is completed (this includes any
+	// liveness test).
+	//
+	// A tunnel which fails to meet the targets but successfully completes any
+	// liveness test and the API handshake is ignored in terms of replay scoring.
+
+	isReplayCandidate, replayWaitDuration, replayTargetDuration :=
+		sshClient.sshServer.support.ReplayCache.GetReplayTargetDuration(sshClient.geoIPData)
+
+	if isReplayCandidate {
+
+		getFragmentorSeed := func() *prng.Seed {
+			fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
+			if ok {
+				fragmentorSeed, _ := fragmentor.GetReplay()
+				return fragmentorSeed
+			}
+			return nil
+		}
+
+		setReplayAfterFunc := time.AfterFunc(
+			replayWaitDuration,
+			func() {
+				if activityConn.GetActiveDuration() >= replayTargetDuration {
+
+					sshClient.Lock()
+					bytesUp := sshClient.tcpTrafficState.bytesUp + sshClient.udpTrafficState.bytesUp
+					bytesDown := sshClient.tcpTrafficState.bytesDown + sshClient.udpTrafficState.bytesDown
+					sshClient.Unlock()
+
+					sshClient.sshServer.support.ReplayCache.SetReplayParameters(
+						sshClient.tunnelProtocol,
+						sshClient.geoIPData,
+						sshClient.serverPacketManipulation,
+						getFragmentorSeed(),
+						bytesUp,
+						bytesDown)
+				}
+			})
+
+		defer func() {
+			setReplayAfterFunc.Stop()
+			completed, _ := sshClient.getHandshaked()
+			if !completed {
+
+				// Count a replay failure case when a tunnel used replay parameters
+				// (excluding OSSH fragmentation, which doesn't use the ReplayCache) and
+				// failed to complete the API handshake.
+
+				replayedFragmentation := false
+				if sshClient.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
+					fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
+					if ok {
+						_, replayedFragmentation = fragmentor.GetReplay()
+					}
+				}
+				usedReplay := replayedFragmentation || sshClient.replayedServerPacketManipulation
+
+				if usedReplay {
+					sshClient.sshServer.support.ReplayCache.FailedReplayParameters(
+						sshClient.tunnelProtocol,
+						sshClient.geoIPData,
+						sshClient.serverPacketManipulation,
+						getFragmentorSeed())
+				}
+			}
+		}()
+	}
+
 	// Run the initial [obfuscated] SSH handshake in a goroutine so we can both
 	// respect shutdownBroadcast and implement a specific handshake timeout.
 	// The timeout is to reclaim network resources in case the handshake takes
@@ -1374,9 +1465,9 @@ func (sshClient *sshClient) run(
 
 	resultChannel := make(chan *sshNewServerConnResult, 2)
 
-	var afterFunc *time.Timer
+	var sshHandshakeAfterFunc *time.Timer
 	if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {
-		afterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
+		sshHandshakeAfterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
 			resultChannel <- &sshNewServerConnResult{err: std_errors.New("ssh handshake timeout")}
 		})
 	}
@@ -1440,18 +1531,19 @@ func (sshClient *sshClient) run(
 				conn = result.obfuscatedSSHConn
 			}
 
-			// Now seed fragmentor, when present, with seed derived from
-			// initial obfuscator message. See tactics.Listener.Accept.
-			// This must preceed ssh.NewServerConn to ensure fragmentor
-			// is seeded before downstream bytes are written.
+			// Seed the fragmentor, when present, with seed derived from initial
+			// obfuscator message. See tactics.Listener.Accept. This must preceed
+			// ssh.NewServerConn to ensure fragmentor is seeded before downstream bytes
+			// are written.
 			if err == nil && sshClient.tunnelProtocol == protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
-				if fragmentorConn, ok := baseConn.(*fragmentor.Conn); ok {
+				fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
+				if ok {
 					var fragmentorPRNG *prng.PRNG
 					fragmentorPRNG, err = result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")
 					if err != nil {
 						err = errors.Trace(err)
 					} else {
-						fragmentorConn.SetPRNG(fragmentorPRNG)
+						fragmentor.SetReplay(fragmentorPRNG)
 					}
 				}
 			}
@@ -1481,8 +1573,8 @@ func (sshClient *sshClient) run(
 		return
 	}
 
-	if afterFunc != nil {
-		afterFunc.Stop()
+	if sshHandshakeAfterFunc != nil {
+		sshHandshakeAfterFunc.Stop()
 	}
 
 	if result.err != nil {
@@ -1490,7 +1582,7 @@ func (sshClient *sshClient) run(
 		// This is a Debug log due to noise. The handshake often fails due to I/O
 		// errors as clients frequently interrupt connections in progress when
 		// client-side load balancing completes a connection to a different server.
-		log.WithTraceFields(LogFields{"error": result.err}).Debug("handshake failed")
+		log.WithTraceFields(LogFields{"error": result.err}).Debug("SSH handshake failed")
 		return
 	}
 
@@ -1522,9 +1614,6 @@ func (sshClient *sshClient) run(
 
 	// Some conns report additional metrics. Meek conns report resiliency
 	// metrics and fragmentor.Conns report fragmentor configs.
-	//
-	// Limitation: for meek, GetMetrics from underlying fragmentor.Conn(s)
-	// should be called in order to log fragmentor metrics for meek sessions.
 
 	var additionalMetrics []LogFields
 	if metricsSource, ok := baseConn.(common.MetricsSource); ok {
@@ -1536,6 +1625,18 @@ func (sshClient *sshClient) run(
 			additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
 	}
 
+	// Record server-replay metrics.
+
+	replayMetrics := make(LogFields)
+	replayedFragmentation := false
+	fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
+	if ok {
+		_, replayedFragmentation = fragmentor.GetReplay()
+	}
+	replayMetrics["server_replay_fragmentation"] = replayedFragmentation
+	replayMetrics["server_replay_packet_manipulation"] = sshClient.replayedServerPacketManipulation
+	additionalMetrics = append(additionalMetrics, replayMetrics)
+
 	sshClient.logTunnel(additionalMetrics)
 
 	// Transfer OSL seed state -- the OSL progress -- from the closing