Explorar o código

Add in-proxy WebRTC media stream mode

Rod Hynes hai 1 ano
pai
achega
a5f3343a8e
Modificáronse 35 ficheiros con 1373 adicións e 779 borrados
  1. 18 0
      psiphon/common/errors/errors.go
  2. 149 77
      psiphon/common/inproxy/api.go
  3. 49 22
      psiphon/common/inproxy/broker.go
  4. 18 13
      psiphon/common/inproxy/client.go
  5. 9 5
      psiphon/common/inproxy/coordinator.go
  6. 71 13
      psiphon/common/inproxy/coordinator_test.go
  7. 0 1
      psiphon/common/inproxy/discovery.go
  8. 6 3
      psiphon/common/inproxy/discoverySTUN.go
  9. 8 5
      psiphon/common/inproxy/inproxy_disabled.go
  10. 122 58
      psiphon/common/inproxy/inproxy_test.go
  11. 38 14
      psiphon/common/inproxy/matcher.go
  12. 105 26
      psiphon/common/inproxy/matcher_test.go
  13. 1 1
      psiphon/common/inproxy/portmapper_other.go
  14. 25 19
      psiphon/common/inproxy/proxy.go
  15. 5 5
      psiphon/common/inproxy/session_test.go
  16. 433 429
      psiphon/common/inproxy/webrtc.go
  17. 4 4
      psiphon/common/parameters/inproxy.go
  18. 24 16
      psiphon/common/parameters/parameters.go
  19. 2 2
      psiphon/common/parameters/parameters_test.go
  20. 2 1
      psiphon/common/protocol/packed.go
  21. 1 0
      psiphon/common/quic/obfuscator_test.go
  22. 45 16
      psiphon/common/quic/quic.go
  23. 4 0
      psiphon/common/quic/quic_disabled.go
  24. 2 0
      psiphon/common/quic/quic_test.go
  25. 50 20
      psiphon/config.go
  26. 11 2
      psiphon/controller.go
  27. 26 0
      psiphon/dialParameters.go
  28. 43 20
      psiphon/inproxy.go
  29. 1 1
      psiphon/net.go
  30. 5 1
      psiphon/net_darwin.go
  31. 1 0
      psiphon/server/api.go
  32. 13 1
      psiphon/server/meek.go
  33. 51 3
      psiphon/server/server_test.go
  34. 30 1
      psiphon/server/tunnelServer.go
  35. 1 0
      psiphon/tunnel.go

+ 18 - 0
psiphon/common/errors/errors.go

@@ -25,6 +25,7 @@ package errors
 
 import (
 	"fmt"
+	"io"
 	"runtime"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
@@ -72,6 +73,23 @@ func Trace(err error) error {
 	return fmt.Errorf("%s#%d: %w", stacktrace.GetFunctionName(pc), line, err)
 }
 
+// TraceReader wraps the given error with the caller stack frame information,
+// except in the case of io.EOF, which is returned unwrapped. This is used to
+// preserve io.Reader.Read io.EOF error returns.
+func TraceReader(err error) error {
+	if err == nil {
+		return nil
+	}
+	if err == io.EOF {
+		return io.EOF
+	}
+	pc, _, line, ok := runtime.Caller(1)
+	if !ok {
+		return fmt.Errorf("[unknown]: %w", err)
+	}
+	return fmt.Errorf("%s#%d: %w", stacktrace.GetFunctionName(pc), line, err)
+}
+
 // TraceMsg wraps the given error with the caller stack frame information
 // and the given message.
 func TraceMsg(err error, message string) error {

+ 149 - 77
psiphon/common/inproxy/api.go

@@ -31,19 +31,81 @@ import (
 
 const (
 
-	// ProxyProtocolVersion1 represents protocol version 1.
-	ProxyProtocolVersion1 = int32(1)
+	// ProtocolVersion1 represents protocol version 1, the initial in-proxy
+	// protocol version number.
+	ProtocolVersion1 = int32(1)
 
-	// MinimumProxyProtocolVersion is the minimum supported version number.
-	MinimumProxyProtocolVersion = ProxyProtocolVersion1
+	// ProtocolVersion2 represents protocol version 2, which adds support for
+	// proxying over WebRTC media streams.
+	ProtocolVersion2 = int32(2)
+
+	// LatestProtocolVersion is the current, default protocol version number.
+	LatestProtocolVersion = ProtocolVersion2
+
+	// MinimumProxyProtocolVersion is the minimum required protocol version
+	// number for proxies.
+	MinimumProxyProtocolVersion = ProtocolVersion1
+
+	// MinimumClientProtocolVersion is the minimum supported protocol version
+	// number for clients.
+	MinimumClientProtocolVersion = ProtocolVersion1
 
 	MaxCompartmentIDs = 10
 )
 
-// proxyProtocolVersion is the current protocol version number.
-// proxyProtocolVersion is variable, to enable overriding the value in tests.
-// This value should not be overridden outside of test cases.
-var proxyProtocolVersion = ProxyProtocolVersion1
+// minimumProxyProtocolVersion and minimumClientProtocolVersion are variable
+// to enable overriding the values in tests. These value should not be
+// overridden outside of test cases.
+var (
+	minimumProxyProtocolVersion  = MinimumProxyProtocolVersion
+	minimumClientProtocolVersion = MinimumClientProtocolVersion
+)
+
+// negotiateProtocolVersion selects the in-proxy protocol version for a new
+// proxy/client match, based on the client's and proxy's reported protocol
+// versions, and the client's selected protocol options. Returns false if no
+// protocol version selection is possible.
+//
+// The broker performs the negotiation on behalf of the proxy and client. Both
+// the proxy and client initially specify the latest protocol version they
+// support. The client specifies the protocol options to use, based on
+// tactics and replay.
+//
+// negotiateProtocolVersion is used by the matcher when searching for
+// potential matches; for this reason, the failure case is expected and
+// returns a simple boolean intead of formating an error message.
+//
+// Existing, legacy proxies have the equivalent of an "if
+// announceResponse.SelectedProtocolVersion != ProtocolVersion1" check, so
+// the SelectedProtocolVersion must be downgraded in that case, if a match is
+// possible.
+func negotiateProtocolVersion(
+	proxyProtocolVersion int32,
+	clientProtocolVersion int32,
+	useMediaStreams bool) (int32, bool) {
+
+	// When not using WebRTC media streams, introduced in ProtocolVersion2,
+	// potentially downgrade if either the proxy or client supports only
+	// ProtocolVersion1.
+
+	if (proxyProtocolVersion == ProtocolVersion1 ||
+		clientProtocolVersion == ProtocolVersion1) &&
+		!useMediaStreams {
+		return ProtocolVersion1, true
+	}
+
+	// Select the client's protocol version.
+
+	if proxyProtocolVersion >= clientProtocolVersion {
+		return clientProtocolVersion, true
+	}
+
+	// No selection is possible. This includes the case where the proxy
+	// supports up to ProtocolVersion1 and the client has specified media
+	// streams.
+
+	return 0, false
+}
 
 // ID is a unique identifier used to identify inproxy connections and actors.
 type ID [32]byte
@@ -192,7 +254,7 @@ func (p NetworkProtocol) IsStream() bool {
 // and clients.
 type ProxyMetrics struct {
 	BaseAPIParameters             protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
-	ProxyProtocolVersion          int32                        `cbor:"2,keyasint,omitempty"`
+	ProtocolVersion               int32                        `cbor:"2,keyasint,omitempty"`
 	NATType                       NATType                      `cbor:"3,keyasint,omitempty"`
 	PortMappingTypes              PortMappingTypes             `cbor:"4,keyasint,omitempty"`
 	MaxClients                    int32                        `cbor:"6,keyasint,omitempty"`
@@ -208,10 +270,10 @@ type ProxyMetrics struct {
 // broker. The broker uses this information when matching proxies and
 // clients.
 type ClientMetrics struct {
-	BaseAPIParameters    protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
-	ProxyProtocolVersion int32                        `cbor:"2,keyasint,omitempty"`
-	NATType              NATType                      `cbor:"3,keyasint,omitempty"`
-	PortMappingTypes     PortMappingTypes             `cbor:"4,keyasint,omitempty"`
+	BaseAPIParameters protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
+	ProtocolVersion   int32                        `cbor:"2,keyasint,omitempty"`
+	NATType           NATType                      `cbor:"3,keyasint,omitempty"`
+	PortMappingTypes  PortMappingTypes             `cbor:"4,keyasint,omitempty"`
 }
 
 // ProxyAnnounceRequest is an API request sent from a proxy to a broker,
@@ -264,24 +326,26 @@ type WebRTCSessionDescription struct {
 // corresponds to a valid Psiphon server.
 //
 // MustUpgrade is an optional flag that is set by the broker, based on the
-// submitted ProxyProtocolVersion, when the proxy app must be upgraded in
-// order to function properly. Potential must-upgrade scenarios include
-// changes to the personal pairing broker rendezvous algorithm, where no
-// protocol backwards compatibility accommodations can ensure a rendezvous
-// and match. When MustUpgrade is set, NoMatch is implied.
+// submitted ProtocolVersion, when the proxy app must be upgraded in order to
+// function properly. Potential must-upgrade scenarios include changes to the
+// personal pairing broker rendezvous algorithm, where no protocol backwards
+// compatibility accommodations can ensure a rendezvous and match. When
+// MustUpgrade is set, NoMatch is implied.
+
 type ProxyAnnounceResponse struct {
-	TacticsPayload              []byte                               `cbor:"2,keyasint,omitempty"`
-	Limited                     bool                                 `cbor:"3,keyasint,omitempty"`
-	NoMatch                     bool                                 `cbor:"4,keyasint,omitempty"`
-	MustUpgrade                 bool                                 `cbor:"13,keyasint,omitempty"`
-	ConnectionID                ID                                   `cbor:"5,keyasint,omitempty"`
-	ClientProxyProtocolVersion  int32                                `cbor:"6,keyasint,omitempty"`
-	ClientOfferSDP              WebRTCSessionDescription             `cbor:"7,keyasint,omitempty"`
-	ClientRootObfuscationSecret ObfuscationSecret                    `cbor:"8,keyasint,omitempty"`
-	DoDTLSRandomization         bool                                 `cbor:"9,keyasint,omitempty"`
-	TrafficShapingParameters    *DataChannelTrafficShapingParameters `cbor:"10,keyasint,omitempty"`
-	NetworkProtocol             NetworkProtocol                      `cbor:"11,keyasint,omitempty"`
-	DestinationAddress          string                               `cbor:"12,keyasint,omitempty"`
+	TacticsPayload              []byte                    `cbor:"2,keyasint,omitempty"`
+	Limited                     bool                      `cbor:"3,keyasint,omitempty"`
+	NoMatch                     bool                      `cbor:"4,keyasint,omitempty"`
+	MustUpgrade                 bool                      `cbor:"13,keyasint,omitempty"`
+	ConnectionID                ID                        `cbor:"5,keyasint,omitempty"`
+	SelectedProtocolVersion     int32                     `cbor:"6,keyasint,omitempty"`
+	ClientOfferSDP              WebRTCSessionDescription  `cbor:"7,keyasint,omitempty"`
+	ClientRootObfuscationSecret ObfuscationSecret         `cbor:"8,keyasint,omitempty"`
+	DoDTLSRandomization         bool                      `cbor:"9,keyasint,omitempty"`
+	UseMediaStreams             bool                      `cbor:"14,keyasint,omitempty"`
+	TrafficShapingParameters    *TrafficShapingParameters `cbor:"10,keyasint,omitempty"`
+	NetworkProtocol             NetworkProtocol           `cbor:"11,keyasint,omitempty"`
+	DestinationAddress          string                    `cbor:"12,keyasint,omitempty"`
 }
 
 // ClientOfferRequest is an API request sent from a client to a broker,
@@ -305,24 +369,25 @@ type ProxyAnnounceResponse struct {
 // domain, and destination port for a valid Psiphon tunnel protocol run by
 // the specified server entry.
 type ClientOfferRequest struct {
-	Metrics                      *ClientMetrics                       `cbor:"1,keyasint,omitempty"`
-	CommonCompartmentIDs         []ID                                 `cbor:"2,keyasint,omitempty"`
-	PersonalCompartmentIDs       []ID                                 `cbor:"3,keyasint,omitempty"`
-	ClientOfferSDP               WebRTCSessionDescription             `cbor:"4,keyasint,omitempty"`
-	ICECandidateTypes            ICECandidateTypes                    `cbor:"5,keyasint,omitempty"`
-	ClientRootObfuscationSecret  ObfuscationSecret                    `cbor:"6,keyasint,omitempty"`
-	DoDTLSRandomization          bool                                 `cbor:"7,keyasint,omitempty"`
-	TrafficShapingParameters     *DataChannelTrafficShapingParameters `cbor:"8,keyasint,omitempty"`
-	PackedDestinationServerEntry []byte                               `cbor:"9,keyasint,omitempty"`
-	NetworkProtocol              NetworkProtocol                      `cbor:"10,keyasint,omitempty"`
-	DestinationAddress           string                               `cbor:"11,keyasint,omitempty"`
-}
-
-// DataChannelTrafficShapingParameters specifies a data channel traffic
-// shaping configuration, including random padding and decoy messages.
-// Clients determine their own traffic shaping configuration, and generate
-// and send a configuration for the peer proxy to use.
-type DataChannelTrafficShapingParameters struct {
+	Metrics                      *ClientMetrics            `cbor:"1,keyasint,omitempty"`
+	CommonCompartmentIDs         []ID                      `cbor:"2,keyasint,omitempty"`
+	PersonalCompartmentIDs       []ID                      `cbor:"3,keyasint,omitempty"`
+	ClientOfferSDP               WebRTCSessionDescription  `cbor:"4,keyasint,omitempty"`
+	ICECandidateTypes            ICECandidateTypes         `cbor:"5,keyasint,omitempty"`
+	ClientRootObfuscationSecret  ObfuscationSecret         `cbor:"6,keyasint,omitempty"`
+	DoDTLSRandomization          bool                      `cbor:"7,keyasint,omitempty"`
+	UseMediaStreams              bool                      `cbor:"12,keyasint,omitempty"`
+	TrafficShapingParameters     *TrafficShapingParameters `cbor:"8,keyasint,omitempty"`
+	PackedDestinationServerEntry []byte                    `cbor:"9,keyasint,omitempty"`
+	NetworkProtocol              NetworkProtocol           `cbor:"10,keyasint,omitempty"`
+	DestinationAddress           string                    `cbor:"11,keyasint,omitempty"`
+}
+
+// TrafficShapingParameters specifies data channel or media stream traffic
+// shaping configuration, including random padding and decoy messages (or
+// packets). Clients determine their own traffic shaping configuration, and
+// generate and send a configuration for the peer proxy to use.
+type TrafficShapingParameters struct {
 	MinPaddedMessages       int     `cbor:"1,keyasint,omitempty"`
 	MaxPaddedMessages       int     `cbor:"2,keyasint,omitempty"`
 	MinPaddingSize          int     `cbor:"3,keyasint,omitempty"`
@@ -347,19 +412,19 @@ type DataChannelTrafficShapingParameters struct {
 // connection and its relayed BrokerServerReport.
 //
 // MustUpgrade is an optional flag that is set by the broker, based on the
-// submitted ProxyProtocolVersion, when the client app must be upgraded in
-// order to function properly. Potential must-upgrade scenarios include
-// changes to the personal pairing broker rendezvous algorithm, where no
-// protocol backwards compatibility accommodations can ensure a rendezvous
-// and match. When MustUpgrade is set, NoMatch is implied.
+// submitted ProtocolVersion, when the client app must be upgraded in order
+// to function properly. Potential must-upgrade scenarios include changes to
+// the personal pairing broker rendezvous algorithm, where no protocol
+// backwards compatibility accommodations can ensure a rendezvous and match.
+// When MustUpgrade is set, NoMatch is implied.
 type ClientOfferResponse struct {
-	Limited                      bool                     `cbor:"1,keyasint,omitempty"`
-	NoMatch                      bool                     `cbor:"2,keyasint,omitempty"`
-	MustUpgrade                  bool                     `cbor:"7,keyasint,omitempty"`
-	ConnectionID                 ID                       `cbor:"3,keyasint,omitempty"`
-	SelectedProxyProtocolVersion int32                    `cbor:"4,keyasint,omitempty"`
-	ProxyAnswerSDP               WebRTCSessionDescription `cbor:"5,keyasint,omitempty"`
-	RelayPacketToServer          []byte                   `cbor:"6,keyasint,omitempty"`
+	Limited                 bool                     `cbor:"1,keyasint,omitempty"`
+	NoMatch                 bool                     `cbor:"2,keyasint,omitempty"`
+	MustUpgrade             bool                     `cbor:"7,keyasint,omitempty"`
+	ConnectionID            ID                       `cbor:"3,keyasint,omitempty"`
+	SelectedProtocolVersion int32                    `cbor:"4,keyasint,omitempty"`
+	ProxyAnswerSDP          WebRTCSessionDescription `cbor:"5,keyasint,omitempty"`
+	RelayPacketToServer     []byte                   `cbor:"6,keyasint,omitempty"`
 }
 
 // TODO: Encode SDPs using CBOR without field names, simliar to packed metrics?
@@ -373,11 +438,14 @@ type ClientOfferResponse struct {
 // reason, it should still send ProxyAnswerRequest with AnswerError
 // populated; the broker will signal the client to abort this connection.
 type ProxyAnswerRequest struct {
-	ConnectionID                 ID                       `cbor:"1,keyasint,omitempty"`
-	SelectedProxyProtocolVersion int32                    `cbor:"2,keyasint,omitempty"`
-	ProxyAnswerSDP               WebRTCSessionDescription `cbor:"3,keyasint,omitempty"`
-	ICECandidateTypes            ICECandidateTypes        `cbor:"4,keyasint,omitempty"`
-	AnswerError                  string                   `cbor:"5,keyasint,omitempty"`
+	ConnectionID      ID                       `cbor:"1,keyasint,omitempty"`
+	ProxyAnswerSDP    WebRTCSessionDescription `cbor:"3,keyasint,omitempty"`
+	ICECandidateTypes ICECandidateTypes        `cbor:"4,keyasint,omitempty"`
+	AnswerError       string                   `cbor:"5,keyasint,omitempty"`
+
+	// These fields are no longer used.
+	//
+	// SelectedProtocolVersion int32 `cbor:"2,keyasint,omitempty"`
 }
 
 // ProxyAnswerResponse is the acknowledgement for a ProxyAnswerRequest.
@@ -496,8 +564,8 @@ func (metrics *ProxyMetrics) ValidateAndGetParametersAndLogFields(
 		return nil, nil, errors.Trace(err)
 	}
 
-	if metrics.ProxyProtocolVersion < 0 || metrics.ProxyProtocolVersion > proxyProtocolVersion {
-		return nil, nil, errors.Tracef("invalid proxy protocol version: %v", metrics.ProxyProtocolVersion)
+	if metrics.ProtocolVersion < ProtocolVersion1 || metrics.ProtocolVersion > LatestProtocolVersion {
+		return nil, nil, errors.Tracef("invalid protocol version: %v", metrics.ProtocolVersion)
 	}
 
 	if !metrics.NATType.IsValid() {
@@ -514,7 +582,7 @@ func (metrics *ProxyMetrics) ValidateAndGetParametersAndLogFields(
 
 	logFields := formatter(logFieldPrefix, geoIPData, baseParams)
 
-	logFields[logFieldPrefix+"proxy_protocol_version"] = metrics.ProxyProtocolVersion
+	logFields[logFieldPrefix+"protocol_version"] = metrics.ProtocolVersion
 	logFields[logFieldPrefix+"nat_type"] = metrics.NATType
 	logFields[logFieldPrefix+"port_mapping_types"] = metrics.PortMappingTypes
 	logFields[logFieldPrefix+"max_clients"] = metrics.MaxClients
@@ -549,8 +617,8 @@ func (metrics *ClientMetrics) ValidateAndGetLogFields(
 		return nil, errors.Trace(err)
 	}
 
-	if metrics.ProxyProtocolVersion < 0 || metrics.ProxyProtocolVersion > proxyProtocolVersion {
-		return nil, errors.Tracef("invalid proxy protocol version: %v", metrics.ProxyProtocolVersion)
+	if metrics.ProtocolVersion < ProtocolVersion1 || metrics.ProtocolVersion > LatestProtocolVersion {
+		return nil, errors.Tracef("invalid protocol version: %v", metrics.ProtocolVersion)
 	}
 
 	if !metrics.NATType.IsValid() {
@@ -567,7 +635,7 @@ func (metrics *ClientMetrics) ValidateAndGetLogFields(
 
 	logFields := formatter("", geoIPData, baseParams)
 
-	logFields["proxy_protocol_version"] = metrics.ProxyProtocolVersion
+	logFields["protocol_version"] = metrics.ProtocolVersion
 	logFields["nat_type"] = metrics.NATType
 	logFields["port_mapping_types"] = metrics.PortMappingTypes
 
@@ -620,6 +688,14 @@ func (request *ClientOfferRequest) ValidateAndGetLogFields(
 	formatter common.APIParameterLogFieldFormatter,
 	geoIPData common.GeoIPData) ([]byte, common.LogFields, error) {
 
+	// UseMediaStreams requires at least ProtocolVersion2.
+	if request.UseMediaStreams &&
+		request.Metrics.ProtocolVersion < ProtocolVersion2 {
+
+		return nil, nil, errors.Tracef(
+			"invalid protocol version: %d", request.Metrics.ProtocolVersion)
+	}
+
 	if len(request.CommonCompartmentIDs) > maxCompartmentIDs {
 		return nil, nil, errors.Tracef(
 			"invalid compartment IDs length: %d", len(request.CommonCompartmentIDs))
@@ -700,13 +776,14 @@ func (request *ClientOfferRequest) ValidateAndGetLogFields(
 	logFields["has_IPv6"] = sdpMetrics.hasIPv6
 	logFields["has_private_IP"] = sdpMetrics.hasPrivateIP
 	logFields["filtered_ice_candidates"] = sdpMetrics.filteredICECandidates
+	logFields["use_media_streams"] = request.UseMediaStreams
 
 	return filteredSDP, logFields, nil
 }
 
 // Validate validates the that client has not specified excess traffic shaping
 // padding or decoy traffic.
-func (params *DataChannelTrafficShapingParameters) Validate() error {
+func (params *TrafficShapingParameters) Validate() error {
 
 	if params.MinPaddedMessages < 0 ||
 		params.MinPaddedMessages > params.MaxPaddedMessages ||
@@ -777,11 +854,6 @@ func (request *ProxyAnswerRequest) ValidateAndGetLogFields(
 			"invalid ICE candidate types: %v", request.ICECandidateTypes)
 	}
 
-	if request.SelectedProxyProtocolVersion != ProxyProtocolVersion1 {
-		return nil, nil, errors.Tracef(
-			"invalid select proxy protocol version: %v", request.SelectedProxyProtocolVersion)
-	}
-
 	logFields := formatter("", geoIPData, common.APIParameters{})
 
 	logFields["connection_id"] = request.ConnectionID

+ 49 - 22
psiphon/common/inproxy/broker.go

@@ -123,13 +123,13 @@ type BrokerConfig struct {
 	AllowProxy func(common.GeoIPData) bool
 
 	// PrioritizeProxy is a callback which can indicate whether proxy
-	// announcements from proxies with the specified GeoIPData and
-	// APIParameters should be prioritized in the matcher queue. Priority
-	// proxy announcements match ahead of other proxy announcements,
-	// regardless of announcement age/deadline. Priority status takes
-	// precedence over preferred NAT matching. Prioritization applies only to
-	// common compartment IDs and not personal pairing mode.
-	PrioritizeProxy func(common.GeoIPData, common.APIParameters) bool
+	// announcements from proxies with the specified in-proxy protocol
+	// version, GeoIPData, and APIParameters should be prioritized in the
+	// matcher queue. Priority proxy announcements match ahead of other proxy
+	// announcements, regardless of announcement age/deadline. Priority
+	// status takes precedence over preferred NAT matching. Prioritization
+	// applies only to common compartment IDs and not personal pairing mode.
+	PrioritizeProxy func(int, common.GeoIPData, common.APIParameters) bool
 
 	// AllowClient is a callback which can indicate whether a client with the
 	// given GeoIP data is allowed to match with common compartment ID
@@ -483,6 +483,7 @@ func (b *Broker) handleProxyAnnounce(
 	startTime := time.Now()
 
 	var logFields common.LogFields
+	var isPriority bool
 	var newTacticsTag string
 	var clientOffer *MatchOffer
 	var matchMetrics *MatchMetrics
@@ -529,6 +530,7 @@ func (b *Broker) handleProxyAnnounce(
 		logFields["broker_event"] = "proxy-announce"
 		logFields["broker_id"] = b.brokerID
 		logFields["proxy_id"] = proxyID
+		logFields["is_priority"] = isPriority
 		logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
 		logFields["connection_id"] = connectionID
 		if newTacticsTag != "" {
@@ -538,6 +540,7 @@ func (b *Broker) handleProxyAnnounce(
 			// Log the target Psiphon server ID (diagnostic ID). The presence
 			// of this field indicates that a match was made.
 			logFields["destination_server_id"] = clientOffer.DestinationServerID
+			logFields["use_media_streams"] = clientOffer.UseMediaStreams
 		}
 		if timedOut {
 			logFields["timed_out"] = true
@@ -571,7 +574,7 @@ func (b *Broker) handleProxyAnnounce(
 
 	// Return MustUpgrade when the proxy's protocol version is less than the
 	// minimum required.
-	if announceRequest.Metrics.ProxyProtocolVersion < MinimumProxyProtocolVersion {
+	if announceRequest.Metrics.ProtocolVersion < minimumProxyProtocolVersion {
 		responsePayload, err := MarshalProxyAnnounceResponse(
 			&ProxyAnnounceResponse{
 				NoMatch:     true,
@@ -643,7 +646,6 @@ func (b *Broker) handleProxyAnnounce(
 	// In the common compartment ID case, invoke the callback to check if the
 	// announcement should be prioritized.
 
-	isPriority := false
 	if b.config.PrioritizeProxy != nil && !hasPersonalCompartmentIDs {
 
 		// Limitation: Of the two return values from
@@ -659,7 +661,8 @@ func (b *Broker) handleProxyAnnounce(
 		// calls for range filtering, which is not yet supported in the
 		// psiphon/server.MeekServer PrioritizeProxy provider.
 
-		isPriority = b.config.PrioritizeProxy(geoIPData, apiParams)
+		isPriority = b.config.PrioritizeProxy(
+			int(announceRequest.Metrics.ProtocolVersion), geoIPData, apiParams)
 	}
 
 	// Await client offer.
@@ -685,6 +688,7 @@ func (b *Broker) handleProxyAnnounce(
 		&MatchAnnouncement{
 			Properties: MatchProperties{
 				IsPriority:             isPriority,
+				ProtocolVersion:        announceRequest.Metrics.ProtocolVersion,
 				CommonCompartmentIDs:   commonCompartmentIDs,
 				PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
 				GeoIPData:              geoIPData,
@@ -746,6 +750,17 @@ func (b *Broker) handleProxyAnnounce(
 		return responsePayload, nil
 	}
 
+	// Select the protocol version. The matcher has already checked
+	// negotiateProtocolVersion, so failure is not expected.
+
+	negotiatedProtocolVersion, ok := negotiateProtocolVersion(
+		announceRequest.Metrics.ProtocolVersion,
+		clientOffer.Properties.ProtocolVersion,
+		clientOffer.UseMediaStreams)
+	if !ok {
+		return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
+	}
+
 	// Respond with the client offer. The proxy will follow up with an answer
 	// request, which is relayed to the client, and then the WebRTC dial begins.
 
@@ -760,10 +775,11 @@ func (b *Broker) handleProxyAnnounce(
 		&ProxyAnnounceResponse{
 			TacticsPayload:              tacticsPayload,
 			ConnectionID:                connectionID,
-			ClientProxyProtocolVersion:  clientOffer.ClientProxyProtocolVersion,
+			SelectedProtocolVersion:     negotiatedProtocolVersion,
 			ClientOfferSDP:              clientOffer.ClientOfferSDP,
 			ClientRootObfuscationSecret: clientOffer.ClientRootObfuscationSecret,
 			DoDTLSRandomization:         clientOffer.DoDTLSRandomization,
+			UseMediaStreams:             clientOffer.UseMediaStreams,
 			TrafficShapingParameters:    clientOffer.TrafficShapingParameters,
 			NetworkProtocol:             clientOffer.NetworkProtocol,
 			DestinationAddress:          clientOffer.DestinationAddress,
@@ -907,7 +923,7 @@ func (b *Broker) handleClientOffer(
 
 	// Return MustUpgrade when the client's protocol version is less than the
 	// minimum required.
-	if offerRequest.Metrics.ProxyProtocolVersion < MinimumProxyProtocolVersion {
+	if offerRequest.Metrics.ProtocolVersion < minimumClientProtocolVersion {
 		responsePayload, err := MarshalClientOfferResponse(
 			&ClientOfferResponse{
 				NoMatch:     true,
@@ -944,6 +960,7 @@ func (b *Broker) handleClientOffer(
 
 	clientMatchOffer = &MatchOffer{
 		Properties: MatchProperties{
+			ProtocolVersion:        offerRequest.Metrics.ProtocolVersion,
 			CommonCompartmentIDs:   offerRequest.CommonCompartmentIDs,
 			PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
 			GeoIPData:              geoIPData,
@@ -951,10 +968,10 @@ func (b *Broker) handleClientOffer(
 			NATType:                offerRequest.Metrics.NATType,
 			PortMappingTypes:       offerRequest.Metrics.PortMappingTypes,
 		},
-		ClientProxyProtocolVersion:  offerRequest.Metrics.ProxyProtocolVersion,
 		ClientOfferSDP:              offerSDP,
 		ClientRootObfuscationSecret: offerRequest.ClientRootObfuscationSecret,
 		DoDTLSRandomization:         offerRequest.DoDTLSRandomization,
+		UseMediaStreams:             offerRequest.UseMediaStreams,
 		TrafficShapingParameters:    offerRequest.TrafficShapingParameters,
 		NetworkProtocol:             offerRequest.NetworkProtocol,
 		DestinationAddress:          offerRequest.DestinationAddress,
@@ -1069,14 +1086,25 @@ func (b *Broker) handleClientOffer(
 		return nil, errors.Trace(err)
 	}
 
+	// Select the protocol version. The matcher has already checked
+	// negotiateProtocolVersion, so failure is not expected.
+
+	negotiatedProtocolVersion, ok := negotiateProtocolVersion(
+		proxyMatchAnnouncement.Properties.ProtocolVersion,
+		offerRequest.Metrics.ProtocolVersion,
+		offerRequest.UseMediaStreams)
+	if !ok {
+		return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
+	}
+
 	// Respond with the proxy answer and initial broker/server session packet.
 
 	responsePayload, err := MarshalClientOfferResponse(
 		&ClientOfferResponse{
-			ConnectionID:                 proxyAnswer.ConnectionID,
-			SelectedProxyProtocolVersion: proxyAnswer.SelectedProxyProtocolVersion,
-			ProxyAnswerSDP:               proxyAnswer.ProxyAnswerSDP,
-			RelayPacketToServer:          relayPacket,
+			ConnectionID:            proxyAnswer.ConnectionID,
+			SelectedProtocolVersion: negotiatedProtocolVersion,
+			ProxyAnswerSDP:          proxyAnswer.ProxyAnswerSDP,
+			RelayPacketToServer:     relayPacket,
 		})
 	if err != nil {
 		return nil, errors.Trace(err)
@@ -1180,11 +1208,10 @@ func (b *Broker) handleProxyAnswer(
 		// These fields are used internally in the matcher.
 
 		proxyAnswer = &MatchAnswer{
-			ProxyIP:                      proxyIP,
-			ProxyID:                      initiatorID,
-			ConnectionID:                 answerRequest.ConnectionID,
-			SelectedProxyProtocolVersion: answerRequest.SelectedProxyProtocolVersion,
-			ProxyAnswerSDP:               answerSDP,
+			ProxyIP:        proxyIP,
+			ProxyID:        initiatorID,
+			ConnectionID:   answerRequest.ConnectionID,
+			ProxyAnswerSDP: answerSDP,
 		}
 
 		err = b.matcher.Answer(proxyAnswer)

+ 18 - 13
psiphon/common/inproxy/client.go

@@ -359,7 +359,8 @@ func dialClientWebRTCConn(
 	// Initialize the WebRTC offer
 
 	doTLSRandomization := config.WebRTCDialCoordinator.DoDTLSRandomization()
-	trafficShapingParameters := config.WebRTCDialCoordinator.DataChannelTrafficShapingParameters()
+	useMediaStreams := config.WebRTCDialCoordinator.UseMediaStreams()
+	trafficShapingParameters := config.WebRTCDialCoordinator.TrafficShapingParameters()
 	clientRootObfuscationSecret := config.WebRTCDialCoordinator.ClientRootObfuscationSecret()
 
 	webRTCConn, SDP, SDPMetrics, err := newWebRTCConnForOffer(
@@ -369,6 +370,7 @@ func dialClientWebRTCConn(
 			WebRTCDialCoordinator:       config.WebRTCDialCoordinator,
 			ClientRootObfuscationSecret: clientRootObfuscationSecret,
 			DoDTLSRandomization:         doTLSRandomization,
+			UseMediaStreams:             useMediaStreams,
 			TrafficShapingParameters:    trafficShapingParameters,
 			ReliableTransport:           config.ReliableTransport,
 		},
@@ -411,10 +413,10 @@ func dialClientWebRTCConn(
 		ctx,
 		&ClientOfferRequest{
 			Metrics: &ClientMetrics{
-				BaseAPIParameters:    packedParams,
-				ProxyProtocolVersion: proxyProtocolVersion,
-				NATType:              config.WebRTCDialCoordinator.NATType(),
-				PortMappingTypes:     config.WebRTCDialCoordinator.PortMappingTypes(),
+				BaseAPIParameters: packedParams,
+				ProtocolVersion:   LatestProtocolVersion,
+				NATType:           config.WebRTCDialCoordinator.NATType(),
+				PortMappingTypes:  config.WebRTCDialCoordinator.PortMappingTypes(),
 			},
 			CommonCompartmentIDs:         brokerCoordinator.CommonCompartmentIDs(),
 			PersonalCompartmentIDs:       personalCompartmentIDs,
@@ -422,6 +424,7 @@ func dialClientWebRTCConn(
 			ICECandidateTypes:            SDPMetrics.iceCandidateTypes,
 			ClientRootObfuscationSecret:  clientRootObfuscationSecret,
 			DoDTLSRandomization:          doTLSRandomization,
+			UseMediaStreams:              useMediaStreams,
 			TrafficShapingParameters:     trafficShapingParameters,
 			PackedDestinationServerEntry: config.PackedDestinationServerEntry,
 			NetworkProtocol:              config.DialNetworkProtocol,
@@ -458,11 +461,13 @@ func dialClientWebRTCConn(
 		return nil, true, errors.TraceNew("no match")
 	}
 
-	if offerResponse.SelectedProxyProtocolVersion < MinimumProxyProtocolVersion ||
-		offerResponse.SelectedProxyProtocolVersion > proxyProtocolVersion {
+	if offerResponse.SelectedProtocolVersion < ProtocolVersion1 ||
+		(useMediaStreams &&
+			offerResponse.SelectedProtocolVersion < ProtocolVersion2) ||
+		offerResponse.SelectedProtocolVersion > LatestProtocolVersion {
 		return nil, false, errors.Tracef(
-			"Unsupported proxy protocol version: %d",
-			offerResponse.SelectedProxyProtocolVersion)
+			"Unsupported protocol version: %d",
+			offerResponse.SelectedProtocolVersion)
 	}
 
 	// Establish the WebRTC DataChannel connection
@@ -473,13 +478,13 @@ func dialClientWebRTCConn(
 		return nil, true, errors.Trace(err)
 	}
 
-	awaitDataChannelCtx, awaitDataChannelCancelFunc := context.WithTimeout(
+	awaitReadyToProxyCtx, awaitReadyToProxyCancelFunc := context.WithTimeout(
 		ctx,
 		common.ValueOrDefault(
-			config.WebRTCDialCoordinator.WebRTCAwaitDataChannelTimeout(), dataChannelAwaitTimeout))
-	defer awaitDataChannelCancelFunc()
+			config.WebRTCDialCoordinator.WebRTCAwaitReadyToProxyTimeout(), readyToProxyAwaitTimeout))
+	defer awaitReadyToProxyCancelFunc()
 
-	err = webRTCConn.AwaitInitialDataChannel(awaitDataChannelCtx)
+	err = webRTCConn.AwaitReadyToProxy(awaitReadyToProxyCtx, offerResponse.ConnectionID)
 	if err != nil {
 		return nil, true, errors.Trace(err)
 	}

+ 9 - 5
psiphon/common/inproxy/coordinator.go

@@ -235,10 +235,14 @@ type WebRTCDialCoordinator interface {
 	// the value.
 	DoDTLSRandomization() bool
 
-	// DataChannelTrafficShapingParameters returns parameters specifying how
-	// to perform data channel traffic shapping -- random padding and decoy
-	// message. Returns nil when no traffic shaping is to be performed.
-	DataChannelTrafficShapingParameters() *DataChannelTrafficShapingParameters
+	// UseMediaStreams indicates whether to use WebRTC media streams to tunnel
+	// traffic. When false, a WebRTC data channel is used to tunnel traffic.
+	UseMediaStreams() bool
+
+	// TrafficShapingParameters returns parameters specifying how to perform
+	// data channel or media stream traffic shapping -- random padding and
+	// decoy messages. Returns nil when no traffic shaping is to be performed.
+	TrafficShapingParameters() *TrafficShapingParameters
 
 	// STUNServerAddress selects a STUN server to use for this dial. When
 	// RFC5780 is true, the STUN server must support RFC5780 NAT discovery;
@@ -363,7 +367,7 @@ type WebRTCDialCoordinator interface {
 	DiscoverNATTimeout() time.Duration
 	WebRTCAnswerTimeout() time.Duration
 	WebRTCAwaitPortMappingTimeout() time.Duration
-	WebRTCAwaitDataChannelTimeout() time.Duration
+	WebRTCAwaitReadyToProxyTimeout() time.Duration
 	ProxyDestinationDialTimeout() time.Duration
 	ProxyRelayInactivityTimeout() time.Duration
 }

+ 71 - 13
psiphon/common/inproxy/coordinator_test.go

@@ -30,6 +30,7 @@ import (
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
 )
 
@@ -204,7 +205,8 @@ type testWebRTCDialCoordinator struct {
 	networkType                     NetworkType
 	clientRootObfuscationSecret     ObfuscationSecret
 	doDTLSRandomization             bool
-	trafficShapingParameters        *DataChannelTrafficShapingParameters
+	useMediaStreams                 bool
+	trafficShapingParameters        *TrafficShapingParameters
 	stunServerAddress               string
 	stunServerAddressRFC5780        string
 	stunServerAddressSucceeded      func(RFC5780 bool, address string)
@@ -223,7 +225,7 @@ type testWebRTCDialCoordinator struct {
 	discoverNATTimeout              time.Duration
 	webRTCAnswerTimeout             time.Duration
 	webRTCAwaitPortMappingTimeout   time.Duration
-	webRTCAwaitDataChannelTimeout   time.Duration
+	webRTCAwaitReadyToProxyTimeout  time.Duration
 	proxyDestinationDialTimeout     time.Duration
 	proxyRelayInactivityTimeout     time.Duration
 }
@@ -252,7 +254,13 @@ func (t *testWebRTCDialCoordinator) DoDTLSRandomization() bool {
 	return t.doDTLSRandomization
 }
 
-func (t *testWebRTCDialCoordinator) DataChannelTrafficShapingParameters() *DataChannelTrafficShapingParameters {
+func (t *testWebRTCDialCoordinator) UseMediaStreams() bool {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	return t.useMediaStreams
+}
+
+func (t *testWebRTCDialCoordinator) TrafficShapingParameters() *TrafficShapingParameters {
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
 	return t.trafficShapingParameters
@@ -365,6 +373,35 @@ func (t *testWebRTCDialCoordinator) ResolveAddress(ctx context.Context, network,
 	return net.JoinHostPort(IPs[0].String(), port), nil
 }
 
+// lossyConn randomly drops 1% of packets sent or received.
+type lossyConn struct {
+	net.PacketConn
+}
+
+func (conn *lossyConn) ReadFrom(p []byte) (int, net.Addr, error) {
+	for {
+		n, addr, err := conn.PacketConn.ReadFrom(p)
+		if err != nil {
+			return n, addr, err
+		}
+		if prng.FlipWeightedCoin(0.01) {
+			// Drop packet
+			continue
+		}
+		return n, addr, err
+	}
+}
+
+func (conn *lossyConn) WriteTo(p []byte, addr net.Addr) (int, error) {
+	if prng.FlipWeightedCoin(0.01) {
+		// Drop packet
+		return len(p), nil
+	}
+	return conn.PacketConn.WriteTo(p, addr)
+}
+
+// UDPListen wraps the returned net.PacketConn in lossyConn to simulate packet
+// loss.
 func (t *testWebRTCDialCoordinator) UDPListen(_ context.Context) (net.PacketConn, error) {
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
@@ -372,9 +409,11 @@ func (t *testWebRTCDialCoordinator) UDPListen(_ context.Context) (net.PacketConn
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
-	return conn, nil
+	return &lossyConn{conn}, nil
 }
 
+// UDPConn wraps the returned net.PacketConn in lossyConn to simulate packet
+// loss.
 func (t *testWebRTCDialCoordinator) UDPConn(_ context.Context, network, remoteAddress string) (net.PacketConn, error) {
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
@@ -387,7 +426,7 @@ func (t *testWebRTCDialCoordinator) UDPConn(_ context.Context, network, remoteAd
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
-	return conn.(*net.UDPConn), nil
+	return &lossyConn{conn.(*net.UDPConn)}, nil
 }
 
 func (t *testWebRTCDialCoordinator) BindToDevice(fileDescriptor int) error {
@@ -423,10 +462,10 @@ func (t *testWebRTCDialCoordinator) WebRTCAwaitPortMappingTimeout() time.Duratio
 	return t.webRTCAwaitPortMappingTimeout
 }
 
-func (t *testWebRTCDialCoordinator) WebRTCAwaitDataChannelTimeout() time.Duration {
+func (t *testWebRTCDialCoordinator) WebRTCAwaitReadyToProxyTimeout() time.Duration {
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
-	return t.webRTCAwaitDataChannelTimeout
+	return t.webRTCAwaitReadyToProxyTimeout
 }
 
 func (t *testWebRTCDialCoordinator) ProxyDestinationDialTimeout() time.Duration {
@@ -442,11 +481,21 @@ func (t *testWebRTCDialCoordinator) ProxyRelayInactivityTimeout() time.Duration
 }
 
 type testLogger struct {
+	component     string
 	logLevelDebug int32
 }
 
 func newTestLogger() *testLogger {
-	return &testLogger{logLevelDebug: 1}
+	return &testLogger{
+		logLevelDebug: 0,
+	}
+}
+
+func newTestLoggerWithComponent(component string) *testLogger {
+	return &testLogger{
+		component:     component,
+		logLevelDebug: 0,
+	}
 }
 
 func (logger *testLogger) WithTrace() common.LogTrace {
@@ -466,9 +515,14 @@ func (logger *testLogger) WithTraceFields(fields common.LogFields) common.LogTra
 
 func (logger *testLogger) LogMetric(metric string, fields common.LogFields) {
 	jsonFields, _ := json.Marshal(fields)
+	var component string
+	if len(logger.component) > 0 {
+		component = fmt.Sprintf("[%s]", logger.component)
+	}
 	fmt.Printf(
-		"[%s] METRIC: %s: %s\n",
+		"[%s]%s METRIC: %s: %s\n",
 		time.Now().UTC().Format(time.RFC3339),
+		component,
 		metric,
 		string(jsonFields))
 }
@@ -493,10 +547,14 @@ type testLoggerTrace struct {
 
 func (logger *testLoggerTrace) log(priority, message string) {
 	now := time.Now().UTC().Format(time.RFC3339)
+	var component string
+	if len(logger.logger.component) > 0 {
+		component = fmt.Sprintf("[%s]", logger.logger.component)
+	}
 	if len(logger.fields) == 0 {
 		fmt.Printf(
-			"[%s] %s: %s: %s\n",
-			now, priority, logger.trace, message)
+			"[%s]%s %s: %s: %s\n",
+			now, component, priority, logger.trace, message)
 	} else {
 		fields := common.LogFields{}
 		for k, v := range logger.fields {
@@ -510,8 +568,8 @@ func (logger *testLoggerTrace) log(priority, message string) {
 		}
 		jsonFields, _ := json.Marshal(fields)
 		fmt.Printf(
-			"[%s] %s: %s: %s %s\n",
-			now, priority, logger.trace, message, string(jsonFields))
+			"[%s]%s %s: %s: %s %s\n",
+			now, component, priority, logger.trace, message, string(jsonFields))
 	}
 }
 

+ 0 - 1
psiphon/common/inproxy/discovery.go

@@ -225,7 +225,6 @@ func discoverNATType(
 		}
 
 		resultChannel <- result{NATType: MakeNATType(mapping, filtering)}
-		return
 	}()
 
 	var r result

+ 6 - 3
psiphon/common/inproxy/discoverySTUN.go

@@ -142,7 +142,7 @@ func discoverNATFiltering(
 	request = stun.MustBuild(stun.TransactionID, stun.BindingRequest)
 	request.Add(stun.AttrChangeRequest, []byte{0x00, 0x00, 0x00, 0x06})
 
-	response, responseTimeout, err := doSTUNRoundTrip(request, conn, serverAddress)
+	_, responseTimeout, err := doSTUNRoundTrip(request, conn, serverAddress)
 	if err == nil {
 		return NATFilteringEndpointIndependent, nil
 	} else if !responseTimeout {
@@ -154,7 +154,7 @@ func discoverNATFiltering(
 	request = stun.MustBuild(stun.TransactionID, stun.BindingRequest)
 	request.Add(stun.AttrChangeRequest, []byte{0x00, 0x00, 0x00, 0x02})
 
-	response, responseTimeout, err = doSTUNRoundTrip(request, conn, serverAddress)
+	_, responseTimeout, err = doSTUNRoundTrip(request, conn, serverAddress)
 	if err == nil {
 		return NATFilteringAddressDependent, nil
 	} else if !responseTimeout {
@@ -212,7 +212,10 @@ func doSTUNRoundTrip(
 		return nil, false, errors.Trace(err)
 	}
 
-	conn.SetReadDeadline(time.Now().Add(discoverNATRoundTripTimeout))
+	err = conn.SetReadDeadline(time.Now().Add(discoverNATRoundTripTimeout))
+	if err != nil {
+		return nil, false, errors.Trace(err)
+	}
 
 	var buffer [1500]byte
 	n, _, err := conn.ReadFrom(buffer[:])

+ 8 - 5
psiphon/common/inproxy/inproxy_disabled.go

@@ -52,7 +52,7 @@ func Enabled() bool {
 var errNotEnabled = std_errors.New("operation not enabled")
 
 const (
-	dataChannelAwaitTimeout = time.Duration(0)
+	readyToProxyAwaitTimeout = time.Duration(0)
 )
 
 type webRTCConn struct {
@@ -64,7 +64,8 @@ type webRTCConfig struct {
 	WebRTCDialCoordinator       WebRTCDialCoordinator
 	ClientRootObfuscationSecret ObfuscationSecret
 	DoDTLSRandomization         bool
-	TrafficShapingParameters    *DataChannelTrafficShapingParameters
+	UseMediaStreams             bool
+	TrafficShapingParameters    *TrafficShapingParameters
 	ReliableTransport           bool
 }
 
@@ -75,9 +76,7 @@ func (conn *webRTCConn) SetRemoteSDP(
 	return errors.Trace(errNotEnabled)
 }
 
-// AwaitInitialDataChannel returns when the data channel is established, or
-// when an error has occured.
-func (conn *webRTCConn) AwaitInitialDataChannel(ctx context.Context) error {
+func (conn *webRTCConn) AwaitReadyToProxy(ctx context.Context, connectionID ID) error {
 	return errors.Trace(errNotEnabled)
 }
 
@@ -121,6 +120,10 @@ func (conn *webRTCConn) GetMetrics() common.LogFields {
 	return nil
 }
 
+func GetQUICMaxPacketSizeAdjustment(isIPv6 bool) int {
+	return 0
+}
+
 type webRTCSDPMetrics struct {
 	iceCandidateTypes     []ICECandidateType
 	hasIPv6               bool

+ 122 - 58
psiphon/common/inproxy/inproxy_test.go

@@ -86,6 +86,7 @@ func runTestInproxy(doMustUpgrade bool) error {
 	testNATType := NATTypeUnknown
 	testSTUNServerAddress := "stun.nextcloud.com:443"
 	testDisableSTUN := false
+	testDisablePortMapping := false
 
 	testNewTacticsPayload := []byte(prng.HexString(100))
 	testNewTacticsTag := "new-tactics-tag"
@@ -115,13 +116,15 @@ func runTestInproxy(doMustUpgrade bool) error {
 		receivedClientMustUpgrade = make(chan struct{})
 
 		// trigger MustUpgrade
-		proxyProtocolVersion = 0
+		minimumProxyProtocolVersion = LatestProtocolVersion + 1
+		minimumClientProtocolVersion = LatestProtocolVersion + 1
 
 		// Minimize test parameters for MustUpgrade case
 		numProxies = 1
 		proxyMaxClients = 1
 		numClients = 1
 		testDisableSTUN = true
+		testDisablePortMapping = true
 	}
 
 	testCtx, stopTest := context.WithCancel(context.Background())
@@ -225,7 +228,9 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 	apiParameterLogFieldFormatter := func(
 		_ string, _ common.GeoIPData, params common.APIParameters) common.LogFields {
-		return common.LogFields(params)
+		logFields := common.LogFields{}
+		logFields.Add(common.LogFields(params))
+		return logFields
 	}
 
 	// Start broker
@@ -406,6 +411,12 @@ func runTestInproxy(doMustUpgrade bool) error {
 				brokerListener.Addr().String(), "proxy"),
 			brokerClientRoundTripperSucceeded: roundTripperSucceded,
 			brokerClientRoundTripperFailed:    roundTripperFailed,
+
+			// Minimize the delay before proxies reannounce after dial
+			// failures, which may occur.
+			announceDelay:           0,
+			announceMaxBackoffDelay: 0,
+			announceDelayJitter:     0.0,
 		}
 
 		webRTCCoordinator := &testWebRTCDialCoordinator{
@@ -413,6 +424,7 @@ func runTestInproxy(doMustUpgrade bool) error {
 			networkType:                testNetworkType,
 			natType:                    testNATType,
 			disableSTUN:                testDisableSTUN,
+			disablePortMapping:         testDisablePortMapping,
 			stunServerAddress:          testSTUNServerAddress,
 			stunServerAddressRFC5780:   testSTUNServerAddress,
 			stunServerAddressSucceeded: stunServerAddressSucceeded,
@@ -420,6 +432,11 @@ func runTestInproxy(doMustUpgrade bool) error {
 			setNATType:                 func(NATType) {},
 			setPortMappingTypes:        func(PortMappingTypes) {},
 			bindToDevice:               func(int) error { return nil },
+
+			// Minimize the delay before proxies reannounce after failed
+			// connections, which may occur.
+			webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
+			proxyRelayInactivityTimeout:    5 * time.Second,
 		}
 
 		// Each proxy has its own broker client
@@ -433,9 +450,11 @@ func runTestInproxy(doMustUpgrade bool) error {
 		runCtx, cancelRun := context.WithCancel(testCtx)
 		// No deferred cancelRun due to testGroup.Go below
 
+		name := fmt.Sprintf("proxy-%d", i)
+
 		proxy, err := NewProxy(&ProxyConfig{
 
-			Logger: logger,
+			Logger: newTestLoggerWithComponent(name),
 
 			WaitForNetworkConnectivity: func() bool {
 				return true
@@ -466,8 +485,8 @@ func runTestInproxy(doMustUpgrade bool) error {
 			ActivityUpdater: func(connectingClients int32, connectedClients int32,
 				bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
 
-				fmt.Printf("[%s] ACTIVITY: %d connecting, %d connected, %d up, %d down\n",
-					time.Now().UTC().Format(time.RFC3339),
+				fmt.Printf("[%s][%s] ACTIVITY: %d connecting, %d connected, %d up, %d down\n",
+					time.Now().UTC().Format(time.RFC3339), name,
 					connectingClients, connectedClients, bytesUp, bytesDown)
 			},
 
@@ -510,13 +529,15 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 	// Start clients
 
+	var completedClientCount atomic.Int64
+
 	logger.WithTrace().Info("START CLIENTS")
 
 	clientsGroup := new(errgroup.Group)
 
 	makeClientFunc := func(
+		clientNum int,
 		isTCP bool,
-		isMobile bool,
 		brokerClient *BrokerClient,
 		webRTCCoordinator WebRTCDialCoordinator) func() error {
 
@@ -534,13 +555,15 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 		return func() error {
 
+			name := fmt.Sprintf("client-%d", clientNum)
+
 			dialCtx, cancelDial := context.WithTimeout(testCtx, 60*time.Second)
 			defer cancelDial()
 
 			conn, err := DialClient(
 				dialCtx,
 				&ClientConfig{
-					Logger:                       logger,
+					Logger:                       newTestLoggerWithComponent(name),
 					BaseAPIParameters:            baseAPIParameters,
 					BrokerClient:                 brokerClient,
 					WebRTCDialCoordinator:        webRTCCoordinator,
@@ -561,12 +584,22 @@ func runTestInproxy(doMustUpgrade bool) error {
 			relayConn = conn
 
 			if wrapWithQUIC {
+				disablePathMTUDiscovery := true
 				quicConn, err := quic.Dial(
 					dialCtx,
 					conn,
 					&net.UDPAddr{Port: 1}, // This address is ignored, but the zero value is not allowed
-					"test", "QUICv1", nil, quicEchoServer.ObfuscationKey(), nil, nil, true,
-					false, false, common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), ""),
+					"test",
+					"QUICv1",
+					nil,
+					quicEchoServer.ObfuscationKey(),
+					nil,
+					nil,
+					disablePathMTUDiscovery,
+					GetQUICMaxPacketSizeAdjustment(false),
+					false,
+					false,
+					common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), ""),
 				)
 				if err != nil {
 					return errors.Trace(err)
@@ -620,7 +653,8 @@ func runTestInproxy(doMustUpgrade bool) error {
 					}
 					n += m
 				}
-				fmt.Printf("%d bytes sent\n", bytesToSend)
+				fmt.Printf("[%s][%s] %d bytes sent\n",
+					time.Now().UTC().Format(time.RFC3339), name, bytesToSend)
 				return nil
 			})
 
@@ -639,13 +673,21 @@ func runTestInproxy(doMustUpgrade bool) error {
 					}
 					n += m
 				}
-				fmt.Printf("%d bytes received\n", bytesToSend)
+
+				completed := completedClientCount.Add(1)
+
+				fmt.Printf("[%s][%s] %d bytes received; relay complete (%d/%d)\n",
+					time.Now().UTC().Format(time.RFC3339), name,
+					bytesToSend, completed, numClients)
 
 				select {
 				case <-signalRelayComplete:
 				case <-testCtx.Done():
 				}
 
+				fmt.Printf("[%s][%s] closing\n",
+					time.Now().UTC().Format(time.RFC3339), name)
+
 				relayConn.Close()
 				conn.Close()
 
@@ -656,16 +698,11 @@ func runTestInproxy(doMustUpgrade bool) error {
 		}
 	}
 
-	newClientParams := func(isMobile bool) (*BrokerClient, *testWebRTCDialCoordinator, error) {
+	newClientBrokerClient := func() (*BrokerClient, error) {
 
 		clientPrivateKey, err := GenerateSessionPrivateKey()
 		if err != nil {
-			return nil, nil, errors.Trace(err)
-		}
-
-		clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
-		if err != nil {
-			return nil, nil, errors.Trace(err)
+			return nil, errors.Trace(err)
 		}
 
 		brokerCoordinator := &testBrokerDialCoordinator{
@@ -684,6 +721,50 @@ func runTestInproxy(doMustUpgrade bool) error {
 			brokerClientNoMatch:               noMatch,
 		}
 
+		brokerClient, err := NewBrokerClient(brokerCoordinator)
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		return brokerClient, nil
+	}
+
+	newClientWebRTCDialCoordinator := func(
+		isMobile bool,
+		useMediaStreams bool) (*testWebRTCDialCoordinator, error) {
+
+		clientRootObfuscationSecret, err := GenerateRootObfuscationSecret()
+		if err != nil {
+			return nil, errors.Trace(err)
+		}
+
+		var trafficShapingParameters *TrafficShapingParameters
+		if useMediaStreams {
+			trafficShapingParameters = &TrafficShapingParameters{
+				MinPaddedMessages:       0,
+				MaxPaddedMessages:       10,
+				MinPaddingSize:          0,
+				MaxPaddingSize:          254,
+				MinDecoyMessages:        0,
+				MaxDecoyMessages:        10,
+				MinDecoySize:            1,
+				MaxDecoySize:            1200,
+				DecoyMessageProbability: 0.5,
+			}
+		} else {
+			trafficShapingParameters = &TrafficShapingParameters{
+				MinPaddedMessages:       0,
+				MaxPaddedMessages:       10,
+				MinPaddingSize:          0,
+				MaxPaddingSize:          1500,
+				MinDecoyMessages:        0,
+				MaxDecoyMessages:        10,
+				MinDecoySize:            1,
+				MaxDecoySize:            1500,
+				DecoyMessageProbability: 0.5,
+			}
+		}
+
 		webRTCCoordinator := &testWebRTCDialCoordinator{
 			networkID:   testNetworkID,
 			networkType: testNetworkType,
@@ -697,27 +778,18 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 			clientRootObfuscationSecret: clientRootObfuscationSecret,
 			doDTLSRandomization:         prng.FlipCoin(),
-			trafficShapingParameters: &DataChannelTrafficShapingParameters{
-				MinPaddedMessages:       0,
-				MaxPaddedMessages:       10,
-				MinPaddingSize:          0,
-				MaxPaddingSize:          1500,
-				MinDecoyMessages:        0,
-				MaxDecoyMessages:        10,
-				MinDecoySize:            1,
-				MaxDecoySize:            1500,
-				DecoyMessageProbability: 0.5,
-			},
+			useMediaStreams:             useMediaStreams,
+			trafficShapingParameters:    trafficShapingParameters,
 
 			setNATType:          func(NATType) {},
 			setPortMappingTypes: func(PortMappingTypes) {},
 			bindToDevice:        func(int) error { return nil },
 
 			// With STUN enabled (testDisableSTUN = false), there are cases
-			// where the WebRTC Data Channel is not successfully established.
-			// With a short enough timeout here, clients will redial and
-			// eventually succceed.
-			webRTCAwaitDataChannelTimeout: 5 * time.Second,
+			// where the WebRTC peer connection is not successfully
+			// established. With a short enough timeout here, clients will
+			// redial and eventually succceed.
+			webRTCAwaitReadyToProxyTimeout: 5 * time.Second,
 		}
 
 		if isMobile {
@@ -725,20 +797,10 @@ func runTestInproxy(doMustUpgrade bool) error {
 			webRTCCoordinator.disableInboundForMobileNetworks = true
 		}
 
-		brokerClient, err := NewBrokerClient(brokerCoordinator)
-		if err != nil {
-			return nil, nil, errors.Trace(err)
-		}
-
-		return brokerClient, webRTCCoordinator, nil
+		return webRTCCoordinator, nil
 	}
 
-	clientBrokerClient, clientWebRTCCoordinator, err := newClientParams(false)
-	if err != nil {
-		return errors.Trace(err)
-	}
-
-	clientMobileBrokerClient, clientMobileWebRTCCoordinator, err := newClientParams(true)
+	sharedBrokerClient, err := newClientBrokerClient()
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -750,29 +812,30 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 		isTCP := i%2 == 0
 		isMobile := i%4 == 0
+		useMediaStreams := i%4 < 2
 
 		// Exercise BrokerClients shared by multiple clients, but also create
 		// several broker clients.
-		if i%8 == 0 {
-			clientBrokerClient, clientWebRTCCoordinator, err = newClientParams(false)
-			if err != nil {
-				return errors.Trace(err)
-			}
-
-			clientMobileBrokerClient, clientMobileWebRTCCoordinator, err = newClientParams(true)
+		brokerClient := sharedBrokerClient
+		if i%2 == 0 {
+			brokerClient, err = newClientBrokerClient()
 			if err != nil {
 				return errors.Trace(err)
 			}
 		}
 
-		brokerClient := clientBrokerClient
-		webRTCCoordinator := clientWebRTCCoordinator
-		if isMobile {
-			brokerClient = clientMobileBrokerClient
-			webRTCCoordinator = clientMobileWebRTCCoordinator
+		webRTCCoordinator, err := newClientWebRTCDialCoordinator(
+			isMobile, useMediaStreams)
+		if err != nil {
+			return errors.Trace(err)
 		}
 
-		clientsGroup.Go(makeClientFunc(isTCP, isMobile, brokerClient, webRTCCoordinator))
+		clientsGroup.Go(
+			makeClientFunc(
+				i,
+				isTCP,
+				brokerClient,
+				webRTCCoordinator))
 	}
 
 	if doMustUpgrade {
@@ -1014,6 +1077,7 @@ func newQuicEchoServer() (*quicEchoServer, error) {
 		nil,
 		nil,
 		"127.0.0.1:0",
+		GetQUICMaxPacketSizeAdjustment(false),
 		obfuscationKey,
 		false)
 	if err != nil {

+ 38 - 14
psiphon/common/inproxy/matcher.go

@@ -41,6 +41,7 @@ const (
 	matcherPendingAnswersTTL        = 30 * time.Second
 	matcherPendingAnswersMaxSize    = 100000
 	matcherMaxPreferredNATProbe     = 100
+	matcherMaxProbe                 = 1000
 
 	matcherRateLimiterReapHistoryFrequencySeconds = 300
 	matcherRateLimiterMaxCacheEntries             = 1000000
@@ -112,9 +113,10 @@ type Matcher struct {
 }
 
 // MatchProperties specifies the compartment, GeoIP, and network topology
-// matching roperties of clients and proxies.
+// matching properties of clients and proxies.
 type MatchProperties struct {
 	IsPriority             bool
+	ProtocolVersion        int32
 	CommonCompartmentIDs   []ID
 	PersonalCompartmentIDs []ID
 	GeoIPData              common.GeoIPData
@@ -173,11 +175,11 @@ type MatchAnnouncement struct {
 // MatchOffer is a client offer to be queued for matching.
 type MatchOffer struct {
 	Properties                  MatchProperties
-	ClientProxyProtocolVersion  int32
 	ClientOfferSDP              WebRTCSessionDescription
 	ClientRootObfuscationSecret ObfuscationSecret
 	DoDTLSRandomization         bool
-	TrafficShapingParameters    *DataChannelTrafficShapingParameters
+	UseMediaStreams             bool
+	TrafficShapingParameters    *TrafficShapingParameters
 	NetworkProtocol             NetworkProtocol
 	DestinationAddress          string
 	DestinationServerID         string
@@ -186,11 +188,10 @@ type MatchOffer struct {
 // MatchAnswer is a proxy answer, the proxy's follow up to a matched
 // announcement, to be routed to the awaiting client offer.
 type MatchAnswer struct {
-	ProxyIP                      string
-	ProxyID                      ID
-	ConnectionID                 ID
-	SelectedProxyProtocolVersion int32
-	ProxyAnswerSDP               WebRTCSessionDescription
+	ProxyIP        string
+	ProxyID        ID
+	ConnectionID   ID
+	ProxyAnswerSDP WebRTCSessionDescription
 }
 
 // MatchMetrics records statistics about the match queue state at the time a
@@ -740,7 +741,7 @@ func (m *Matcher) matchAllOffers() {
 
 func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
-	// Assumes the caller has the queue mutexed locked.
+	// Assumes the caller has the queue mutexes locked.
 
 	// Check each candidate announcement in turn, and select a match. There is
 	// an implicit preference for older proxy announcements, sooner to
@@ -756,10 +757,6 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 	// rules, such as a configuration encoding knowledge of an ASN's NAT
 	// type, or preferred client/proxy country/ASN matches.
 
-	// TODO: match supported protocol versions. Currently, all announces and
-	// offers must specify ProxyProtocolVersion1, so there's no protocol
-	// version match logic.
-
 	offerProperties := &offerEntry.offer.Properties
 
 	// Assumes the caller checks that offer specifies either personal
@@ -789,13 +786,28 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 		partiallyLimitedNATCount > 0,
 		strictlyLimitedNATCount > 0)
 
+	// TODO: add an ExistsCompatibleProtocolVersionMatch check?
+	//
+	// Currently, searching for protocol version support that doesn't exist
+	// may be mitigated by limiting, through tactics, client protocol options
+	// selection; using the proxy protocol version in PrioritizeProxy; and,
+	// ultimately, increasing MinimumProxyProtocolVersion.
+
 	var bestMatch *announcementEntry
 	bestMatchIndex := -1
 	bestMatchIsPriority := false
 	bestMatchNAT := false
 
+	// matcherMaxProbe limits the linear search through the announcement queue
+	// to find a match. Currently, the queue implementation provides
+	// constant-time lookup for matching compartment IDs. Other matching
+	// aspects may require iterating over the queue items, including the
+	// strict same-country and ASN constraint and protocol version
+	// compatibility constraint. Best NAT match is not a strict constraint
+	// and uses a shorter search limit, matcherMaxPreferredNATProbe.
+
 	candidateIndex := -1
-	for {
+	for candidateIndex <= matcherMaxProbe {
 
 		announcementEntry, isPriority := matchIterator.getNext()
 		if announcementEntry == nil {
@@ -825,6 +837,18 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (*announcementEntry, int) {
 
 		announcementProperties := &announcementEntry.announcement.Properties
 
+		// Don't match unless the proxy announcement, client offer, and the
+		// client's selected protocol options are compatible. UseMediaStreams
+		// requires at least ProtocolVersion2.
+
+		_, ok := negotiateProtocolVersion(
+			announcementProperties.ProtocolVersion,
+			offerProperties.ProtocolVersion,
+			offerEntry.offer.UseMediaStreams)
+		if !ok {
+			continue
+		}
+
 		// Disallow matching the same country and ASN, except for personal
 		// compartment ID matches.
 		//

+ 105 - 26
psiphon/common/inproxy/matcher_test.go

@@ -83,10 +83,10 @@ func runTestMatcher() error {
 		}
 	}
 
-	makeOffer := func(properties *MatchProperties) *MatchOffer {
+	makeOffer := func(properties *MatchProperties, useMediaStreams bool) *MatchOffer {
 		return &MatchOffer{
-			Properties:                 *properties,
-			ClientProxyProtocolVersion: ProxyProtocolVersion1,
+			Properties:      *properties,
+			UseMediaStreams: useMediaStreams,
 		}
 	}
 
@@ -115,12 +115,19 @@ func runTestMatcher() error {
 		if err != nil {
 			resultChan <- errors.Trace(err)
 			return
-		} else {
-			err := checkMatchMetrics(matchMetrics)
-			if err != nil {
-				resultChan <- errors.Trace(err)
-				return
-			}
+		}
+		err = checkMatchMetrics(matchMetrics)
+		if err != nil {
+			resultChan <- errors.Trace(err)
+			return
+		}
+		_, ok := negotiateProtocolVersion(
+			matchProperties.ProtocolVersion,
+			offer.Properties.ProtocolVersion,
+			offer.UseMediaStreams)
+		if !ok {
+			resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
+			return
 		}
 
 		if waitBeforeAnswer != nil {
@@ -130,9 +137,8 @@ func runTestMatcher() error {
 		if answerSuccess {
 			err = m.Answer(
 				&MatchAnswer{
-					ProxyID:                      announcement.ProxyID,
-					ConnectionID:                 announcement.ConnectionID,
-					SelectedProxyProtocolVersion: offer.ClientProxyProtocolVersion,
+					ProxyID:      announcement.ProxyID,
+					ConnectionID: announcement.ConnectionID,
 				})
 		} else {
 			m.AnswerError(announcement.ProxyID, announcement.ConnectionID)
@@ -142,39 +148,55 @@ func runTestMatcher() error {
 
 	clientIP := randomIPAddress()
 
-	clientFunc := func(
+	baseClientFunc := func(
 		resultChan chan error,
 		clientIP string,
 		matchProperties *MatchProperties,
+		useMediaStreams bool,
 		timeout time.Duration) {
 
 		ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
 		defer cancelFunc()
 
-		offer := makeOffer(matchProperties)
-		answer, _, matchMetrics, err := m.Offer(ctx, clientIP, offer)
+		offer := makeOffer(matchProperties, useMediaStreams)
+		_, matchAnnouncement, matchMetrics, err := m.Offer(ctx, clientIP, offer)
 		if err != nil {
 			resultChan <- errors.Trace(err)
 			return
 		}
-		if answer.SelectedProxyProtocolVersion != offer.ClientProxyProtocolVersion {
-			resultChan <- errors.TraceNew("unexpected selected proxy protocol version")
+		err = checkMatchMetrics(matchMetrics)
+		if err != nil {
+			resultChan <- errors.Trace(err)
 			return
-		} else {
-			err := checkMatchMetrics(matchMetrics)
-			if err != nil {
-				resultChan <- errors.Trace(err)
-				return
-			}
 		}
+		_, ok := negotiateProtocolVersion(
+			matchAnnouncement.Properties.ProtocolVersion,
+			offer.Properties.ProtocolVersion,
+			offer.UseMediaStreams)
+		if !ok {
+			resultChan <- errors.TraceNew("unexpected negotiateProtocolVersion failure")
+			return
+		}
+
 		resultChan <- nil
 	}
 
+	clientFunc := func(resultChan chan error, clientIP string,
+		matchProperties *MatchProperties, timeout time.Duration) {
+		baseClientFunc(resultChan, clientIP, matchProperties, false, timeout)
+	}
+
+	clientUsingMediaStreamsFunc := func(resultChan chan error, clientIP string,
+		matchProperties *MatchProperties, timeout time.Duration) {
+		baseClientFunc(resultChan, clientIP, matchProperties, true, timeout)
+	}
+
 	// Test: announce timeout
 
 	proxyResultChan := make(chan error)
 
 	matchProperties := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		CommonCompartmentIDs: []ID{makeID()},
 	}
 
@@ -373,11 +395,13 @@ func runTestMatcher() error {
 	commonCompartmentIDs := []ID{makeID()}
 
 	geoIPData1 := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            common.GeoIPData{Country: "C1", ASN: "A1"},
 		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	geoIPData2 := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            common.GeoIPData{Country: "C2", ASN: "A2"},
 		CommonCompartmentIDs: commonCompartmentIDs,
 	}
@@ -432,21 +456,25 @@ func runTestMatcher() error {
 	// Test: no compartment match
 
 	compartment1 := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            geoIPData1.GeoIPData,
 		CommonCompartmentIDs: []ID{makeID()},
 	}
 
 	compartment2 := &MatchProperties{
+		ProtocolVersion:        LatestProtocolVersion,
 		GeoIPData:              geoIPData2.GeoIPData,
 		PersonalCompartmentIDs: []ID{makeID()},
 	}
 
 	compartment3 := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            geoIPData2.GeoIPData,
 		CommonCompartmentIDs: []ID{makeID()},
 	}
 
 	compartment4 := &MatchProperties{
+		ProtocolVersion:        LatestProtocolVersion,
 		GeoIPData:              geoIPData2.GeoIPData,
 		PersonalCompartmentIDs: []ID{makeID()},
 	}
@@ -484,7 +512,8 @@ func runTestMatcher() error {
 	// Test: common compartment match
 
 	compartment1And3 := &MatchProperties{
-		GeoIPData: geoIPData2.GeoIPData,
+		ProtocolVersion: LatestProtocolVersion,
+		GeoIPData:       geoIPData2.GeoIPData,
 		CommonCompartmentIDs: []ID{
 			compartment1.CommonCompartmentIDs[0],
 			compartment3.CommonCompartmentIDs[0]},
@@ -506,7 +535,8 @@ func runTestMatcher() error {
 	// Test: personal compartment match
 
 	compartment2And4 := &MatchProperties{
-		GeoIPData: geoIPData2.GeoIPData,
+		ProtocolVersion: LatestProtocolVersion,
+		GeoIPData:       geoIPData2.GeoIPData,
 		PersonalCompartmentIDs: []ID{
 			compartment2.PersonalCompartmentIDs[0],
 			compartment4.PersonalCompartmentIDs[0]},
@@ -540,27 +570,75 @@ func runTestMatcher() error {
 		return errors.Tracef("unexpected result: %v", err)
 	}
 
+	// Test: downgrade-compatible protocol version match
+
+	protocolVersion1 := &MatchProperties{
+		ProtocolVersion:      ProtocolVersion1,
+		GeoIPData:            common.GeoIPData{Country: "C1", ASN: "A1"},
+		CommonCompartmentIDs: commonCompartmentIDs,
+	}
+
+	protocolVersion2 := &MatchProperties{
+		ProtocolVersion:      ProtocolVersion2,
+		GeoIPData:            common.GeoIPData{Country: "C2", ASN: "A2"},
+		CommonCompartmentIDs: commonCompartmentIDs,
+	}
+
+	go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
+	go clientFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
+
+	err = <-proxyResultChan
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	err = <-clientResultChan
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	// Test: no incompatible protocol version match
+
+	go proxyFunc(proxyResultChan, proxyIP, protocolVersion1, 10*time.Millisecond, nil, true)
+	go clientUsingMediaStreamsFunc(clientResultChan, clientIP, protocolVersion2, 10*time.Millisecond)
+
+	err = <-proxyResultChan
+	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
+		return errors.Tracef("unexpected result: %v", err)
+	}
+
+	err = <-clientResultChan
+	if err == nil || !strings.HasSuffix(err.Error(), "context deadline exceeded") {
+		return errors.Tracef("unexpected result: %v", err)
+	}
+
+	// Test: downgrade-compatible protocol version match
+
 	// Test: proxy preferred NAT match
 
 	client1Properties := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            common.GeoIPData{Country: "C1", ASN: "A1"},
 		NATType:              NATTypeFullCone,
 		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	client2Properties := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            common.GeoIPData{Country: "C2", ASN: "A2"},
 		NATType:              NATTypeSymmetric,
 		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	proxy1Properties := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            common.GeoIPData{Country: "C3", ASN: "A3"},
 		NATType:              NATTypeNone,
 		CommonCompartmentIDs: commonCompartmentIDs,
 	}
 
 	proxy2Properties := &MatchProperties{
+		ProtocolVersion:      LatestProtocolVersion,
 		GeoIPData:            common.GeoIPData{Country: "C4", ASN: "A4"},
 		NATType:              NATTypeSymmetric,
 		CommonCompartmentIDs: commonCompartmentIDs,
@@ -1095,6 +1173,7 @@ func BenchmarkMatcherQueue(b *testing.B) {
 						limitIP: "127.0.0.1",
 						announcement: &MatchAnnouncement{
 							Properties: MatchProperties{
+								ProtocolVersion:        LatestProtocolVersion,
 								PersonalCompartmentIDs: []ID{personalCompartmentID},
 								GeoIPData:              common.GeoIPData{},
 								NetworkType:            NetworkTypeWiFi,
@@ -1111,13 +1190,13 @@ func BenchmarkMatcherQueue(b *testing.B) {
 					limitIP: "127.0.0.1",
 					offer: &MatchOffer{
 						Properties: MatchProperties{
+							ProtocolVersion:        LatestProtocolVersion,
 							PersonalCompartmentIDs: []ID{personalCompartmentID},
 							GeoIPData:              common.GeoIPData{},
 							NetworkType:            NetworkTypeWiFi,
 							NATType:                NATTypePortRestrictedCone,
 							PortMappingTypes:       []PortMappingType{},
 						},
-						ClientProxyProtocolVersion: ProxyProtocolVersion1,
 					},
 					answerChan: make(chan *answerInfo, 1),
 				}

+ 1 - 1
psiphon/common/inproxy/portmapper_other.go

@@ -23,6 +23,6 @@ package inproxy
 
 func setPortMapperBindToDevice(_ WebRTCDialCoordinator) {
 	// BindToDevice is not applied on iOS as tailscale.com/net/netns does not
-	// have an equivilent to SetAndroidProtectFunc for iOS. At this time,
+	// have an equivalent to SetAndroidProtectFunc for iOS. At this time,
 	// BindToDevice operations on iOS are legacy code and not required.
 }

+ 25 - 19
psiphon/common/inproxy/proxy.go

@@ -720,13 +720,15 @@ func (p *Proxy) proxyOneClient(
 
 	}
 
-	if announceResponse.ClientProxyProtocolVersion != ProxyProtocolVersion1 {
-		// This case is currently unexpected, as all clients and proxies use
-		// ProxyProtocolVersion1.
+	if announceResponse.SelectedProtocolVersion < ProtocolVersion1 ||
+		(announceResponse.UseMediaStreams &&
+			announceResponse.SelectedProtocolVersion < ProtocolVersion2) ||
+		announceResponse.SelectedProtocolVersion > LatestProtocolVersion {
+
 		backOff = true
 		return backOff, errors.Tracef(
-			"Unsupported proxy protocol version: %d",
-			announceResponse.ClientProxyProtocolVersion)
+			"unsupported protocol version: %d",
+			announceResponse.SelectedProtocolVersion)
 	}
 
 	// Trigger back-off if the following WebRTC operations fail to establish a
@@ -767,7 +769,16 @@ func (p *Proxy) proxyOneClient(
 			WebRTCDialCoordinator:       webRTCCoordinator,
 			ClientRootObfuscationSecret: announceResponse.ClientRootObfuscationSecret,
 			DoDTLSRandomization:         announceResponse.DoDTLSRandomization,
+			UseMediaStreams:             announceResponse.UseMediaStreams,
 			TrafficShapingParameters:    announceResponse.TrafficShapingParameters,
+
+			// In media stream mode, this flag indicates to the proxy that it
+			// should add the QUIC-based reliability layer wrapping to media
+			// streams. In data channel mode, this flag is ignored, since the
+			// client configures the data channel using
+			// webrtc.DataChannelInit.Ordered, and this configuration is sent
+			// to the proxy in the client's SDP.
+			ReliableTransport: announceResponse.NetworkProtocol == NetworkProtocolTCP,
 		},
 		announceResponse.ClientOfferSDP,
 		hasPersonalCompartmentIDs)
@@ -788,11 +799,10 @@ func (p *Proxy) proxyOneClient(
 	_, err = brokerClient.ProxyAnswer(
 		ctx,
 		&ProxyAnswerRequest{
-			ConnectionID:                 announceResponse.ConnectionID,
-			SelectedProxyProtocolVersion: announceResponse.ClientProxyProtocolVersion,
-			ProxyAnswerSDP:               SDP,
-			ICECandidateTypes:            sdpMetrics.iceCandidateTypes,
-			AnswerError:                  webRTCRequestErr,
+			ConnectionID:      announceResponse.ConnectionID,
+			ProxyAnswerSDP:    SDP,
+			ICECandidateTypes: sdpMetrics.iceCandidateTypes,
+			AnswerError:       webRTCRequestErr,
 		})
 	if err != nil {
 		if webRTCErr != nil {
@@ -818,21 +828,17 @@ func (p *Proxy) proxyOneClient(
 	// create wasted load on destination Psiphon servers, particularly when
 	// WebRTC connections fail.
 
-	awaitDataChannelCtx, awaitDataChannelCancelFunc := context.WithTimeout(
+	awaitReadyToProxyCtx, awaitReadyToProxyCancelFunc := context.WithTimeout(
 		ctx,
 		common.ValueOrDefault(
-			webRTCCoordinator.WebRTCAwaitDataChannelTimeout(), dataChannelAwaitTimeout))
-	defer awaitDataChannelCancelFunc()
+			webRTCCoordinator.WebRTCAwaitReadyToProxyTimeout(), readyToProxyAwaitTimeout))
+	defer awaitReadyToProxyCancelFunc()
 
-	err = webRTCConn.AwaitInitialDataChannel(awaitDataChannelCtx)
+	err = webRTCConn.AwaitReadyToProxy(awaitReadyToProxyCtx, announceResponse.ConnectionID)
 	if err != nil {
 		return backOff, errors.Trace(err)
 	}
 
-	p.config.Logger.WithTraceFields(common.LogFields{
-		"connectionID": announceResponse.ConnectionID,
-	}).Info("WebRTC data channel established")
-
 	// Dial the destination, a Psiphon server. The broker validates that the
 	// dial destination is a Psiphon server.
 
@@ -1028,7 +1034,7 @@ func (p *Proxy) getMetrics(
 
 	return &ProxyMetrics{
 		BaseAPIParameters:             packedParams,
-		ProxyProtocolVersion:          proxyProtocolVersion,
+		ProtocolVersion:               LatestProtocolVersion,
 		NATType:                       webRTCCoordinator.NATType(),
 		PortMappingTypes:              webRTCCoordinator.PortMappingTypes(),
 		MaxClients:                    int32(p.config.MaxClients),

+ 5 - 5
psiphon/common/inproxy/session_test.go

@@ -712,11 +712,11 @@ func runTestNoise() error {
 	if receivedPayload == nil {
 		return errors.TraceNew("missing payload")
 	}
-	if bytes.Compare(sendPayload, receivedPayload) != 0 {
+	if !bytes.Equal(sendPayload, receivedPayload) {
 		return errors.TraceNew("incorrect payload")
 	}
 
-	if bytes.Compare(responderHandshake.PeerStatic(), initiatorKeys.Public) != 0 {
+	if !bytes.Equal(responderHandshake.PeerStatic(), initiatorKeys.Public) {
 		return errors.TraceNew("unexpected initiator static public key")
 	}
 
@@ -738,7 +738,7 @@ func runTestNoise() error {
 	if !initiatorReplay.ValidateCounter(nonce, math.MaxUint64) {
 		return errors.TraceNew("replay detected")
 	}
-	if bytes.Compare(sendPayload, receivedPayload) != 0 {
+	if !bytes.Equal(sendPayload, receivedPayload) {
 		return errors.TraceNew("incorrect payload")
 	}
 
@@ -764,7 +764,7 @@ func runTestNoise() error {
 		if !responderReplay.ValidateCounter(nonce, math.MaxUint64) {
 			return errors.TraceNew("replay detected")
 		}
-		if bytes.Compare(sendPayload, receivedPayload) != 0 {
+		if !bytes.Equal(sendPayload, receivedPayload) {
 			return errors.TraceNew("incorrect payload")
 		}
 
@@ -786,7 +786,7 @@ func runTestNoise() error {
 		if !initiatorReplay.ValidateCounter(nonce, math.MaxUint64) {
 			return errors.TraceNew("replay detected")
 		}
-		if bytes.Compare(sendPayload, receivedPayload) != 0 {
+		if !bytes.Equal(sendPayload, receivedPayload) {
 			return errors.TraceNew("incorrect payload")
 		}
 	}

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 433 - 429
psiphon/common/inproxy/webrtc.go


+ 4 - 4
psiphon/common/parameters/inproxy.go

@@ -82,9 +82,9 @@ func (IDs InproxyCompartmentIDsValue) Validate(checkCompartmentIDList *[]string)
 	return nil
 }
 
-// InproxyDataChannelTrafficShapingParameters is type-compatible with
-// common/inproxy.DataChannelTrafficShapingParameters.
-type InproxyDataChannelTrafficShapingParametersValue struct {
+// InproxyTrafficShapingParametersValue is type-compatible with
+// common/inproxy.TrafficShapingParameters.
+type InproxyTrafficShapingParametersValue struct {
 	MinPaddedMessages       int
 	MaxPaddedMessages       int
 	MinPaddingSize          int
@@ -96,7 +96,7 @@ type InproxyDataChannelTrafficShapingParametersValue struct {
 	DecoyMessageProbability float64
 }
 
-func (p *InproxyDataChannelTrafficShapingParametersValue) Validate() error {
+func (p *InproxyTrafficShapingParametersValue) Validate() error {
 	if p.MinPaddedMessages < 0 ||
 		p.MaxPaddedMessages < 0 ||
 		p.MinPaddingSize < 0 ||

+ 24 - 16
psiphon/common/parameters/parameters.go

@@ -421,6 +421,7 @@ const (
 	InproxyBrokerMatcherOfferRateLimitInterval         = "InproxyBrokerMatcherOfferRateLimitInterval"
 	InproxyBrokerMatcherPrioritizeProxiesProbability   = "InproxyBrokerMatcherPrioritizeProxiesProbability"
 	InproxyBrokerMatcherPrioritizeProxiesFilter        = "InproxyBrokerMatcherPrioritizeProxiesFilter"
+	InproxyBrokerMatcherPrioritizeProxiesMinVersion    = "InproxyBrokerMatcherPrioritizeProxiesMinVersion"
 	InproxyBrokerProxyAnnounceTimeout                  = "InproxyBrokerProxyAnnounceTimeout"
 	InproxyBrokerClientOfferTimeout                    = "InproxyBrokerClientOfferTimeout"
 	InproxyBrokerClientOfferPersonalTimeout            = "InproxyBrokerClientOfferPersonalTimeout"
@@ -438,8 +439,11 @@ const (
 	InproxyClientRelayedPacketRequestTimeout           = "InproxyCloientRelayedPacketRequestTimeout"
 	InproxyBrokerRoundTripStatusCodeFailureThreshold   = "InproxyBrokerRoundTripStatusCodeFailureThreshold"
 	InproxyDTLSRandomizationProbability                = "InproxyDTLSRandomizationProbability"
-	InproxyDataChannelTrafficShapingProbability        = "InproxyDataChannelTrafficShapingProbability"
-	InproxyDataChannelTrafficShapingParameters         = "InproxyDataChannelTrafficShapingParameters"
+	InproxyWebRTCMediaStreamsProbability               = "InproxyWebRTCMediaStreamsProbability"
+	InproxyWebRTCDataChannelTrafficShapingProbability  = "InproxyWebRTCDataChannelTrafficShapingProbability"
+	InproxyWebRTCDataChannelTrafficShapingParameters   = "InproxyWebRTCDataChannelTrafficShapingParameters"
+	InproxyWebRTCMediaStreamsTrafficShapingProbability = "InproxyWebRTCMediaStreamsTrafficShapingProbability"
+	InproxyWebRTCMediaStreamsTrafficShapingParameters  = "InproxyWebRTCMediaStreamsTrafficShapingParameters"
 	InproxySTUNServerAddresses                         = "InproxySTUNServerAddresses"
 	InproxySTUNServerAddressesRFC5780                  = "InproxySTUNServerAddressesRFC5780"
 	InproxyProxySTUNServerAddresses                    = "InproxyProxySTUNServerAddresses"
@@ -463,8 +467,8 @@ const (
 	InproxyClientDiscoverNATTimeout                    = "InproxyClientDiscoverNATTimeout"
 	InproxyWebRTCAnswerTimeout                         = "InproxyWebRTCAnswerTimeout"
 	InproxyWebRTCAwaitPortMappingTimeout               = "InproxyWebRTCAwaitPortMappingTimeout"
-	InproxyProxyWebRTCAwaitDataChannelTimeout          = "InproxyProxyWebRTCAwaitDataChannelTimeout"
-	InproxyClientWebRTCAwaitDataChannelTimeout         = "InproxyClientWebRTCAwaitDataChannelTimeout"
+	InproxyProxyWebRTCAwaitReadyToProxyTimeout         = "InproxyProxyWebRTCAwaitReadyToProxyTimeout"
+	InproxyClientWebRTCAwaitReadyToProxyTimeout        = "InproxyClientWebRTCAwaitReadyToProxyTimeout"
 	InproxyProxyDestinationDialTimeout                 = "InproxyProxyDestinationDialTimeout"
 	InproxyProxyRelayInactivityTimeout                 = "InproxyProxyRelayInactivityTimeout"
 	InproxyPsiphonAPIRequestTimeout                    = "InproxyPsiphonAPIRequestTimeout"
@@ -962,8 +966,9 @@ var defaultParameters = map[string]struct {
 	InproxyBrokerMatcherOfferLimitEntryCount:           {value: 10, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferRateLimitQuantity:         {value: 50, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferRateLimitInterval:         {value: 1 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
-	InproxyBrokerMatcherPrioritizeProxiesProbability:   {value: 1.0, minimum: 0.0},
-	InproxyBrokerMatcherPrioritizeProxiesFilter:        {value: KeyStrings{}},
+	InproxyBrokerMatcherPrioritizeProxiesProbability:   {value: 1.0, minimum: 0.0, flags: serverSideOnly},
+	InproxyBrokerMatcherPrioritizeProxiesFilter:        {value: KeyStrings{}, flags: serverSideOnly},
+	InproxyBrokerMatcherPrioritizeProxiesMinVersion:    {value: 0, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerProxyAnnounceTimeout:                  {value: 2 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferTimeout:                    {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferPersonalTimeout:            {value: 5 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},
@@ -981,8 +986,11 @@ var defaultParameters = map[string]struct {
 	InproxyClientRelayedPacketRequestTimeout:           {value: 10 * time.Second, minimum: time.Duration(0)},
 	InproxyBrokerRoundTripStatusCodeFailureThreshold:   {value: 2 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyDTLSRandomizationProbability:                {value: 0.5, minimum: 0.0},
-	InproxyDataChannelTrafficShapingProbability:        {value: 0.5, minimum: 0.0},
-	InproxyDataChannelTrafficShapingParameters:         {value: InproxyDataChannelTrafficShapingParametersValue{0, 10, 0, 1500, 0, 10, 1, 1500, 0.5}},
+	InproxyWebRTCMediaStreamsProbability:               {value: 0.0, minimum: 0.0},
+	InproxyWebRTCDataChannelTrafficShapingProbability:  {value: 0.5, minimum: 0.0},
+	InproxyWebRTCDataChannelTrafficShapingParameters:   {value: InproxyTrafficShapingParametersValue{0, 10, 0, 1500, 0, 10, 1, 1500, 0.5}},
+	InproxyWebRTCMediaStreamsTrafficShapingProbability: {value: 0.5, minimum: 0.0},
+	InproxyWebRTCMediaStreamsTrafficShapingParameters:  {value: InproxyTrafficShapingParametersValue{0, 10, 0, 254, 0, 10, 1, 1200, 0.5}},
 	InproxySTUNServerAddresses:                         {value: []string{}},
 	InproxySTUNServerAddressesRFC5780:                  {value: []string{}},
 	InproxyProxySTUNServerAddresses:                    {value: []string{}},
@@ -1006,8 +1014,8 @@ var defaultParameters = map[string]struct {
 	InproxyClientDiscoverNATTimeout:                    {value: 10 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyWebRTCAnswerTimeout:                         {value: 20 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyWebRTCAwaitPortMappingTimeout:               {value: 2 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
-	InproxyProxyWebRTCAwaitDataChannelTimeout:          {value: 30 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
-	InproxyClientWebRTCAwaitDataChannelTimeout:         {value: 20 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
+	InproxyProxyWebRTCAwaitReadyToProxyTimeout:         {value: 30 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
+	InproxyClientWebRTCAwaitReadyToProxyTimeout:        {value: 20 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyProxyDestinationDialTimeout:                 {value: 20 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyProxyRelayInactivityTimeout:                 {value: 5 * time.Minute, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyPsiphonAPIRequestTimeout:                    {value: 10 * time.Second, minimum: 1 * time.Second, flags: useNetworkLatencyMultiplier},
@@ -1566,7 +1574,7 @@ func (p *Parameters) Set(
 					}
 					return nil, errors.Trace(err)
 				}
-			case InproxyDataChannelTrafficShapingParametersValue:
+			case InproxyTrafficShapingParametersValue:
 				err := v.Validate()
 				if err != nil {
 					if skipOnError {
@@ -2184,12 +2192,12 @@ func (p ParametersAccessor) InproxyCompartmentIDs(name string) InproxyCompartmen
 	return value
 }
 
-// InproxyDataChannelTrafficShapingParameters returns a
-// InproxyDataChannelTrafficShapingParameters parameter value.
-func (p ParametersAccessor) InproxyDataChannelTrafficShapingParameters(
-	name string) InproxyDataChannelTrafficShapingParametersValue {
+// InproxyTrafficShapingParameters returns a InproxyTrafficShapingParameters
+// parameter value.
+func (p ParametersAccessor) InproxyTrafficShapingParameters(
+	name string) InproxyTrafficShapingParametersValue {
 
-	value := InproxyDataChannelTrafficShapingParametersValue{}
+	value := InproxyTrafficShapingParametersValue{}
 	p.snapshot.getValue(name, &value)
 	return value
 }

+ 2 - 2
psiphon/common/parameters/parameters_test.go

@@ -220,8 +220,8 @@ func TestGetDefaultParameters(t *testing.T) {
 			if !reflect.DeepEqual(v, g) {
 				t.Fatalf("ConjureTransports returned %+v expected %+v", g, v)
 			}
-		case InproxyDataChannelTrafficShapingParametersValue:
-			g := p.Get().InproxyDataChannelTrafficShapingParameters(name)
+		case InproxyTrafficShapingParametersValue:
+			g := p.Get().InproxyTrafficShapingParameters(name)
 			if !reflect.DeepEqual(v, g) {
 				t.Fatalf("ConjureTransports returned %+v expected %+v", g, v)
 			}

+ 2 - 1
psiphon/common/protocol/packed.go

@@ -825,8 +825,9 @@ func init() {
 		{163, "inproxy_dial_broker_offer_duration", intConverter},
 		{164, "inproxy_dial_webrtc_connection_duration", intConverter},
 		{165, "inproxy_broker_is_reuse", intConverter},
+		{166, "inproxy_webrtc_use_media_streams", intConverter},
 
-		// Next key value = 166
+		// Next key value = 167
 	}
 
 	for _, spec := range packedAPIParameterSpecs {

+ 1 - 0
psiphon/common/quic/obfuscator_test.go

@@ -135,6 +135,7 @@ func runNonceTransformer(t *testing.T, quicVersion string) {
 				TransformSpec: transforms.Spec{{"^.{24}", "ffff00000000000000000000"}},
 			},
 			false,
+			0,
 			false,
 			false, // Disable obfuscated PSK
 			common.WrapClientSessionCache(tls.NewLRUClientSessionCache(0), "test"),

+ 45 - 16
psiphon/common/quic/quic.go

@@ -144,6 +144,7 @@ func Listen(
 	logger common.Logger,
 	irregularTunnelLogger func(string, error, common.LogFields),
 	address string,
+	additionalMaxPacketSizeAdjustment int,
 	obfuscationKey string,
 	enableGQUIC bool) (net.Listener, error) {
 
@@ -249,7 +250,11 @@ func Listen(
 		// pumping read packets though mux channels.
 
 		tlsConfig, ietfQUICConfig, err := makeServerIETFConfig(
-			obfuscatedPacketConn, verifyClientHelloRandom, tlsCertificate, obfuscationKey)
+			obfuscatedPacketConn,
+			additionalMaxPacketSizeAdjustment,
+			verifyClientHelloRandom,
+			tlsCertificate,
+			obfuscationKey)
 
 		if err != nil {
 			obfuscatedPacketConn.Close()
@@ -275,7 +280,12 @@ func Listen(
 		// return and caller calls Accept.
 
 		muxListener, err := newMuxListener(
-			logger, verifyClientHelloRandom, obfuscatedPacketConn, tlsCertificate, obfuscationKey)
+			logger,
+			obfuscatedPacketConn,
+			additionalMaxPacketSizeAdjustment,
+			verifyClientHelloRandom,
+			tlsCertificate,
+			obfuscationKey)
 		if err != nil {
 			obfuscatedPacketConn.Close()
 			return nil, errors.Trace(err)
@@ -293,6 +303,7 @@ func Listen(
 
 func makeServerIETFConfig(
 	conn *ObfuscatedPacketConn,
+	additionalMaxPacketSizeAdjustment int,
 	verifyClientHelloRandom func(net.Addr, []byte) bool,
 	tlsCertificate tls.Certificate,
 	sharedSecret string) (*tls.Config, *ietf_quic.Config, error) {
@@ -325,6 +336,14 @@ func makeServerIETFConfig(
 		})
 	}
 
+	serverMaxPacketSizeAdjustment := conn.serverMaxPacketSizeAdjustment
+	if additionalMaxPacketSizeAdjustment != 0 {
+		serverMaxPacketSizeAdjustment = func(addr net.Addr) int {
+			return conn.serverMaxPacketSizeAdjustment(addr) +
+				additionalMaxPacketSizeAdjustment
+		}
+	}
+
 	ietfQUICConfig := &ietf_quic.Config{
 		Allow0RTT:             true,
 		HandshakeIdleTimeout:  SERVER_HANDSHAKE_TIMEOUT,
@@ -335,7 +354,7 @@ func makeServerIETFConfig(
 		KeepAlivePeriod: CLIENT_IDLE_TIMEOUT / 2,
 
 		VerifyClientHelloRandom:       verifyClientHelloRandom,
-		ServerMaxPacketSizeAdjustment: conn.serverMaxPacketSizeAdjustment,
+		ServerMaxPacketSizeAdjustment: serverMaxPacketSizeAdjustment,
 	}
 
 	return tlsConfig, ietfQUICConfig, nil
@@ -405,6 +424,7 @@ func Dial(
 	obfuscationPaddingSeed *prng.Seed,
 	obfuscationNonceTransformerParameters *transforms.ObfuscatorSeedTransformerParameters,
 	disablePathMTUDiscovery bool,
+	additionalMaxPacketSizeAdjustment int,
 	dialEarly bool,
 	useObfuscatedPSK bool,
 	tlsClientSessionCache *common.TLSClientSessionCacheWrapper) (net.Conn, error) {
@@ -472,7 +492,7 @@ func Dial(
 		}
 	}
 
-	maxPacketSizeAdjustment := 0
+	maxPacketSizeAdjustment := additionalMaxPacketSizeAdjustment
 
 	if isObfuscated(quicVersion) {
 		obfuscatedPacketConn, err := NewClientObfuscatedPacketConn(
@@ -488,9 +508,9 @@ func Dial(
 		}
 		packetConn = obfuscatedPacketConn
 
-		// Reserve space for packet obfuscation overhead so that quic-go will
-		// continue to produce packets of max size 1280.
-		maxPacketSizeAdjustment = OBFUSCATED_MAX_PACKET_SIZE_ADJUSTMENT
+		// Reserve additional space for packet obfuscation overhead so that
+		// quic-go will continue to produce packets of max size 1280.
+		maxPacketSizeAdjustment += OBFUSCATED_MAX_PACKET_SIZE_ADJUSTMENT
 	}
 
 	// As an anti-probing measure, QUIC clients must prove knowledge of the
@@ -525,7 +545,6 @@ func Dial(
 	connection, err := dialQUIC(
 		ctx,
 		packetConn,
-		false,
 		remoteAddr,
 		quicSNIAddress,
 		versionNumber,
@@ -971,7 +990,6 @@ func (t *QUICTransporter) dialQUIC() (retConnection quicConnection, retErr error
 	connection, err := dialQUIC(
 		ctx,
 		packetConn,
-		true,
 		remoteAddr,
 		t.quicSNIAddress,
 		versionNumber,
@@ -1111,6 +1129,13 @@ func (c *ietfQUICConnection) Close() error {
 }
 
 func (c *ietfQUICConnection) isErrorIndicatingClosed(err error) bool {
+	if err == nil {
+		return false
+	}
+	return IsIETFErrorIndicatingClosed(err)
+}
+
+func IsIETFErrorIndicatingClosed(err error) bool {
 	if err == nil {
 		return false
 	}
@@ -1138,13 +1163,12 @@ func (c *ietfQUICConnection) getClientConnMetrics() quicClientConnMetrics {
 func dialQUIC(
 	ctx context.Context,
 	packetConn net.PacketConn,
-	expectNetUDPConn bool,
 	remoteAddr *net.UDPAddr,
 	quicSNIAddress string,
 	versionNumber uint32,
 	clientHelloSeed *prng.Seed,
 	getClientHelloRandom func() ([]byte, error),
-	clientMaxPacketSizeAdjustment int,
+	maxPacketSizeAdjustment int,
 	disablePathMTUDiscovery bool,
 	dialEarly bool,
 	obfuscatedPSKKey string,
@@ -1164,7 +1188,7 @@ func dialQUIC(
 				ietf_quic.VersionNumber(versionNumber)},
 			ClientHelloSeed:               clientHelloSeed,
 			GetClientHelloRandom:          getClientHelloRandom,
-			ClientMaxPacketSizeAdjustment: clientMaxPacketSizeAdjustment,
+			ClientMaxPacketSizeAdjustment: maxPacketSizeAdjustment,
 			DisablePathMTUDiscovery:       disablePathMTUDiscovery,
 		}
 
@@ -1207,7 +1231,7 @@ func dialQUIC(
 			if err != nil {
 				return nil, errors.Trace(err)
 			}
-			ss := tls.MakeClientSessionState(
+			sessionState := tls.MakeClientSessionState(
 				obfuscatedSessionState.SessionTicket,
 				obfuscatedSessionState.Vers,
 				obfuscatedSessionState.CipherSuite,
@@ -1216,7 +1240,7 @@ func dialQUIC(
 				obfuscatedSessionState.AgeAdd,
 				obfuscatedSessionState.UseBy,
 			)
-			tlsClientSessionCache.Put("", ss)
+			tlsClientSessionCache.Put("", sessionState)
 		}
 
 		if dialEarly {
@@ -1400,8 +1424,9 @@ type muxListener struct {
 
 func newMuxListener(
 	logger common.Logger,
-	verifyClientHelloRandom func(net.Addr, []byte) bool,
 	conn *ObfuscatedPacketConn,
+	additionalMaxPacketSizeAdjustment int,
+	verifyClientHelloRandom func(net.Addr, []byte) bool,
 	tlsCertificate tls.Certificate,
 	sharedSecret string) (*muxListener, error) {
 
@@ -1422,7 +1447,11 @@ func newMuxListener(
 	listener.ietfQUICConn = newMuxPacketConn(conn.LocalAddr(), listener)
 
 	tlsConfig, ietfQUICConfig, err := makeServerIETFConfig(
-		conn, verifyClientHelloRandom, tlsCertificate, sharedSecret)
+		conn,
+		additionalMaxPacketSizeAdjustment,
+		verifyClientHelloRandom,
+		tlsCertificate,
+		sharedSecret)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}

+ 4 - 0
psiphon/common/quic/quic_disabled.go

@@ -96,3 +96,7 @@ func NewQUICTransporter(
 
 	return nil, errors.TraceNew("operation is not enabled")
 }
+
+func IsIETFErrorIndicatingClosed(_ error) bool {
+	return false
+}

+ 2 - 0
psiphon/common/quic/quic_test.go

@@ -106,6 +106,7 @@ func runQUIC(
 		nil,
 		irregularTunnelLogger,
 		"127.0.0.1:0",
+		0,
 		obfuscationKey,
 		enableGQUIC)
 	if err != nil {
@@ -216,6 +217,7 @@ func runQUIC(
 				obfuscationPaddingSeed,
 				nil,
 				disablePathMTUDiscovery,
+				0,
 				true,
 				useObfuscatedPSK,
 				clientSessionCache)

+ 50 - 20
psiphon/config.go

@@ -1028,8 +1028,11 @@ type Config struct {
 	InproxyClientOfferRetryJitter                           *float64
 	InproxyClientRelayedPacketRequestTimeoutMilliseconds    *int
 	InproxyDTLSRandomizationProbability                     *float64
-	InproxyDataChannelTrafficShapingProbability             *float64
-	InproxyDataChannelTrafficShapingParameters              *parameters.InproxyDataChannelTrafficShapingParametersValue
+	InproxyWebRTCMediaStreamsProbability                    *float64
+	InproxyWebRTCDataChannelTrafficShapingProbability       *float64
+	InproxyWebRTCDataChannelTrafficShapingParameters        *parameters.InproxyTrafficShapingParametersValue
+	InproxyWebRTCMediaStreamsTrafficShapingProbability      *float64
+	InproxyWebRTCMediaStreamsTrafficShapingParameters       *parameters.InproxyTrafficShapingParametersValue
 	InproxySTUNServerAddresses                              []string
 	InproxySTUNServerAddressesRFC5780                       []string
 	InproxyProxySTUNServerAddresses                         []string
@@ -1052,8 +1055,8 @@ type Config struct {
 	InproxyProxyDiscoverNATTimeoutMilliseconds              *int
 	InproxyClientDiscoverNATTimeoutMilliseconds             *int
 	InproxyWebRTCAnswerTimeoutMilliseconds                  *int
-	InproxyProxyWebRTCAwaitDataChannelTimeoutMilliseconds   *int
-	InproxyClientWebRTCAwaitDataChannelTimeoutMilliseconds  *int
+	InproxyProxyWebRTCAwaitReadyToProxyTimeoutMilliseconds  *int
+	InproxyClientWebRTCAwaitReadyToProxyTimeoutMilliseconds *int
 	InproxyProxyDestinationDialTimeoutMilliseconds          *int
 	InproxyPsiphonAPIRequestTimeoutMilliseconds             *int
 	InproxyProxyTotalActivityNoticePeriodMilliseconds       *int
@@ -1274,11 +1277,14 @@ func (config *Config) Commit(migrateFromLegacyFields bool) error {
 	}
 
 	if config.UseNoticeFiles != nil {
-		setNoticeFiles(
+		err := setNoticeFiles(
 			homepageFilePath,
 			noticesFilePath,
 			config.UseNoticeFiles.RotatingFileSize,
 			config.UseNoticeFiles.RotatingSyncFrequency)
+		if err != nil {
+			return errors.Trace(err)
+		}
 	}
 
 	// Emit notices now that notice files are set if configured
@@ -1713,7 +1719,7 @@ func (config *Config) SetParameters(tag string, skipOnError bool, applyParameter
 	for _, receiver := range config.GetTacticsAppliedReceivers() {
 		err := receiver.TacticsApplied()
 		if err != nil {
-			NoticeError("TacticsApplied failed: %v", err)
+			NoticeError("TacticsApplied failed: %v", errors.Trace(err))
 			// Log and continue running.
 		}
 	}
@@ -2650,12 +2656,24 @@ func (config *Config) makeConfigParameters() map[string]interface{} {
 		applyParameters[parameters.InproxyDTLSRandomizationProbability] = *config.InproxyDTLSRandomizationProbability
 	}
 
-	if config.InproxyDataChannelTrafficShapingProbability != nil {
-		applyParameters[parameters.InproxyDataChannelTrafficShapingProbability] = *config.InproxyDataChannelTrafficShapingProbability
+	if config.InproxyWebRTCMediaStreamsProbability != nil {
+		applyParameters[parameters.InproxyWebRTCMediaStreamsProbability] = *config.InproxyWebRTCMediaStreamsProbability
+	}
+
+	if config.InproxyWebRTCDataChannelTrafficShapingProbability != nil {
+		applyParameters[parameters.InproxyWebRTCDataChannelTrafficShapingProbability] = *config.InproxyWebRTCDataChannelTrafficShapingProbability
+	}
+
+	if config.InproxyWebRTCDataChannelTrafficShapingParameters != nil {
+		applyParameters[parameters.InproxyWebRTCDataChannelTrafficShapingParameters] = *config.InproxyWebRTCDataChannelTrafficShapingParameters
+	}
+
+	if config.InproxyWebRTCMediaStreamsTrafficShapingProbability != nil {
+		applyParameters[parameters.InproxyWebRTCMediaStreamsTrafficShapingProbability] = *config.InproxyWebRTCMediaStreamsTrafficShapingProbability
 	}
 
-	if config.InproxyDataChannelTrafficShapingParameters != nil {
-		applyParameters[parameters.InproxyDataChannelTrafficShapingParameters] = *config.InproxyDataChannelTrafficShapingParameters
+	if config.InproxyWebRTCMediaStreamsTrafficShapingParameters != nil {
+		applyParameters[parameters.InproxyWebRTCMediaStreamsTrafficShapingParameters] = *config.InproxyWebRTCMediaStreamsTrafficShapingParameters
 	}
 
 	if len(config.InproxySTUNServerAddresses) > 0 {
@@ -2746,12 +2764,12 @@ func (config *Config) makeConfigParameters() map[string]interface{} {
 		applyParameters[parameters.InproxyWebRTCAnswerTimeout] = fmt.Sprintf("%dms", *config.InproxyWebRTCAnswerTimeoutMilliseconds)
 	}
 
-	if config.InproxyProxyWebRTCAwaitDataChannelTimeoutMilliseconds != nil {
-		applyParameters[parameters.InproxyProxyWebRTCAwaitDataChannelTimeout] = fmt.Sprintf("%dms", *config.InproxyProxyWebRTCAwaitDataChannelTimeoutMilliseconds)
+	if config.InproxyProxyWebRTCAwaitReadyToProxyTimeoutMilliseconds != nil {
+		applyParameters[parameters.InproxyProxyWebRTCAwaitReadyToProxyTimeout] = fmt.Sprintf("%dms", *config.InproxyProxyWebRTCAwaitReadyToProxyTimeoutMilliseconds)
 	}
 
-	if config.InproxyClientWebRTCAwaitDataChannelTimeoutMilliseconds != nil {
-		applyParameters[parameters.InproxyClientWebRTCAwaitDataChannelTimeout] = fmt.Sprintf("%dms", *config.InproxyClientWebRTCAwaitDataChannelTimeoutMilliseconds)
+	if config.InproxyClientWebRTCAwaitReadyToProxyTimeoutMilliseconds != nil {
+		applyParameters[parameters.InproxyClientWebRTCAwaitReadyToProxyTimeout] = fmt.Sprintf("%dms", *config.InproxyClientWebRTCAwaitReadyToProxyTimeoutMilliseconds)
 	}
 
 	if config.InproxyProxyDestinationDialTimeoutMilliseconds != nil {
@@ -3538,13 +3556,25 @@ func (config *Config) setDialParametersHash() {
 		hash.Write([]byte("InproxyDTLSRandomizationProbability"))
 		binary.Write(hash, binary.LittleEndian, *config.InproxyDTLSRandomizationProbability)
 	}
-	if config.InproxyDataChannelTrafficShapingProbability != nil {
-		hash.Write([]byte("InproxyDataChannelTrafficShapingProbability"))
-		binary.Write(hash, binary.LittleEndian, *config.InproxyDataChannelTrafficShapingProbability)
+	if config.InproxyWebRTCMediaStreamsProbability != nil {
+		hash.Write([]byte("InproxyWebRTCMediaStreamsProbability"))
+		binary.Write(hash, binary.LittleEndian, *config.InproxyWebRTCMediaStreamsProbability)
+	}
+	if config.InproxyWebRTCDataChannelTrafficShapingProbability != nil {
+		hash.Write([]byte("InproxyWebRTCDataChannelTrafficShapingProbability"))
+		binary.Write(hash, binary.LittleEndian, *config.InproxyWebRTCDataChannelTrafficShapingProbability)
+	}
+	if config.InproxyWebRTCDataChannelTrafficShapingParameters != nil {
+		hash.Write([]byte("InproxyWebRTCDataChannelTrafficShapingParameters"))
+		hash.Write([]byte(fmt.Sprintf("%+v", config.InproxyWebRTCDataChannelTrafficShapingParameters)))
+	}
+	if config.InproxyWebRTCMediaStreamsTrafficShapingProbability != nil {
+		hash.Write([]byte("InproxyWebRTCMediaStreamsTrafficShapingProbability"))
+		binary.Write(hash, binary.LittleEndian, *config.InproxyWebRTCMediaStreamsTrafficShapingProbability)
 	}
-	if config.InproxyDataChannelTrafficShapingParameters != nil {
-		hash.Write([]byte("InproxyDataChannelTrafficShapingParameters"))
-		hash.Write([]byte(fmt.Sprintf("%+v", config.InproxyDataChannelTrafficShapingParameters)))
+	if config.InproxyWebRTCMediaStreamsTrafficShapingParameters != nil {
+		hash.Write([]byte("InproxyWebRTCMediaStreamsTrafficShapingParameters"))
+		hash.Write([]byte(fmt.Sprintf("%+v", config.InproxyWebRTCMediaStreamsTrafficShapingParameters)))
 	}
 	if config.InproxySTUNServerAddresses != nil {
 		hash.Write([]byte("InproxySTUNServerAddresses"))

+ 11 - 2
psiphon/controller.go

@@ -466,9 +466,18 @@ func (controller *Controller) NetworkChanged() {
 	controller.TerminateNextActiveTunnel()
 
 	if controller.inproxyProxyBrokerClientManager != nil {
-		controller.inproxyProxyBrokerClientManager.NetworkChanged()
+		err := controller.inproxyProxyBrokerClientManager.NetworkChanged()
+		if err != nil {
+			NoticeError("NetworkChanged failed: %v", errors.Trace(err))
+			// Log and continue running.
+		}
+
+	}
+	err := controller.inproxyClientBrokerClientManager.NetworkChanged()
+	if err != nil {
+		NoticeError("NetworkChanged failed: %v", errors.Trace(err))
+		// Log and continue running.
 	}
-	controller.inproxyClientBrokerClientManager.NetworkChanged()
 
 	controller.config.networkIDGetter.FlushCache()
 

+ 26 - 0
psiphon/dialParameters.go

@@ -135,6 +135,7 @@ type DialParameters struct {
 	QUICDialEarly                            bool
 	QUICUseObfuscatedPSK                     bool
 	QUICDisablePathMTUDiscovery              bool
+	QUICMaxPacketSizeAdjustment              int
 
 	ConjureCachedRegistrationTTL        time.Duration
 	ConjureAPIRegistration              bool
@@ -1267,6 +1268,31 @@ func MakeDialParameters(
 			}
 		}
 
+		if protocol.TunnelProtocolUsesQUIC(dialParams.TunnelProtocol) &&
+			dialParams.InproxyWebRTCDialParameters.UseMediaStreams {
+
+			// In the in-proxy WebRTC media stream mode, QUIC packets are
+			// encapsulated in SRTP packet payloads, and the maximum QUIC
+			// packet size must be adjusted to fit. In addition, QUIC path
+			// MTU discovery is disabled, to avoid sending oversized packets.
+
+			// isIPv6 indicates whether quic-go will use a max initial packet
+			// size appropriate for IPv6 or IPv4;
+			// GetQUICMaxPacketSizeAdjustment modifies the adjustment
+			// accordingly. quic-go selects based on the RemoteAddr of the
+			// net.PacketConn passed to quic.Dial. In the in-proxy case, that
+			// RemoteAddr, inproxy.ClientConn.RemoteAddr, is synthetic and
+			// can reflect inproxy.ClientConfig.RemoteAddrOverride, which, in
+			// turn, is currently based on serverEntry.IpAddress; see
+			// dialInproxy. Limitation: not compatible with FRONTED-QUIC.
+
+			IPAddress := net.ParseIP(serverEntry.IpAddress)
+			isIPv6 := IPAddress != nil && IPAddress.To4() == nil
+
+			dialParams.QUICMaxPacketSizeAdjustment = inproxy.GetQUICMaxPacketSizeAdjustment(isIPv6)
+			dialParams.QUICDisablePathMTUDiscovery = true
+		}
+
 		// dialParams.inproxyConn is left uninitialized until after the dial,
 		// and until then Load will return nil.
 	}

+ 43 - 20
psiphon/inproxy.go

@@ -1548,7 +1548,7 @@ type InproxyWebRTCDialInstance struct {
 	discoverNATTimeout              time.Duration
 	webRTCAnswerTimeout             time.Duration
 	webRTCAwaitPortMappingTimeout   time.Duration
-	awaitDataChannelTimeout         time.Duration
+	awaitReadyToProxyTimeout        time.Duration
 	proxyDestinationDialTimeout     time.Duration
 	proxyRelayInactivityTimeout     time.Duration
 }
@@ -1593,7 +1593,7 @@ func NewInproxyWebRTCDialInstance(
 	disableInboundForMobileNetworks := p.Bool(parameters.InproxyDisableInboundForMobileNetworks)
 	disableIPv6ICECandidates := p.Bool(parameters.InproxyDisableIPv6ICECandidates)
 
-	var discoverNATTimeout, awaitDataChannelTimeout time.Duration
+	var discoverNATTimeout, awaitReadyToProxyTimeout time.Duration
 
 	if isProxy {
 
@@ -1609,7 +1609,7 @@ func NewInproxyWebRTCDialInstance(
 
 		discoverNATTimeout = p.Duration(parameters.InproxyProxyDiscoverNATTimeout)
 
-		awaitDataChannelTimeout = p.Duration(parameters.InproxyProxyWebRTCAwaitDataChannelTimeout)
+		awaitReadyToProxyTimeout = p.Duration(parameters.InproxyProxyWebRTCAwaitReadyToProxyTimeout)
 
 	} else {
 
@@ -1625,7 +1625,7 @@ func NewInproxyWebRTCDialInstance(
 
 		discoverNATTimeout = p.Duration(parameters.InproxyClientDiscoverNATTimeout)
 
-		awaitDataChannelTimeout = p.Duration(parameters.InproxyClientWebRTCAwaitDataChannelTimeout)
+		awaitReadyToProxyTimeout = p.Duration(parameters.InproxyClientWebRTCAwaitReadyToProxyTimeout)
 	}
 
 	// Parameters such as disabling certain operations and operation timeouts
@@ -1652,7 +1652,7 @@ func NewInproxyWebRTCDialInstance(
 		discoverNATTimeout:              discoverNATTimeout,
 		webRTCAnswerTimeout:             p.Duration(parameters.InproxyWebRTCAnswerTimeout),
 		webRTCAwaitPortMappingTimeout:   p.Duration(parameters.InproxyWebRTCAwaitPortMappingTimeout),
-		awaitDataChannelTimeout:         awaitDataChannelTimeout,
+		awaitReadyToProxyTimeout:        awaitReadyToProxyTimeout,
 		proxyDestinationDialTimeout:     p.Duration(parameters.InproxyProxyDestinationDialTimeout),
 		proxyRelayInactivityTimeout:     p.Duration(parameters.InproxyProxyRelayInactivityTimeout),
 	}, nil
@@ -1679,8 +1679,13 @@ func (w *InproxyWebRTCDialInstance) DoDTLSRandomization() bool {
 }
 
 // Implements the inproxy.WebRTCDialCoordinator interface.
-func (w *InproxyWebRTCDialInstance) DataChannelTrafficShapingParameters() *inproxy.DataChannelTrafficShapingParameters {
-	return w.webRTCDialParameters.DataChannelTrafficShapingParameters
+func (w *InproxyWebRTCDialInstance) UseMediaStreams() bool {
+	return w.webRTCDialParameters.UseMediaStreams
+}
+
+// Implements the inproxy.WebRTCDialCoordinator interface.
+func (w *InproxyWebRTCDialInstance) TrafficShapingParameters() *inproxy.TrafficShapingParameters {
+	return w.webRTCDialParameters.TrafficShapingParameters
 }
 
 // Implements the inproxy.WebRTCDialCoordinator interface.
@@ -1961,8 +1966,8 @@ func (w *InproxyWebRTCDialInstance) WebRTCAwaitPortMappingTimeout() time.Duratio
 }
 
 // Implements the inproxy.WebRTCDialCoordinator interface.
-func (w *InproxyWebRTCDialInstance) WebRTCAwaitDataChannelTimeout() time.Duration {
-	return w.awaitDataChannelTimeout
+func (w *InproxyWebRTCDialInstance) WebRTCAwaitReadyToProxyTimeout() time.Duration {
+	return w.awaitReadyToProxyTimeout
 }
 
 // Implements the inproxy.WebRTCDialCoordinator interface.
@@ -2156,9 +2161,10 @@ func (dialParams *InproxySTUNDialParameters) GetMetrics() common.LogFields {
 // marshaling. For client in-proxy tunnel dials, DialParameters will manage
 // WebRTC dial parameter selection and replay.
 type InproxyWebRTCDialParameters struct {
-	RootObfuscationSecret               inproxy.ObfuscationSecret
-	DataChannelTrafficShapingParameters *inproxy.DataChannelTrafficShapingParameters
-	DoDTLSRandomization                 bool
+	RootObfuscationSecret    inproxy.ObfuscationSecret
+	UseMediaStreams          bool
+	TrafficShapingParameters *inproxy.TrafficShapingParameters
+	DoDTLSRandomization      bool
 }
 
 // MakeInproxyWebRTCDialParameters generates new InproxyWebRTCDialParameters.
@@ -2170,19 +2176,36 @@ func MakeInproxyWebRTCDialParameters(
 		return nil, errors.Trace(err)
 	}
 
-	var trafficSharingParams inproxy.DataChannelTrafficShapingParameters
-	if p.WeightedCoinFlip(parameters.InproxyDataChannelTrafficShapingProbability) {
-		trafficSharingParams = inproxy.DataChannelTrafficShapingParameters(
-			p.InproxyDataChannelTrafficShapingParameters(
-				parameters.InproxyDataChannelTrafficShapingParameters))
+	useMediaStreams := p.WeightedCoinFlip(parameters.InproxyWebRTCMediaStreamsProbability)
+
+	var trafficSharingParams *inproxy.TrafficShapingParameters
+
+	if useMediaStreams {
+
+		if p.WeightedCoinFlip(parameters.InproxyWebRTCMediaStreamsTrafficShapingProbability) {
+			t := inproxy.TrafficShapingParameters(
+				p.InproxyTrafficShapingParameters(
+					parameters.InproxyWebRTCMediaStreamsTrafficShapingParameters))
+			trafficSharingParams = &t
+		}
+
+	} else {
+
+		if p.WeightedCoinFlip(parameters.InproxyWebRTCDataChannelTrafficShapingProbability) {
+			t := inproxy.TrafficShapingParameters(
+				p.InproxyTrafficShapingParameters(
+					parameters.InproxyWebRTCDataChannelTrafficShapingParameters))
+			trafficSharingParams = &t
+		}
 	}
 
 	doDTLSRandomization := p.WeightedCoinFlip(parameters.InproxyDTLSRandomizationProbability)
 
 	return &InproxyWebRTCDialParameters{
-		RootObfuscationSecret:               rootObfuscationSecret,
-		DataChannelTrafficShapingParameters: &trafficSharingParams,
-		DoDTLSRandomization:                 doDTLSRandomization,
+		RootObfuscationSecret:    rootObfuscationSecret,
+		UseMediaStreams:          useMediaStreams,
+		TrafficShapingParameters: trafficSharingParams,
+		DoDTLSRandomization:      doDTLSRandomization,
 	}, nil
 }
 

+ 1 - 1
psiphon/net.go

@@ -835,7 +835,7 @@ func ResumeDownload(
 
 	// Not making failure to write ETag file fatal, in case the entire download
 	// succeeds in this one request.
-	ioutil.WriteFile(partialETagFilename, []byte(responseETag), 0600)
+	_ = ioutil.WriteFile(partialETagFilename, []byte(responseETag), 0600)
 
 	// A partial download occurs when this copy is interrupted. The io.Copy
 	// will fail, leaving a partial download in place (.part and .part.etag).

+ 5 - 1
psiphon/net_darwin.go

@@ -37,7 +37,11 @@ func setSocketBPF(_ []bpf.RawInstruction, _ int) error {
 }
 
 func setAdditionalSocketOptions(socketFd int) {
-	syscall.SetsockoptInt(socketFd, syscall.SOL_SOCKET, syscall.SO_NOSIGPIPE, 1)
+	// TODO: return error
+	err := syscall.SetsockoptInt(socketFd, syscall.SOL_SOCKET, syscall.SO_NOSIGPIPE, 1)
+	if err != nil {
+		NoticeError("SetsockoptInt failed: %v", errors.Trace(err))
+	}
 }
 
 func makeLocalProxyListener(listenIP string, port int) (net.Listener, bool, error) {

+ 1 - 0
psiphon/server/api.go

@@ -1198,6 +1198,7 @@ var inproxyDialParams = []requestParamSpec{
 	{"inproxy_dial_broker_offer_duration", isIntString, requestParamOptional | requestParamLogStringAsInt},
 	{"inproxy_dial_webrtc_connection_duration", isIntString, requestParamOptional | requestParamLogStringAsInt},
 	{"inproxy_broker_is_reuse", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
+	{"inproxy_webrtc_use_media_streams", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
 }
 
 // baseAndDialParams adds baseDialParams and inproxyDialParams to baseParams.

+ 13 - 1
psiphon/server/meek.go

@@ -1896,7 +1896,9 @@ func (server *MeekServer) inproxyBrokerAllowDomainFrontedDestinations(clientGeoI
 }
 
 func (server *MeekServer) inproxyBrokerPrioritizeProxy(
-	proxyGeoIPData common.GeoIPData, proxyAPIParams common.APIParameters) bool {
+	proxyInproxyProtocolVersion int,
+	proxyGeoIPData common.GeoIPData,
+	proxyAPIParams common.APIParameters) bool {
 
 	// Fallback to not-prioritized on failure or nil tactics.
 	p, err := server.support.ServerTacticsParametersCache.Get(GeoIPData(proxyGeoIPData))
@@ -1908,6 +1910,14 @@ func (server *MeekServer) inproxyBrokerPrioritizeProxy(
 	if p.IsNil() {
 		return false
 	}
+
+	// As API parameter filtering currently does not support range matching, the minimum version
+	// constraint is specified in a seperate parameter.
+	minProtocolVersion := p.Int(parameters.InproxyBrokerMatcherPrioritizeProxiesMinVersion)
+	if proxyInproxyProtocolVersion < minProtocolVersion {
+		return false
+	}
+
 	filter := p.KeyStringsValue(parameters.InproxyBrokerMatcherPrioritizeProxiesFilter)
 	if len(filter) == 0 {
 		return false
@@ -1918,9 +1928,11 @@ func (server *MeekServer) inproxyBrokerPrioritizeProxy(
 			return false
 		}
 	}
+
 	if !p.WeightedCoinFlip(parameters.InproxyBrokerMatcherPrioritizeProxiesProbability) {
 		return false
 	}
+
 	return true
 }
 

+ 51 - 3
psiphon/server/server_test.go

@@ -444,6 +444,39 @@ func TestInproxyPersonalPairing(t *testing.T) {
 		})
 }
 
+func TestInproxyOSSHMediaStreams(t *testing.T) {
+	if !inproxy.Enabled() {
+		t.Skip("inproxy is not enabled")
+	}
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:         "INPROXY-WEBRTC-OSSH",
+			requireAuthorization:   true,
+			doTunneledWebRequest:   true,
+			doTunneledNTPRequest:   true,
+			doDanglingTCPConn:      true,
+			doLogHostProvider:      true,
+			doTargetBrokerSpecs:    true,
+			useInproxyMediaStreams: true,
+		})
+}
+
+func TestInproxyQUICOSSHMediaStreams(t *testing.T) {
+	if !inproxy.Enabled() {
+		t.Skip("inproxy is not enabled")
+	}
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:         "INPROXY-WEBRTC-QUIC-OSSH",
+			requireAuthorization:   true,
+			doTunneledWebRequest:   true,
+			doTunneledNTPRequest:   true,
+			doLogHostProvider:      true,
+			doTargetBrokerSpecs:    true,
+			useInproxyMediaStreams: true,
+		})
+}
+
 func TestHotReload(t *testing.T) {
 	runServer(t,
 		&runServerConfig{
@@ -702,6 +735,7 @@ type runServerConfig struct {
 	useLegacyAPIEncoding     bool
 	doPersonalPairing        bool
 	doRestrictInproxy        bool
+	useInproxyMediaStreams   bool
 }
 
 var (
@@ -758,7 +792,8 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			runConfig.doTargetBrokerSpecs,
 			brokerIPAddress,
 			brokerPort,
-			serverEntrySignaturePublicKey)
+			serverEntrySignaturePublicKey,
+			runConfig.useInproxyMediaStreams)
 		if err != nil {
 			t.Fatalf("error generating inproxy test config: %s", err)
 		}
@@ -2521,7 +2556,7 @@ func checkExpectedServerTunnelLogFields(
 			"inproxy_proxy_device_region",
 			"inproxy_proxy_device_location",
 			"inproxy_proxy_network_type",
-			"inproxy_proxy_proxy_protocol_version",
+			"inproxy_proxy_protocol_version",
 			"inproxy_proxy_nat_type",
 			"inproxy_proxy_max_clients",
 			"inproxy_proxy_connecting_clients",
@@ -2544,6 +2579,7 @@ func checkExpectedServerTunnelLogFields(
 			"inproxy_broker_dial_address",
 			"inproxy_broker_resolved_ip_address",
 			"inproxy_webrtc_randomize_dtls",
+			"inproxy_webrtc_use_media_streams",
 			"inproxy_webrtc_padded_messages_sent",
 			"inproxy_webrtc_padded_messages_received",
 			"inproxy_webrtc_decoy_messages_sent",
@@ -2619,6 +2655,10 @@ func checkExpectedServerTunnelLogFields(
 		if fields["inproxy_proxy_network_type"].(string) != testNetworkType {
 			return fmt.Errorf("unexpected inproxy_proxy_network_type '%s'", fields["inproxy_proxy_network_type"])
 		}
+
+		if fields["inproxy_webrtc_use_media_streams"].(bool) != runConfig.useInproxyMediaStreams {
+			return fmt.Errorf("unexpected inproxy_webrtc_use_media_streams '%v'", fields["inproxy_webrtc_use_media_streams"])
+		}
 	}
 
 	if runConfig.applyPrefix {
@@ -3690,7 +3730,8 @@ func generateInproxyTestConfig(
 	doTargetBrokerSpecs bool,
 	brokerIPAddress string,
 	brokerPort int,
-	serverEntrySignaturePublicKey string) (*inproxyTestConfig, error) {
+	serverEntrySignaturePublicKey string,
+	useInproxyMediaStreams bool) (*inproxyTestConfig, error) {
 
 	// Generate in-proxy configuration.
 	//
@@ -3869,9 +3910,15 @@ func generateInproxyTestConfig(
             "InproxyDisableSTUN": true,
             "InproxyDisablePortMapping": true,
             "InproxyDisableIPv6ICECandidates": true,
+            "InproxyWebRTCMediaStreamsProbability": %s,
             %s
     `
 
+	mediaStreamsProbability := "0.0"
+	if useInproxyMediaStreams {
+		mediaStreamsProbability = "1.0"
+	}
+
 	tacticsParametersJSON := fmt.Sprintf(
 		tacticsParametersJSONFormat,
 		brokerSessionPublicKeyStr,
@@ -3881,6 +3928,7 @@ func generateInproxyTestConfig(
 		clientBrokerSpecsJSON,
 		commonCompartmentIDStr,
 		commonCompartmentIDStr,
+		mediaStreamsProbability,
 		maxRequestTimeoutsJSON)
 
 	config := &inproxyTestConfig{

+ 30 - 1
psiphon/server/tunnelServer.go

@@ -168,8 +168,36 @@ func (server *TunnelServer) Run() error {
 
 		} else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {
 
+			usesInproxy := protocol.TunnelProtocolUsesInproxy(tunnelProtocol)
+
 			// in-proxy QUIC tunnel protocols don't support gQUIC.
-			enableGQUIC := support.Config.EnableGQUIC && !protocol.TunnelProtocolUsesInproxy(tunnelProtocol)
+			enableGQUIC := support.Config.EnableGQUIC && !usesInproxy
+
+			maxPacketSizeAdjustment := 0
+			if usesInproxy {
+
+				// In the in-proxy WebRTC media stream mode, QUIC packets sent
+				// back to the client, via the proxy, are encapsulated in
+				// SRTP packet payloads, and the maximum QUIC packet size
+				// must be adjusted to fit.
+				//
+				// Limitation: the WebRTC data channel mode does not have the
+				// same QUIC packet size constraint, since data channel
+				// messages can be far larger (up to 65536 bytes). However,
+				// the server, at this point, does not know whether
+				// individual connections are using WebRTC media streams or
+				// data channels on the first hop, and will no know until API
+				// handshake information is delivered after the QUIC, OSSH,
+				// and SSH handshakes are completed. Currently the max packet
+				// size adjustment is set unconditionally. For data channels,
+				// this will result in suboptimal packet sizes (10s of bytes)
+				// and a corresponding different traffic shape on the 2nd hop.
+
+				IPAddress := net.ParseIP(support.Config.ServerIPAddress)
+				isIPv6 := IPAddress != nil && IPAddress.To4() == nil
+
+				maxPacketSizeAdjustment = inproxy.GetQUICMaxPacketSizeAdjustment(isIPv6)
+			}
 
 			logTunnelProtocol := tunnelProtocol
 			listener, err = quic.Listen(
@@ -180,6 +208,7 @@ func (server *TunnelServer) Run() error {
 						errors.Trace(err), LogFields(logFields))
 				},
 				localAddress,
+				maxPacketSizeAdjustment,
 				support.Config.ObfuscatedSSHKey,
 				enableGQUIC)
 

+ 1 - 0
psiphon/tunnel.go

@@ -945,6 +945,7 @@ func dialTunnel(
 			dialParams.ObfuscatedQUICPaddingSeed,
 			dialParams.ObfuscatedQUICNonceTransformerParameters,
 			dialParams.QUICDisablePathMTUDiscovery,
+			dialParams.QUICMaxPacketSizeAdjustment,
 			dialParams.QUICDialEarly,
 			dialParams.QUICUseObfuscatedPSK,
 			dialParams.quicTLSClientSessionCache)

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio