Просмотр исходного кода

Add packet manipulator package

- Functionally complete with tests passing
- Not yet integrated into psiphond
Rod Hynes 5 лет назад
Родитель
Сommit
922b9e85ec

+ 781 - 0
psiphon/common/packetman/packetman.go

@@ -0,0 +1,781 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+/*
+Package packetman implements low-level manipulation of TCP packets, enabling a
+variety of strategies to evade network censorship.
+
+This implementation is entirely based on and is a subset of Geneva:
+
+  Come as You Are: Helping Unmodified Clients Bypass Censorship with
+  Server-side Evasion
+  Kevin Bock, George Hughey, Louis-Henri Merino, Tania Arya, Daniel Liscinsky,
+  Regina Pogosian, Dave Levin
+  ACM SIGCOMM 2020
+
+  Geneva: Evolving Censorship Evasion Strategies
+  Kevin Bock, George Hughey, Xiao Qiang, Dave Levin
+  ACM CCS 2019 (Conference on Computer and Communications Security)
+
+  https://github.com/Kkevsterrr/geneva
+
+This package implements the equivilent of the Geneva "engine", which can
+execute packet manipulation strategies. It does not implement the genetic
+algorithm component.
+
+Other notable differences:
+
+- We intercept, parse, and transform only server-side outbound SYN-ACK
+  packets. Geneva supports client-side packet manipulation with a more diverse
+  set of trigger packets, but in practise we cannot execute most low-level
+  packet operations on client platforms such as Android and iOS.
+
+- For expediancy, we use a simplified strategy syntax (called transformation
+  specs, to avoid confusion with the more general original). As we do not
+  evolve strategies, we do not use a tree representation and some
+  randomization tranformations are simplified.
+
+At this time, full functionality is limited to the Linux platform.
+
+Security: external parties can induce the server to emit a SYN-ACK, invoking
+the packet manipulation logic. External parties cannot set the transformation
+specs, and, as the input is the server-side generated SYN-ACK packet, cannot
+influence the packet manipulation with any external input parameters.
+
+*/
+package packetman
+
+import (
+	"encoding/binary"
+	"encoding/hex"
+	"fmt"
+	"net"
+	"strings"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
+)
+
+// Config specifies a packet manipulation configuration.
+type Config struct {
+
+	// Logger is used for logging events and metrics.
+	Logger common.Logger
+
+	// ProtocolPorts specifies the set of TCP ports to which SYN-ACK packet
+	// interception and manipulation is to be applied. To accommodate hosts with
+	// multiple IP addresses, packet interception is applied to all interfaces.
+	ProtocolPorts []int
+
+	// On Linux, which uses NFQUEUE and raw sockets, QueueNumber is the NFQUEUE
+	// queue-num parameter to be used.
+	QueueNumber int
+
+	// On Linux, which uses NFQUEUE and raw sockets, SocketMark is the SO_MARK
+	// value to be used. When 0, a default value is used.
+	SocketMark int
+
+	// Specs is the list of packet transformation Spec value that are to be
+	// available for packet manipulation. Spec names must be unique.
+	Specs []*Spec
+
+	// GetSpecName is a callback invoked for each intercepted SYN-ACK packet.
+	// GetSpecName must return a name of a Spec, in Specs, to apply that
+	// transformation spec, or "" to send the SYN-ACK packet unmodified.
+	//
+	// The inputs protocolPort and clientIP allow the callback to select a Spec
+	// based on the protocol running at the intercepted packet's port and/or
+	// client GeoIP.
+	GetSpecName func(protocolPort int, clientIP net.IP) string
+
+	// SudoNetworkConfigCommands specifies whether to use "sudo" when executing
+	// network configuration commands. See comment for same parameter in
+	// psiphon/common/tun.
+	SudoNetworkConfigCommands bool
+
+	// AllowNoIPv6NetworkConfiguration indicates that failures while configuring
+	// tun interfaces and routing for IPv6 are to be logged as warnings only. See
+	// comment for same parameter in psiphon/common/tun.
+	AllowNoIPv6NetworkConfiguration bool
+}
+
+// Spec specifies a set of transformations to be applied to an intercepted
+// SYN-ACK packet to produce zero or more replacement packets to be sent in
+// its place.
+//
+// Each element in PacketSpecs specifies a new outgoing packet. Each element
+// in a packet specification specifies an individual transformation to be
+// applied, in turn, to a copy of the intercepted SYN-ACK packet, producing
+// the outgoing packet.
+//
+// Syntax of individual tranformations:
+//
+// "TCP-flags random|<flags>"
+// "TCP-<field> random|<base64>"
+// "TCP-option-<option> random|omit|<base64>"
+// "TCP-payload random|<base64>"
+//
+// flags:   FSRPAUECN
+//
+// fields:  srcport, dstport, seq, ack, dataoffset, window, checksum, urgent
+//
+// options: eol, nop, mss, windowscale, sackpermitted, sack, timestamps,
+//          altchecksum, altchecksumdata, md5header, usertimeout
+//
+//
+// For example, this Geneva strategy:
+//   [TCP:flags:SA]-duplicate(tamper{TCP:flags:replace:R},tamper{TCP:flags:replace:S})-| \/
+//
+// is represented as follows (in JSON encoding):
+//   [["TCP-flags R"], ["TCP-flags S"]]
+//
+//
+// Field and option values must be the expected length (see implementation).
+//
+// A Spec may produce invalid packets. For example, the total options length
+// can exceed 40 bytes and the DataOffset field may overflow.
+type Spec struct {
+	Name        string
+	PacketSpecs [][]string
+}
+
+// Validate checks that the transformation spec is syntactically correct.
+func (s *Spec) Validate() error {
+	_, err := compileSpec(s)
+	return errors.Trace(err)
+}
+
+type compiledSpec struct {
+	name                string
+	compiledPacketSpecs [][]transformation
+}
+
+func compileSpec(spec *Spec) (*compiledSpec, error) {
+
+	compiledPacketSpecs := make([][]transformation, len(spec.PacketSpecs))
+	for i, _ := range spec.PacketSpecs {
+		compiledPacketSpecs[i] = make([]transformation, len(spec.PacketSpecs[i]))
+		for j, transformationSpec := range spec.PacketSpecs[i] {
+			transform, err := compileTransformation(transformationSpec)
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
+			compiledPacketSpecs[i][j] = transform
+		}
+	}
+	return &compiledSpec{
+		name:                spec.Name,
+		compiledPacketSpecs: compiledPacketSpecs}, nil
+}
+
+func (spec *compiledSpec) apply(interceptedPacket gopacket.Packet) ([][]byte, error) {
+
+	packets := make([][]byte, len(spec.compiledPacketSpecs))
+
+	for i, packetTransformations := range spec.compiledPacketSpecs {
+
+		var networkLayer gopacket.NetworkLayer
+		var serializableNetworkLayer gopacket.SerializableLayer
+
+		// Copy the network layer (IPv4 or IPv6) as modifications may be made to
+		// checksums or lengths in that layer. Note this is not a deep copy of
+		// fields such as the Options slice, as these are not modified.
+
+		interceptedIPv4Layer := interceptedPacket.Layer(layers.LayerTypeIPv4)
+		if interceptedIPv4Layer != nil {
+			transformedIPv4 := *interceptedIPv4Layer.(*layers.IPv4)
+			networkLayer = &transformedIPv4
+			serializableNetworkLayer = &transformedIPv4
+		} else {
+			interceptedIPv6Layer := interceptedPacket.Layer(layers.LayerTypeIPv6)
+			transformedIPv6 := *interceptedIPv6Layer.(*layers.IPv6)
+			networkLayer = &transformedIPv6
+			serializableNetworkLayer = &transformedIPv6
+		}
+
+		interceptedTCP := interceptedPacket.Layer(layers.LayerTypeTCP).(*layers.TCP)
+
+		// Copy the TCP layer before transforming it. Again this is not a deep copy.
+		// If a transformation modifies the Options slice, it will be copied at that
+		// time.
+
+		transformedTCP := *interceptedTCP
+		var payload gopacket.Payload
+		setCalculatedField := false
+
+		for _, transform := range packetTransformations {
+			transform.apply(&transformedTCP, &payload)
+			if transform.setsCalculatedField() {
+				setCalculatedField = true
+			}
+		}
+
+		err := transformedTCP.SetNetworkLayerForChecksum(networkLayer)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		buffer := gopacket.NewSerializeBuffer()
+		options := gopacket.SerializeOptions{FixLengths: true, ComputeChecksums: true}
+
+		gopacket.SerializeLayers(
+			buffer,
+			options,
+			serializableNetworkLayer,
+			&transformedTCP,
+			payload)
+
+		// In the first SerializeLayers call, all IP and TCP length and checksums
+		// are recalculated and set to the correct values with transformations
+		// applied.
+		//
+		// If the spec calls for setting the TCP DataOffset or Checksum, a second
+		// SerializeLayers call is performed, which will repave these values without
+		// recalculation; all other calculated lengths and checksums are retained
+		// from the first round.
+
+		if setCalculatedField {
+			buffer.Clear()
+			gopacket.SerializeLayers(
+				buffer,
+				gopacket.SerializeOptions{},
+				serializableNetworkLayer,
+				&transformedTCP,
+				payload)
+		}
+
+		packets[i] = buffer.Bytes()
+	}
+
+	return packets, nil
+}
+
+type transformation interface {
+	apply(tcp *layers.TCP, payload *gopacket.Payload)
+	setsCalculatedField() bool
+}
+
+const (
+	transformationTypeUnknown = iota
+	transformationTypeOmit
+	transformationTypeRandom
+	transformationTypeValue
+)
+
+func compileTransformation(spec string) (transformation, error) {
+
+	parts := strings.Split(spec, " ")
+	if len(parts) != 2 {
+		return nil, errors.Tracef("invalid spec: %s", spec)
+	}
+	fieldSpec := parts[0]
+	valueSpec := parts[1]
+
+	parts = strings.Split(fieldSpec, "-")
+	if (len(parts) != 2 && len(parts) != 3) || parts[0] != "TCP" {
+		return nil, errors.Tracef("invalid field spec: %s", fieldSpec)
+	}
+
+	var transformationType int
+
+	if valueSpec == "omit" {
+		transformationType = transformationTypeOmit
+	} else if valueSpec == "random" {
+		transformationType = transformationTypeRandom
+	} else {
+		transformationType = transformationTypeValue
+	}
+
+	var t transformation
+	var err error
+
+	if len(parts) == 3 {
+		if parts[1] != "option" {
+			return nil, errors.Tracef("invalid field spec: %s", fieldSpec)
+		}
+		t, err = newTransformationTCPOption(parts[2], transformationType, valueSpec)
+	} else if parts[1] == "flags" {
+		t, err = newTransformationTCPFlags(transformationType, valueSpec)
+	} else if parts[1] == "payload" {
+		t, err = newTransformationTCPPayload(transformationType, valueSpec)
+	} else {
+		t, err = newTransformationTCPField(parts[1], transformationType, valueSpec)
+	}
+	if err != nil {
+		return nil, errors.Tracef("invalid field spec: %s: %v", fieldSpec, err)
+	}
+	return t, nil
+}
+
+type transformationTCPFlags struct {
+	transformationType int
+	flags              string
+}
+
+func newTransformationTCPFlags(
+	transformationType int, valueSpec string) (*transformationTCPFlags, error) {
+
+	var flags string
+
+	switch transformationType {
+	case transformationTypeRandom:
+	case transformationTypeValue:
+		checkFlags := valueSpec
+		for _, f := range "FSRPAUECN" {
+			checkFlags = strings.ReplaceAll(checkFlags, string(f), "")
+		}
+		if checkFlags != "" {
+			return nil, errors.Tracef("invalid value spec: %s", valueSpec)
+		}
+		flags = valueSpec
+	default:
+		return nil, errors.Tracef("invalid transformation type")
+	}
+
+	return &transformationTCPFlags{
+		transformationType: transformationType,
+		flags:              flags,
+	}, nil
+}
+
+func (t *transformationTCPFlags) apply(tcp *layers.TCP, _ *gopacket.Payload) {
+
+	var flags string
+
+	if t.transformationType == transformationTypeRandom {
+
+		// Differs from Geneva, which often selects real flag combinations,
+		// presumably to focus its search space:
+		// https://github.com/Kkevsterrr/geneva/blob/de6823ba7723582054d2047083262cabffa85f36/layers/tcp_layer.py#L117-L121.
+
+		for _, f := range "FSRPAUECN" {
+			if prng.FlipCoin() {
+				flags += string(f)
+			}
+		}
+	} else {
+		flags = t.flags
+	}
+
+	tcp.FIN = strings.Index(t.flags, "F") != -1
+	tcp.SYN = strings.Index(t.flags, "S") != -1
+	tcp.RST = strings.Index(t.flags, "R") != -1
+	tcp.PSH = strings.Index(t.flags, "P") != -1
+	tcp.ACK = strings.Index(t.flags, "A") != -1
+	tcp.URG = strings.Index(t.flags, "U") != -1
+	tcp.ECE = strings.Index(t.flags, "E") != -1
+	tcp.CWR = strings.Index(t.flags, "C") != -1
+	tcp.NS = strings.Index(t.flags, "N") != -1
+}
+
+func (t *transformationTCPFlags) setsCalculatedField() bool {
+	return false
+}
+
+type transformationTCPField struct {
+	fieldName          string
+	transformationType int
+	value              []byte
+}
+
+const (
+	tcpFieldSrcPort    = "srcport"
+	tcpFieldDstPort    = "dstport"
+	tcpFieldSeq        = "seq"
+	tcpFieldAck        = "ack"
+	tcpFieldDataOffset = "dataoffset"
+	tcpFieldWindow     = "window"
+	tcpFieldChecksum   = "checksum"
+	tcpFieldUrgent     = "urgent"
+)
+
+func newTransformationTCPField(
+	fieldName string, transformationType int, valueSpec string) (*transformationTCPField, error) {
+
+	length := 0
+
+	switch fieldName {
+	case tcpFieldSrcPort:
+		length = 2
+	case tcpFieldDstPort:
+		length = 2
+	case tcpFieldSeq:
+		length = 4
+	case tcpFieldAck:
+		length = 4
+	case tcpFieldDataOffset:
+		length = 1
+	case tcpFieldWindow:
+		length = 2
+	case tcpFieldChecksum:
+		length = 2
+	case tcpFieldUrgent:
+		length = 2
+	default:
+		return nil, errors.Tracef("invalid field name: %s", fieldName)
+	}
+
+	var decodedValue []byte
+
+	switch transformationType {
+	case transformationTypeRandom:
+	case transformationTypeValue:
+		var err error
+		decodedValue, err = hex.DecodeString(valueSpec)
+		if err == nil && len(decodedValue) != length {
+			err = fmt.Errorf("invalid value length: %d", len(decodedValue))
+		}
+		if err != nil {
+			return nil, errors.Tracef("invalid value spec: %s: %v", valueSpec, err)
+		}
+	default:
+		return nil, errors.Tracef("invalid transformation type")
+	}
+
+	return &transformationTCPField{
+		fieldName:          fieldName,
+		transformationType: transformationType,
+		value:              decodedValue,
+	}, nil
+}
+
+func (t *transformationTCPField) apply(tcp *layers.TCP, _ *gopacket.Payload) {
+
+	var value [4]byte
+
+	if t.transformationType == transformationTypeRandom {
+		_, _ = prng.Read(value[:])
+	} else {
+		copy(value[:], t.value)
+	}
+
+	switch t.fieldName {
+	case tcpFieldSrcPort:
+		tcp.SrcPort = layers.TCPPort(binary.BigEndian.Uint16(value[:]))
+	case tcpFieldDstPort:
+		tcp.DstPort = layers.TCPPort(binary.BigEndian.Uint16(value[:]))
+	case tcpFieldSeq:
+		tcp.Seq = binary.BigEndian.Uint32(value[:])
+	case tcpFieldAck:
+		tcp.Ack = binary.BigEndian.Uint32(value[:])
+	case tcpFieldDataOffset:
+		tcp.DataOffset = value[0]
+		// DataOffset is a 4-bit field; the most significant 4 bits are ignored
+		tcp.DataOffset &= 0x0f
+	case tcpFieldWindow:
+		// Differs from Geneva: https://github.com/Kkevsterrr/geneva/blob/de6823ba7723582054d2047083262cabffa85f36/layers/tcp_layer.py#L117-L121
+		tcp.Window = binary.BigEndian.Uint16(value[:])
+	case tcpFieldChecksum:
+		tcp.Checksum = binary.BigEndian.Uint16(value[:])
+	case tcpFieldUrgent:
+		tcp.Urgent = binary.BigEndian.Uint16(value[:])
+	}
+}
+
+func (t *transformationTCPField) setsCalculatedField() bool {
+	return t.fieldName == tcpFieldDataOffset || t.fieldName == tcpFieldChecksum
+}
+
+type transformationTCPOption struct {
+	optionName         string
+	transformationType int
+	value              []byte
+}
+
+const (
+	tcpOptionEOL             = "eol"
+	tcpOptionNOP             = "nop"
+	tcpOptionMSS             = "mss"
+	tcpOptionWindowScale     = "windowscale"
+	tcpOptionSACKPermitted   = "sackpermitted"
+	tcpOptionSACK            = "sack"
+	tcpOptionTimestamps      = "timestamps"
+	tcpOptionAltChecksum     = "altchecksum"
+	tcpOptionAltChecksumData = "altchecksumdata"
+	tcpOptionMD5Header       = "md5header"
+	tcpOptionUserTimeout     = "usertimeout"
+)
+
+func tcpOptionInfo(optionName string) (layers.TCPOptionKind, []int, bool) {
+
+	var kind layers.TCPOptionKind
+	var validLengths []int
+	switch optionName {
+	case tcpOptionEOL:
+		kind = layers.TCPOptionKindEndList
+		validLengths = nil // no option length field
+	case tcpOptionNOP:
+		kind = layers.TCPOptionKindNop
+		validLengths = nil
+	case tcpOptionMSS:
+		kind = layers.TCPOptionKindMSS
+		validLengths = []int{2}
+	case tcpOptionWindowScale:
+		kind = layers.TCPOptionKindWindowScale
+		validLengths = []int{1}
+	case tcpOptionSACKPermitted:
+		kind = layers.TCPOptionKindSACKPermitted
+		validLengths = []int{0}
+	case tcpOptionSACK:
+		// https://tools.ietf.org/html/rfc2018
+		kind = layers.TCPOptionKindSACK
+		validLengths = []int{8, 16, 24, 32}
+	case tcpOptionTimestamps:
+		kind = layers.TCPOptionKindTimestamps
+		validLengths = []int{8}
+	case tcpOptionAltChecksum:
+		kind = layers.TCPOptionKindAltChecksum
+		validLengths = []int{1}
+	case tcpOptionAltChecksumData:
+		// https://tools.ietf.org/html/rfc1145:
+		// "this field is used only when the alternate checksum that is negotiated is longer than 16 bits"
+		//
+		// Geneva allows setting length 0.
+		kind = layers.TCPOptionKindAltChecksumData
+		validLengths = []int{0, 4}
+	case tcpOptionMD5Header:
+		// https://tools.ietf.org/html/rfc2385
+		kind = layers.TCPOptionKind(19)
+		validLengths = []int{16}
+	case tcpOptionUserTimeout:
+		// https://tools.ietf.org/html/rfc5482
+		kind = layers.TCPOptionKind(28)
+		validLengths = []int{2}
+	default:
+		return kind, nil, false
+	}
+	return kind, validLengths, true
+}
+
+func newTransformationTCPOption(
+	optionName string, transformationType int, valueSpec string) (*transformationTCPOption, error) {
+
+	_, validLengths, ok := tcpOptionInfo(optionName)
+	if !ok {
+		return nil, errors.Tracef("invalid option name: %s", optionName)
+	}
+
+	var decodedValue []byte
+
+	switch transformationType {
+	case transformationTypeOmit:
+	case transformationTypeRandom:
+	case transformationTypeValue:
+		var err error
+		decodedValue, err = hex.DecodeString(valueSpec)
+		if err == nil {
+			if validLengths == nil {
+				validLengths = []int{0}
+			}
+			if !common.ContainsInt(validLengths, len(decodedValue)) {
+				err = fmt.Errorf("invalid value length: %d", len(decodedValue))
+			}
+		}
+		if err != nil {
+			return nil, errors.Tracef("invalid value spec: %s: %v", valueSpec, err)
+		}
+	default:
+		return nil, errors.Tracef("invalid transformation type")
+	}
+
+	return &transformationTCPOption{
+		optionName:         optionName,
+		transformationType: transformationType,
+		value:              decodedValue,
+	}, nil
+}
+
+func (t *transformationTCPOption) apply(tcp *layers.TCP, _ *gopacket.Payload) {
+
+	// This transformation makes a copy of all existing TCPOption structs, so
+	// transformed option slices are not shared between multiple packets.
+	//
+	// All existing options are retained in the existing order. Modified options
+	// are overwritten in place. New options are appended to the end of the
+	// option list.
+	//
+	// Total option set size is not tracked or validated and the DataOffset TCP
+	// field can overflow.
+	//
+	// Limitations:
+	// - Inserting an option at a specific position is not supported.
+	// - OptionLengths cannot be set to arbitrary values.
+	// - Each option transformation executes a full copy of the existing option
+	//   list, which is not efficient for a long list of option transformations.
+
+	kind, validLengths, _ := tcpOptionInfo(t.optionName)
+
+	var options []layers.TCPOption
+
+	// The for loop iterates over all existing options plus one additional
+	// iteration, copying or modifying existing options and then appending a new
+	// option if required. This flag ensures that we don't both modify and append
+	// a new option.
+	applied := false
+
+	for i := 0; i <= len(tcp.Options); i++ {
+
+		if i < len(tcp.Options) {
+			option := tcp.Options[i]
+			if option.OptionType != kind {
+				options = append(options, layers.TCPOption{
+					OptionType:   option.OptionType,
+					OptionLength: option.OptionLength,
+					OptionData:   append([]byte(nil), option.OptionData...),
+				})
+				continue
+			}
+		} else if applied {
+			// Skip the append iteration if we already applied the transformation to an
+			// existing option.
+			continue
+		}
+
+		// TCP options with validLengths == nil have only the "kind" byte and total
+		// length 1. Options with validLengths have the "kind" byte, the "length"
+		// byte, and 0 or more data bytes; in this case, "length" is 2 + the length
+		// of the data.
+
+		switch t.transformationType {
+
+		case transformationTypeOmit:
+			continue
+
+		case transformationTypeRandom:
+			if validLengths == nil {
+				options = append(options, layers.TCPOption{
+					OptionType:   kind,
+					OptionLength: 1,
+				})
+			} else {
+				length := validLengths[prng.Range(0, len(validLengths)-1)]
+				var data []byte
+				if length > 0 {
+					data = prng.Bytes(length)
+				}
+				options = append(options, layers.TCPOption{
+					OptionType:   kind,
+					OptionLength: 2 + uint8(length),
+					OptionData:   data,
+				})
+			}
+			applied = true
+
+		case transformationTypeValue:
+			if validLengths == nil {
+				options = append(options, layers.TCPOption{
+					OptionType:   kind,
+					OptionLength: 1,
+				})
+			} else {
+				length := len(t.value)
+				var data []byte
+				if length > 0 {
+					data = append([]byte(nil), t.value...)
+				}
+				options = append(options, layers.TCPOption{
+					OptionType:   kind,
+					OptionLength: 2 + uint8(length),
+					OptionData:   data,
+				})
+			}
+			applied = true
+		}
+	}
+
+	tcp.Options = options
+}
+
+func (t *transformationTCPOption) setsCalculatedField() bool {
+	return false
+}
+
+type transformationTCPPayload struct {
+	transformationType int
+	value              []byte
+}
+
+func newTransformationTCPPayload(
+	transformationType int, valueSpec string) (*transformationTCPPayload, error) {
+
+	var decodedValue []byte
+
+	switch transformationType {
+	case transformationTypeOmit:
+	case transformationTypeRandom:
+	case transformationTypeValue:
+		var err error
+		decodedValue, err = hex.DecodeString(valueSpec)
+		if err != nil {
+			return nil, errors.Tracef("invalid value spec: %s: %v", valueSpec, err)
+		}
+	default:
+		return nil, errors.Tracef("invalid transformation type")
+	}
+
+	return &transformationTCPPayload{
+		transformationType: transformationType,
+		value:              decodedValue,
+	}, nil
+}
+
+func (t *transformationTCPPayload) apply(tcp *layers.TCP, payload *gopacket.Payload) {
+
+	var value []byte
+
+	switch t.transformationType {
+	case transformationTypeOmit:
+
+	case transformationTypeRandom:
+		// Differs from Geneva: https://github.com/Kkevsterrr/geneva/blob/de6823ba7723582054d2047083262cabffa85f36/layers/layer.py#L191-L197
+		value = prng.Bytes(prng.Range(1, 200))
+
+	case transformationTypeValue:
+		value = t.value
+	}
+
+	if value == nil {
+		// Omit the payload.
+		*payload = nil
+	} else {
+		// Change the payload.
+		*payload = append([]byte(nil), value...)
+	}
+}
+
+func (t *transformationTCPPayload) setsCalculatedField() bool {
+	return false
+}
+
+func stripEOLOption(packet gopacket.Packet) {
+
+	// gopacket.NewPacket appears to decode padding (0s) as an explicit EOL
+	// option (value 0) at the end of the option list. This helper strips that
+	// option, allowing append-option transformations to work as expected.
+	// gopacket TCP serialization will re-add padding as required.
+
+	tcpLayer := packet.Layer(layers.LayerTypeTCP).(*layers.TCP)
+	if len(tcpLayer.Options) > 0 &&
+		tcpLayer.Options[len(tcpLayer.Options)-1].OptionType == layers.TCPOptionKindEndList {
+		tcpLayer.Options = tcpLayer.Options[:len(tcpLayer.Options)-1]
+	}
+}

+ 660 - 0
psiphon/common/packetman/packetman_linux.go

@@ -0,0 +1,660 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package packetman
+
+import (
+	"context"
+	"encoding/binary"
+	"log"
+	"net"
+	"strconv"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/florianl/go-nfqueue"
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
+	cache "github.com/patrickmn/go-cache"
+)
+
+func IsSupported() bool {
+	return true
+}
+
+const (
+	netlinkSocketIOTimeout = 10 * time.Millisecond
+	defaultSocketMark      = 0x70736970 // "PSIP"
+	appliedSpecCacheTTL    = 1 * time.Minute
+)
+
+// Manipulator is a SYN-ACK packet manipulator.
+//
+// NFQUEUE/Netlink is used to intercept SYN-ACK packets, on all local
+// interfaces, with source port equal to one of the ProtocolPorts specified in
+// Config. For each intercepted SYN-ACK packet, the GetSpecName callback in
+// Config is invoked; the callback determines which packet transformation spec
+// to apply, based on, for example, client GeoIP, protocol, or other
+// considerations.
+//
+// Protocol network listeners use GetAppliedSpecName to determine which
+// transformation spec was applied to a given accepted connection.
+//
+// When a manipulations are to be applied to a SYN-ACK packet, NFQUEUE is
+// instructed to drop the packet and one or more new packets, created by
+// applying transformations to the original SYN-ACK packet, are injected via
+// raw sockets. Raw sockets are used as NFQUEUE supports only replacing the
+// original packet with one alternative packet.
+//
+// To avoid an intercept loop, injected packets are marked (SO_MARK) and the
+// filter for NFQUEUE excludes packets with this mark.
+//
+// To avoid breaking TCP in unexpected cases, Manipulator fails open --
+// allowing the original packet to proceed -- when packet parsing fails. For
+// the same reason, the queue-bypass NFQUEUE option is set.
+//
+// As an iptables filter ensures only SYN-ACK packets are sent to the
+// NFQUEUEs, the overhead of packet interception, parsing, and injection is
+// incurred no more than once per TCP connection.
+//
+// NFQUEUE with queue-bypass requires Linux kernel 2.6.39; 3.16 or later is
+// validated and recommended.
+type Manipulator struct {
+	config           *Config
+	mutex            sync.Mutex
+	runContext       context.Context
+	stopRunning      context.CancelFunc
+	waitGroup        *sync.WaitGroup
+	injectIPv4FD     int
+	injectIPv6FD     int
+	nfqueue          *nfqueue.Nfqueue
+	compiledSpecs    map[string]*compiledSpec
+	appliedSpecCache *cache.Cache
+}
+
+// NewManipulator creates a new Manipulator.
+func NewManipulator(config *Config) (*Manipulator, error) {
+
+	compiledSpecs := make(map[string]*compiledSpec)
+
+	for _, spec := range config.Specs {
+		if spec.Name == "" {
+			return nil, errors.TraceNew("invalid spec name")
+		}
+		if _, ok := compiledSpecs[spec.Name]; ok {
+			return nil, errors.TraceNew("duplicate spec name")
+		}
+		compiledSpec, err := compileSpec(spec)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+		compiledSpecs[spec.Name] = compiledSpec
+	}
+
+	// To avoid memory exhaustion, do not retain unconsumed appliedSpecCache
+	// entries for a longer time than it may reasonably take to complete the TCP
+	// handshake.
+	appliedSpecCache := cache.New(appliedSpecCacheTTL, appliedSpecCacheTTL/2)
+
+	return &Manipulator{
+		config:           config,
+		compiledSpecs:    compiledSpecs,
+		appliedSpecCache: appliedSpecCache,
+	}, nil
+}
+
+// Start initializes NFQUEUEs and raw sockets for packet manipulation. Start
+// returns when initialization is complete; once it returns, the caller may
+// assume that any SYN-ACK packets on configured ports will be intercepted. In
+// the case of initialization failure, Start will undo any partial
+// initialization. When Start succeeds, the caller must call Stop to free
+// resources and restore networking state.
+func (m *Manipulator) Start(ctx context.Context) (retErr error) {
+
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+
+	if m.runContext != nil {
+		return errors.TraceNew("already running")
+	}
+
+	err := m.configureIPTables(true)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	defer func() {
+		if retErr != nil {
+			m.configureIPTables(false)
+		}
+	}()
+
+	m.injectIPv4FD, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	defer func() {
+		if retErr != nil {
+			syscall.Close(m.injectIPv4FD)
+		}
+	}()
+
+	err = syscall.SetsockoptInt(m.injectIPv4FD, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	err = syscall.SetsockoptInt(m.injectIPv4FD, syscall.SOL_SOCKET, syscall.SO_MARK, m.getSocketMark())
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	m.injectIPv6FD, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
+	if err != nil && !m.config.AllowNoIPv6NetworkConfiguration {
+		return errors.Trace(err)
+	}
+	defer func() {
+		if retErr != nil {
+			syscall.Close(m.injectIPv6FD)
+		}
+	}()
+
+	if m.injectIPv6FD != 0 {
+		err = syscall.SetsockoptInt(m.injectIPv6FD, syscall.IPPROTO_IPV6, syscall.IP_HDRINCL, 1)
+		if err != nil {
+			// There's no AllowNoIPv6NetworkConfiguration in this case: if we can
+			// create an IPv6 socket, we must be able to set its options.
+			return errors.Trace(err)
+		}
+
+		err = syscall.SetsockoptInt(m.injectIPv6FD, syscall.SOL_SOCKET, syscall.SO_MARK, m.getSocketMark())
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	// Use a reasonable buffer size to avoid excess allocation. As we're
+	// intercepting only locally generated SYN-ACK packets, which should have no
+	// payload, this size should be more than sufficient.
+	maxPacketLen := uint32(1500)
+
+	// Use the kernel default of 1024:
+	// https://github.com/torvalds/linux/blob/cd8dead0c39457e58ec1d36db93aedca811d48f1/net/netfilter/nfnetlink_queue.c#L51,
+	// via https://github.com/florianl/go-nfqueue/issues/3.
+	maxQueueLen := uint32(1024)
+
+	// Note: runContext alone is not sufficient to interrupt the
+	// nfqueue.socketCallback goroutine spawned by nfqueue.Register; timeouts
+	// must be set. See comment in Manipulator.Stop.
+
+	m.nfqueue, err = nfqueue.Open(
+		&nfqueue.Config{
+			NfQueue:      uint16(m.config.QueueNumber),
+			MaxPacketLen: maxPacketLen,
+			MaxQueueLen:  maxQueueLen,
+			Copymode:     nfqueue.NfQnlCopyPacket,
+			Logger:       newNfqueueLogger(m.config.Logger),
+			ReadTimeout:  netlinkSocketIOTimeout,
+			WriteTimeout: netlinkSocketIOTimeout,
+		})
+	if err != nil {
+		return errors.Trace(err)
+	}
+	defer func() {
+		if retErr != nil {
+			m.nfqueue.Close()
+		}
+	}()
+
+	runContext, stopRunning := context.WithCancel(ctx)
+	defer func() {
+		if retErr != nil {
+			stopRunning()
+		}
+	}()
+
+	err = m.nfqueue.Register(runContext, m.handleInterceptedPacket)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	m.runContext = runContext
+	m.stopRunning = stopRunning
+
+	return nil
+}
+
+// Stop halts packet manipulation, frees resources, and restores networking
+// state.
+func (m *Manipulator) Stop() {
+
+	m.mutex.Lock()
+	defer m.mutex.Unlock()
+
+	if m.runContext == nil {
+		return
+	}
+
+	m.stopRunning()
+
+	// stopRunning will cancel the context passed into nfqueue.Register. The
+	// goroutine spawned by Register, nfqueue.socketCallback, polls the context
+	// after a read timeout:
+	// https://github.com/florianl/go-nfqueue/blob/1e38df738c06deffbac08da8fec4b7c28a69b918/nfqueue_gteq_1.12.go#L138-L146
+	//
+	// There's no stop synchronization exposed by nfqueue. Calling nfqueue.Close
+	// while socketCallback is still running can result in errors such as
+	// "nfqueuenfqueue_gteq_1.12.go:134: Could not unbind from queue: netlink
+	// send: sendmsg: bad file descriptor".
+	//
+	// To avoid invalid file descriptor operations and spurious error messages,
+	// sleep for two polling periods, which should be sufficient, in most cases,
+	// for socketCallback to poll the context and exit.
+
+	time.Sleep(2 * netlinkSocketIOTimeout)
+
+	m.nfqueue.Close()
+
+	syscall.Close(m.injectIPv4FD)
+
+	if m.injectIPv6FD != 0 {
+		syscall.Close(m.injectIPv6FD)
+	}
+
+	m.configureIPTables(false)
+}
+
+func makeConnectionID(
+	srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16) string {
+
+	// Create a unique connection ID, for appliedSpecCache, from the 4-tuple
+	// srcIP, dstIP, srcPort, dstPort. In the SYN/ACK context, src is the server
+	// and dst is the client.
+	//
+	// Limitation: there may be many repeat connections from one dstIP,
+	// especially if many clients are behind the same NAT. Each TCP connection
+	// will have a distinct dstPort. In principle, there remains a race between
+	// populating appliedSpecCache, the TCP connection terminating on the
+	// client-side and the NAT reusing the dstPort, and consuming
+	// appliedSpecCache.
+
+	// From: https://github.com/golang/go/blob/b88efc7e7ac15f9e0b5d8d9c82f870294f6a3839/src/net/ip.go#L55
+	var v4InV6Prefix = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}
+	const uint16Len = 2
+
+	var connID [net.IPv6len + uint16Len + net.IPv6len + uint16Len]byte
+
+	offset := 0
+	if len(srcIP) == net.IPv4len {
+		copy(connID[offset:], v4InV6Prefix)
+		offset += len(v4InV6Prefix)
+		copy(connID[offset:], srcIP)
+		offset += len(srcIP)
+	} else { // net.IPv6len
+		copy(connID[offset:], srcIP)
+		offset += len(srcIP)
+	}
+	binary.BigEndian.PutUint16(connID[offset:], srcPort)
+	offset += uint16Len
+
+	if len(dstIP) == net.IPv4len {
+		copy(connID[offset:], v4InV6Prefix)
+		offset += len(v4InV6Prefix)
+		copy(connID[offset:], dstIP)
+		offset += len(dstIP)
+	} else { // net.IPv6len
+		copy(connID[offset:], dstIP)
+		offset += len(dstIP)
+	}
+	binary.BigEndian.PutUint16(connID[offset:], dstPort)
+	offset += uint16Len
+
+	return string(connID[:])
+}
+
+// GetAppliedSpecName returns the packet manipulation spec name applied to the
+// TCP connection, represented by its local and remote address components,
+// that was ultimately accepted by a network listener.
+//
+// This allows GetSpecName, the spec selector, to be non-deterministic while
+// also allowing for accurate packet manipulation metrics to be associated
+// with each TCP connection.
+//
+// For a given connection, GetAppliedSpecName must be called before a TTL
+// clears the stored value. Calling GetAppliedSpecName immediately clears the
+// stored value for the given connection.
+//
+// To obtain the correct result GetAppliedSpecName must be called with a
+// RemoteAddr which reflects the true immediate network peer address. In
+// particular, for proxied net.Conns which present a synthetic RemoteAddr with
+// the original address of a proxied client (e.g., armon/go-proxyproto, or
+// psiphon/server.meekConn) the true peer RemoteAddr must instead be
+// provided.
+func (m *Manipulator) GetAppliedSpecName(
+	localAddr, remoteAddr *net.TCPAddr) (string, error) {
+
+	connID := makeConnectionID(
+		localAddr.IP,
+		uint16(localAddr.Port),
+		remoteAddr.IP,
+		uint16(remoteAddr.Port))
+
+	specName, found := m.appliedSpecCache.Get(connID)
+	if !found {
+		return "", errors.TraceNew("connection not found")
+	}
+
+	m.appliedSpecCache.Delete(connID)
+
+	return specName.(string), nil
+}
+
+func (m *Manipulator) setAppliedSpecName(
+	interceptedPacket gopacket.Packet, specName string) {
+
+	srcIP, dstIP, _, _ := m.getPacketAddressInfo(interceptedPacket)
+
+	interceptedTCP := interceptedPacket.Layer(layers.LayerTypeTCP).(*layers.TCP)
+
+	connID := makeConnectionID(
+		srcIP,
+		uint16(interceptedTCP.SrcPort),
+		dstIP,
+		uint16(interceptedTCP.DstPort))
+
+	m.appliedSpecCache.Set(connID, specName, cache.DefaultExpiration)
+}
+
+func (m *Manipulator) getSocketMark() int {
+	if m.config.SocketMark == 0 {
+		return defaultSocketMark
+	}
+	return m.config.SocketMark
+}
+
+func (m *Manipulator) handleInterceptedPacket(attr nfqueue.Attribute) int {
+
+	if attr.PacketID == nil || attr.Payload == nil {
+		m.config.Logger.WithTrace().Warning("missing nfqueue data")
+		return 0
+	}
+
+	// Trigger packet manipulation only if the packet is a SYN-ACK and has no
+	// payload (which a transformation _may_ discard). The iptables filter for
+	// NFQUEUE should already ensure that only SYN-ACK packets are sent through
+	// the queue. To avoid breaking all TCP connections in an unanticipated case,
+	// fail open -- allow the packet -- if these conditions are not met or if
+	// parsing the packet fails.
+
+	packet, err := m.parseInterceptedPacket(*attr.Payload)
+	if err != nil {
+
+		// Fail open in this case.
+		m.nfqueue.SetVerdict(*attr.PacketID, nfqueue.NfAccept)
+
+		m.config.Logger.WithTraceFields(
+			common.LogFields{"error": err}).Warning("unexpected packet")
+		return 0
+	}
+
+	spec, err := m.getCompiledSpec(packet)
+	if err != nil {
+
+		// Fail open in this case.
+		m.nfqueue.SetVerdict(*attr.PacketID, nfqueue.NfAccept)
+
+		m.config.Logger.WithTraceFields(
+			common.LogFields{"error": err}).Warning("get strategy failed")
+		return 0
+	}
+
+	// Call setAppliedSpecName cache _before_ accepting the packet or injecting
+	// manipulated packets to avoid a potential race in which the TCP handshake
+	// completes and GetAppliedSpecName is called before the cache is populated.
+
+	if spec == nil {
+
+		// No packet manipulation in this case.
+		m.setAppliedSpecName(packet, "")
+		m.nfqueue.SetVerdict(*attr.PacketID, nfqueue.NfAccept)
+		return 0
+	}
+
+	m.setAppliedSpecName(packet, spec.name)
+	m.nfqueue.SetVerdict(*attr.PacketID, nfqueue.NfDrop)
+
+	err = m.injectPackets(packet, spec)
+	if err != nil {
+		m.config.Logger.WithTraceFields(
+			common.LogFields{"error": err}).Warning("inject packets failed")
+		return 0
+	}
+
+	return 0
+}
+
+func (m *Manipulator) parseInterceptedPacket(packetData []byte) (gopacket.Packet, error) {
+
+	// Note that NFQUEUE doesn't send an Ethernet layer. This first layer is
+	// either IPv4 or IPv6.
+	//
+	// As we parse only one packet per TCP connection, we are not using the
+	// faster DecodingLayerParser API,
+	// https://godoc.org/github.com/google/gopacket#hdr-Fast_Decoding_With_DecodingLayerParser,
+	// or zero-copy approaches.
+	//
+	// TODO: use a stub gopacket.Decoder as the first layer to avoid the extra
+	// NewPacket call? Use distinct NFQUEUE queue numbers and nfqueue instances
+	// for IPv4 and IPv6?
+
+	packet := gopacket.NewPacket(packetData, layers.LayerTypeIPv4, gopacket.Default)
+
+	if packet.ErrorLayer() != nil {
+		packet = gopacket.NewPacket(packetData, layers.LayerTypeIPv6, gopacket.Default)
+	}
+
+	errLayer := packet.ErrorLayer()
+	if errLayer != nil {
+		return nil, errors.Trace(errLayer.Error())
+	}
+
+	// After this check, Layer([IPv4,IPv6]/TCP) return values are assumed to be
+	// non-nil and unchecked layer type assertions are assumed safe.
+
+	tcpLayer := packet.Layer(layers.LayerTypeTCP)
+	if tcpLayer == nil {
+		return nil, errors.TraceNew("missing TCP layer")
+	}
+
+	if packet.Layer(gopacket.LayerTypePayload) != nil {
+		return nil, errors.TraceNew("unexpected payload layer")
+	}
+
+	tcp := tcpLayer.(*layers.TCP)
+
+	if !tcp.SYN || !tcp.ACK ||
+		tcp.FIN || tcp.RST || tcp.PSH || tcp.URG || tcp.ECE || tcp.CWR || tcp.NS {
+		return nil, errors.TraceNew("unexpected TCP flags")
+	}
+
+	stripEOLOption(packet)
+
+	return packet, nil
+}
+
+func (m *Manipulator) getCompiledSpec(interceptedPacket gopacket.Packet) (*compiledSpec, error) {
+
+	_, dstIP, _, _ := m.getPacketAddressInfo(interceptedPacket)
+
+	interceptedTCP := interceptedPacket.Layer(layers.LayerTypeTCP).(*layers.TCP)
+
+	protocolPort := interceptedTCP.SrcPort
+	clientIP := dstIP
+
+	specName := m.config.GetSpecName(int(protocolPort), clientIP)
+	if specName == "" {
+		return nil, nil
+	}
+
+	spec, ok := m.compiledSpecs[specName]
+	if !ok {
+		return nil, errors.Tracef("invalid spec name: %s", specName)
+	}
+
+	return spec, nil
+}
+
+func (m *Manipulator) injectPackets(interceptedPacket gopacket.Packet, spec *compiledSpec) error {
+
+	// A sockAddr parameter with dstIP (but not port) set appears to be required
+	// even with the IP_HDRINCL socket option.
+
+	_, _, injectFD, sockAddr := m.getPacketAddressInfo(interceptedPacket)
+
+	injectPackets, err := spec.apply(interceptedPacket)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	for _, injectPacket := range injectPackets {
+
+		err = syscall.Sendto(injectFD, injectPacket, 0, sockAddr)
+		if err != nil {
+			return errors.Trace(err)
+		}
+	}
+
+	return nil
+}
+
+func (m *Manipulator) getPacketAddressInfo(interceptedPacket gopacket.Packet) (net.IP, net.IP, int, syscall.Sockaddr) {
+
+	var srcIP, dstIP net.IP
+	var injectFD int
+	var sockAddr syscall.Sockaddr
+
+	ipv4Layer := interceptedPacket.Layer(layers.LayerTypeIPv4)
+	if ipv4Layer != nil {
+		interceptedIPv4 := ipv4Layer.(*layers.IPv4)
+		srcIP = interceptedIPv4.SrcIP
+		dstIP = interceptedIPv4.DstIP
+		injectFD = m.injectIPv4FD
+		var ipv4 [4]byte
+		copy(ipv4[:], interceptedIPv4.DstIP.To4())
+		sockAddr = &syscall.SockaddrInet4{Addr: ipv4, Port: 0}
+	} else {
+		interceptedIPv6 := interceptedPacket.Layer(layers.LayerTypeIPv6).(*layers.IPv6)
+		srcIP = interceptedIPv6.SrcIP
+		dstIP = interceptedIPv6.DstIP
+		injectFD = m.injectIPv6FD
+		var ipv6 [16]byte
+		copy(ipv6[:], interceptedIPv6.DstIP.To16())
+		sockAddr = &syscall.SockaddrInet6{Addr: ipv6, Port: 0}
+	}
+
+	return srcIP, dstIP, injectFD, sockAddr
+}
+
+func (m *Manipulator) configureIPTables(addRules bool) error {
+
+	execCommands := func(mode string) error {
+
+		ports := make([]string, len(m.config.ProtocolPorts))
+		for i, port := range m.config.ProtocolPorts {
+			ports[i] = strconv.Itoa(port)
+		}
+
+		socketMark := strconv.Itoa(m.getSocketMark())
+
+		args := []string{
+			mode, "OUTPUT",
+			"--protocol", "tcp",
+			"--match", "multiport",
+			"--source-ports", strings.Join(ports, ","),
+			"--match", "mark",
+			"!", "--mark", socketMark,
+			"--tcp-flags", "ALL", "SYN,ACK",
+			"-j", "NFQUEUE",
+			"--queue-bypass",
+			"--queue-num", strconv.Itoa(m.config.QueueNumber),
+		}
+
+		err := common.RunNetworkConfigCommand(
+			m.config.Logger,
+			m.config.SudoNetworkConfigCommands,
+			"iptables",
+			args...)
+		if mode != "-D" && err != nil {
+			return errors.Trace(err)
+		}
+
+		err = common.RunNetworkConfigCommand(
+			m.config.Logger,
+			m.config.SudoNetworkConfigCommands,
+			"ip6tables",
+			args...)
+		if mode != "-D" && err != nil {
+			if m.config.AllowNoIPv6NetworkConfiguration {
+				m.config.Logger.WithTraceFields(
+					common.LogFields{
+						"error": err}).Warning(
+					"configure IPv6 NFQUEUE failed")
+			} else {
+				return errors.Trace(err)
+			}
+		}
+
+		return nil
+	}
+
+	// To avoid duplicates, first try to drop existing rules, then add. Also try
+	// to revert any partial configuration in the case of an error.
+
+	_ = execCommands("-D")
+
+	if addRules {
+		err := execCommands("-A")
+		if err != nil {
+			_ = execCommands("-D")
+		}
+		return errors.Trace(err)
+	}
+
+	return nil
+}
+
+func newNfqueueLogger(logger common.Logger) *log.Logger {
+	return log.New(
+		&nfqueueLoggerWriter{logger: logger},
+		"nfqueue",
+		log.Lshortfile)
+}
+
+type nfqueueLoggerWriter struct {
+	logger common.Logger
+}
+
+func (n *nfqueueLoggerWriter) Write(p []byte) (int, error) {
+	n.logger.WithTraceFields(
+		common.LogFields{"log": string(p)}).Warning("nfqueue log")
+	return len(p), nil
+}

+ 203 - 0
psiphon/common/packetman/packetman_linux_test.go

@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package packetman
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
+)
+
+func TestPacketManipulatorIPv4(t *testing.T) {
+	testPacketManipulator(false, t)
+}
+
+func TestPacketManipulatorIPv6(t *testing.T) {
+	testPacketManipulator(true, t)
+}
+
+func testPacketManipulator(useIPv6 bool, t *testing.T) {
+
+	// Test: run a Manipulator in front of a web server; make an HTTP request;
+	// the expected transformation spec should be executed (as reported by
+	// GetAppliedSpecName) and the request must succeed.
+
+	ipv4, ipv6, err := common.GetRoutableInterfaceIPAddresses()
+	if err != nil {
+		t.Fatalf("GetRoutableInterfaceIPAddressesfailed: %v", err)
+	}
+
+	network := "tcp4"
+	address := net.JoinHostPort(ipv4.String(), "0")
+	if useIPv6 {
+		network = "tcp6"
+		address = net.JoinHostPort(ipv6.String(), "0")
+	}
+
+	listener, err := net.Listen(network, address)
+	if err != nil {
+		t.Fatalf("net.Listen failed: %v", err)
+	}
+	defer listener.Close()
+
+	hostStr, portStr, err := net.SplitHostPort(listener.Addr().String())
+	if err != nil {
+		t.Fatalf("net.SplitHostPort failed: %s", err.Error())
+	}
+	listenerPort, _ := strconv.Atoi(portStr)
+
+	// [["TCP-flags S"]] replaces the original SYN-ACK packet with a single
+	// SYN packet, implementing TCP simultaneous open.
+
+	testSpecName := "test-spec"
+	config := &Config{
+		Logger:        newTestLogger(),
+		ProtocolPorts: []int{listenerPort},
+		Specs:         []*Spec{&Spec{Name: testSpecName, PacketSpecs: [][]string{[]string{"TCP-flags S"}}}},
+		GetSpecName: func(protocolPort int, _ net.IP) string {
+			if protocolPort == listenerPort {
+				return testSpecName
+			}
+			return ""
+		},
+		QueueNumber: 1,
+	}
+
+	m, err := NewManipulator(config)
+	if err != nil {
+		t.Fatalf("NewManipulator failed: %v", err)
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+
+	err = m.Start(ctx)
+	if err != nil {
+		t.Fatalf("Manipulator.Start failed: %v", err)
+	}
+	defer m.Stop()
+
+	go func() {
+		serveMux := http.NewServeMux()
+		serveMux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
+			io.WriteString(w, "test-response\n")
+		})
+
+		server := &http.Server{
+			Handler: serveMux,
+			ConnState: func(conn net.Conn, state http.ConnState) {
+				if state == http.StateNew {
+					localAddr := conn.LocalAddr().(*net.TCPAddr)
+					remoteAddr := conn.RemoteAddr().(*net.TCPAddr)
+					specName, err := m.GetAppliedSpecName(localAddr, remoteAddr)
+					if err != nil {
+						t.Fatalf("GetAppliedSpecName failed: %v", err)
+					}
+					if specName != testSpecName {
+						t.Fatalf("unexpected spec name: %s", specName)
+					}
+				}
+			},
+		}
+
+		server.Serve(listener)
+	}()
+
+	httpClient := &http.Client{
+		Timeout: 30 * time.Second,
+	}
+
+	response, err := httpClient.Get(fmt.Sprintf("http://%s:%s", hostStr, portStr))
+	if err != nil {
+		t.Fatalf("http.Get failed: %v", err)
+	}
+	defer response.Body.Close()
+	_, err = ioutil.ReadAll(response.Body)
+	if err != nil {
+		t.Fatalf("ioutil.ReadAll failed: %v", err)
+	}
+
+	if response.StatusCode != http.StatusOK {
+		t.Fatalf("unexpected response code: %d", response.StatusCode)
+	}
+}
+
+func newTestLogger() common.Logger {
+	return &testLogger{}
+}
+
+type testLogger struct {
+}
+
+func (logger *testLogger) WithTrace() common.LogTrace {
+	return &testLogTrace{
+		trace: stacktrace.GetParentFunctionName(),
+	}
+}
+
+func (logger *testLogger) WithTraceFields(fields common.LogFields) common.LogTrace {
+	return &testLogTrace{
+		trace:  stacktrace.GetParentFunctionName(),
+		fields: fields,
+	}
+}
+
+func (logger *testLogger) LogMetric(metric string, fields common.LogFields) {
+}
+
+type testLogTrace struct {
+	trace  string
+	fields common.LogFields
+}
+
+func (log *testLogTrace) log(
+	noticeType string, args ...interface{}) {
+
+	fmt.Printf("[%s] %s: %+v: %s\n",
+		noticeType,
+		log.trace,
+		log.fields,
+		fmt.Sprint(args...))
+}
+
+func (log *testLogTrace) Debug(args ...interface{}) {
+	log.log("DEBUG", args...)
+}
+
+func (log *testLogTrace) Info(args ...interface{}) {
+	log.log("INFO", args...)
+}
+
+func (log *testLogTrace) Warning(args ...interface{}) {
+	log.log("ALERT", args...)
+}
+
+func (log *testLogTrace) Error(args ...interface{}) {
+	log.log("ERROR", args...)
+}

+ 277 - 0
psiphon/common/packetman/packetman_test.go

@@ -0,0 +1,277 @@
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package packetman
+
+import (
+	"bytes"
+	"encoding/json"
+	"net"
+	"testing"
+
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
+)
+
+func TestTransformations(t *testing.T) {
+
+	// Test: apply various transformations to an original packet, then parse the
+	// resulting packets and check that flags/fields/options are as expected.
+
+	// Limitation: gopacket, used here in the test to verify transformations,
+	// will fail to parse some or all of certain packets that can be created by
+	// certain transformations. gopacket will fail to parse packets with too many
+	// option bytes or invalid DataOffset values. gopacket will stop
+	// deserializing TCP options as soon as it encounters the EOL option, even if
+	// the packet actually contains more options. Etc.
+
+	specJSON := []byte(`
+    {
+        "Name": "test-spec",
+        "PacketSpecs": [
+            ["TCP-flags SA",
+             "TCP-flags S",
+             "TCP-srcport ffff",
+             "TCP-dstport ffff",
+             "TCP-seq ffffffff",
+             "TCP-ack ffffffff",
+             "TCP-dataoffset 0f",
+             "TCP-window ffff",
+             "TCP-checksum ffff",
+             "TCP-urgent ffff",
+             "TCP-option-nop omit",
+             "TCP-option-mss ffff",
+             "TCP-option-windowscale ff",
+             "TCP-option-sackpermitted ",
+	         "TCP-option-sack ffffffffffffffff",
+	         "TCP-option-timestamps ffffffffffffffff",
+             "TCP-payload eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
+             "TCP-payload ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"],
+
+            ["TCP-flags random",
+             "TCP-srcport random",
+             "TCP-dstport random",
+             "TCP-seq random",
+             "TCP-ack random",
+             "TCP-dataoffset random",
+             "TCP-window random",
+             "TCP-checksum random",
+             "TCP-urgent random",
+             "TCP-option-mss random",
+             "TCP-option-windowscale random",
+             "TCP-option-sackpermitted random",
+             "TCP-option-timestamps random",
+             "TCP-payload random"]
+        ]
+    }
+	`)
+
+	var spec *Spec
+	err := json.Unmarshal(specJSON, &spec)
+	if err != nil {
+		t.Fatalf("json.Unmarshal failed: %v", err)
+	}
+
+	c, err := compileSpec(spec)
+	if err != nil {
+		t.Fatalf("compileSpec failed: %v", err)
+	}
+
+	if c.name != spec.Name {
+		t.Fatalf("unexpected compiled spec name: %s", c.name)
+	}
+
+	originalIPv4 := &layers.IPv4{
+		Version:  0x04,
+		IHL:      0x05,
+		Protocol: 0x06,
+		SrcIP:    net.IPv4(192, 168, 0, 1),
+		DstIP:    net.IPv4(192, 168, 0, 2),
+	}
+
+	originalTCP := &layers.TCP{
+		SYN: true,
+		ACK: true,
+		Options: []layers.TCPOption{
+			layers.TCPOption{OptionType: layers.TCPOptionKindNop, OptionLength: 1},
+			layers.TCPOption{OptionType: layers.TCPOptionKindSACKPermitted, OptionLength: 2},
+			layers.TCPOption{OptionType: layers.TCPOptionKindSACK, OptionLength: 10, OptionData: bytes.Repeat([]byte{0}, 8)},
+			layers.TCPOption{OptionType: layers.TCPOptionKindTimestamps, OptionLength: 10, OptionData: bytes.Repeat([]byte{0}, 8)},
+		},
+	}
+
+	originalTCP.SetNetworkLayerForChecksum(originalIPv4)
+
+	originalPayload := gopacket.Payload([]byte{0, 0, 0, 0})
+
+	buffer := gopacket.NewSerializeBuffer()
+	gopacket.SerializeLayers(
+		buffer,
+		gopacket.SerializeOptions{FixLengths: true, ComputeChecksums: true},
+		originalIPv4,
+		originalTCP,
+		originalPayload)
+	originalPacketData := buffer.Bytes()
+
+	originalPacket := gopacket.NewPacket(originalPacketData, layers.LayerTypeIPv4, gopacket.Default)
+	errLayer := originalPacket.ErrorLayer()
+	if errLayer != nil {
+		t.Fatalf("gopacket.NewPacket failed: %v", errLayer.Error())
+	}
+
+	stripEOLOption(originalPacket)
+
+	repeats := 1000
+repeatLoop:
+	for i := 0; i < repeats; i++ {
+
+		lastRepeat := i == repeats-1
+
+		injectPackets, err := c.apply(originalPacket)
+		if err != nil {
+			t.Fatalf("apply failed: %v", err)
+		}
+
+		if len(injectPackets) != 2 {
+			t.Fatalf("unexpected injectPackets count: %d", len(injectPackets))
+		}
+
+		for packetNum, packetData := range injectPackets {
+
+			packet := gopacket.NewPacket(packetData, layers.LayerTypeIPv4, gopacket.Default)
+
+			errLayer := packet.ErrorLayer()
+			if errLayer != nil {
+				t.Fatalf("gopacket.NewPacket failed: %v", errLayer.Error())
+			}
+
+			tcpLayer := packet.Layer(layers.LayerTypeTCP)
+			if tcpLayer == nil {
+				t.Fatalf("missing TCP layer")
+			}
+
+			tcp := tcpLayer.(*layers.TCP)
+
+			payloadLayer := packet.Layer(gopacket.LayerTypePayload)
+			if payloadLayer == nil {
+				t.Fatalf("missing payload layer")
+			}
+
+			payload := payloadLayer.(*gopacket.Payload)
+
+			optionsEqual := func(a, b layers.TCPOption) bool {
+				if a.OptionType != b.OptionType ||
+					a.OptionLength != b.OptionLength ||
+					!bytes.Equal(a.OptionData, b.OptionData) {
+					return false
+				}
+				return true
+			}
+
+			optionsListEqual := func(a, b []layers.TCPOption) bool {
+				if len(a) != len(b) {
+					return false
+				}
+				for i, o := range a {
+					if !optionsEqual(o, b[i]) {
+						return false
+					}
+				}
+				return true
+			}
+
+			if packetNum == 0 {
+
+				// With multiple, redundant value specs (TCP-flags in the test case) the
+				// _last_ value spec should be applied. Values should be truncated to
+				// protocol lengths. The NOP option in the original packet should be
+				// omitted.
+
+				expectedOptions := []layers.TCPOption{
+					layers.TCPOption{OptionType: layers.TCPOptionKindSACKPermitted, OptionLength: 2},
+					layers.TCPOption{OptionType: layers.TCPOptionKindSACK, OptionLength: 10, OptionData: bytes.Repeat([]byte{0xff}, 8)},
+					layers.TCPOption{OptionType: layers.TCPOptionKindTimestamps, OptionLength: 10, OptionData: bytes.Repeat([]byte{0xff}, 8)},
+					layers.TCPOption{OptionType: layers.TCPOptionKindMSS, OptionLength: 4, OptionData: bytes.Repeat([]byte{0xff}, 2)},
+					layers.TCPOption{OptionType: layers.TCPOptionKindWindowScale, OptionLength: 3, OptionData: bytes.Repeat([]byte{0xff}, 1)},
+					layers.TCPOption{OptionType: layers.TCPOptionKindEndList, OptionLength: 1},
+				}
+
+				if tcp.SrcPort != 0xffff ||
+					tcp.DstPort != 0xffff ||
+					tcp.Seq != 0xffffffff ||
+					tcp.Ack != 0xffffffff ||
+					tcp.FIN || !tcp.SYN || tcp.RST || tcp.PSH || tcp.ACK ||
+					tcp.URG || tcp.ECE || tcp.CWR || tcp.NS ||
+					tcp.Window != 0xffff ||
+					tcp.Urgent != 0xffff ||
+					!optionsListEqual(tcp.Options, expectedOptions) {
+					t.Fatalf("unexpected TCP layer: %+v", tcp)
+				}
+
+				expectedPayload := bytes.Repeat([]byte{0xff}, 32)
+				if !bytes.Equal(expectedPayload, *payload) {
+					t.Fatalf("unexpected payload: %x", *payload)
+				}
+
+			} else {
+
+				// In at least one repeat, randomized fields fully differ from original,
+				// including zero-values; original NOP and SACK options retained; random
+				// options have correct protocol lengths.
+
+				if tcp.SrcPort == originalTCP.SrcPort ||
+					tcp.DstPort == originalTCP.DstPort ||
+					tcp.Seq == originalTCP.Seq ||
+					tcp.Ack == originalTCP.Ack ||
+					(tcp.FIN == originalTCP.FIN &&
+						tcp.SYN == originalTCP.SYN &&
+						tcp.RST == originalTCP.RST &&
+						tcp.PSH == originalTCP.PSH &&
+						tcp.ACK == originalTCP.ACK &&
+						tcp.URG == originalTCP.URG &&
+						tcp.ECE == originalTCP.ECE &&
+						tcp.CWR == originalTCP.CWR &&
+						tcp.NS == originalTCP.NS) ||
+					tcp.Window == originalTCP.Window ||
+					tcp.Checksum == originalTCP.Checksum ||
+					tcp.Urgent == originalTCP.Urgent ||
+					len(tcp.Options) != 7 ||
+					!optionsEqual(tcp.Options[0], originalTCP.Options[0]) ||
+					!optionsEqual(tcp.Options[1], originalTCP.Options[1]) ||
+					!optionsEqual(tcp.Options[2], originalTCP.Options[2]) ||
+					tcp.Options[3].OptionType != layers.TCPOptionKindTimestamps ||
+					tcp.Options[3].OptionLength != 10 ||
+					optionsEqual(tcp.Options[3], originalTCP.Options[3]) ||
+					tcp.Options[4].OptionType != layers.TCPOptionKindMSS ||
+					tcp.Options[4].OptionLength != 4 ||
+					tcp.Options[5].OptionType != layers.TCPOptionKindWindowScale ||
+					tcp.Options[5].OptionLength != 3 ||
+					tcp.Options[6].OptionType != layers.TCPOptionKindEndList ||
+					bytes.Equal(originalPayload, *payload) {
+
+					if lastRepeat {
+						t.Fatalf("unexpected TCP layer: %+v", tcp)
+					}
+				} else {
+					break repeatLoop
+				}
+			}
+		}
+	}
+}

+ 54 - 0
psiphon/common/packetman/packetman_unsupported.go

@@ -0,0 +1,54 @@
+// +build !linux
+
+/*
+ * Copyright (c) 2020, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package packetman
+
+import (
+	"context"
+	std_errors "errors"
+	"net"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+)
+
+func IsSupported() bool {
+	return false
+}
+
+var errUnsupported = std_errors.New("operation unsupported on this platform")
+
+type Manipulator struct {
+}
+
+func NewManipulator(_ *Config) (*Manipulator, error) {
+	return nil, errors.Trace(errUnsupported)
+}
+
+func (m *Manipulator) Start(_ context.Context) error {
+	return errors.Trace(errUnsupported)
+}
+
+func (m *Manipulator) Stop() {
+}
+
+func (m *Manipulator) GetAppliedSpecName(_, _ *net.TCPAddr) (string, error) {
+	return "", errors.Trace(errUnsupported)
+}