mirokuratczyk 2 лет назад
Родитель
Сommit
52a9c9fd0d

+ 6 - 1
psiphon/common/protocol/protocol.go

@@ -194,6 +194,10 @@ func TunnelProtocolUsesMeekHTTP(protocol string) bool {
 		protocol == TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP
 }
 
+func TunnelProtocolUsesMeekHTTPNormalizer(protocol string) bool {
+	return protocol == TUNNEL_PROTOCOL_UNFRONTED_MEEK
+}
+
 func TunnelProtocolUsesMeekHTTPS(protocol string) bool {
 	return protocol == TUNNEL_PROTOCOL_FRONTED_MEEK ||
 		protocol == TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS ||
@@ -249,7 +253,8 @@ func TunnelProtocolRequiresTLS12SessionTickets(protocol string) bool {
 
 func TunnelProtocolSupportsPassthrough(protocol string) bool {
 	return protocol == TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS ||
-		protocol == TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET
+		protocol == TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET ||
+		protocol == TUNNEL_PROTOCOL_UNFRONTED_MEEK
 }
 
 func TunnelProtocolSupportsUpstreamProxy(protocol string) bool {

+ 679 - 0
psiphon/common/transforms/httpNormalizer.go

@@ -0,0 +1,679 @@
+/*
+ * Copyright (c) 2023, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package transforms
+
+import (
+	"bytes"
+	stderrors "errors"
+	"io"
+	"net"
+	"net/textproto"
+	"strconv"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+)
+
+const (
+	// httpNormalizerReadReqLineAndHeader HTTPNormalizer is waiting to finish
+	// reading the Request-Line, and headers, of the next request from the
+	// underlying net.Conn.
+	httpNormalizerReadReqLineAndHeader = 0
+	// httpNormalizerReadBody HTTPNormalizer is waiting to finish reading the
+	// current request body from the underlying net.Conn.
+	httpNormalizerReadBody = 1
+
+	// httpNormalizerRequestLine is a valid Request-Line used by the normalizer.
+	httpNormalizerRequestLine = "POST / HTTP/1.1"
+	hostHeader                = "Host"
+	contentLengthHeader       = "Content-Length"
+	cookieHeader              = "Cookie"
+	rangeHeader               = "Range"
+)
+
+var ErrPassthroughActive = stderrors.New("passthrough")
+
+// HTTPNormalizer wraps a net.Conn, intercepting Read calls, and normalizes any
+// HTTP requests that are read. The HTTP request components preceeding the body
+// are normalized; i.e. the Request-Line and headers.
+//
+// Each HTTP request read from the underlying net.Conn is normalized and then
+// returned over subsequent Read calls.
+//
+// HTTPNormalizer is not safe for concurrent use.
+type HTTPNormalizer struct {
+	// state is the HTTPNormalizer state. Possible values are
+	// httpNormalizerReadReqLineAndHeader and httpNormalizerReadBody.
+	state int64
+	// b is used to buffer the accumulated bytes of the current request
+	// until the Request-Line, and headers, are read from the underlying
+	// net.Conn, normalized, and returned in one, or more, Read calls. May
+	// contain bytes of the current request body and subsequent requests until
+	// they are processed.
+	b bytes.Buffer
+	// maxReqLineAndHeadersSize is the maximum number of bytes the normalizer
+	// will read before establishing a passthrough session, or rejecting the
+	// connection, if the request body of the current request has not been
+	// reached.
+	// No limit is applied if the value is 0.
+	maxReqLineAndHeadersSize int
+	// scanIndex is the index that the bytes in b have been processed up to.
+	// Bytes before this index in b will not contain the RequestLine, or
+	// headers, of the current request after a Read call. Applies when state is
+	// httpNormalizerReadReqLineAndHeader.
+	scanIndex int
+	// readRequestLine is set to true when the Request-Line of the current
+	// request has been read. Applies when state is httpNormalizerReadReqLineAndHeader.
+	readRequestLine bool
+	// reqLineAndHeadersBuf is the buffer used to stage the next normalized
+	// Request-Line, and headers, before outputting them in Read.
+	reqLineAndHeadersBuf bytes.Buffer
+	// headers is the staging area for preserved headers and is reset after the
+	// Request-Line, and headers, of the current request are processed.
+	headers map[string][]byte
+	// contentLength of the current request. Reset after the Request-Line, and
+	// headers, of the current request are processed
+	contentLength *uint64
+	// preserveHeaders are the headers to preserve during request normalization.
+	preserveHeaders []string
+	// prohibitedHeaders is a list of HTTP headers to check for in the
+	// request. If one of these headers is found, then a passthrough is
+	// performed. This is used to defend against abuse.
+	// Limitation: prohibited headers are only logged when passthrough is
+	// configured and passthroughLogPassthrough is set.
+	prohibitedHeaders []string
+	// headerWriteOrder is the order in which headers are written if set. Used
+	// for testing.
+	headerWriteOrder []string
+	// readRemain is the number of remaining request body bytes of the current
+	// request to read from the underlying net.Conn.
+	readRemain uint64
+	// copyRemain is the number of remaining bytes of the current request to
+	// return over one, or more, Read calls.
+	copyRemain uint64
+	// validateMeekCookie is called with the cookie header value of the current
+	// request when it is received and a passthrough session is established if
+	// false is returned.
+	// Note: if there are multiple cookie headers, even though prohibited by
+	// rfc6265, then validateMeekCookie will only be invoked once with the
+	// first one received.
+	validateMeekCookie func(rawCookies []byte) ([]byte, error)
+	// ValidateMeekCookieResult stores the result from calling
+	// validateMeekCookie.
+	ValidateMeekCookieResult []byte
+	// passthrough is set if the normalizer has established a passthrough
+	// session.
+	passthrough bool
+	// passthroughDialer is used to establish any passthrough sessions.
+	passthroughDialer func(network, address string) (net.Conn, error)
+	// passthroughAddress is the passthrough address that will be used for any
+	// passthrough sessions.
+	passthroughAddress string
+	// passthroughLogPassthrough is called when a passthrough session is
+	// initiated.
+	passthroughLogPassthrough func(clientIP string, tunnelError error, logFields map[string]interface{})
+
+	net.Conn
+}
+
+func NewHTTPNormalizer(conn net.Conn) *HTTPNormalizer {
+	t := HTTPNormalizer{
+		Conn: conn,
+	}
+
+	// TODO/perf: could pre-alloc n.b, and n.reqLineAndHeadersBuf,
+	// with (*bytes.Buffer).Grow().
+
+	t.reqLineAndHeadersBuf.WriteString(httpNormalizerRequestLine)
+
+	t.preserveHeaders = []string{
+		hostHeader,
+		contentLengthHeader,
+		cookieHeader,
+		rangeHeader,
+	}
+
+	return &t
+}
+
+// Read implements the net.Conn interface.
+//
+// Note: it is assumed that the underlying transport, net.Conn, is a reliable
+// stream transport, i.e. TCP, therefore it is required that the caller stop
+// calling Read() on an instance of HTTPNormalizer after an error is returned
+// because, following this assumption, the connection will have failed when a
+// Read() call to the underlying net.Conn fails; a new connection must be
+// established, net.Conn, and wrapped with a new HTTPNormalizer.
+//
+// Warning: Does not handle chunked encoding. Must be called synchronously.
+func (t *HTTPNormalizer) Read(buffer []byte) (int, error) {
+
+	if t.passthrough {
+		return 0, io.EOF
+	}
+
+	// TODO/perf: allocate on-demand
+	if t.headers == nil {
+		t.headers = make(map[string][]byte)
+	}
+
+	if t.state == httpNormalizerReadReqLineAndHeader {
+
+		// perf: read into caller's buffer instead of allocating a new one.
+		// perf: theoretically it could be more performant to read directly
+		// into t.b, but there is no mechanism to do so with bytes.Buffer.
+		n, err := t.Conn.Read(buffer)
+
+		if n > 0 {
+			// Do not need to check return value. Applies to all subsequent
+			// calls to t.b.Write() and this comment will not be repeated for
+			// each. See https://github.com/golang/go/blob/1e9ff255a130200fcc4ec5e911d28181fce947d5/src/bytes/buffer.go#L164.
+			t.b.Write(buffer[:n])
+		}
+
+		crlf := []byte("\r\n")
+		doublecrlf := []byte("\r\n\r\n")
+
+		// Check if the maximum number of bytes to read before the request body
+		// has been exceeded first.
+		// Note: could check if max header size will be exceeded before Read
+		// call or ensure the buffer passed into Read is no larger than
+		// t.maxReqLineAndHeadersSize-t.b.Len().
+		if t.maxReqLineAndHeadersSize > 0 && t.b.Len() > t.maxReqLineAndHeadersSize && !bytes.Contains(t.b.Bytes()[:t.maxReqLineAndHeadersSize], doublecrlf) {
+
+			if t.passthroughConfigured() {
+
+				t.startPassthrough(errors.TraceNew("maxReqLineAndHeadersSize exceeded before request body received"), nil)
+
+				return 0, nil
+			}
+
+			return 0, errors.Tracef("%d exceeds maxReqLineAndHeadersSize %d", t.b.Len(), t.maxReqLineAndHeadersSize)
+		}
+
+		if err != nil {
+			// Do not wrap any I/O err returned by Conn
+			return 0, err
+		}
+
+		// preserve headers
+		//
+		// TODO/perf: instead of storing headers in a map they could be
+		// processed and written as they are parsed, but benchmarking this
+		// change shows no measurable change in performance.
+		//
+		// TODO/perf: skip Request-Line, e.g. "GET /foo HTTP/1.1"
+
+		reachedBody := false
+
+		for {
+
+			// NOTE: could add guard here for t.scanIndex < t.b.Len(),
+			// but should never happen.
+			i := bytes.Index(t.b.Bytes()[t.scanIndex:], crlf)
+
+			var header []byte
+			if i == -1 {
+				break // no more CRLF separated headers in t.b
+			} else {
+				header = t.b.Bytes()[t.scanIndex : t.scanIndex+i]
+			}
+
+			if len(header) == 0 && t.readRequestLine {
+				// Zero-length header line means the end of the request headers
+				// has been reached.
+				reachedBody = true
+				break
+			}
+
+			if !t.readRequestLine {
+				t.readRequestLine = true
+			}
+
+			if len(t.headers) >= len(t.preserveHeaders) {
+				t.scanIndex += i + len(crlf)
+				continue // found all headers, continue until final CRLF
+			}
+
+			colon := bytes.Index(header, []byte(":"))
+			if colon == -1 {
+				t.scanIndex += i + len(crlf)
+				continue // not a header, skip
+			}
+
+			// Allow for space before header and trim whitespace around
+			// value.
+			k := textproto.TrimBytes(header[:colon])
+			v := textproto.TrimBytes(header[colon+1:]) // skip over ":"
+
+			err = nil
+			var logFields map[string]interface{}
+
+			if t.validateMeekCookie != nil && t.ValidateMeekCookieResult == nil && bytes.Equal(k, []byte(cookieHeader)) {
+				t.ValidateMeekCookieResult, err = t.validateMeekCookie(v)
+				if err != nil {
+					err = errors.TraceMsg(err, "invalid meek cookie")
+				}
+			}
+
+			if err == nil {
+				if bytes.Equal(k, []byte(contentLengthHeader)) {
+					var cl uint64
+					cl, err = strconv.ParseUint(string(v), 10, 63)
+					if err != nil {
+						err = errors.TraceMsg(err, "invalid Content-Length")
+					} else {
+						t.contentLength = &cl
+					}
+				}
+			}
+
+			if err == nil {
+				// Do passthrough if a prohibited header is found
+				for _, h := range t.prohibitedHeaders {
+
+					// TODO/perf: consider using map, but array may be faster
+					// and use less mem.
+					if bytes.Equal(k, []byte(h)) {
+
+						err = errors.TraceNew("prohibited header")
+						logFields = map[string]interface{}{
+							"header": h,
+							"value":  v,
+						}
+
+						break
+					}
+				}
+			}
+
+			if err != nil {
+				if t.passthroughConfigured() {
+					t.startPassthrough(err, logFields)
+					return 0, nil
+				} else {
+					return 0, errors.Trace(err)
+				}
+			}
+
+			for _, h := range t.preserveHeaders {
+				// TODO/perf: consider using map, but array may be faster and
+				// use less mem.
+				if bytes.Equal(k, []byte(h)) {
+					// TODO: if there are multiple preserved headers with the
+					// same key, then the last header parsed will be the
+					// preserved value. Consider if this is the desired
+					// functionality.
+					t.headers[h] = v
+					break
+				}
+			}
+
+			t.scanIndex += i + len(crlf)
+		}
+
+		if !reachedBody {
+			return 0, nil
+		} // else: Request-Line and all headers have been read.
+
+		bodyOffset := t.scanIndex + len(crlf)
+
+		// reset for next request
+		defer func() {
+			t.scanIndex = 0
+			t.readRequestLine = false
+			t.headers = nil
+			t.contentLength = nil
+		}()
+
+		err = nil
+
+		if t.contentLength == nil {
+			// Irrecoverable error because either Content-Length header
+			// is missing, or Content-Length header value is empty, e.g.
+			// "Content-Length: ", and request body length cannot be
+			// determined.
+			err = errors.TraceNew("Content-Length missing")
+		}
+
+		if err == nil {
+			if t.validateMeekCookie != nil {
+				// NOTE: could check t.ValidateMeekCookieResult == nil instead
+				// if it is guaranteed to return a non-nil result if no error is
+				// returned.
+				if _, ok := t.headers[cookieHeader]; !ok {
+					err = errors.TraceNew("cookie missing")
+				}
+			}
+		}
+
+		if err != nil {
+			if t.passthroughConfigured() {
+				t.startPassthrough(err, nil)
+				return 0, nil
+			} else {
+				return 0, errors.Trace(err)
+			}
+		}
+
+		// No passthrough will be performed. Discard buffered bytes because
+		// they are no longer needed to perform a passthrough.
+		t.b.Next(bodyOffset)
+
+		// TODO: technically at this point we could start copying bytes into the
+		// caller's buffer which would remove the need to copy len(buffer) bytes
+		// twice; first into the internal buffer and second into the caller's
+		// buffer.
+		t.reqLineAndHeadersBuf.Truncate(len(httpNormalizerRequestLine))
+
+		if _, ok := t.headers[hostHeader]; !ok {
+			// net/http expects the host header
+			t.reqLineAndHeadersBuf.WriteString("\r\nHost: example.com")
+		}
+
+		// Write headers
+
+		if t.headerWriteOrder != nil {
+			// Re-add headers in specified order (for testing)
+			for _, k := range t.headerWriteOrder {
+				if v, ok := t.headers[k]; ok {
+					t.reqLineAndHeadersBuf.WriteString("\r\n" + k + ": ")
+					t.reqLineAndHeadersBuf.Write(v)
+				}
+			}
+		} else {
+			for k, v := range t.headers {
+				t.reqLineAndHeadersBuf.WriteString("\r\n" + k + ": ")
+				t.reqLineAndHeadersBuf.Write(v)
+			}
+		}
+		t.reqLineAndHeadersBuf.Write(doublecrlf)
+
+		// TODO/perf: could eliminate copy of header by copying it direct into
+		// the caller's buffer instead of copying the bytes over to t.b first.
+		header := t.reqLineAndHeadersBuf.Bytes()
+
+		// Copy any request body bytes received before resetting the
+		// buffer.
+		var reqBody []byte
+		reqBodyLen := t.b.Len() // number of request body bytes received
+		if reqBodyLen > 0 {
+			reqBody = make([]byte, reqBodyLen)
+			copy(reqBody, t.b.Bytes())
+		}
+
+		t.b.Reset()
+		t.b.Write(header)
+		if len(reqBody) > 0 {
+			t.b.Write(reqBody)
+		}
+
+		// Calculate number of bytes remaining to:
+		// - read from the underlying net.Conn
+		// - return to the caller
+
+		t.state = httpNormalizerReadBody
+
+		totalReqBytes := len(header) + int(*t.contentLength)
+		t.copyRemain = uint64(totalReqBytes)
+
+		bytesOfBodyRead := t.b.Len() - len(header)
+
+		if bytesOfBodyRead > totalReqBytes-len(header) {
+			t.readRemain = 0
+		} else {
+			t.readRemain = *t.contentLength - uint64(bytesOfBodyRead)
+		}
+
+		return t.copy(buffer), nil
+	}
+
+	// Request-Line, and headers, have been normalized. Return any remaining
+	// bytes of these and then read, and return, the bytes of the request body
+	// from the underlying net.Conn.
+
+	var n int
+	var err error
+
+	// Read more bytes from the underlying net.Conn once all the remaining
+	// bytes in t.b have been copied into the caller's buffer in previous Read
+	// calls.
+	if t.b.Len() == 0 {
+
+		// perf: read bytes directly into the caller's buffer.
+
+		bufferLen := len(buffer)
+		if uint64(bufferLen) > t.readRemain {
+			bufferLen = int(t.readRemain)
+		}
+
+		// TODO: could attempt to read more bytes and only copy bufferLen bytes
+		// into buffer but this adds an extra copy.
+		n, err = t.Conn.Read(buffer[:bufferLen])
+
+		if uint64(n) >= t.readRemain {
+			t.readRemain = 0
+			// Do not reset t.b because it may contain bytes of subsequent
+			// requests.
+			t.state = httpNormalizerReadReqLineAndHeader
+		} else {
+			t.readRemain -= uint64(n)
+		}
+
+		// Do not wrap any I/O err returned by Conn
+		return n, err
+	}
+
+	// Copy remaining bytes in t.b into the caller's buffer.
+	return t.copy(buffer), nil
+}
+
+func (t *HTTPNormalizer) copy(buffer []byte) int {
+	// Do not return any bytes from subsequent requests which have been
+	// buffered internally because they need to be normalized first.
+	bytesToCopy := t.copyRemain
+	if uint64(t.b.Len()) < t.copyRemain {
+		bytesToCopy = uint64(t.b.Len())
+	}
+
+	// Copy bytes to caller's buffer
+	n := copy(buffer, t.b.Bytes()[:bytesToCopy])
+
+	// Remove returned bytes from internal buffer and update number of bytes
+	// remaining to return to the caller.
+	t.b.Next(n) // perf: advance read cursor instead of copying bytes to front of buffer
+	t.copyRemain -= uint64(n)
+
+	if t.copyRemain == 0 && t.readRemain == 0 {
+
+		// Shift buffer back to 0 copying any remaining bytes to the start of
+		// the buffer.
+		// TODO/perf: technically bytes.Buffer takes a similar, and more
+		// efficient, approach internally so this should not be necessary.
+		nextBytes := t.b.Bytes()
+		t.b.Reset()
+		if len(nextBytes) > 0 {
+			t.b.Write(nextBytes)
+		}
+
+		// All bytes of the current request have been read and returned to the
+		// caller. Start normalizing the header of the next request.
+		// NOTE: if t.b contains CRLF separated lines, of the next request and
+		// there is remaining space in the buffer supplied by the caller, then
+		// technically we could start processing the next request instead of
+		// returning here.
+
+		// Do not reset t.b because it may contain bytes of subsequent requests.
+		t.state = httpNormalizerReadReqLineAndHeader
+	}
+
+	return n
+}
+
+func (t *HTTPNormalizer) passthroughConfigured() bool {
+	return t.passthroughDialer != nil && t.passthroughAddress != ""
+}
+
+func (t *HTTPNormalizer) startPassthrough(tunnelError error, logFields map[string]interface{}) {
+
+	if t.passthroughLogPassthrough != nil {
+
+		clientAddr := t.Conn.RemoteAddr().String()
+		clientIP, _, _ := net.SplitHostPort(clientAddr)
+
+		t.passthroughLogPassthrough(clientIP, errors.TraceMsg(tunnelError, "passthrough"), logFields)
+	}
+
+	go passthrough(t.Conn, t.passthroughAddress, t.passthroughDialer, t.b.Bytes())
+
+	t.passthrough = true
+}
+
+func passthrough(conn net.Conn, address string, dialer func(network, address string) (net.Conn, error), buf []byte) {
+
+	// Perform the passthrough relay.
+	//
+	// Limitations:
+	//
+	// - The local TCP stack may differ from passthrough target in a
+	//   detectable way.
+	//
+	// - There may be detectable timing characteristics due to the network hop
+	//   to the passthrough target.
+	//
+	// - Application-level socket operations may produce detectable
+	//   differences (e.g., CloseWrite/FIN).
+	//
+	// - The dial to the passthrough, or other upstream network operations,
+	//   may fail. These errors are not logged.
+	//
+	// - There's no timeout on the passthrough dial and no time limit on the
+	//   passthrough relay so that the invalid client can't detect a timeout
+	//   shorter than the passthrough target; this may cause additional load.
+
+	defer conn.Close()
+
+	passthroughConn, err := dialer("tcp", address)
+	if err != nil {
+		return
+	}
+	_, err = passthroughConn.Write(buf)
+	if err != nil {
+		return
+	}
+
+	go func() {
+		_, _ = io.Copy(passthroughConn, conn)
+		passthroughConn.Close()
+	}()
+
+	_, _ = io.Copy(conn, passthroughConn)
+}
+
+func (t *HTTPNormalizer) Write(b []byte) (n int, err error) {
+	if t.passthrough {
+		return 0, ErrPassthroughActive
+	}
+	return t.Conn.Write(b)
+}
+
+func (t *HTTPNormalizer) Close() error {
+	if t.passthrough {
+		return nil
+	}
+	return t.Conn.Close()
+}
+
+func (t *HTTPNormalizer) SetDeadline(tt time.Time) error {
+	if t.passthrough {
+		return nil
+	}
+	return t.Conn.SetDeadline(tt)
+}
+
+func (t *HTTPNormalizer) SetReadDeadline(tt time.Time) error {
+	if t.passthrough {
+		return nil
+	}
+	return t.Conn.SetReadDeadline(tt)
+}
+
+func (t *HTTPNormalizer) SetWriteDeadline(tt time.Time) error {
+	if t.passthrough {
+		return nil
+	}
+	return t.Conn.SetReadDeadline(tt)
+}
+
+// Note: all config fields must be set before calling Accept.
+type HTTPNormalizerListener struct {
+	HeaderWriteOrder          []string
+	MaxReqLineAndHeadersSize  int
+	ProhibitedHeaders         []string
+	PassthroughAddress        string
+	PassthroughDialer         func(network, address string) (net.Conn, error)
+	PassthroughLogPassthrough func(clientIP string, tunnelError error, logFields map[string]interface{})
+	ValidateMeekCookie        func(clientIP string, rawCookies []byte) ([]byte, error)
+
+	net.Listener
+}
+
+func (t *HTTPNormalizerListener) Accept() (net.Conn, error) {
+	conn, err := t.Listener.Accept()
+	if err != nil {
+		// Do not wrap any err returned by Listener
+		return nil, err
+	}
+
+	normalizer := NewHTTPNormalizer(conn)
+
+	normalizer.headerWriteOrder = t.HeaderWriteOrder // for testing
+	normalizer.maxReqLineAndHeadersSize = t.MaxReqLineAndHeadersSize
+	normalizer.prohibitedHeaders = t.ProhibitedHeaders
+	normalizer.passthroughAddress = t.PassthroughAddress
+	normalizer.passthroughDialer = t.PassthroughDialer
+	normalizer.passthroughLogPassthrough = t.PassthroughLogPassthrough
+
+	if t.ValidateMeekCookie != nil {
+
+		clientIP, _, err := net.SplitHostPort(conn.RemoteAddr().String())
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		normalizer.validateMeekCookie = func(cookie []byte) ([]byte, error) {
+
+			b, err := t.ValidateMeekCookie(clientIP, cookie)
+			if err != nil {
+				return nil, errors.Trace(err)
+			}
+
+			return b, nil
+		}
+	}
+
+	return normalizer, nil
+}
+
+func WrapListenerWithHTTPNormalizer(listener net.Listener) *HTTPNormalizerListener {
+	return &HTTPNormalizerListener{
+		Listener: listener,
+	}
+}

+ 592 - 0
psiphon/common/transforms/httpNormalizer_test.go

@@ -0,0 +1,592 @@
+/*
+ * Copyright (c) 2023, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package transforms
+
+import (
+	"bytes"
+	stderrors "errors"
+	"io"
+	"net"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+)
+
+type httpNormalizerTest struct {
+	name               string
+	input              string
+	maxHeaderSize      int
+	prohibitedHeaders  []string
+	headerOrder        []string
+	chunkSize          int
+	connReadErrs       []error
+	validateMeekCookie func([]byte) ([]byte, error)
+	wantOutput         string
+	wantError          error
+}
+
+func runHTTPNormalizerTest(tt *httpNormalizerTest, useNormalizer bool) error {
+
+	conn := testConn{
+		readErrs:   tt.connReadErrs,
+		readBuffer: []byte(tt.input),
+	}
+
+	passthroughMessage := "passthrough"
+
+	passthroughConn := testConn{
+		readBuffer: []byte(passthroughMessage),
+	}
+
+	var normalizer net.Conn
+	if useNormalizer {
+		n := NewHTTPNormalizer(&conn)
+		n.maxReqLineAndHeadersSize = tt.maxHeaderSize
+		n.headerWriteOrder = tt.headerOrder
+		n.prohibitedHeaders = tt.prohibitedHeaders
+		n.validateMeekCookie = tt.validateMeekCookie
+
+		if n.validateMeekCookie != nil {
+
+			n.passthroughAddress = "127.0.0.1:0"
+			n.passthroughDialer = func(network, address string) (net.Conn, error) {
+
+				if network != "tcp" {
+					return nil, errors.Tracef("expected network tcp but got \"%s\"", network)
+				}
+
+				if address != n.passthroughAddress {
+					return nil, errors.Tracef("expected address \"%s\" but got \"%s\"", n.passthroughAddress, address)
+				}
+
+				return &passthroughConn, nil // return underlying conn
+			}
+		}
+		normalizer = n
+	} else {
+		normalizer = &conn
+	}
+	defer normalizer.Close()
+
+	remain := len(tt.wantOutput)
+	var acc []byte
+	var err error
+
+	// Write input bytes to normalizer in chunks and then check
+	// output.
+	for {
+		if remain <= 0 {
+			break
+		}
+
+		b := make([]byte, tt.chunkSize)
+
+		expectedErr := len(conn.readErrs) > 0
+
+		var n int
+		n, err = normalizer.Read(b)
+		if err != nil && !expectedErr {
+			// err checked outside loop
+			break
+		}
+
+		if n > 0 {
+			remain -= n
+			acc = append(acc, b[:n]...)
+		}
+	}
+
+	if tt.validateMeekCookie != nil && err == io.EOF {
+
+		// wait for passthrough to complete
+
+		timeout := time.After(time.Second)
+
+		for len(passthroughConn.readBuffer) != 0 || len(conn.readBuffer) != 0 {
+
+			select {
+			case <-timeout:
+				return errors.TraceNew("timed out waiting for passthrough to complete")
+			case <-time.After(10 * time.Millisecond):
+			}
+		}
+
+		// Subsequent reads should return EOF
+
+		b := make([]byte, 1)
+		_, err := normalizer.Read(b)
+		if err != io.EOF {
+			return errors.TraceNew("expected EOF")
+		}
+
+		// Subsequent writes should not impact conn or passthroughConn
+
+		_, err = normalizer.Write([]byte("ignored"))
+		if !stderrors.Is(err, ErrPassthroughActive) {
+			return errors.Tracef("expected error io.EOF but got %v", err)
+		}
+
+		if string(acc) != "" {
+			return errors.TraceNew("expected to read no bytes")
+		}
+
+		if string(passthroughConn.readBuffer) != "" {
+			return errors.TraceNew("expected read buffer to be emptied")
+		}
+
+		if string(passthroughConn.writeBuffer) != tt.wantOutput {
+			return errors.Tracef("expected \"%s\" of len %d but got \"%s\" of len %d", escapeNewlines(tt.wantOutput), len(tt.wantOutput), escapeNewlines(string(passthroughConn.writeBuffer)), len(passthroughConn.writeBuffer))
+		}
+
+		if string(conn.readBuffer) != "" {
+			return errors.TraceNew("expected read buffer to be emptied")
+		}
+
+		if string(conn.writeBuffer) != passthroughMessage {
+			return errors.Tracef("expected \"%s\" of len %d but got \"%s\" of len %d", escapeNewlines(passthroughMessage), len(passthroughMessage), escapeNewlines(string(conn.writeBuffer)), len(conn.writeBuffer))
+		}
+	}
+
+	if tt.wantError == nil {
+		if err != nil {
+			return errors.TraceMsg(err, "unexpected error")
+		}
+	} else {
+		// tt.wantError != nil
+		if err == nil {
+			return errors.Tracef("expected error %v", tt.wantError)
+		} else if !strings.Contains(err.Error(), tt.wantError.Error()) {
+			return errors.Tracef("expected error %v got %v", tt.wantError, err)
+		}
+	}
+	if tt.wantError == nil && string(acc) != tt.wantOutput {
+		return errors.Tracef("expected \"%s\" of len %d but got \"%s\" of len %d", escapeNewlines(tt.wantOutput), len(tt.wantOutput), escapeNewlines(string(acc)), len(acc))
+	}
+
+	return nil
+}
+
+func TestHTTPNormalizerHTTPRequest(t *testing.T) {
+
+	tests := []httpNormalizerTest{
+		{
+			name:       "no cookie in chunks",
+			input:      "POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+			wantOutput: "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:  1,
+		},
+		{
+			name:        "no cookie in single read",
+			input:       "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:   999,
+		},
+		{
+			name:        "no cookie, first read lands in body",
+			input:       "POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:   40, // first read goes up to and including "b"
+		},
+		{
+			name:        "no cookie with spaces",
+			input:       "POST / HTTP/1.1\r\n      Content-Length:   4  \r\n\r\nabcd",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:   1,
+		},
+		{
+			name:        "cookie and range",
+			input:       "POST / HTTP/1.1\r\nContent-Length: 4\r\n    Cookie: X\r\nRange: 1234 \r\n\r\nabcd",
+			headerOrder: []string{"Host", "Content-Length", "Cookie", "Range"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\nCookie: X\r\nRange: 1234\r\n\r\nabcd",
+			chunkSize:   1,
+		},
+		{
+			name:         "partial write and errors",
+			input:        "POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+			headerOrder:  []string{"Host", "Content-Length"},
+			wantOutput:   "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:    1,
+			connReadErrs: []error{stderrors.New("err1"), stderrors.New("err2")},
+		},
+		{
+			name:       "Content-Length missing",
+			input:      "POST / HTTP/1.1\r\n\r\nabcd",
+			wantOutput: "POST / HTTP/1.1\r\n\r\nabcd", // set to ensure all bytes are read
+			wantError:  stderrors.New("Content-Length missing"),
+			chunkSize:  1,
+		},
+		{
+			name:       "invalid Content-Length header value",
+			input:      "POST / HTTP/1.1\r\nContent-Length: X\r\n\r\nabcd",
+			wantOutput: "POST / HTTP/1.1\r\nContent-Length: X\r\nHost: example.com\r\n\r\nabcd", // set to ensure all bytes are read
+			wantError:  stderrors.New("strconv.ParseUint: parsing \"X\": invalid syntax"),
+			chunkSize:  1,
+		},
+		{
+			name:       "incorrect Content-Length header value",
+			input:      "POST / HTTP/1.1\r\nContent-Length: 3\r\n\r\nabcd",
+			wantOutput: "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 3\r\n\r\nabc",
+			chunkSize:  1,
+		},
+		{
+			name:        "single HTTP request written in a single write",
+			input:       "POST / HTTP/1.1\r\nRemoved: removed\r\nContent-Length: 4\r\n\r\nabcd",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:   999,
+		},
+		{
+			name:        "multiple HTTP requests written in a single write",
+			input:       "POST / HTTP/1.1\r\nRemoved: removed\r\nContent-Length: 4\r\n\r\nabcdPOST / HTTP/1.1\r\nContent-Length: 2\r\n\r\n12",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcdPOST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 2\r\n\r\n12",
+			chunkSize:   999,
+		},
+		{
+			name:        "multiple HTTP requests written in chunks",
+			input:       "POST / HTTP/1.1\r\nRemoved: removed\r\nContent-Length: 4\r\n\r\nabcdPOST / HTTP/1.1\r\nContent-Length: 2\r\n\r\n12",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcdPOST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 2\r\n\r\n12",
+			chunkSize:   3,
+		},
+		{
+			name:        "multiple HTTP requests first read lands in middle of last request",
+			input:       "POST / HTTP/1.1\r\nRemoved: removed\r\nContent-Length: 4\r\n\r\nabcdPOST / HTTP/1.1\r\nContent-Length: 2\r\n\r\n12POST / HTTP/1.1\r\nContent-Length: 2\r\n\r\nxyx",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcdPOST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 2\r\n\r\n12",
+			chunkSize:   109,
+		},
+		{
+			name:        "longer",
+			input:       "POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:   1,
+		},
+		{
+			name:        "shorter",
+			input:       "POST / HTTP/1.1111111111111111111\r\nContent-Length: 4\r\n\r\nabcd",
+			headerOrder: []string{"Host", "Content-Length"},
+			wantOutput:  "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:   1,
+		},
+		{
+			name:  "missing cookie",
+			input: "POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+			validateMeekCookie: func([]byte) ([]byte, error) {
+				return nil, errors.TraceNew("invalid cookie")
+			},
+			wantOutput: "POST / HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:  1,
+			wantError:  io.EOF,
+		},
+		{
+			name:  "invalid cookie",
+			input: "POST / HTTP/1.1\r\nCookie: invalid\r\nContent-Length: 4\r\n\r\nabcd",
+			validateMeekCookie: func([]byte) ([]byte, error) {
+				return nil, errors.TraceNew("invalid cookie")
+			},
+			wantOutput: "POST / HTTP/1.1\r\nCookie: invalid\r\nContent-Length: 4\r\n\r\nabcd",
+			chunkSize:  1,
+			wantError:  io.EOF,
+		},
+		{
+			name:        "valid cookie",
+			input:       "POST / HTTP/1.1\r\nHost: example.com\r\nCookie: valid\r\nContent-Length: 4\r\nRange: unused\r\nSkipped: skipped\r\n\r\nabcd",
+			headerOrder: []string{"Host", "Cookie", "Content-Length", "Range"},
+			validateMeekCookie: func([]byte) ([]byte, error) {
+				return nil, nil
+			},
+			wantOutput: "POST / HTTP/1.1\r\nHost: example.com\r\nCookie: valid\r\nContent-Length: 4\r\nRange: unused\r\n\r\nabcd",
+			chunkSize:  1,
+		},
+		{
+			name:          "exceeds max Request-Line, and headers, size",
+			input:         "POST / HTTP/1.1\r\nContent-Length: 4\r\nCookie: X\r\nRange: 1234 \r\n\r\nabcd",
+			maxHeaderSize: 47, // up to end of Cookie header
+			wantOutput:    "POST / HTTP/1.1\r\nContent-Length: 4\r\nCookie: X\r\nRange: 1234 \r\n\r\nabcd",
+			chunkSize:     1,
+			wantError:     stderrors.New("exceeds maxReqLineAndHeadersSize"),
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+
+			err := runHTTPNormalizerTest(&tt, true)
+			if err != nil {
+				t.Fatalf("runHTTPNormalizerTest failed: %v", err)
+			}
+		})
+	}
+}
+
+// Caveats:
+//   - Does not test or handle mutiple requests in a single connection
+//   - Does not test the scenario where the first request in a connection
+//     passes validation and then a subsequent request fails which triggers
+//     a passthrough - in this scenario both the listener and passthrough
+//     listener will receive bytes.
+func TestHTTPNormalizerHTTPServer(t *testing.T) {
+
+	type test struct {
+		name              string
+		request           string
+		maxHeaderSize     int
+		prohibitedHeaders []string
+		wantPassthrough   bool
+		wantRecv          string
+	}
+
+	tests := []test{
+		{
+			name:     "valid cookie",
+			request:  "POST / HTTP/1.1\r\nCookie: valid\r\nContent-Length: 4\r\n\r\nabcd",
+			wantRecv: "POST / HTTP/1.1\r\nHost: example.com\r\nCookie: valid\r\nContent-Length: 4\r\n\r\nabcd",
+		},
+		{
+			name:            "missing cookie",
+			request:         "POST HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+			wantPassthrough: true,
+			wantRecv:        "POST HTTP/1.1\r\nContent-Length: 4\r\n\r\nabcd",
+		},
+		{
+			name:            "invalid cookie",
+			request:         "POST HTTP/1.1\r\nCookie: invalid\r\nContent-Length: 4\r\n\r\nabcd",
+			wantPassthrough: true,
+			wantRecv:        "POST HTTP/1.1\r\nCookie: invalid\r\nContent-Length: 4\r\n\r\nabcd",
+		},
+		{
+			name:              "valid cookie with prohibited headers",
+			request:           "POST / HTTP/1.1\r\nCookie: valid\r\nProhibited: prohibited\r\nContent-Length: 4\r\n\r\nabcd",
+			prohibitedHeaders: []string{"Prohibited"},
+			wantPassthrough:   true,
+			wantRecv:          "POST / HTTP/1.1\r\nCookie: valid\r\nProhibited: prohibited\r\nContent-Length: 4\r\n\r\nabcd",
+		},
+		{
+			name:            "valid cookie but exceeds max header size",
+			request:         "POST / HTTP/1.1\r\nCookie: valid\r\nContent-Length: 4\r\n\r\nabcd",
+			wantPassthrough: true,
+			maxHeaderSize:   32, // up to end of Cookie header
+			wantRecv:        "POST / HTTP/1.1\r\nCookie: valid\r\nContent-Length: 4\r\n\r\nabcd",
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+
+			listener, err := net.Listen("tcp", "127.0.0.1:0")
+			if err != nil {
+				t.Fatalf("net.Listen failed %v", err)
+			}
+			defer listener.Close()
+
+			passthrough, err := net.Listen("tcp", "127.0.0.1:0")
+			if err != nil {
+				t.Fatalf("net.Listen failed %v", err)
+			}
+			defer passthrough.Close()
+
+			listener = WrapListenerWithHTTPNormalizer(listener)
+			normalizer := listener.(*HTTPNormalizerListener)
+			normalizer.PassthroughAddress = passthrough.Addr().String()
+			normalizer.PassthroughDialer = net.Dial
+			normalizer.MaxReqLineAndHeadersSize = tt.maxHeaderSize
+			normalizer.ProhibitedHeaders = tt.prohibitedHeaders
+			normalizer.PassthroughLogPassthrough = func(clientIP string, tunnelError error, logFields map[string]interface{}) {}
+
+			validateMeekCookieResult := "payload"
+			normalizer.ValidateMeekCookie = func(clientIP string, cookie []byte) ([]byte, error) {
+				if string(cookie) == "valid" {
+					return []byte(validateMeekCookieResult), nil
+				}
+				return nil, stderrors.New("invalid cookie")
+			}
+			normalizer.HeaderWriteOrder = []string{"Host", "Cookie", "Content-Length"}
+
+			type listenerState struct {
+				lType                    string // listener type, "listener" or "passthrough"
+				err                      error
+				recv                     []byte
+				validateMeekCookieResult []byte // set if listener is "passthrough"
+			}
+
+			runListener := func(listener net.Listener, listenerType string, recv chan *listenerState) {
+
+				conn, err := listener.Accept()
+				if err != nil {
+					recv <- &listenerState{
+						lType: listenerType,
+						err:   errors.TraceMsg(err, "listener.Accept failed"),
+					}
+					return
+				}
+
+				defer conn.Close()
+
+				b := make([]byte, len(tt.wantRecv))
+
+				// A single Read should be sufficient because multiple requests
+				// in a single connection are not supported by this test.
+				n, err := conn.Read(b)
+				if err != nil {
+					recv <- &listenerState{
+						lType: listenerType,
+						err:   errors.TraceMsg(err, "conn.Read failed"),
+					}
+					return
+				}
+				b = b[:n]
+
+				var validateMeekCookieResult []byte
+				if n, ok := conn.(*HTTPNormalizer); ok {
+					validateMeekCookieResult = n.ValidateMeekCookieResult
+				}
+
+				_, err = conn.Write([]byte(listenerType))
+				if err != nil {
+					if stderrors.Is(err, ErrPassthroughActive) {
+						return
+					}
+					recv <- &listenerState{
+						lType:                    listenerType,
+						err:                      errors.TraceMsg(err, "conn.Write failed"),
+						validateMeekCookieResult: validateMeekCookieResult,
+					}
+					return
+				}
+
+				recv <- &listenerState{
+					lType:                    listenerType,
+					recv:                     b,
+					err:                      nil,
+					validateMeekCookieResult: validateMeekCookieResult,
+				}
+			}
+
+			recv := make(chan *listenerState)
+
+			listenerType := "listener"
+			passthroughType := "passthrough"
+
+			go runListener(listener, listenerType, recv)
+			go runListener(passthrough, passthroughType, recv)
+
+			conn, err := net.Dial("tcp", listener.Addr().String())
+			if err != nil {
+				t.Fatalf("net.Dial failed %v", err)
+			}
+			defer conn.Close()
+
+			n, err := conn.Write([]byte(tt.request))
+			if err != nil {
+				t.Fatalf("conn.Write failed %v", err)
+			}
+			if n != len(tt.request) {
+				t.Fatalf("expected to write %d bytes but wrote %d", len(tt.request), n)
+			}
+
+			// read response
+
+			b := make([]byte, 512)
+			n, err = conn.Read(b)
+			if err != nil {
+				t.Fatalf("conn.Read failed %v", err)
+			}
+			b = b[:n]
+
+			if tt.wantPassthrough && string(b) != passthroughType {
+				t.Fatalf("expected passthrough but got response from listener")
+			} else if !tt.wantPassthrough && string(b) != listenerType {
+				t.Fatalf("expected no passthrough but got response from passthrough")
+			}
+
+			r := <-recv
+
+			if r.err != nil {
+				t.Fatalf("listener failed %v", r.err)
+			}
+
+			if !bytes.Equal(r.recv, []byte(tt.wantRecv)) {
+				t.Fatalf("expected \"%s\" of len %d but got \"%s\" of len %d", escapeNewlines(string(tt.wantRecv)), len(tt.wantRecv), escapeNewlines(string(r.recv)), len(r.recv))
+			}
+
+			if r.lType != "passthrough" && string(r.validateMeekCookieResult) != validateMeekCookieResult {
+
+				t.Fatalf("expected validateMeekCookieResult value \"%s\" but got \"%s\"", validateMeekCookieResult, string(r.validateMeekCookieResult))
+			}
+
+			// Check that other listener did not get a connection
+
+			n, err = conn.Read(b)
+			if err != nil && err != io.EOF {
+				t.Fatalf("conn.Read failed %v", err)
+			}
+			if n != 0 {
+				t.Fatalf("expected to read 0 bytes")
+			}
+
+			select {
+			case r := <-recv:
+				t.Fatalf("unexpected response from %s: %v \"%s\"", r.lType, r.err, string(r.recv))
+			case <-time.After(10 * time.Millisecond):
+			}
+		})
+	}
+}
+
+func BenchmarkHTTPNormalizer(b *testing.B) {
+
+	inReq := "POST / HTTP/1.1\r\nContent-Length: 400\r\n\r\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+	outReq := "POST / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 400\r\n\r\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+
+	input := ""
+	output := ""
+
+	// Concatenate many requests to simulate a single connection running over
+	// the normalizer.
+	for i := 0; i < 100; i++ {
+		input += inReq
+		output += outReq
+	}
+
+	// TODO: test different chunk sizes
+	test := &httpNormalizerTest{
+		name:       "no cookie in chunks",
+		input:      input,
+		wantOutput: output,
+		chunkSize:  1,
+	}
+
+	for n := 0; n < b.N; n++ {
+
+		// TODO: does test setup and teardown in runHTTPNormalizerTest skew
+		// the benchmark
+		err := runHTTPNormalizerTest(test, true)
+		if err != nil {
+			b.Fatalf("runHTTPNormalizerTest failed: %v", err)
+		}
+	}
+}

+ 59 - 53
psiphon/common/transforms/httpTransformer.go

@@ -51,11 +51,12 @@ type HTTPTransformerParameters struct {
 }
 
 const (
-	// httpTransformerReadWriteHeader HTTPTransformer is waiting to finish
-	// reading and writing the next HTTP request header.
-	httpTransformerReadWriteHeader = 0
-	// httpTransformerReadWriteBody HTTPTransformer is waiting to finish reading
-	// and writing the current HTTP request body.
+	// httpTransformerReadWriteReqLineAndHeaders HTTPTransformer is waiting to
+	// finish reading and writing the Request-Line, and headers, of the next
+	// request.
+	httpTransformerReadWriteReqLineAndHeaders = 0
+	// httpTransformerReadWriteBody HTTPTransformer is waiting to finish
+	// reading, and writing, the current request body.
 	httpTransformerReadWriteBody = 1
 )
 
@@ -72,15 +73,16 @@ type HTTPTransformer struct {
 	seed      *prng.Seed
 
 	// state is the HTTPTransformer state. Possible values are
-	// httpTransformerReadWriteHeader and httpTransformerReadWriteBody.
+	// httpTransformerReadWriteReqLineAndHeaders and
+	// httpTransformerReadWriteBody.
 	state int64
-	// b is used to buffer the accumulated bytes of the current HTTP request
-	// header until the entire header is received and written.
+	// b is used to buffer the accumulated bytes of the current request until
+	// the Request-Line and all headers are received and written.
 	b bytes.Buffer
-	// remain is the number of remaining HTTP request bytes to write to the
-	// underlying net.Conn. Set to the value of Content-Length (HTTP request
-	// body bytes) plus the length of the transformed HTTP header once the
-	// current request header is received.
+	// remain is the number of remaining request bytes to write to the
+	// underlying net.Conn. Set to the value of Content-Length (request body
+	// bytes) plus the length of the transformed Request-Line, and headers,
+	// once the Request-Line, and headers, of the current request are received.
 	remain uint64
 
 	net.Conn
@@ -102,25 +104,28 @@ type HTTPTransformer struct {
 // in a single Write(). Must be called synchronously.
 func (t *HTTPTransformer) Write(b []byte) (int, error) {
 
-	if t.state == httpTransformerReadWriteHeader {
+	if t.state == httpTransformerReadWriteReqLineAndHeaders {
 
-		// Do not need to check return value https://github.com/golang/go/blob/1e9ff255a130200fcc4ec5e911d28181fce947d5/src/bytes/buffer.go#L164
+		// Do not need to check return value. Applies to all subsequent
+		// calls to t.b.Write() and this comment will not be repeated for
+		// each. See https://github.com/golang/go/blob/1e9ff255a130200fcc4ec5e911d28181fce947d5/src/bytes/buffer.go#L164.
 		t.b.Write(b)
 
-		// Wait until the entire HTTP request header has been read. Must check
-		// all accumulated bytes incase the "\r\n\r\n" separator is written over
-		// multiple Write() calls; from reading the go1.19.5 net/http code the
-		// entire HTTP request is written in a single Write() call.
+		// Wait until the Request-Line, and all headers, have been read. Must
+		// check all accumulated bytes incase the "\r\n\r\n" separator is
+		// written over multiple Write() calls; from reading the go1.19.5
+		// net/http code the entire HTTP request is written in a single Write()
+		// call.
 
 		sep := []byte("\r\n\r\n")
 
-		headerBodyLines := bytes.SplitN(t.b.Bytes(), sep, 2) // split header and body
+		headerBodyLines := bytes.SplitN(t.b.Bytes(), sep, 2) // split Request-Line, and headers, from body
 
 		if len(headerBodyLines) <= 1 {
-			// b buffered in t.b and the entire HTTP request header has not been
-			// recieved so another Write() call is expected.
+			// b buffered in t.b and the Request-Line, and all headers, have not
+			// been recieved so another Write() call is expected.
 			return len(b), nil
-		} // else: HTTP request header has been read
+		} // else: Request-Line, and all headers, have been read
 
 		// read Content-Length before applying transform
 
@@ -128,7 +133,7 @@ func (t *HTTPTransformer) Write(b []byte) (int, error) {
 
 		lines := bytes.Split(headerBodyLines[0], []byte("\r\n"))
 		if len(lines) > 1 {
-			// skip request line, e.g. "GET /foo HTTP/1.1"
+			// skip Request-Line, e.g. "GET /foo HTTP/1.1"
 			headerLines = lines[1:]
 		}
 
@@ -145,7 +150,7 @@ func (t *HTTPTransformer) Write(b []byte) (int, error) {
 		}
 		if len(cl) == 0 {
 			// Irrecoverable error because either Content-Length header
-			// missing, or Content-Length header value is empty, e.g.
+			// is missing, or Content-Length header value is empty, e.g.
 			// "Content-Length: ", and request body length cannot be
 			// determined.
 			return len(b), errors.TraceNew("Content-Length missing")
@@ -160,13 +165,13 @@ func (t *HTTPTransformer) Write(b []byte) (int, error) {
 
 		t.remain = contentLength
 
-		// transform and write header
+		// transform, and write, Request-Line and headers.
 
-		headerLen := len(headerBodyLines[0]) + len(sep)
-		header := t.b.Bytes()[:headerLen]
+		reqLineAndHeadersLen := len(headerBodyLines[0]) + len(sep)
+		reqLineAndHeaders := t.b.Bytes()[:reqLineAndHeadersLen]
 
 		if t.transform != nil {
-			newHeader, err := t.transform.Apply(t.seed, header)
+			newReqLineAndHeaders, err := t.transform.Apply(t.seed, reqLineAndHeaders)
 			if err != nil {
 				// TODO: consider logging an error and skiping transform
 				// instead of returning an error, if the transform is broken
@@ -174,42 +179,42 @@ func (t *HTTPTransformer) Write(b []byte) (int, error) {
 				return len(b), errors.Trace(err)
 			}
 
-			// only allocate new slice if header length changed
-			if len(newHeader) == len(header) {
+			// perf: only allocate new slice if length changed, otherwise the
+			// transformed data can be copied directly over the original in t.b.
+			if len(newReqLineAndHeaders) == len(reqLineAndHeaders) {
 				// Do not need to check return value. It is guaranteed that
-				// n == len(newHeader) because t.b.Len() >= n if the header
-				// size has not changed.
-				copy(t.b.Bytes()[:headerLen], newHeader)
+				// n == len(newReqLineAndHeaders) because t.b.Len() >= n if the
+				// transformed data is the same size as the original data.
+				copy(t.b.Bytes()[:reqLineAndHeadersLen], newReqLineAndHeaders)
 			} else {
 
 				// Copy any request body bytes received before resetting the
 				// buffer.
 				var reqBody []byte
-				reqBodyLen := t.b.Len() - headerLen // number of request body bytes received
+				reqBodyLen := t.b.Len() - reqLineAndHeadersLen // number of request body bytes received
 				if reqBodyLen > 0 {
 					reqBody = make([]byte, reqBodyLen)
-					copy(reqBody, t.b.Bytes()[headerLen:])
+					copy(reqBody, t.b.Bytes()[reqLineAndHeadersLen:])
 				}
 
-				// Reset the buffer and write transformed header and any
-				// request body bytes received into it.
+				// Reset the buffer and write transformed Request-Line, and
+				// headers, and any request body bytes received into it.
 				t.b.Reset()
-				// Do not need to check return value of bytes.Buffer.Write() https://github.com/golang/go/blob/1e9ff255a130200fcc4ec5e911d28181fce947d5/src/bytes/buffer.go#L164
-				t.b.Write(newHeader)
+				t.b.Write(newReqLineAndHeaders)
 				if len(reqBody) > 0 {
 					t.b.Write(reqBody)
 				}
 			}
 
-			header = newHeader
+			reqLineAndHeaders = newReqLineAndHeaders
 		}
 
-		if math.MaxUint64-t.remain < uint64(len(header)) {
+		if math.MaxUint64-t.remain < uint64(len(reqLineAndHeaders)) {
 			// Irrecoverable error because request is malformed:
-			// Content-Length + len(header) > math.MaxUint64.
+			// Content-Length + len(reqLineAndHeaders) > math.MaxUint64.
 			return len(b), errors.TraceNew("t.remain + uint64(len(header)) overflows")
 		}
-		t.remain += uint64(len(header))
+		t.remain += uint64(len(reqLineAndHeaders))
 
 		if uint64(t.b.Len()) > t.remain {
 			// Should never happen, multiple requests written in a single
@@ -228,21 +233,22 @@ func (t *HTTPTransformer) Write(b []byte) (int, error) {
 		return len(b), err
 	}
 
-	// HTTP request header has been transformed. Write any remaining bytes of
-	// HTTP request header and then write HTTP request body.
+	// Request-Line, and headers, have been transformed. Write any remaining
+	// bytes of these and then write request body.
 
 	// Must write buffered bytes first, in-order, to write bytes to underlying
 	// net.Conn in the same order they were received in.
 	//
 	// Already checked that t.b does not contain bytes of a subsequent HTTP
-	// request when the header is parsed, i.e. at this point it is guaranteed
-	// that t.b.Len() <= t.remain.
+	// request when the Request-Line, and headers, are parsed, i.e. at this
+	// point it is guaranteed that t.b.Len() <= t.remain.
 	//
 	// In practise the buffer will be empty by this point because its entire
 	// contents will have been written in the first call to t.b.WriteTo(t.Conn)
-	// when the header is received, parsed, and transformed; otherwise the
-	// underlying transport will have failed and the caller will not invoke
-	// Write() again on this instance. See HTTPTransformer.Write() comment.
+	// when the Request-Line, and headers, are received, parsed, and
+	// transformed; otherwise the underlying transport will have failed and the
+	// caller will not invoke Write() again on this instance. See
+	// HTTPTransformer.Write() comment.
 	wrote, err := t.b.WriteTo(t.Conn)
 	t.remain -= uint64(wrote)
 	if err != nil {
@@ -260,9 +266,9 @@ func (t *HTTPTransformer) Write(b []byte) (int, error) {
 	t.remain -= uint64(n)
 
 	if t.remain <= 0 {
-		// Entire request, header and body, has been written. Return to
-		// waiting for next HTTP request header to arrive.
-		t.state = httpTransformerReadWriteHeader
+		// Entire request has been written. Return to waiting for next HTTP
+		// request to arrive.
+		t.state = httpTransformerReadWriteReqLineAndHeaders
 		t.remain = 0
 	}
 

+ 42 - 10
psiphon/common/transforms/httpTransformer_test.go

@@ -271,8 +271,8 @@ func TestHTTPTransformerHTTPRequest(t *testing.T) {
 				if err != nil {
 					t.Fatalf("unexpected error %v", err)
 				}
-				if string(conn.b) != tt.wantOutput {
-					t.Fatalf("expected \"%s\" of len %d but got \"%s\" of len %d", escapeNewlines(tt.wantOutput), len(tt.wantOutput), escapeNewlines(string(conn.b)), len(conn.b))
+				if string(conn.writeBuffer) != tt.wantOutput {
+					t.Fatalf("expected \"%s\" of len %d but got \"%s\" of len %d", escapeNewlines(tt.wantOutput), len(tt.wantOutput), escapeNewlines(string(conn.writeBuffer)), len(conn.writeBuffer))
 				}
 			} else {
 				// tt.wantError != nil
@@ -461,8 +461,10 @@ func escapeNewlines(s string) string {
 }
 
 type testConn struct {
-	// b is the accumulated bytes from Write() calls.
-	b []byte
+	// writeBuffer are the accumulated bytes from Write() calls.
+	writeBuffer []byte
+	// readBuffer are the bytes to return from Read() calls.
+	readBuffer []byte
 	// writeLimit is the max number of bytes that will be written in a Write()
 	// call.
 	writeLimit int
@@ -473,12 +475,38 @@ type testConn struct {
 	// writeErrs are returned from Write() calls in order. If empty, then a nil
 	// error is returned.
 	writeErrs []error
+	// readErrs are returned from Read() calls in order. If empty, then a nil
+	// error is returned.
+	readErrs []error
 
 	net.Conn
 }
 
 func (c *testConn) Read(b []byte) (n int, err error) {
-	return c.Conn.Read(b)
+
+	if len(c.readErrs) > 0 {
+		err = c.readErrs[0]
+		c.readErrs = c.readErrs[1:]
+	}
+
+	// If Conn set, then read from it directly and do not use readBuffer.
+	if c.Conn != nil {
+		return c.Conn.Read(b)
+	}
+
+	if len(c.readBuffer) == 0 {
+		n = 0
+		return
+	}
+
+	n = copy(b, c.readBuffer)
+	if n == len(c.readBuffer) {
+		c.readBuffer = nil
+	} else {
+		c.readBuffer = c.readBuffer[n:]
+	}
+
+	return
 }
 
 func (c *testConn) Write(b []byte) (n int, err error) {
@@ -492,16 +520,16 @@ func (c *testConn) Write(b []byte) (n int, err error) {
 		n = c.writeLens[0]
 		c.writeLens = c.writeLens[1:]
 		if len(b) <= n {
-			c.b = append(c.b, b...)
+			c.writeBuffer = append(c.writeBuffer, b...)
 			n = len(b)
 		} else {
-			c.b = append(c.b, b[:n]...)
+			c.writeBuffer = append(c.writeBuffer, b[:n]...)
 		}
 	} else if c.writeLimit != 0 && c.writeLimit < len(b) {
-		c.b = append(c.b, b[:c.writeLimit]...)
+		c.writeBuffer = append(c.writeBuffer, b[:c.writeLimit]...)
 		n = c.writeLimit
 	} else {
-		c.b = append(c.b, b...)
+		c.writeBuffer = append(c.writeBuffer, b...)
 		n = len(b)
 	}
 
@@ -514,7 +542,11 @@ func (c *testConn) Write(b []byte) (n int, err error) {
 }
 
 func (c *testConn) Close() error {
-	return c.Conn.Close()
+	if c.Conn != nil {
+		return c.Conn.Close()
+	}
+
+	return nil
 }
 
 func (c *testConn) LocalAddr() net.Addr {

+ 6 - 3
psiphon/server/config.go

@@ -163,12 +163,13 @@ type Config struct {
 	TunnelProtocolPorts map[string]int
 
 	// TunnelProtocolPassthroughAddresses specifies passthrough addresses to be
-	// used for tunnel protocols configured in  TunnelProtocolPorts. Passthrough
+	// used for tunnel protocols configured in TunnelProtocolPorts. Passthrough
 	// is a probing defense which relays all network traffic between a client and
 	// the passthrough target when the client fails anti-probing tests.
 	//
 	// TunnelProtocolPassthroughAddresses is supported for:
-	// "UNFRONTED-MEEK-HTTPS-OSSH", "UNFRONTED-MEEK-SESSION-TICKET-OSSH".
+	// "UNFRONTED-MEEK-HTTPS-OSSH", "UNFRONTED-MEEK-SESSION-TICKET-OSSH",
+	// "UNFRONTED-MEEK-OSSH".
 	TunnelProtocolPassthroughAddresses map[string]string
 
 	// LegacyPassthrough indicates whether to expect legacy passthrough messages
@@ -1046,7 +1047,9 @@ func GenerateConfig(params *GenerateConfigParams) ([]byte, []byte, []byte, []byt
 
 		capability := protocol.GetCapability(tunnelProtocol)
 
-		if params.Passthrough && protocol.TunnelProtocolSupportsPassthrough(tunnelProtocol) {
+		// Note: do not add passthrough annotation if HTTP unfronted meek
+		// because it would result in an invalid capability.
+		if params.Passthrough && protocol.TunnelProtocolSupportsPassthrough(tunnelProtocol) && tunnelProtocol != protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK {
 			if !params.LegacyPassthrough {
 				capability += "-PASSTHROUGH-v2"
 			} else {

+ 115 - 4
psiphon/server/meek.go

@@ -46,6 +46,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
 	tris "github.com/Psiphon-Labs/tls-tris"
 	lrucache "github.com/cognusion/go-cache-lru"
@@ -127,6 +128,7 @@ type MeekServer struct {
 	rateLimitHistory                *lrucache.Cache
 	rateLimitCount                  int
 	rateLimitSignalGC               chan struct{}
+	normalizer                      *transforms.HTTPNormalizerListener
 }
 
 // NewMeekServer initializes a new meek server.
@@ -135,7 +137,7 @@ func NewMeekServer(
 	listener net.Listener,
 	listenerTunnelProtocol string,
 	listenerPort int,
-	useTLS, isFronted, useObfuscatedSessionTickets bool,
+	useTLS, isFronted, useObfuscatedSessionTickets, useHTTPNormalizer bool,
 	clientHandler func(clientTunnelProtocol string, clientConn net.Conn),
 	stopBroadcast <-chan struct{}) (*MeekServer, error) {
 
@@ -238,6 +240,13 @@ func NewMeekServer(
 		meekServer.tlsConfig = tlsConfig
 	}
 
+	if useHTTPNormalizer && protocol.TunnelProtocolUsesMeekHTTPNormalizer(listenerTunnelProtocol) {
+
+		normalizer := meekServer.makeMeekHTTPNormalizerListener()
+		meekServer.normalizer = normalizer
+		meekServer.listener = normalizer
+	}
+
 	return meekServer, nil
 }
 
@@ -749,9 +758,22 @@ func (server *MeekServer) getSessionOrEndpoint(
 	// bytes -- assuming that MEEK_MAX_SESSION_ID_LENGTH is too short to be a
 	// valid meek cookie.
 
-	payloadJSON, err := server.getMeekCookiePayload(clientIP, meekCookie.Value)
-	if err != nil {
-		return "", nil, nil, "", nil, errors.Trace(err)
+	var payloadJSON []byte
+
+	if server.normalizer != nil {
+
+		// NOTE: operates on the assumption that the normalizer is not wrapped
+		// with a further conn.
+		underlyingConn := request.Context().Value(meekNetConnContextKey).(net.Conn)
+		normalizedConn := underlyingConn.(*transforms.HTTPNormalizer)
+		payloadJSON = normalizedConn.ValidateMeekCookieResult
+
+	} else {
+
+		payloadJSON, err = server.getMeekCookiePayload(clientIP, meekCookie.Value)
+		if err != nil {
+			return "", nil, nil, "", nil, errors.Trace(err)
+		}
 	}
 
 	// Note: this meek server ignores legacy values PsiphonClientSessionId
@@ -1323,6 +1345,95 @@ func (server *MeekServer) makeMeekTLSConfig(
 	return config, nil
 }
 
+// makeMeekHTTPNormalizerListener returns the meek server listener wrapped in
+// an HTTP normalizer.
+func (server *MeekServer) makeMeekHTTPNormalizerListener() *transforms.HTTPNormalizerListener {
+
+	normalizer := transforms.WrapListenerWithHTTPNormalizer(server.listener)
+
+	normalizer.ProhibitedHeaders = server.support.Config.MeekProhibitedHeaders
+
+	normalizer.MaxReqLineAndHeadersSize = 8192 // max number of header bytes common web servers will read before returning an error
+
+	if server.passthroughAddress != "" {
+		normalizer.PassthroughAddress = server.passthroughAddress
+		normalizer.PassthroughDialer = net.Dial
+	}
+	normalizer.PassthroughLogPassthrough = func(
+		clientIP string, tunnelError error, logFields map[string]interface{}) {
+
+		logIrregularTunnel(
+			server.support,
+			server.listenerTunnelProtocol,
+			server.listenerPort,
+			clientIP,
+			errors.Trace(tunnelError),
+			logFields)
+	}
+
+	// ValidateMeekCookie is invoked by the normalizer with the value of the
+	// cookie header (if present), before ServeHTTP gets the request and calls
+	// getSessionOrEndpoint; and then any valid meek cookie payload, or meek
+	// session ID, extracted in this callback is stored to be fetched by
+	// getSessionOrEndpoint.
+	// Note: if there are multiple cookie headers, even though prohibited by
+	// rfc6265, then ValidateMeekCookie will only be invoked once with the
+	// first one received.
+	normalizer.ValidateMeekCookie = func(clientIP string, rawCookies []byte) ([]byte, error) {
+
+		// Parse cookie.
+
+		if len(rawCookies) == 0 {
+			return nil, errors.TraceNew("no cookies")
+		}
+
+		// TODO/perf: readCookies in net/http is not exported, use a local
+		// implementation which does not require allocating an http.header
+		// each time.
+		request := http.Request{
+			Header: http.Header{
+				"Cookie": []string{string(rawCookies)},
+			},
+		}
+		cookies := request.Cookies()
+		if len(rawCookies) == 0 {
+			return nil, errors.Tracef("invalid cookies: %s", string(rawCookies))
+		}
+
+		// Use value of the first cookie.
+		meekCookieValue := cookies[0].Value
+
+		// Check for an existing session.
+
+		server.sessionsLock.RLock()
+		existingSessionID := meekCookieValue
+		_, ok := server.sessions[existingSessionID]
+		server.sessionsLock.RUnlock()
+		if ok {
+			// The cookie is a session ID for an active (not expired) session.
+			// Return it and then it will be stored and later fetched by
+			// getSessionOrEndpoint where it will be mapped to the existing
+			// session.
+			// Note: it's possible for the session to expire between this check
+			// and when getSessionOrEndpoint looks up the session.
+			return rawCookies, nil
+		}
+
+		// The session is new (or expired). Treat the cookie value as a new
+		// meek cookie, extract the payload, and return it; and then it will be
+		// stored and later fetched by getSessionOrEndpoint.
+
+		payloadJSON, err := server.getMeekCookiePayload(clientIP, meekCookieValue)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		return payloadJSON, nil
+	}
+
+	return normalizer
+}
+
 type meekSession struct {
 	// Note: 64-bit ints used with atomic operations are placed
 	// at the start of struct to ensure 64-bit alignment.

+ 31 - 2
psiphon/server/meek_test.go

@@ -42,6 +42,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
 	"golang.org/x/crypto/nacl/box"
 )
 
@@ -173,6 +174,26 @@ func TestCachedResponse(t *testing.T) {
 }
 
 func TestMeekResiliency(t *testing.T) {
+	testMeekResiliency(t, nil, false)
+}
+
+func TestMeekHTTPNormalizerResiliency(t *testing.T) {
+
+	seed, err := prng.NewSeed()
+	if err != nil {
+		t.Fatalf("prng.NewSeed failed %v", err)
+	}
+
+	spec := &transforms.HTTPTransformerParameters{
+		ProtocolTransformName: "spec1",
+		ProtocolTransformSpec: transforms.Spec{{"Host: example.com\r\n", ""}},
+		ProtocolTransformSeed: seed,
+	}
+
+	testMeekResiliency(t, spec, true)
+}
+
+func testMeekResiliency(t *testing.T, spec *transforms.HTTPTransformerParameters, useHTTPNormalizer bool) {
 
 	upstreamData := make([]byte, 5*MB)
 	_, _ = rand.Read(upstreamData)
@@ -246,6 +267,10 @@ func TestMeekResiliency(t *testing.T) {
 		Config: &Config{
 			MeekObfuscatedKey:              meekObfuscatedKey,
 			MeekCookieEncryptionPrivateKey: meekCookieEncryptionPrivateKey,
+			TunnelProtocolPorts: map[string]int{
+				protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK: 0,
+			},
+			runningProtocols: []string{protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK},
 		},
 		TrafficRulesSet: &TrafficRulesSet{},
 	}
@@ -287,11 +312,12 @@ func TestMeekResiliency(t *testing.T) {
 	server, err := NewMeekServer(
 		mockSupport,
 		listener,
-		"",
+		protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
 		0,
 		useTLS,
 		isFronted,
 		useObfuscatedSessionTickets,
+		useHTTPNormalizer,
 		clientHandler,
 		stopBroadcast)
 	if err != nil {
@@ -332,7 +358,6 @@ func TestMeekResiliency(t *testing.T) {
 	if err != nil {
 		t.Fatalf("prng.NewSeed failed: %s", err)
 	}
-
 	meekConfig := &psiphon.MeekConfig{
 		Parameters:                    params,
 		DialAddress:                   serverAddress,
@@ -342,6 +367,8 @@ func TestMeekResiliency(t *testing.T) {
 		MeekCookieEncryptionPublicKey: meekCookieEncryptionPublicKey,
 		MeekObfuscatedKey:             meekObfuscatedKey,
 		MeekObfuscatorPaddingSeed:     meekObfuscatorPaddingSeed,
+		ClientTunnelProtocol:          protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
+		HTTPTransformerParameters:     spec,
 	}
 
 	ctx, cancelFunc := context.WithTimeout(
@@ -545,6 +572,7 @@ func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider, missing
 	useTLS := false
 	isFronted := false
 	useObfuscatedSessionTickets := false
+	useHTTPNormalizer := false
 
 	server, err := NewMeekServer(
 		mockSupport,
@@ -554,6 +582,7 @@ func runTestMeekAccessControl(t *testing.T, rateLimit, restrictProvider, missing
 		useTLS,
 		isFronted,
 		useObfuscatedSessionTickets,
+		useHTTPNormalizer,
 		func(_ string, conn net.Conn) {
 			go func() {
 				for {

+ 1 - 0
psiphon/server/tunnelServer.go

@@ -531,6 +531,7 @@ func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError
 			protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol),
 			protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
 			protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
+			true,
 			handleClient,
 			sshServer.shutdownBroadcast)