Просмотр исходного кода

More inproxy personal pairing enhancements

- Allow RFC 1918/4193 private IP addresses ICE candidates in personal pairing
  mode.

- Add MustUpgrade response flag and callbacks; remove ProxyOperatorMessage.

- Add personal pairing mode end-to-end test.
Rod Hynes 1 год назад
Родитель
Сommit
02b5e5be07

+ 9 - 5
MobileLibrary/Android/PsiphonTunnel/PsiphonTunnel.java

@@ -132,10 +132,14 @@ public class PsiphonTunnel {
         default public void onApplicationParameters(Object parameters) {}
         default public void onServerAlert(String reason, String subject, List<String> actionURLs) {}
         /**
-         * Called when tunnel-core emits a message to be displayed to the in-proxy operator.
-         * @param message The operator message received.
+         * Called when tunnel-core reports that a selected in-proxy mode --
+         * including running a proxy; or running a client in personal pairing
+         * mode -- cannot function without an app upgrade. The receiver
+         * should alert the user to upgrade the app and/or disable the
+         * unsupported mode(s). This callback is followed by a tunnel-core
+         * shutdown.
          */
-        default void onInproxyOperatorMessage(String message) {}
+        default void onInproxyMustUpgrade() {}
         /**
          * Called when tunnel-core reports proxy usage statistics.
          * By default onInproxyProxyActivity is disabled. Enable it by setting
@@ -1115,8 +1119,8 @@ public class PsiphonTunnel {
                     notice.getJSONObject("data").getString("reason"),
                     notice.getJSONObject("data").getString("subject"),
                     actionURLsList);
-            } else if (noticeType.equals("InproxyOperatorMessage")) {
-                mHostService.onInproxyOperatorMessage( notice.getJSONObject("data").getString("message"));
+            } else if (noticeType.equals("InproxyMustUpgrade")) {
+                mHostService.onInproxyMustUpgrade();
             } else if (noticeType.equals("InproxyProxyActivity")) {
                 JSONObject data = notice.getJSONObject("data");
                 mHostService.onInproxyProxyActivity(

+ 6 - 3
MobileLibrary/iOS/PsiphonTunnel/PsiphonTunnel/PsiphonTunnel.h

@@ -300,10 +300,13 @@ WWAN or vice versa or VPN state changed
 - (void)onApplicationParameters:(NSDictionary * _Nonnull)parameters;
 
 /*!
- Called when tunnel-core emits a message to be displayed to the in-proxy operator
- @param message The operator message received.
+Called when tunnel-core reports that a selected in-proxy mode -- including
+running a proxy; or running a client in personal pairing mode -- cannot
+function without an app upgrade. The receiver should alert the user to
+upgrade the app and/or disable the unsupported mode(s). This callback is
+followed by a tunnel-core shutdown.
  */
-- (void)onInproxyOperatorMessage:(NSString * _Nonnull)message;
+- (void)onInproxyMustUpgrade;
 
 /*!
  Called when tunnel-core reports in-proxy usage statistics

+ 3 - 8
MobileLibrary/iOS/PsiphonTunnel/PsiphonTunnel/PsiphonTunnel.m

@@ -1174,15 +1174,10 @@ typedef NS_ERROR_ENUM(PsiphonTunnelErrorDomain, PsiphonTunnelErrorCode) {
             });
         }
     }
-    else if ([noticeType isEqualToString:@"InproxyOperatorMessage"]) {
-        id message = [notice valueForKeyPath:@"data.message"];
-        if (![message isKindOfClass:[NSString class]]) {
-            [self logMessage:[NSString stringWithFormat: @"InproxyOperatorMessage notice missing data.message: %@", noticeJSON]];
-            return;
-        }
-        if ([self.tunneledAppDelegate respondsToSelector:@selector(onInproxyOperatorMessage:)]) {
+    else if ([noticeType isEqualToString:@"InproxyMustUpgrade"]) {
+        if ([self.tunneledAppDelegate respondsToSelector:@selector(onInproxyMustUpgrade)]) {
             dispatch_sync(self->callbackQueue, ^{
-                [self.tunneledAppDelegate onInproxyOperatorMessage:message];
+                [self.tunneledAppDelegate onInproxyMustUpgrade];
             });
         }
     }

+ 51 - 7
psiphon/common/inproxy/api.go

@@ -243,14 +243,17 @@ type WebRTCSessionDescription struct {
 // to relay client traffic with. The broker validates that the dial address
 // corresponds to a valid Psiphon server.
 //
-// OperatorMessageJSON is an optional message bundle to be forwarded to the
-// user interface for display to the user; for example, to alert the proxy
-// operator of configuration issue; the JSON schema is not defined here.
+// MustUpgrade is an optional flag that is set by the broker, based on the
+// submitted ProxyProtocolVersion, when the proxy app must be upgraded in
+// order to function properly. Potential must-upgrade scenarios include
+// changes to the personal pairing broker rendezvous algorithm, where no
+// protocol backwards compatibility accommodations can ensure a rendezvous
+// and match. When MustUpgrade is set, NoMatch is implied.
 type ProxyAnnounceResponse struct {
-	OperatorMessageJSON         string                               `cbor:"1,keyasint,omitempty"`
 	TacticsPayload              []byte                               `cbor:"2,keyasint,omitempty"`
 	Limited                     bool                                 `cbor:"3,keyasint,omitempty"`
 	NoMatch                     bool                                 `cbor:"4,keyasint,omitempty"`
+	MustUpgrade                 bool                                 `cbor:"13,keyasint,omitempty"`
 	ConnectionID                ID                                   `cbor:"5,keyasint,omitempty"`
 	ClientProxyProtocolVersion  int32                                `cbor:"6,keyasint,omitempty"`
 	ClientOfferSDP              WebRTCSessionDescription             `cbor:"7,keyasint,omitempty"`
@@ -322,9 +325,17 @@ type DataChannelTrafficShapingParameters struct {
 // the broker using ClientRelayedPacketRequests and continues to relay using
 // ClientRelayedPacketRequests until complete. ConnectionID identifies this
 // connection and its relayed BrokerServerReport.
+//
+// MustUpgrade is an optional flag that is set by the broker, based on the
+// submitted ProxyProtocolVersion, when the client app must be upgraded in
+// order to function properly. Potential must-upgrade scenarios include
+// changes to the personal pairing broker rendezvous algorithm, where no
+// protocol backwards compatibility accommodations can ensure a rendezvous
+// and match. When MustUpgrade is set, NoMatch is implied.
 type ClientOfferResponse struct {
 	Limited                      bool                     `cbor:"1,keyasint,omitempty"`
 	NoMatch                      bool                     `cbor:"2,keyasint,omitempty"`
+	MustUpgrade                  bool                     `cbor:"7,keyasint,omitempty"`
 	ConnectionID                 ID                       `cbor:"3,keyasint,omitempty"`
 	SelectedProxyProtocolVersion int32                    `cbor:"4,keyasint,omitempty"`
 	ProxyAnswerSDP               WebRTCSessionDescription `cbor:"5,keyasint,omitempty"`
@@ -591,13 +602,31 @@ func (request *ClientOfferRequest) ValidateAndGetLogFields(
 			"invalid compartment IDs length: %d", len(request.PersonalCompartmentIDs))
 	}
 
+	if len(request.CommonCompartmentIDs) > 0 && len(request.PersonalCompartmentIDs) > 0 {
+		return nil, nil, errors.TraceNew("multiple compartment ID types")
+	}
+
 	// The client offer SDP may contain no ICE candidates.
 	errorOnNoCandidates := false
 
+	// The client offer SDP may include RFC 1918/4193 private IP addresses in
+	// personal pairing mode. filterSDPAddresses should not filter out
+	// private IP addresses based on the broker's local interfaces; this
+	// filtering occurs on the proxy that receives the SDP.
+	allowPrivateIPAddressCandidates :=
+		len(request.PersonalCompartmentIDs) > 0 &&
+			len(request.CommonCompartmentIDs) == 0
+	filterPrivateIPAddressCandidates := false
+
 	// Client offer SDP candidate addresses must match the country and ASN of
 	// the client. Don't facilitate connections to arbitrary destinations.
 	filteredSDP, sdpMetrics, err := filterSDPAddresses(
-		[]byte(request.ClientOfferSDP.SDP), errorOnNoCandidates, lookupGeoIP, geoIPData)
+		[]byte(request.ClientOfferSDP.SDP),
+		errorOnNoCandidates,
+		lookupGeoIP,
+		geoIPData,
+		allowPrivateIPAddressCandidates,
+		filterPrivateIPAddressCandidates)
 	if err != nil {
 		return nil, nil, errors.Trace(err)
 	}
@@ -641,6 +670,7 @@ func (request *ClientOfferRequest) ValidateAndGetLogFields(
 	logFields["has_personal_compartment_ids"] = hasPersonalCompartmentIDs
 	logFields["ice_candidate_types"] = request.ICECandidateTypes
 	logFields["has_IPv6"] = sdpMetrics.hasIPv6
+	logFields["has_private_IP"] = sdpMetrics.hasPrivateIP
 	logFields["filtered_ice_candidates"] = sdpMetrics.filteredICECandidates
 
 	return filteredSDP, logFields, nil
@@ -683,15 +713,28 @@ func (request *ProxyAnswerRequest) ValidateAndGetLogFields(
 	lookupGeoIP LookupGeoIP,
 	baseAPIParameterValidator common.APIParameterValidator,
 	formatter common.APIParameterLogFieldFormatter,
-	geoIPData common.GeoIPData) ([]byte, common.LogFields, error) {
+	geoIPData common.GeoIPData,
+	proxyAnnouncementHasPersonalCompartmentIDs bool) ([]byte, common.LogFields, error) {
 
 	// The proxy answer SDP must contain at least one ICE candidate.
 	errorOnNoCandidates := true
 
+	// The proxy answer SDP may include RFC 1918/4193 private IP addresses in
+	// personal pairing mode. filterSDPAddresses should not filter out
+	// private IP addresses based on the broker's local interfaces; this
+	// filtering occurs on the client that receives the SDP.
+	allowPrivateIPAddressCandidates := proxyAnnouncementHasPersonalCompartmentIDs
+	filterPrivateIPAddressCandidates := false
+
 	// Proxy answer SDP candidate addresses must match the country and ASN of
 	// the proxy. Don't facilitate connections to arbitrary destinations.
 	filteredSDP, sdpMetrics, err := filterSDPAddresses(
-		[]byte(request.ProxyAnswerSDP.SDP), errorOnNoCandidates, lookupGeoIP, geoIPData)
+		[]byte(request.ProxyAnswerSDP.SDP),
+		errorOnNoCandidates,
+		lookupGeoIP,
+		geoIPData,
+		allowPrivateIPAddressCandidates,
+		filterPrivateIPAddressCandidates)
 	if err != nil {
 		return nil, nil, errors.Trace(err)
 	}
@@ -716,6 +759,7 @@ func (request *ProxyAnswerRequest) ValidateAndGetLogFields(
 	logFields["connection_id"] = request.ConnectionID
 	logFields["ice_candidate_types"] = request.ICECandidateTypes
 	logFields["has_IPv6"] = sdpMetrics.hasIPv6
+	logFields["has_private_IP"] = sdpMetrics.hasPrivateIP
 	logFields["filtered_ice_candidates"] = sdpMetrics.filteredICECandidates
 	logFields["answer_error"] = request.AnswerError
 

+ 14 - 1
psiphon/common/inproxy/broker.go

@@ -771,6 +771,9 @@ func (b *Broker) handleClientOffer(
 	// processSDPAddresses), so all invalid candidates are removed and the
 	// remaining SDP is used. Filtered candidate information is logged in
 	// logFields.
+	//
+	// In personal pairing mode, RFC 1918/4193 private IP addresses are
+	// permitted in exchanged SDPs and not filtered out.
 
 	var filteredSDP []byte
 	filteredSDP, logFields, err = offerRequest.ValidateAndGetLogFields(
@@ -1023,13 +1026,23 @@ func (b *Broker) handleProxyAnswer(
 	// processSDPAddresses), so all invalid candidates are removed and the
 	// remaining SDP is used. Filtered candidate information is logged in
 	// logFields.
+	//
+	// In personal pairing mode, RFC 1918/4193 private IP addresses are
+	// permitted in exchanged SDPs and not filtered out.
+
+	hasPersonalCompartmentIDs, err := b.matcher.AnnouncementHasPersonalCompartmentIDs(
+		initiatorID, answerRequest.ConnectionID)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
 
 	var filteredSDP []byte
 	filteredSDP, logFields, err = answerRequest.ValidateAndGetLogFields(
 		b.config.LookupGeoIP,
 		b.config.APIParameterValidator,
 		b.config.APIParameterLogFieldFormatter,
-		geoIPData)
+		geoIPData,
+		hasPersonalCompartmentIDs)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}

+ 31 - 7
psiphon/common/inproxy/client.go

@@ -107,6 +107,16 @@ type ClientConfig struct {
 	// with the caller invoking  ServerEntryFields.RemoveUnsignedFields to
 	// prune local, unnsigned fields before sending.
 	PackedDestinationServerEntry []byte
+
+	// MustUpgrade is a callback that is invoked when a MustUpgrade flag is
+	// received from the broker. When MustUpgrade is received, the client
+	// should be stopped and the user should be prompted to upgrade before
+	// restarting the client.
+	//
+	// In Psiphon, MustUpgrade may be ignored when not running in
+	// in-proxy-only personal pairing mode, as other tunnel protocols remain
+	// available.
+	MustUpgrade func()
 }
 
 // DialClient establishes an in-proxy connection for relaying traffic to the
@@ -314,6 +324,13 @@ func dialClientWebRTCConn(
 	ctx context.Context,
 	config *ClientConfig) (retResult *clientWebRTCDialResult, retRetry bool, retErr error) {
 
+	brokerCoordinator := config.BrokerClient.GetBrokerDialCoordinator()
+	personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
+
+	// In personal pairing mode, RFC 1918/4193 private IP addresses are
+	// included in SDPs.
+	hasPersonalCompartmentIDs := len(personalCompartmentIDs) > 0
+
 	// Initialize the WebRTC offer
 
 	doTLSRandomization := config.WebRTCDialCoordinator.DoDTLSRandomization()
@@ -329,7 +346,8 @@ func dialClientWebRTCConn(
 			DoDTLSRandomization:         doTLSRandomization,
 			TrafficShapingParameters:    trafficShapingParameters,
 			ReliableTransport:           config.ReliableTransport,
-		})
+		},
+		hasPersonalCompartmentIDs)
 	if err != nil {
 		return nil, true, errors.Trace(err)
 	}
@@ -342,8 +360,6 @@ func dialClientWebRTCConn(
 
 	// Send the ClientOffer request to the broker
 
-	brokerCoordinator := config.BrokerClient.GetBrokerDialCoordinator()
-
 	packedBaseParams, err := protocol.EncodePackedAPIParameters(config.BaseAPIParameters)
 	if err != nil {
 		return nil, false, errors.Trace(err)
@@ -366,7 +382,7 @@ func dialClientWebRTCConn(
 				PortMappingTypes:     config.WebRTCDialCoordinator.PortMappingTypes(),
 			},
 			CommonCompartmentIDs:         brokerCoordinator.CommonCompartmentIDs(),
-			PersonalCompartmentIDs:       brokerCoordinator.PersonalCompartmentIDs(),
+			PersonalCompartmentIDs:       personalCompartmentIDs,
 			ClientOfferSDP:               SDP,
 			ICECandidateTypes:            SDPMetrics.iceCandidateTypes,
 			ClientRootObfuscationSecret:  clientRootObfuscationSecret,
@@ -380,8 +396,8 @@ func dialClientWebRTCConn(
 		return nil, false, errors.Trace(err)
 	}
 
-	// No retry when rate/entry limited; do retry on no-match, as a match may
-	// soon appear.
+	// No retry when rate/entry limited or must upgrade; do retry on no-match,
+	// as a match may soon appear.
 
 	if offerResponse.Limited {
 		return nil, false, errors.TraceNew("limited")
@@ -390,6 +406,13 @@ func dialClientWebRTCConn(
 
 		return nil, true, errors.TraceNew("no proxy match")
 
+	} else if offerResponse.MustUpgrade {
+
+		if config.MustUpgrade != nil {
+			config.MustUpgrade()
+		}
+
+		return nil, false, errors.TraceNew("must upgrade")
 	}
 
 	if offerResponse.SelectedProxyProtocolVersion != ProxyProtocolVersion1 {
@@ -402,7 +425,8 @@ func dialClientWebRTCConn(
 
 	// Establish the WebRTC DataChannel connection
 
-	err = webRTCConn.SetRemoteSDP(offerResponse.ProxyAnswerSDP)
+	err = webRTCConn.SetRemoteSDP(
+		offerResponse.ProxyAnswerSDP, hasPersonalCompartmentIDs)
 	if err != nil {
 		return nil, true, errors.Trace(err)
 	}

+ 12 - 4
psiphon/common/inproxy/inproxy_disabled.go

@@ -68,7 +68,10 @@ type webRTCConfig struct {
 	ReliableTransport           bool
 }
 
-func (conn *webRTCConn) SetRemoteSDP(peerSDP WebRTCSessionDescription) error {
+func (conn *webRTCConn) SetRemoteSDP(
+	peerSDP WebRTCSessionDescription,
+	hasPersonalCompartmentIDs bool) error {
+
 	return errors.Trace(errNotEnabled)
 }
 
@@ -121,12 +124,14 @@ func (conn *webRTCConn) GetMetrics() common.LogFields {
 type webRTCSDPMetrics struct {
 	iceCandidateTypes     []ICECandidateType
 	hasIPv6               bool
+	hasPrivateIP          bool
 	filteredICECandidates []string
 }
 
 func newWebRTCConnWithOffer(
 	ctx context.Context,
-	config *webRTCConfig) (
+	config *webRTCConfig,
+	hasPersonalCompartmentIDs bool) (
 	*webRTCConn, WebRTCSessionDescription, *webRTCSDPMetrics, error) {
 	return nil, WebRTCSessionDescription{}, nil, errors.Trace(errNotEnabled)
 }
@@ -134,7 +139,8 @@ func newWebRTCConnWithOffer(
 func newWebRTCConnWithAnswer(
 	ctx context.Context,
 	config *webRTCConfig,
-	peerSDP WebRTCSessionDescription) (
+	peerSDP WebRTCSessionDescription,
+	hasPersonalCompartmentIDs bool) (
 	*webRTCConn, WebRTCSessionDescription, *webRTCSDPMetrics, error) {
 
 	return nil, WebRTCSessionDescription{}, nil, errors.Trace(errNotEnabled)
@@ -144,7 +150,9 @@ func filterSDPAddresses(
 	encodedSDP []byte,
 	errorOnNoCandidates bool,
 	lookupGeoIP LookupGeoIP,
-	expectedGeoIPData common.GeoIPData) ([]byte, *webRTCSDPMetrics, error) {
+	expectedGeoIPData common.GeoIPData,
+	allowPrivateIPAddressCandidates bool,
+	filterPrivateIPAddressCandidates bool) ([]byte, *webRTCSDPMetrics, error) {
 	return nil, nil, errors.Trace(errNotEnabled)
 }
 

+ 33 - 12
psiphon/common/inproxy/matcher.go

@@ -56,8 +56,8 @@ const (
 // The client and proxy must supply matching personal or common compartment
 // IDs. Common compartments are managed by Psiphon and can be obtained via a
 // tactics parameter or via an OSL embedding. Each proxy announcement or
-// client offer may specify only one of either common or personal compartment
-// IDs.
+// client offer may specify only one compartment ID type, either common or
+// personal.
 //
 // Matching prefers to pair proxies and clients in a way that maximizes total
 // possible matches. For a client or proxy with less-limited NAT traversal, a
@@ -161,12 +161,6 @@ func (p *MatchProperties) IsPreferredNATMatch(
 		peerMatchProperties.EffectiveNATType())
 }
 
-// IsPersonalCompartmentalized indicates whether the candidate has personal
-// compartment IDs.
-func (p *MatchProperties) IsPersonalCompartmentalized() bool {
-	return len(p.PersonalCompartmentIDs) > 0
-}
-
 // MatchAnnouncement is a proxy announcement to be queued for matching.
 type MatchAnnouncement struct {
 	Properties           MatchProperties
@@ -424,8 +418,9 @@ func (m *Matcher) Announce(
 	proxyAnnouncement *MatchAnnouncement) (*MatchOffer, *MatchMetrics, error) {
 
 	// An announcement must specify exactly one compartment ID, of one type,
-	// common or personal. This is currently a limitation of the multi-queue
-	// implementation; see comment in announcementMultiQueue.enqueue.
+	// common or personal. The limit of one is currently a limitation of the
+	// multi-queue implementation; see comment in
+	// announcementMultiQueue.enqueue.
 	compartmentIDs := proxyAnnouncement.Properties.CommonCompartmentIDs
 	if len(compartmentIDs) == 0 {
 		compartmentIDs = proxyAnnouncement.Properties.PersonalCompartmentIDs
@@ -543,6 +538,31 @@ func (m *Matcher) Offer(
 		nil
 }
 
+// AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
+// announcement identified by the specified proxy ID and connection ID and
+// returns whether the announcement has personal compartment IDs, indicating
+// personal pairing mode.
+//
+// If no pending answer is found, an error is returned.
+func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
+	proxyID ID, connectionID ID) (bool, error) {
+
+	key := m.pendingAnswerKey(proxyID, connectionID)
+	pendingAnswerValue, ok := m.pendingAnswers.Get(key)
+	if !ok {
+		// The input IDs don't correspond to a pending answer, or the client
+		// is no longer awaiting the response.
+		return false, errors.TraceNew("no pending answer")
+	}
+
+	pendingAnswer := pendingAnswerValue.(*pendingAnswer)
+
+	hasPersonalCompartmentIDs := len(
+		pendingAnswer.announcement.Properties.PersonalCompartmentIDs) > 0
+
+	return hasPersonalCompartmentIDs, nil
+}
+
 // Answer delivers an answer from the proxy for a previously matched offer.
 // The ProxyID and ConnectionID must correspond to the original announcement.
 // The caller must not mutate the answer after calling Answer. Answer does
@@ -556,8 +576,9 @@ func (m *Matcher) Answer(
 	key := m.pendingAnswerKey(proxyAnswer.ProxyID, proxyAnswer.ConnectionID)
 	pendingAnswerValue, ok := m.pendingAnswers.Get(key)
 	if !ok {
-		// The client is no longer awaiting the response.
-		return errors.TraceNew("no client")
+		// The input IDs don't correspond to a pending answer, or the client
+		// is no longer awaiting the response.
+		return errors.TraceNew("no pending answer")
 	}
 
 	m.pendingAnswers.Delete(key)

+ 23 - 13
psiphon/common/inproxy/proxy.go

@@ -114,11 +114,11 @@ type ProxyConfig struct {
 	// controlled by tactics parameters.
 	HandleTacticsPayload func(networkID string, tacticsPayload []byte) bool
 
-	// OperatorMessageHandler is a callback that is invoked with any user
-	// message JSON object that is sent to the Proxy from the Broker. This
-	// facility may be used to alert proxy operators when required. The JSON
-	// object schema is arbitrary and not defined here.
-	OperatorMessageHandler func(messageJSON string)
+	// MustUpgrade is a callback that is invoked when a MustUpgrade flag is
+	// received from the broker. When MustUpgrade is received, the proxy
+	// should be stopped and the user should be prompted to upgrade before
+	// restarting the proxy.
+	MustUpgrade func()
 
 	// MaxClients is the maximum number of clients that are allowed to connect
 	// to the proxy.
@@ -571,11 +571,12 @@ func (p *Proxy) proxyOneClient(
 	// ProxyAnnounce applies an additional request timeout to facilitate
 	// long-polling.
 	announceStartTime := time.Now()
+	personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
 	announceResponse, err := brokerClient.ProxyAnnounce(
 		ctx,
 		requestDelay,
 		&ProxyAnnounceRequest{
-			PersonalCompartmentIDs: brokerCoordinator.PersonalCompartmentIDs(),
+			PersonalCompartmentIDs: personalCompartmentIDs,
 			Metrics:                metrics,
 		})
 
@@ -588,10 +589,6 @@ func (p *Proxy) proxyOneClient(
 		return backOff, errors.Trace(err)
 	}
 
-	if announceResponse.OperatorMessageJSON != "" {
-		p.config.OperatorMessageHandler(announceResponse.OperatorMessageJSON)
-	}
-
 	if len(announceResponse.TacticsPayload) > 0 {
 
 		// The TacticsPayload may include new tactics, or may simply signal,
@@ -613,8 +610,8 @@ func (p *Proxy) proxyOneClient(
 		signalAnnounceDone()
 	}
 
-	// Trigger back-off back off when rate/entry limited; no back-off for
-	// no-match.
+	// Trigger back-off back off when rate/entry limited or must upgrade; no
+	// back-off for no-match.
 
 	if announceResponse.Limited {
 
@@ -625,6 +622,14 @@ func (p *Proxy) proxyOneClient(
 
 		return backOff, errors.TraceNew("no match")
 
+	} else if announceResponse.MustUpgrade {
+
+		if p.config.MustUpgrade != nil {
+			p.config.MustUpgrade()
+		}
+
+		backOff = true
+		return backOff, errors.TraceNew("must upgrade")
 	}
 
 	if announceResponse.ClientProxyProtocolVersion != ProxyProtocolVersion1 {
@@ -662,6 +667,10 @@ func (p *Proxy) proxyOneClient(
 		ctx, common.ValueOrDefault(webRTCCoordinator.WebRTCAnswerTimeout(), proxyWebRTCAnswerTimeout))
 	defer webRTCAnswerCancelFunc()
 
+	// In personal pairing mode, RFC 1918/4193 private IP addresses are
+	// included in SDPs.
+	hasPersonalCompartmentIDs := len(personalCompartmentIDs) > 0
+
 	webRTCConn, SDP, sdpMetrics, webRTCErr := newWebRTCConnWithAnswer(
 		webRTCAnswerCtx,
 		&webRTCConfig{
@@ -672,7 +681,8 @@ func (p *Proxy) proxyOneClient(
 			DoDTLSRandomization:         announceResponse.DoDTLSRandomization,
 			TrafficShapingParameters:    announceResponse.TrafficShapingParameters,
 		},
-		announceResponse.ClientOfferSDP)
+		announceResponse.ClientOfferSDP,
+		hasPersonalCompartmentIDs)
 	var webRTCRequestErr string
 	if webRTCErr != nil {
 		webRTCErr = errors.Trace(webRTCErr)

+ 84 - 7
psiphon/common/inproxy/sdp_test.go

@@ -23,6 +23,7 @@ package inproxy
 
 import (
 	"context"
+	"fmt"
 	"net"
 	"strings"
 	"testing"
@@ -48,13 +49,19 @@ func runTestProcessSDP() error {
 		},
 	}
 
+	hasPersonalCompartmentIDs := false
+	errorOnNoCandidates := true
+	disableIPv6Candidates := false
+	allowPrivateIPAddressCandidates := false
+	filterPrivateIPAddressCandidates := false
+
 	// Create a valid, base SDP, including private network (bogon) candidates.
 
 	SetAllowBogonWebRTCConnections(true)
 	defer SetAllowBogonWebRTCConnections(false)
 
 	conn, webRTCSDP, metrics, err := newWebRTCConnWithOffer(
-		context.Background(), config)
+		context.Background(), config, hasPersonalCompartmentIDs)
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -64,9 +71,15 @@ func runTestProcessSDP() error {
 
 	// Test disallow IPv6
 
+	disableIPv6Candidates = true
+
 	if metrics.hasIPv6 {
 		preparedSDP, metrics, err := prepareSDPAddresses(
-			SDP, true, "", true)
+			SDP,
+			errorOnNoCandidates,
+			"",
+			disableIPv6Candidates,
+			allowPrivateIPAddressCandidates)
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -87,6 +100,8 @@ func runTestProcessSDP() error {
 		}
 	}
 
+	disableIPv6Candidates = false
+
 	// Test filter unexpected GeoIP
 
 	// This IP must not be a bogon; this address is not dialed.
@@ -101,13 +116,22 @@ func runTestProcessSDP() error {
 
 	// Add the testIP as a port mapping candidate.
 	preparedSDP, metrics, err := prepareSDPAddresses(
-		SDP, true, net.JoinHostPort(testIP, "80"), false)
+		SDP,
+		errorOnNoCandidates,
+		net.JoinHostPort(testIP, "80"),
+		disableIPv6Candidates,
+		allowPrivateIPAddressCandidates)
 	if err != nil {
 		return errors.Trace(err)
 	}
 
 	filteredSDP, metrics, err := filterSDPAddresses(
-		preparedSDP, true, lookupGeoIP, expectedGeoIP)
+		preparedSDP,
+		errorOnNoCandidates,
+		lookupGeoIP,
+		expectedGeoIP,
+		allowPrivateIPAddressCandidates,
+		filterPrivateIPAddressCandidates)
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -131,9 +155,16 @@ func runTestProcessSDP() error {
 
 	SetAllowBogonWebRTCConnections(false)
 
-	// Allow no candidates (errorOnNoCandidates = false)
+	// Allow no candidates
+	errorOnNoCandidates = false
+
 	filteredSDP, metrics, err = filterSDPAddresses(
-		SDP, false, nil, common.GeoIPData{})
+		SDP,
+		errorOnNoCandidates,
+		nil,
+		common.GeoIPData{},
+		allowPrivateIPAddressCandidates,
+		filterPrivateIPAddressCandidates)
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -149,7 +180,53 @@ func runTestProcessSDP() error {
 		return errors.TraceNew("unexpected filteredICECandidates")
 	}
 
-	if len(filteredSDP) >= len(preparedSDP) {
+	if len(filteredSDP) >= len(SDP) {
+		return errors.TraceNew("unexpected SDP length")
+	}
+
+	errorOnNoCandidates = true
+
+	// Test private IP addresses
+
+	SetAllowBogonWebRTCConnections(false)
+
+	hasPersonalCompartmentIDs = true
+	allowPrivateIPAddressCandidates = true
+	filterPrivateIPAddressCandidates = true
+
+	conn, webRTCSDP, metrics, err = newWebRTCConnWithOffer(
+		context.Background(), config, hasPersonalCompartmentIDs)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	defer conn.Close()
+
+	SDP = []byte(webRTCSDP.SDP)
+
+	hasPrivateIP := metrics.hasPrivateIP
+
+	if !hasPrivateIP {
+		// Test may run on host without RFC 1918/4193 private IP address
+		fmt.Printf("No private IP address\n")
+	}
+
+	// Filter should retain any private IP address(es)
+	filteredSDP, metrics, err = filterSDPAddresses(
+		SDP,
+		errorOnNoCandidates,
+		nil,
+		common.GeoIPData{},
+		allowPrivateIPAddressCandidates,
+		filterPrivateIPAddressCandidates)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	if hasPrivateIP != metrics.hasPrivateIP {
+		return errors.TraceNew("unexpected metrics.hasPrivateIP")
+	}
+
+	if len(filteredSDP) != len(SDP) {
 		return errors.TraceNew("unexpected SDP length")
 	}
 

+ 165 - 22
psiphon/common/inproxy/webrtc.go

@@ -153,10 +153,11 @@ type webRTCConfig struct {
 // establishment.
 func newWebRTCConnWithOffer(
 	ctx context.Context,
-	config *webRTCConfig) (
+	config *webRTCConfig,
+	hasPersonalCompartmentIDs bool) (
 	*webRTCConn, WebRTCSessionDescription, *webRTCSDPMetrics, error) {
 
-	conn, SDP, metrics, err := newWebRTCConn(ctx, config, nil)
+	conn, SDP, metrics, err := newWebRTCConn(ctx, config, nil, false)
 	if err != nil {
 		return nil, WebRTCSessionDescription{}, nil, errors.Trace(err)
 	}
@@ -170,10 +171,12 @@ func newWebRTCConnWithOffer(
 func newWebRTCConnWithAnswer(
 	ctx context.Context,
 	config *webRTCConfig,
-	peerSDP WebRTCSessionDescription) (
+	peerSDP WebRTCSessionDescription,
+	hasPersonalCompartmentIDs bool) (
 	*webRTCConn, WebRTCSessionDescription, *webRTCSDPMetrics, error) {
 
-	conn, SDP, metrics, err := newWebRTCConn(ctx, config, &peerSDP)
+	conn, SDP, metrics, err := newWebRTCConn(
+		ctx, config, &peerSDP, hasPersonalCompartmentIDs)
 	if err != nil {
 		return nil, WebRTCSessionDescription{}, nil, errors.Trace(err)
 	}
@@ -183,7 +186,8 @@ func newWebRTCConnWithAnswer(
 func newWebRTCConn(
 	ctx context.Context,
 	config *webRTCConfig,
-	peerSDP *WebRTCSessionDescription) (
+	peerSDP *WebRTCSessionDescription,
+	hasPersonalCompartmentIDs bool) (
 	retconn *webRTCConn,
 	retSDP *WebRTCSessionDescription,
 	retMetrics *webRTCSDPMetrics,
@@ -628,9 +632,33 @@ func newWebRTCConn(
 
 	} else {
 
+		SDP := peerSDP.SDP
+		if hasPersonalCompartmentIDs {
+
+			// In personal pairing mode, the peer SDP may include private IP
+			// addresses. To avoid unnecessary network traffic, filter out
+			// any peer private IP addresses for which there is no
+			// corresponding local, active interface.
+
+			errorOnNoCandidates := false
+			allowPrivateIPAddressCandidates := true
+			filterPrivateIPAddressCandidates := true
+			adjustedSDP, _, err := filterSDPAddresses(
+				[]byte(peerSDP.SDP),
+				errorOnNoCandidates,
+				nil,
+				common.GeoIPData{},
+				allowPrivateIPAddressCandidates,
+				filterPrivateIPAddressCandidates)
+			if err != nil {
+				return nil, nil, nil, errors.Trace(err)
+			}
+			SDP = string(adjustedSDP)
+		}
+
 		pionSessionDescription := webrtc.SessionDescription{
 			Type: webrtc.SDPType(peerSDP.Type),
-			SDP:  peerSDP.SDP,
+			SDP:  SDP,
 		}
 
 		err = conn.peerConnection.SetRemoteDescription(pionSessionDescription)
@@ -701,7 +729,8 @@ func newWebRTCConn(
 		[]byte(localDescription.SDP),
 		errorOnNoCandidates,
 		portMappingExternalAddr,
-		config.WebRTCDialCoordinator.DisableIPv6ICECandidates())
+		config.WebRTCDialCoordinator.DisableIPv6ICECandidates(),
+		hasPersonalCompartmentIDs)
 	if err != nil {
 		return nil, nil, nil, errors.Trace(err)
 	}
@@ -760,13 +789,40 @@ func (conn *webRTCConn) setDataChannel(dataChannel *webrtc.DataChannel) {
 // SetRemoteSDP takes the answer SDP that is received in response to an offer
 // SDP. SetRemoteSDP initiates the WebRTC connection establishment on the
 // offer end.
-func (conn *webRTCConn) SetRemoteSDP(peerSDP WebRTCSessionDescription) error {
+func (conn *webRTCConn) SetRemoteSDP(
+	peerSDP WebRTCSessionDescription,
+	hasPersonalCompartmentIDs bool) error {
+
 	conn.mutex.Lock()
 	defer conn.mutex.Unlock()
 
+	SDP := peerSDP.SDP
+	if hasPersonalCompartmentIDs {
+
+		// In personal pairing mode, the peer SDP may include private IP
+		// addresses. To avoid unnecessary network traffic, filter out any
+		// peer private IP addresses for which there is no corresponding
+		// local, active interface.
+
+		errorOnNoCandidates := false
+		allowPrivateIPAddressCandidates := true
+		filterPrivateIPAddressCandidates := true
+		adjustedSDP, _, err := filterSDPAddresses(
+			[]byte(peerSDP.SDP),
+			errorOnNoCandidates,
+			nil,
+			common.GeoIPData{},
+			allowPrivateIPAddressCandidates,
+			filterPrivateIPAddressCandidates)
+		if err != nil {
+			return errors.Trace(err)
+		}
+		SDP = string(adjustedSDP)
+	}
+
 	pionSessionDescription := webrtc.SessionDescription{
 		Type: webrtc.SDPType(peerSDP.Type),
-		SDP:  peerSDP.SDP,
+		SDP:  SDP,
 	}
 
 	err := conn.peerConnection.SetRemoteDescription(pionSessionDescription)
@@ -919,8 +975,14 @@ func (conn *webRTCConn) recordSelectedICECandidateStats() error {
 		if localIP != nil && localIP.To4() == nil {
 			isIPv6 = "1"
 		}
+		isPrivate := "0"
+		if localIP != nil && localIP.IsPrivate() {
+			isPrivate = "1"
+		}
 		conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_is_IPv6"] =
 			isIPv6
+		conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_is_private_IP"] =
+			isPrivate
 		conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_port"] =
 			localCandidateStats.Port
 
@@ -931,8 +993,14 @@ func (conn *webRTCConn) recordSelectedICECandidateStats() error {
 		if remoteIP != nil && remoteIP.To4() == nil {
 			isIPv6 = "1"
 		}
+		isPrivate = "0"
+		if remoteIP != nil && remoteIP.IsPrivate() {
+			isPrivate = "1"
+		}
 		conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_is_IPv6"] =
 			isIPv6
+		conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_is_private_IP"] =
+			isPrivate
 		conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_port"] =
 			remoteCandidateStats.Port
 
@@ -1516,13 +1584,16 @@ func prepareSDPAddresses(
 	encodedSDP []byte,
 	errorOnNoCandidates bool,
 	portMappingExternalAddr string,
-	disableIPv6Candidates bool) ([]byte, *webRTCSDPMetrics, error) {
+	disableIPv6Candidates bool,
+	allowPrivateIPAddressCandidates bool) ([]byte, *webRTCSDPMetrics, error) {
 
 	modifiedSDP, metrics, err := processSDPAddresses(
 		encodedSDP,
+		errorOnNoCandidates,
 		portMappingExternalAddr,
 		disableIPv6Candidates,
-		errorOnNoCandidates,
+		allowPrivateIPAddressCandidates,
+		false,
 		nil,
 		common.GeoIPData{})
 	return modifiedSDP, metrics, errors.Trace(err)
@@ -1536,13 +1607,17 @@ func filterSDPAddresses(
 	encodedSDP []byte,
 	errorOnNoCandidates bool,
 	lookupGeoIP LookupGeoIP,
-	expectedGeoIPData common.GeoIPData) ([]byte, *webRTCSDPMetrics, error) {
+	expectedGeoIPData common.GeoIPData,
+	allowPrivateIPAddressCandidates bool,
+	filterPrivateIPAddressCandidates bool) ([]byte, *webRTCSDPMetrics, error) {
 
 	filteredSDP, metrics, err := processSDPAddresses(
 		encodedSDP,
+		errorOnNoCandidates,
 		"",
 		false,
-		errorOnNoCandidates,
+		allowPrivateIPAddressCandidates,
+		filterPrivateIPAddressCandidates,
 		lookupGeoIP,
 		expectedGeoIPData)
 	return filteredSDP, metrics, errors.Trace(err)
@@ -1552,6 +1627,7 @@ func filterSDPAddresses(
 type webRTCSDPMetrics struct {
 	iceCandidateTypes     []ICECandidateType
 	hasIPv6               bool
+	hasPrivateIP          bool
 	filteredICECandidates []string
 }
 
@@ -1594,9 +1670,11 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
 func processSDPAddresses(
 	encodedSDP []byte,
+	errorOnNoCandidates bool,
 	portMappingExternalAddr string,
 	disableIPv6Candidates bool,
-	errorOnNoCandidates bool,
+	allowPrivateIPAddressCandidates bool,
+	filterPrivateIPAddressCandidates bool,
 	lookupGeoIP LookupGeoIP,
 	expectedGeoIPData common.GeoIPData) ([]byte, *webRTCSDPMetrics, error) {
 
@@ -1608,6 +1686,7 @@ func processSDPAddresses(
 
 	candidateTypes := map[ICECandidateType]bool{}
 	hasIPv6 := false
+	hasPrivateIP := true
 	filteredCandidateReasons := make(map[string]int)
 
 	var portMappingICECandidates []sdp.Attribute
@@ -1703,14 +1782,21 @@ func processSDPAddresses(
 					candidateIsIPv6 = true
 				}
 
-				// Strip non-routable bogons, including LAN addresses.
-				// Same-LAN client/proxy hops are not expected to be useful,
-				// and this also avoids unnecessary local network traffic.
+				// Strip non-routable bogons, including RFC 1918/4193 private
+				// IP addresses. Same-LAN client/proxy hops are not expected
+				// to be useful, and this also avoids unnecessary network traffic.
 				//
 				// Well-behaved clients and proxies should strip these values;
 				// the broker enforces this with filtering.
+				//
+				// In personal pairing mode, private IP addresses are allowed,
+				// as connection may be made between devices the same LAN and
+				// not all routers support NAT hairpinning.
+
+				candidateIsPrivateIP := candidateIP.IsPrivate()
 
 				if !GetAllowBogonWebRTCConnections() &&
+					!(candidateIsPrivateIP && allowPrivateIPAddressCandidates) &&
 					common.IsBogon(candidateIP) {
 
 					version := "IPv4"
@@ -1723,6 +1809,18 @@ func processSDPAddresses(
 					continue
 				}
 
+				// In personal pairing mode, filter out any private IP
+				// addresses for which there is no corresponding local,
+				// active interface. This avoids unnecessary network traffic.
+				// This filtering option is applied post-broker exchange,
+				// with the SDP received, via the broker, from the peer.
+
+				if candidateIsPrivateIP && filterPrivateIPAddressCandidates {
+					if !hasInterfaceForPrivateIPAddress(candidateIP) {
+						continue
+					}
+				}
+
 				// The broker will check that clients and proxies specify only
 				// candidates that map to the same GeoIP country and ASN as
 				// the client/proxy connection to the broker. This limits
@@ -1765,6 +1863,9 @@ func processSDPAddresses(
 				if candidateIsIPv6 {
 					hasIPv6 = true
 				}
+				if candidateIsPrivateIP {
+					hasPrivateIP = true
+				}
 
 				// These types are not reported:
 				// - CandidateTypeRelay: TURN servers are not used.
@@ -1797,7 +1898,8 @@ func processSDPAddresses(
 	}
 
 	metrics := &webRTCSDPMetrics{
-		hasIPv6: hasIPv6,
+		hasIPv6:      hasIPv6,
+		hasPrivateIP: hasPrivateIP,
 	}
 	for candidateType := range candidateTypes {
 		metrics.iceCandidateTypes = append(metrics.iceCandidateTypes, candidateType)
@@ -1895,6 +1997,48 @@ func (l *pionLogger) Errorf(format string, args ...interface{}) {
 	l.logger.WithTrace().Error(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
 }
 
+func hasInterfaceForPrivateIPAddress(IP net.IP) bool {
+
+	if !IP.IsPrivate() {
+		return false
+	}
+
+	// The anet package is used to work around net.Interfaces not working on
+	// Android at this time: https://github.com/golang/go/issues/40569.
+	//
+	// Any errors are silently dropped; the caller will proceed without using
+	// the input private IP; and equivilent anet calls are made in
+	// pionNetwork.Interfaces, with errors logged.
+
+	netInterfaces, err := anet.Interfaces()
+	if err != nil {
+		return false
+	}
+
+	for _, netInterface := range netInterfaces {
+		// Note: don't exclude interfaces with the net.FlagPointToPoint flag,
+		// which is set for certain mobile networks
+		if netInterface.Flags&net.FlagUp == 0 {
+			continue
+		}
+		addrs, err := anet.InterfaceAddrsByInterface(&netInterface)
+		if err != nil {
+			continue
+		}
+		for _, addr := range addrs {
+			_, IPNet, err := net.ParseCIDR(addr.String())
+			if err != nil {
+				continue
+			}
+			if IPNet.Contains(IP) {
+				return true
+			}
+		}
+	}
+
+	return false
+}
+
 // pionNetwork implements pion/transport.Net.
 //
 // Via the SettingsEngine, pion is configured to use a pionNetwork instance,
@@ -1936,9 +2080,6 @@ func (p *pionNetwork) Interfaces() ([]*transport.Interface, error) {
 	// should be the active, externally routable addresses, and the IPv6
 	// address should be the preferred, non-deprecated temporary IPv6 address.
 	//
-	// The anet package is used to work around net.Interfaces not working on
-	// Android at this time: https://github.com/golang/go/issues/40569.
-	//
 	// In post-ICE gathering processing, processSDPAddresses will also strip
 	// all bogon addresses, so there is no explicit bogon check here.
 	//
@@ -1971,10 +2112,12 @@ func (p *pionNetwork) Interfaces() ([]*transport.Interface, error) {
 		udpConnIPv6.Close()
 	}
 
+	// The anet package is used to work around net.Interfaces not working on
+	// Android at this time: https://github.com/golang/go/issues/40569.
+
 	transportInterfaces := []*transport.Interface{}
 
 	netInterfaces, err := anet.Interfaces()
-
 	if err != nil {
 		return nil, errors.Trace(err)
 	}

+ 4 - 1
psiphon/common/protocol/packed.go

@@ -792,7 +792,10 @@ func init() {
 
 		{142, "statusData", rawJSONConverter},
 
-		// Last key value = 142
+		{143, "inproxy_webrtc_local_ice_candidate_is_private_IP", intConverter},
+		{144, "inproxy_webrtc_remote_ice_candidate_is_private_IP", intConverter},
+
+		// Next key value = 145
 	}
 
 	for _, spec := range packedAPIParameterSpecs {

+ 33 - 1
psiphon/config.go

@@ -34,6 +34,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"unicode"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -1072,6 +1073,10 @@ type Config struct {
 
 	tacticsAppliedReceiversMutex sync.Mutex
 	tacticsAppliedReceivers      []TacticsAppliedReceiver
+
+	signalComponentFailure atomic.Value
+
+	inproxyMustUpgradePosted int32
 }
 
 // TacticsAppliedReceiver specifies the interface for a component that is
@@ -1113,6 +1118,8 @@ func LoadConfig(configJson []byte) (*Config, error) {
 	config.loadTimestamp = common.TruncateTimestampToHour(
 		common.GetCurrentTimestamp())
 
+	config.signalComponentFailure.Store(func() {})
+
 	return &config, nil
 }
 
@@ -1400,7 +1407,9 @@ func (config *Config) Commit(migrateFromLegacyFields bool) error {
 		return errors.TraceNew("invalid ObfuscatedSSHAlgorithms")
 	}
 
-	if !config.DisableTunnels && config.InproxyEnableProxy &&
+	if !config.DisableTunnels &&
+		config.InproxyEnableProxy &&
+		!GetAllowOverlappingPersonalCompartmentIDs() &&
 		common.ContainsAny(
 			config.InproxyProxyPersonalCompartmentIDs,
 			config.InproxyClientPersonalCompartmentIDs) {
@@ -1782,6 +1791,10 @@ func (config *Config) GetNetworkID() string {
 	return config.networkIDGetter.GetNetworkID()
 }
 
+func (config *Config) SetSignalComponentFailure(signalComponentFailure func()) {
+	config.signalComponentFailure.Store(signalComponentFailure)
+}
+
 // IsInproxyPersonalPairingMode indicates that the client is in in-proxy
 // personal pairing mode, where connections are made only through in-proxy
 // proxies with corresponding personal compartment IDs.
@@ -1789,6 +1802,25 @@ func (config *Config) IsInproxyPersonalPairingMode() bool {
 	return len(config.InproxyClientPersonalCompartmentIDs) > 0
 }
 
+// OnInproxyMustUpgrade is invoked when the in-proxy broker returns the
+// MustUpgrade response. When either running a proxy, or when running a
+// client in personal-pairing mode -- two states that require in-proxy
+// functionality -- onInproxyMustUpgrade initiates a shutdown after emitting
+// the InproxyMustUpgrade notice.
+func (config *Config) OnInproxyMustUpgrade() {
+
+	// TODO: check if LimitTunnelProtocols is set to allow only INPROXY tunnel
+	// protocols; this is another case where in-proxy functionality is
+	// required.
+
+	if config.InproxyEnableProxy || config.IsInproxyPersonalPairingMode() {
+		if atomic.CompareAndSwapInt32(&config.inproxyMustUpgradePosted, 0, 1) {
+			NoticeInproxyMustUpgrade()
+		}
+		config.signalComponentFailure.Load().(func())()
+	}
+}
+
 func (config *Config) makeConfigParameters() map[string]interface{} {
 
 	// Build set of config values to apply to parameters.

+ 3 - 6
psiphon/controller.go

@@ -260,6 +260,7 @@ func NewController(config *Config) (controller *Controller, err error) {
 	}
 
 	controller.config.SetTacticsAppliedReceivers(tacticAppliedReceivers)
+	controller.config.SetSignalComponentFailure(controller.SignalComponentFailure)
 
 	return controller, nil
 }
@@ -2875,12 +2876,8 @@ func (controller *Controller) runInproxyProxy() {
 		MaxClients:                    controller.config.InproxyMaxClients,
 		LimitUpstreamBytesPerSecond:   controller.config.InproxyLimitUpstreamBytesPerSecond,
 		LimitDownstreamBytesPerSecond: controller.config.InproxyLimitDownstreamBytesPerSecond,
-
-		OperatorMessageHandler: func(messageJSON string) {
-			NoticeInproxyOperatorMessage(messageJSON)
-		},
-
-		ActivityUpdater: activityUpdater,
+		MustUpgrade:                   controller.config.OnInproxyMustUpgrade,
+		ActivityUpdater:               activityUpdater,
 	}
 
 	proxy, err := inproxy.NewProxy(config)

+ 64 - 0
psiphon/debug.go

@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2024, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"sync/atomic"
+)
+
+var allowOverlappingPersonalCompartmentIDs int32
+
+func GetAllowOverlappingPersonalCompartmentIDs() bool {
+	return atomic.LoadInt32(&allowOverlappingPersonalCompartmentIDs) == 1
+}
+
+// SetAllowOverlappingPersonalCompartmentIDs configures whether to allow
+// overlapping personal compartment IDs in InproxyProxyPersonalCompartmentIDs
+// and InproxyClientPersonalCompartmentIDs. Overlapping IDs are not allowed
+// in order to prevent a client matching its own proxy.
+// SetAllowOverlappingPersonalCompartmentIDs is for end-to-end testing on a
+// single host, and should be used only for testing purposes.
+func SetAllowOverlappingPersonalCompartmentIDs(allow bool) {
+	value := int32(0)
+	if allow {
+		value = 1
+	}
+	atomic.StoreInt32(&allowOverlappingPersonalCompartmentIDs, value)
+}
+
+var allowBogonWebRTCConnections int32
+
+func GetAllowBogonWebRTCConnections() bool {
+	return atomic.LoadInt32(&allowBogonWebRTCConnections) == 1
+}
+
+// SetAllowBogonWebRTCConnections configures whether to allow bogon ICE
+// candidates in WebRTC session descriptions. This included loopback and
+// private network candidates. By default, bogon addresses are exclude as
+// they are not expected to be useful and may expose private network
+// information. SetAllowBogonWebRTCConnections is for end-to-end testing on a
+// single host, and should be used only for testing purposes.
+func SetAllowBogonWebRTCConnections(allow bool) {
+	value := int32(0)
+	if allow {
+		value = 1
+	}
+	atomic.StoreInt32(&allowBogonWebRTCConnections, value)
+}

+ 9 - 5
psiphon/notice.go

@@ -1097,12 +1097,16 @@ func NoticeSkipServerEntry(format string, args ...interface{}) {
 		"SkipServerEntry", 0, "reason", reason)
 }
 
-// NoticeInproxyOperatorMessage emits a message to be displayed to the proxy
-// operator.
-func NoticeInproxyOperatorMessage(messageJSON string) {
+// NoticeInproxyMustUpgrade reports that an in-proxy component requires an app
+// upgrade. Currently this includes running a proxy; and running a client in
+// personal pairing mode. The receiver should alert the user to upgrade the
+// app.
+//
+// There is at most one InproxyMustUpgrade notice emitted per controller run,
+// and an InproxyMustUpgrade notice is followed by a tunnel-core shutdown.
+func NoticeInproxyMustUpgrade() {
 	singletonNoticeLogger.outputNotice(
-		"InproxyOperatorMessage", 0,
-		"message", messageJSON)
+		"InproxyMustUpgrade", 0)
 }
 
 // NoticeInproxyProxyActivity reports proxy usage statistics. The stats are

+ 40 - 2
psiphon/server/server_test.go

@@ -416,6 +416,23 @@ func TestInproxyTLSOSSH(t *testing.T) {
 		})
 }
 
+func TestInproxyPersonalPairing(t *testing.T) {
+	if !inproxy.Enabled() {
+		t.Skip("inproxy is not enabled")
+	}
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:       "INPROXY-WEBRTC-OSSH",
+			requireAuthorization: true,
+			doTunneledWebRequest: true,
+			doTunneledNTPRequest: true,
+			doDanglingTCPConn:    true,
+			doLogHostProvider:    true,
+			doTargetBrokerSpecs:  true,
+			doPersonalPairing:    true,
+		})
+}
+
 func TestHotReload(t *testing.T) {
 	runServer(t,
 		&runServerConfig{
@@ -654,6 +671,7 @@ type runServerConfig struct {
 	doSteeringIP         bool
 	doTargetBrokerSpecs  bool
 	useLegacyAPIEncoding bool
+	doPersonalPairing    bool
 }
 
 var (
@@ -1301,6 +1319,15 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		clientConfig.InproxyLimitDownstreamBytesPerSecond = 0
 		clientConfig.ServerEntrySignaturePublicKey = inproxyTestConfig.brokerServerEntrySignaturePublicKey
 
+		if runConfig.doPersonalPairing {
+
+			psiphon.SetAllowOverlappingPersonalCompartmentIDs(true)
+			defer psiphon.SetAllowOverlappingPersonalCompartmentIDs(false)
+
+			clientConfig.InproxyClientPersonalCompartmentIDs = []string{inproxyTestConfig.personalCompartmentID}
+			clientConfig.InproxyProxyPersonalCompartmentIDs = []string{inproxyTestConfig.personalCompartmentID}
+		}
+
 		// Simulate a CDN adding required HTTP headers by injecting them at
 		// the client.
 		headers := make(http.Header)
@@ -2385,6 +2412,10 @@ func checkExpectedServerTunnelLogFields(
 			return fmt.Errorf("unexpected inproxy_proxy_id '%s'", fields["inproxy_proxy_id"])
 		}
 
+		if fields["inproxy_matched_common_compartments"].(bool) != !runConfig.doPersonalPairing {
+			return fmt.Errorf("unexpected inproxy_matched_common_compartments '%s'", fields["inproxy_matched_common_compartments"])
+		}
+
 		if fields["inproxy_broker_fronting_provider_id"].(string) != inproxyTestConfig.brokerFrontingProviderID {
 			return fmt.Errorf("unexpected inproxy_broker_fronting_provider_id '%s'", fields["inproxy_broker_fronting_provider_id"])
 		}
@@ -3390,6 +3421,8 @@ type inproxyTestConfig struct {
 	proxySessionPublicKey           string
 	proxySessionPublicKeyCurve25519 string
 	proxySessionPrivateKey          string
+
+	personalCompartmentID string
 }
 
 func generateInproxyTestConfig(
@@ -3404,8 +3437,6 @@ func generateInproxyTestConfig(
 	// In this test, a single common compartment ID is issued to all clients;
 	// the test client will get it via tactics.
 	//
-	// TODO: exercise personal compartment IDs
-	//
 	// Because of singletons in the Psiphon client, there can only be a single
 	// Psiphon client instance in this test process, and so it must act as
 	// it's own in-proxy proxy.
@@ -3425,6 +3456,12 @@ func generateInproxyTestConfig(
 	}
 	commonCompartmentIDStr := commonCompartmentID.String()
 
+	personalCompartmentID, err := inproxy.MakeID()
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	personalCompartmentIDStr := personalCompartmentID.String()
+
 	brokerSessionPrivateKey, err := inproxy.GenerateSessionPrivateKey()
 	if err != nil {
 		return nil, errors.Trace(err)
@@ -3589,6 +3626,7 @@ func generateInproxyTestConfig(
 		proxySessionPublicKey:               proxySessionPublicKeyStr,
 		proxySessionPublicKeyCurve25519:     proxySessionPublicKeyCurve25519Str,
 		proxySessionPrivateKey:              proxySessionPrivateKeyStr,
+		personalCompartmentID:               personalCompartmentIDStr,
 	}
 
 	return config, nil

+ 1 - 0
psiphon/tunnel.go

@@ -1568,6 +1568,7 @@ func dialInproxy(
 		DialAddress:                  dialAddress,
 		RemoteAddrOverride:           remoteAddrOverride,
 		PackedDestinationServerEntry: dialParams.inproxyPackedSignedServerEntry,
+		MustUpgrade:                  config.OnInproxyMustUpgrade,
 	}
 
 	conn, err := inproxy.DialClient(ctx, clientConfig)