Răsfoiți Sursa

Add region activity tracking and split common/personal proxy config

sinku 1 lună în urmă
părinte
comite
3561db46b6

+ 54 - 2
MobileLibrary/Android/PsiphonTunnel/PsiphonTunnel.java

@@ -48,6 +48,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
@@ -66,6 +69,18 @@ import psi.PsiphonProviderNetwork;
 import psi.PsiphonProviderNoticeHandler;
 
 public class PsiphonTunnel {
+
+    /**
+     * A point-in-time snapshot of per-region proxy activity metrics.
+     * Used in onInproxyProxyActivity
+     */
+    public static class RegionActivitySnapshot {
+        public long bytesUp;
+        public long bytesDown;
+        public int connectingClients;
+        public int connectedClients;
+    }
+
     public interface HostLogger {
         default void onDiagnosticMessage(String message) {}
     }
@@ -140,9 +155,17 @@ public class PsiphonTunnel {
          * @param connectedClients Number of clients currently connected to the proxy.
          * @param bytesUp  Bytes uploaded through the proxy since the last report.
          * @param bytesDown Bytes downloaded through the proxy since the last report.
+         * @param personalRegionActivity Per-region activity metrics for personal proxy clients
+         * @param commonRegionActivity Per-region activity metrics for common proxy clients
          */
         default void onInproxyProxyActivity(
-            int announcing, int connectingClients, int connectedClients,long bytesUp, long bytesDown) {}
+            int announcing, 
+            int connectingClients, 
+            int connectedClients,
+            long bytesUp, 
+            long bytesDown,
+            Map<String, RegionActivitySnapshot> personalRegionActivity,
+            Map<String, RegionActivitySnapshot> commonRegionActivity) {}
         /**
          * Called when tunnel-core reports connected server region information.
          * @param region The server region received.
@@ -937,12 +960,18 @@ public class PsiphonTunnel {
                 mHostService.onInproxyMustUpgrade();
             } else if (noticeType.equals("InproxyProxyActivity")) {
                 JSONObject data = notice.getJSONObject("data");
+                Map<String, RegionActivitySnapshot> personalRegionActivity =
+                        parseRegionActivity(data.getJSONObject("personalRegionActivity"));
+                Map<String, RegionActivitySnapshot> commonRegionActivity =
+                        parseRegionActivity(data.getJSONObject("commonRegionActivity"));
                 mHostService.onInproxyProxyActivity(
                         data.getInt("announcing"),
                         data.getInt("connectingClients"),
                         data.getInt("connectedClients"),
                         data.getLong("bytesUp"),
-                        data.getLong("bytesDown"));
+                        data.getLong("bytesDown"),
+                        personalRegionActivity,
+                        commonRegionActivity);
             }
 
             if (diagnostic) {
@@ -955,6 +984,29 @@ public class PsiphonTunnel {
         }
     }
 
+    private static Map<String, RegionActivitySnapshot> parseRegionActivity(
+            JSONObject json) throws JSONException {
+        // creates a Map and populates it with the data from all available
+        // regions. This function also makes sure that the map is never null
+        if (json == null) {
+            return Collections.emptyMap();
+        }
+
+        Map<String, RegionActivitySnapshot> result = new HashMap<>();
+        Iterator<String> keys = json.keys();
+        while (keys.hasNext()) {
+            String region = keys.next();
+            JSONObject regionData = json.getJSONObject(region);
+            RegionActivitySnapshot snapshot = new RegionActivitySnapshot();
+            snapshot.bytesUp = regionData.getLong("bytesUp");
+            snapshot.bytesDown = regionData.getLong("bytesDown");
+            snapshot.connectingClients = regionData.getInt("connectingClients");
+            snapshot.connectedClients = regionData.getInt("connectedClients");
+            result.put(region, snapshot);
+        }
+        return result;
+    }
+
     private static String getDeviceRegion(Context context) {
         String region = "";
         TelephonyManager telephonyManager = (TelephonyManager)context.getSystemService(Context.TELEPHONY_SERVICE);

+ 2 - 0
MobileLibrary/iOS/PsiphonTunnel/PsiphonTunnel/PsiphonTunnel.h

@@ -318,6 +318,8 @@ followed by a tunnel-core shutdown.
  @param bytesUp Bytes uploaded through the proxy since the last report.
  @param bytesDown Bytes downloaded through the proxy since the last report.
  */
+// TODO: Add personalRegionActivity and commonRegionActivity parameters
+// to match the new fields in the InproxyProxyActivity notice.
 - (void)onInproxyProxyActivity:(int)announcing
               connectingClients:(int)connectingClients
               connectedClients:(int)connectedClients

+ 3 - 0
MobileLibrary/iOS/PsiphonTunnel/PsiphonTunnel/PsiphonTunnel.m

@@ -1196,6 +1196,9 @@ typedef NS_ERROR_ENUM(PsiphonTunnelErrorDomain, PsiphonTunnelErrorCode) {
         }
     }
     else if ([noticeType isEqualToString:@"InproxyProxyActivity"]) {
+        // TODO: Parse and forward personalRegionActivity and
+        // commonRegionActivity. This should be done when the conduit iOS app
+        // supports the tunnel functionality correctly
         id announcing = [notice valueForKeyPath:@"data.announcing"];
         id connectingClients = [notice valueForKeyPath:@"data.connectingClients"];
         id connectedClients = [notice valueForKeyPath:@"data.connectedClients"];

+ 8 - 2
psiphon/common/inproxy/api.go

@@ -255,18 +255,21 @@ func (p NetworkProtocol) IsStream() bool {
 // ProxyMetrics are network topolology and resource metrics provided by a
 // proxy to a broker. The broker uses this information when matching proxies
 // and clients.
+// Limitation: Currently, there is no MaxReducedPersonalClients config, as
+// We assumed that users would not want the personal connections to be reduced.
 type ProxyMetrics struct {
 	BaseAPIParameters             protocol.PackedAPIParameters `cbor:"1,keyasint,omitempty"`
 	ProtocolVersion               int32                        `cbor:"2,keyasint,omitempty"`
 	NATType                       NATType                      `cbor:"3,keyasint,omitempty"`
 	PortMappingTypes              PortMappingTypes             `cbor:"4,keyasint,omitempty"`
-	MaxClients                    int32                        `cbor:"6,keyasint,omitempty"`
+	MaxCommonClients              int32                        `cbor:"6,keyasint,omitempty"`
 	ConnectingClients             int32                        `cbor:"7,keyasint,omitempty"`
 	ConnectedClients              int32                        `cbor:"8,keyasint,omitempty"`
 	LimitUpstreamBytesPerSecond   int64                        `cbor:"9,keyasint,omitempty"`
 	LimitDownstreamBytesPerSecond int64                        `cbor:"10,keyasint,omitempty"`
 	PeakUpstreamBytesPerSecond    int64                        `cbor:"11,keyasint,omitempty"`
 	PeakDownstreamBytesPerSecond  int64                        `cbor:"12,keyasint,omitempty"`
+	MaxPersonalClients            int32                        `cbor:"13,keyasint,omitempty"`
 }
 
 // ClientMetrics are network topolology metrics provided by a client to a
@@ -349,6 +352,7 @@ type ProxyAnnounceResponse struct {
 	TrafficShapingParameters    *TrafficShapingParameters `cbor:"10,keyasint,omitempty"`
 	NetworkProtocol             NetworkProtocol           `cbor:"11,keyasint,omitempty"`
 	DestinationAddress          string                    `cbor:"12,keyasint,omitempty"`
+	ClientRegion                string                    `cbor:"15,keyasint,omitempty"`
 }
 
 // ClientOfferRequest is an API request sent from a client to a broker,
@@ -662,7 +666,9 @@ func (metrics *ProxyMetrics) ValidateAndGetParametersAndLogFields(
 	logFields[logFieldPrefix+"protocol_version"] = metrics.ProtocolVersion
 	logFields[logFieldPrefix+"nat_type"] = metrics.NATType
 	logFields[logFieldPrefix+"port_mapping_types"] = metrics.PortMappingTypes
-	logFields[logFieldPrefix+"max_clients"] = metrics.MaxClients
+	logFields[logFieldPrefix+"max_common_clients"] = metrics.MaxCommonClients
+	logFields[logFieldPrefix+"max_personal_clients"] = metrics.MaxPersonalClients
+	logFields[logFieldPrefix+"max_clients"] = metrics.MaxCommonClients + metrics.MaxPersonalClients
 	logFields[logFieldPrefix+"connecting_clients"] = metrics.ConnectingClients
 	logFields[logFieldPrefix+"connected_clients"] = metrics.ConnectedClients
 	logFields[logFieldPrefix+"limit_upstream_bytes_per_second"] = metrics.LimitUpstreamBytesPerSecond

+ 1 - 0
psiphon/common/inproxy/broker.go

@@ -952,6 +952,7 @@ func (b *Broker) handleProxyAnnounce(
 			TrafficShapingParameters:    clientOffer.TrafficShapingParameters,
 			NetworkProtocol:             clientOffer.NetworkProtocol,
 			DestinationAddress:          clientOffer.DestinationAddress,
+			ClientRegion:                clientOffer.Properties.GeoIPData.Country,
 		})
 	if err != nil {
 		return nil, errors.Trace(err)

+ 132 - 9
psiphon/common/inproxy/inproxy_test.go

@@ -81,6 +81,8 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 	testCompartmentID, _ := MakeID()
 	testCommonCompartmentIDs := []ID{testCompartmentID}
+	personalCompartmentID, _ := MakeID()
+	testPersonalCompartmentIDs := []ID{personalCompartmentID}
 
 	testNetworkID := "NETWORK-ID-1"
 	testNetworkType := NetworkTypeUnknown
@@ -109,6 +111,13 @@ func runTestInproxy(doMustUpgrade bool) error {
 	roundTripperFailed := func(RoundTripper) { atomic.AddInt32(&roundTripperFailedCount, 1) }
 	noMatch := func(RoundTripper) {}
 
+	// Per-region activity tracking for testing the region activity feature.
+	var regionTrackingMutex sync.Mutex
+	seenCommonRegions := make(map[string]bool)
+	seenPersonalRegions := make(map[string]bool)
+	var totalCommonRegionBytesUp, totalCommonRegionBytesDown int64
+	var totalPersonalRegionBytesUp, totalPersonalRegionBytesDown int64
+
 	var receivedProxyMustUpgrade chan struct{}
 	var receivedClientMustUpgrade chan struct{}
 	if doMustUpgrade {
@@ -481,6 +490,7 @@ func runTestInproxy(doMustUpgrade bool) error {
 			brokerClientPrivateKey:      proxyPrivateKey,
 			brokerPublicKey:             brokerPublicKey,
 			brokerRootObfuscationSecret: brokerRootObfuscationSecret,
+			personalCompartmentIDs:      testPersonalCompartmentIDs,
 			brokerClientRoundTripper: newHTTPRoundTripper(
 				brokerListener.Addr().String(), "proxy"),
 			brokerClientRoundTripperSucceeded: roundTripperSucceded,
@@ -552,18 +562,40 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 			HandleTacticsPayload: makeHandleTacticsPayload(proxyPrivateKey, tacticsNetworkID),
 
-			MaxClients:                    proxyMaxClients,
+			MaxCommonClients:              proxyMaxClients,
+			MaxPersonalClients:            proxyMaxClients,
 			LimitUpstreamBytesPerSecond:   bytesToSend / targetElapsedSeconds,
 			LimitDownstreamBytesPerSecond: bytesToSend / targetElapsedSeconds,
 
 			ActivityUpdater: func(
 				announcing int32,
 				connectingClients int32, connectedClients int32,
-				bytesUp int64, bytesDown int64, bytesDuration time.Duration) {
+				bytesUp int64, bytesDown int64, bytesDuration time.Duration,
+				personalRegionActivity map[string]RegionActivitySnapshot,
+				commonRegionActivity map[string]RegionActivitySnapshot) {
 
 				fmt.Printf("[%s][%s] ACTIVITY: %d announcing, %d connecting, %d connected, %d up, %d down\n",
 					time.Now().UTC().Format(time.RFC3339), name,
 					announcing, connectingClients, connectedClients, bytesUp, bytesDown)
+
+				regionTrackingMutex.Lock()
+				for region, stats := range commonRegionActivity {
+					seenCommonRegions[region] = true
+					totalCommonRegionBytesUp += stats.BytesUp
+					totalCommonRegionBytesDown += stats.BytesDown
+					fmt.Printf("[%s][%s] COMMON REGION %s: connecting=%d, connected=%d, up=%d, down=%d\n",
+						time.Now().UTC().Format(time.RFC3339), name, region,
+						stats.ConnectingClients, stats.ConnectedClients, stats.BytesUp, stats.BytesDown)
+				}
+				for region, stats := range personalRegionActivity {
+					seenPersonalRegions[region] = true
+					totalPersonalRegionBytesUp += stats.BytesUp
+					totalPersonalRegionBytesDown += stats.BytesDown
+					fmt.Printf("[%s][%s] PERSONAL REGION %s: connecting=%d, connected=%d, up=%d, down=%d\n",
+						time.Now().UTC().Format(time.RFC3339), name, region,
+						stats.ConnectingClients, stats.ConnectedClients, stats.BytesUp, stats.BytesDown)
+				}
+				regionTrackingMutex.Unlock()
 			},
 
 			MustUpgrade: func() {
@@ -781,7 +813,9 @@ func runTestInproxy(doMustUpgrade bool) error {
 	}
 
 	newClientBrokerClient := func(
-		disableWaitToShareSession bool) (*BrokerClient, error) {
+		disableWaitToShareSession bool,
+		commonCompartmentIDs []ID,
+		personalCompartmentIDs []ID) (*BrokerClient, error) {
 
 		clientPrivateKey, err := GenerateSessionPrivateKey()
 		if err != nil {
@@ -792,7 +826,8 @@ func runTestInproxy(doMustUpgrade bool) error {
 			networkID:   testNetworkID,
 			networkType: testNetworkType,
 
-			commonCompartmentIDs: testCommonCompartmentIDs,
+			commonCompartmentIDs:   commonCompartmentIDs,
+			personalCompartmentIDs: personalCompartmentIDs,
 
 			disableWaitToShareSession: disableWaitToShareSession,
 
@@ -885,12 +920,22 @@ func runTestInproxy(doMustUpgrade bool) error {
 		return webRTCCoordinator, nil
 	}
 
-	sharedBrokerClient, err := newClientBrokerClient(false)
+	sharedCommonBrokerClient, err := newClientBrokerClient(false, testCommonCompartmentIDs, nil)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	sharedCommonBrokerClientDisableWait, err := newClientBrokerClient(true, testCommonCompartmentIDs, nil)
 	if err != nil {
 		return errors.Trace(err)
 	}
 
-	sharedBrokerClientDisableWait, err := newClientBrokerClient(true)
+	sharedPersonalBrokerClient, err := newClientBrokerClient(false, nil, testPersonalCompartmentIDs)
+	if err != nil {
+		return errors.Trace(err)
+	}
+
+	sharedPersonalBrokerClientDisableWait, err := newClientBrokerClient(true, nil, testPersonalCompartmentIDs)
 	if err != nil {
 		return errors.Trace(err)
 	}
@@ -904,16 +949,34 @@ func runTestInproxy(doMustUpgrade bool) error {
 		isMobile := i%4 == 0
 		useMediaStreams := i%4 < 2
 
+		// First half of clients are personal, second half are common.
+		isPersonalClient := i < numClients/2
+
 		// Exercise BrokerClients shared by multiple clients, but also create
 		// several broker clients.
+		//
+		// Per-region testing is handled by the HTTP server alternating regions
+		// based on request count.
 		var brokerClient *BrokerClient
 		switch i % 3 {
 		case 0:
-			brokerClient = sharedBrokerClient
+			if isPersonalClient {
+				brokerClient = sharedPersonalBrokerClient
+			} else {
+				brokerClient = sharedCommonBrokerClient
+			}
 		case 1:
-			brokerClient = sharedBrokerClientDisableWait
+			if isPersonalClient {
+				brokerClient = sharedPersonalBrokerClientDisableWait
+			} else {
+				brokerClient = sharedCommonBrokerClientDisableWait
+			}
 		case 2:
-			brokerClient, err = newClientBrokerClient(true)
+			if isPersonalClient {
+				brokerClient, err = newClientBrokerClient(true, nil, testPersonalCompartmentIDs)
+			} else {
+				brokerClient, err = newClientBrokerClient(true, testCommonCompartmentIDs, nil)
+			}
 			if err != nil {
 				return errors.Trace(err)
 			}
@@ -1008,6 +1071,40 @@ func runTestInproxy(doMustUpgrade bool) error {
 		if atomic.LoadInt32(&roundTripperFailedCount) > 0 {
 			return errors.TraceNew("unexpected round tripper failed count")
 		}
+
+		// Check per-region activity tracking
+		regionTrackingMutex.Lock()
+		if !seenCommonRegions["REGION-A"] {
+			regionTrackingMutex.Unlock()
+			return errors.TraceNew("expected to see REGION-A in common region activity")
+		}
+		if !seenCommonRegions["REGION-B"] {
+			regionTrackingMutex.Unlock()
+			return errors.TraceNew("expected to see REGION-B in common region activity")
+		}
+		if totalCommonRegionBytesUp == 0 {
+			regionTrackingMutex.Unlock()
+			return errors.TraceNew("expected non-zero per-region bytes up")
+		}
+		if totalCommonRegionBytesDown == 0 {
+			regionTrackingMutex.Unlock()
+			return errors.TraceNew("expected non-zero per-region bytes down")
+		}
+		if !seenPersonalRegions["REGION-A"] && !seenPersonalRegions["REGION-B"] {
+			regionTrackingMutex.Unlock()
+			return errors.TraceNew("expected to see personal region activity")
+		}
+		if totalPersonalRegionBytesUp == 0 {
+			regionTrackingMutex.Unlock()
+			return errors.TraceNew("expected non-zero personal per-region bytes up")
+		}
+		if totalPersonalRegionBytesDown == 0 {
+			regionTrackingMutex.Unlock()
+			return errors.TraceNew("expected non-zero personal per-region bytes down")
+		}
+		fmt.Printf("Per-region test passed: seen regions %v, total bytes up=%d, down=%d\n",
+			seenCommonRegions, totalCommonRegionBytesUp, totalCommonRegionBytesDown)
+		regionTrackingMutex.Unlock()
 	}
 
 	// Await shutdowns
@@ -1025,14 +1122,40 @@ func runTestInproxy(doMustUpgrade bool) error {
 
 func runHTTPServer(listener net.Listener, broker *Broker) error {
 
+	// Track client regions by RemoteAddr
+	var clientRegionsMutex sync.Mutex
+	clientRegions := make(map[string]string)
+	var clientRegionCount atomic.Int32
+
 	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 
 		// For this test, clients set the path to "/client" and proxies
 		// set the path to "/proxy" and we use that to create stub GeoIP
 		// data to pass the not-same-ASN condition.
+		//
+		// For per-region testing, each client gets an alternating sticky region
+		// (REGION-A or REGION-B) based on its RemoteAddr
 		var geoIPData common.GeoIPData
 		geoIPData.ASN = r.URL.Path
 
+		path := r.URL.Path
+		if strings.HasPrefix(path, "/client") {
+			clientRegionsMutex.Lock()
+			clientRegion, ok := clientRegions[r.RemoteAddr]
+			if !ok {
+				if clientRegionCount.Add(1)%2 == 0 {
+					clientRegion = "REGION-A"
+				} else {
+					clientRegion = "REGION-B"
+				}
+				clientRegions[r.RemoteAddr] = clientRegion
+			}
+			clientRegionsMutex.Unlock()
+			geoIPData.Country = clientRegion
+		} else if strings.HasPrefix(path, "/proxy") {
+			geoIPData.Country = "PROXY-REGION"
+		}
+
 		requestPayload, err := ioutil.ReadAll(
 			http.MaxBytesReader(w, r.Body, BrokerMaxRequestBodySize))
 		if err != nil {

+ 221 - 41
psiphon/common/inproxy/proxy.go

@@ -71,6 +71,20 @@ type Proxy struct {
 	useReducedSettings bool
 	reducedStartMinute int
 	reducedEndMinute   int
+
+	personalStatsMutex     sync.Mutex
+	personalRegionActivity map[string]*RegionActivity
+
+	commonStatsMutex     sync.Mutex
+	commonRegionActivity map[string]*RegionActivity
+}
+
+// RegionActivity holds metrics per-region for more detailed metric collection.
+type RegionActivity struct {
+	bytesUp           atomic.Int64
+	bytesDown         atomic.Int64
+	connectingClients atomic.Int32
+	connectedClients  atomic.Int32
 }
 
 // TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
@@ -138,9 +152,13 @@ type ProxyConfig struct {
 	// restarting the proxy.
 	MustUpgrade func()
 
-	// MaxClients is the maximum number of clients that are allowed to connect
-	// to the proxy. Must be > 0.
-	MaxClients int
+	// MaxCommonClients (formerly MaxClients) is the maximum number of common
+	// clients that are allowed to connect to the proxy. Must be > 0.
+	MaxCommonClients int
+
+	// MaxPersonalClients is the maximum number of personal clients that are
+	// allowed to connect to the proxy. Must be > 0.
+	MaxPersonalClients int
 
 	// LimitUpstreamBytesPerSecond limits the upstream data transfer rate for
 	// a single client. When 0, there is no limit.
@@ -158,12 +176,16 @@ type ProxyConfig struct {
 	// which reduced client settings end.
 	ReducedEndTime string
 
-	// ReducedMaxClients specifies the maximum number of clients that are
-	// allowed to connect to the proxy during the reduced time range.
+	// ReducedMaxCommonClients specifies the maximum number of common clients
+	// that are allowed to connect to the proxy during the reduced time range.
+	//
+	// Limitation: We currently do not support ReducedMaxPersonalClients.
+	// We assume that due to the importance of personal clients, users
+	// always prefer to have them connected.
 	//
 	// Clients connected when the reduced settings begin will not be
 	// disconnected.
-	ReducedMaxClients int
+	ReducedMaxCommonClients int
 
 	// ReducedLimitUpstreamBytesPerSecond limits the upstream data transfer
 	// rate for a single client during the reduced time range. When 0,
@@ -186,33 +208,51 @@ type ProxyConfig struct {
 	ActivityUpdater ActivityUpdater
 }
 
+// RegionActivitySnapshot holds a point-in-time copy of per-region metrics.
+// This is used for the ActivityUpdater callback and notice serialization.
+type RegionActivitySnapshot struct {
+	BytesUp           int64 `json:"bytesUp"`
+	BytesDown         int64 `json:"bytesDown"`
+	ConnectingClients int32 `json:"connectingClients"`
+	ConnectedClients  int32 `json:"connectedClients"`
+}
+
 // ActivityUpdater is a callback that is invoked when the proxy announces
 // availability, when clients connect and disconnect, and periodically with
 // data transfer updates (unless idle). This callback may be used to update
 // an activity UI. This callback should post this data to another thread or
 // handler and return immediately and not block on UI updates.
+//
+// The personalRegionActivity and commonRegionActivity parameters contain per-region
+// metrics (bytes transferred, connecting/connected counts) segmented by client
+// region.
 type ActivityUpdater func(
 	announcing int32,
 	connectingClients int32,
 	connectedClients int32,
 	bytesUp int64,
 	bytesDown int64,
-	bytesDuration time.Duration)
+	bytesDuration time.Duration,
+	personalRegionActivitySnapshot map[string]RegionActivitySnapshot,
+	commonRegionActivitySnapshot map[string]RegionActivitySnapshot)
 
 // NewProxy initializes a new Proxy with the specified configuration.
 func NewProxy(config *ProxyConfig) (*Proxy, error) {
 
-	if config.MaxClients <= 0 {
-		return nil, errors.TraceNew("invalid MaxClients")
+	// Check if there are no clients who can connect
+	if config.MaxCommonClients+config.MaxPersonalClients <= 0 {
+		return nil, errors.TraceNew("invalid MaxCommonClients")
 	}
 
 	p := &Proxy{
-		config: config,
+		config:                 config,
+		personalRegionActivity: make(map[string]*RegionActivity),
+		commonRegionActivity:   make(map[string]*RegionActivity),
 	}
 
 	if config.ReducedStartTime != "" ||
 		config.ReducedEndTime != "" ||
-		config.ReducedMaxClients > 0 {
+		config.ReducedMaxCommonClients > 0 {
 
 		startMinute, err := common.ParseTimeOfDayMinutes(config.ReducedStartTime)
 		if err != nil {
@@ -228,9 +268,9 @@ func NewProxy(config *ProxyConfig) (*Proxy, error) {
 			return nil, errors.TraceNew("invalid ReducedStartTime/ReducedEndTime")
 		}
 
-		if config.ReducedMaxClients <= 0 ||
-			config.ReducedMaxClients > config.MaxClients {
-			return nil, errors.TraceNew("invalid ReducedMaxClients")
+		if config.ReducedMaxCommonClients <= 0 ||
+			config.ReducedMaxCommonClients > config.MaxCommonClients {
+			return nil, errors.TraceNew("invalid ReducedMaxCommonClients")
 		}
 
 		p.useReducedSettings = true
@@ -256,6 +296,24 @@ func (w *activityUpdateWrapper) UpdateProgress(bytesRead, bytesWritten int64, _
 	w.p.bytesDown.Add(bytesRead)
 }
 
+// connectionActivityWrapper implements common.ActivityUpdater for a single
+// connection. It caches the RegionActivity pointer to enable atomic updates
+// with no mutex locking.
+type connectionActivityWrapper struct {
+	p              *Proxy
+	regionActivity *RegionActivity
+}
+
+func (w *connectionActivityWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
+	w.p.bytesUp.Add(bytesWritten)
+	w.p.bytesDown.Add(bytesRead)
+
+	if w.regionActivity != nil {
+		w.regionActivity.bytesUp.Add(bytesWritten)
+		w.regionActivity.bytesDown.Add(bytesRead)
+	}
+}
+
 // Run runs the proxy. The proxy sends requests to the Broker announcing its
 // availability; the Broker matches the proxy with clients, and facilitates
 // an exchange of WebRTC connection information; the proxy and each client
@@ -314,6 +372,13 @@ func (p *Proxy) Run(ctx context.Context) {
 	//
 	// The first worker is the only proxy worker which sets
 	// ProxyAnnounceRequest.CheckTactics.
+	//
+	// Limitation: currently, the first proxy is always common (unless
+	// MaxCommonClients == 0). We might want to change this later
+	// so that the first message is just an announcement, and not a full
+	// proxy, so we don't have to decide its type.
+
+	commonProxiesToCreate, personalProxiesToCreate := p.config.MaxCommonClients, p.config.MaxPersonalClients
 
 	signalFirstAnnounceCtx, signalFirstAnnounceDone :=
 		context.WithCancel(context.Background())
@@ -321,7 +386,15 @@ func (p *Proxy) Run(ctx context.Context) {
 	proxyWaitGroup.Add(1)
 	go func() {
 		defer proxyWaitGroup.Done()
-		p.proxyClients(ctx, signalFirstAnnounceDone, false)
+		if p.config.MaxCommonClients <= 0 {
+			// Create personal if no common clients are allowed
+			p.proxyClients(ctx, signalFirstAnnounceDone, false, true)
+			personalProxiesToCreate -= 1
+		} else {
+			// Create common
+			p.proxyClients(ctx, signalFirstAnnounceDone, false, false)
+			commonProxiesToCreate -= 1
+		}
 	}()
 
 	select {
@@ -332,22 +405,34 @@ func (p *Proxy) Run(ctx context.Context) {
 
 	// Launch the remaining workers.
 
-	for i := 0; i < p.config.MaxClients-1; i++ {
+	for i := 0; i < commonProxiesToCreate; i++ {
+		isPersonal := false
 
 		// When reduced settings are in effect, a subset of workers will pause
-		// during the reduced time period. Since ReducedMaxClients > 0 the
+		// during the reduced time period. Since ReducedMaxCommonClients > 0 the
 		// first proxy worker is never paused.
 		workerNum := i + 1
 		reducedPause := p.useReducedSettings &&
-			workerNum >= p.config.ReducedMaxClients
+			workerNum >= p.config.ReducedMaxCommonClients
 
 		proxyWaitGroup.Add(1)
 		go func(reducedPause bool) {
 			defer proxyWaitGroup.Done()
-			p.proxyClients(ctx, nil, reducedPause)
+			p.proxyClients(ctx, nil, reducedPause, isPersonal)
 		}(reducedPause)
 	}
 
+	for i := 0; i < personalProxiesToCreate; i++ {
+		// Limitation: There are no reduced settings for personal proxies
+		isPersonal := true
+
+		proxyWaitGroup.Add(1)
+		go func() {
+			defer proxyWaitGroup.Done()
+			p.proxyClients(ctx, nil, false, isPersonal)
+		}()
+	}
+
 	proxyWaitGroup.Wait()
 }
 
@@ -365,6 +450,9 @@ func (p *Proxy) activityUpdate(period time.Duration) {
 	greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
 	greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
 
+	personalRegionActivity := p.snapshotAndResetRegionActivity(&p.personalStatsMutex, p.personalRegionActivity)
+	commonRegionActivity := p.snapshotAndResetRegionActivity(&p.commonStatsMutex, p.commonRegionActivity)
+
 	stateChanged := announcing != p.lastAnnouncing ||
 		connectingClients != p.lastConnectingClients ||
 		connectedClients != p.lastConnectedClients
@@ -386,7 +474,64 @@ func (p *Proxy) activityUpdate(period time.Duration) {
 		connectedClients,
 		bytesUp,
 		bytesDown,
-		period)
+		period,
+		personalRegionActivity,
+		commonRegionActivity)
+}
+
+// getOrCreateRegionActivity returns the RegionActivity for a region, creating it
+// if needed. This should be called once at connection start to avoid multiple
+// lock usage.
+func (p *Proxy) getOrCreateRegionActivity(region string, isPersonal bool) *RegionActivity {
+	var mutex *sync.Mutex
+	var statsMap map[string]*RegionActivity
+	if isPersonal {
+		mutex = &p.personalStatsMutex
+		statsMap = p.personalRegionActivity
+	} else {
+		mutex = &p.commonStatsMutex
+		statsMap = p.commonRegionActivity
+	}
+	mutex.Lock()
+	defer mutex.Unlock()
+	stats, exists := statsMap[region]
+	if !exists {
+		stats = &RegionActivity{}
+		statsMap[region] = stats
+	}
+	return stats
+}
+
+// snapshotAndResetRegionActivity creates a copy of region stats with bytes reset
+// to zero, and prunes any entries that have no active connections and zero
+// bytes. The snapshot mechanism allows us to avoid holding locks during the
+// callback invocation.
+func (p *Proxy) snapshotAndResetRegionActivity(
+	mutex *sync.Mutex,
+	statsMap map[string]*RegionActivity,
+) map[string]RegionActivitySnapshot {
+	mutex.Lock()
+	defer mutex.Unlock()
+	result := make(map[string]RegionActivitySnapshot, len(statsMap))
+	regionsToDelete := []string{}
+	for region, stats := range statsMap {
+		snapshot := RegionActivitySnapshot{
+			BytesUp:           stats.bytesUp.Swap(0),
+			BytesDown:         stats.bytesDown.Swap(0),
+			ConnectingClients: stats.connectingClients.Load(),
+			ConnectedClients:  stats.connectedClients.Load(),
+		}
+		if snapshot.BytesUp > 0 || snapshot.BytesDown > 0 ||
+			snapshot.ConnectingClients > 0 || snapshot.ConnectedClients > 0 {
+			result[region] = snapshot
+		} else {
+			regionsToDelete = append(regionsToDelete, region)
+		}
+	}
+	for _, region := range regionsToDelete {
+		delete(statsMap, region)
+	}
+	return result
 }
 
 func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
@@ -403,7 +548,7 @@ func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
 
 func (p *Proxy) isReducedUntil() (int, time.Time) {
 	if !p.useReducedSettings {
-		return p.config.MaxClients, time.Time{}
+		return p.config.MaxCommonClients, time.Time{}
 	}
 
 	now := time.Now().UTC()
@@ -417,7 +562,7 @@ func (p *Proxy) isReducedUntil() (int, time.Time) {
 	}
 
 	if !isReduced {
-		return p.config.MaxClients, time.Time{}
+		return p.config.MaxCommonClients, time.Time{}
 	}
 
 	endHour := p.reducedEndMinute / 60
@@ -435,17 +580,17 @@ func (p *Proxy) isReducedUntil() (int, time.Time) {
 	if !endTime.After(now) {
 		endTime = endTime.AddDate(0, 0, 1)
 	}
-	return p.config.ReducedMaxClients, endTime
+	return p.config.ReducedMaxCommonClients, endTime
 }
 
-func (p *Proxy) getLimits() (int, common.RateLimits) {
+func (p *Proxy) getLimits() (int, int, common.RateLimits) {
 
 	rateLimits := common.RateLimits{
 		ReadBytesPerSecond:  int64(p.config.LimitUpstreamBytesPerSecond),
 		WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
 	}
 
-	maxClients, reducedUntil := p.isReducedUntil()
+	maxCommonClients, reducedUntil := p.isReducedUntil()
 	if !reducedUntil.IsZero() {
 
 		upstream := p.config.ReducedLimitUpstreamBytesPerSecond
@@ -464,7 +609,9 @@ func (p *Proxy) getLimits() (int, common.RateLimits) {
 		}
 	}
 
-	return maxClients, rateLimits
+	maxPersonalClients := p.config.MaxPersonalClients
+
+	return maxCommonClients, maxPersonalClients, rateLimits
 }
 
 // getAnnounceDelayParameters is a helper that fetches the proxy announcement
@@ -487,7 +634,7 @@ func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, floa
 }
 
 func (p *Proxy) proxyClients(
-	ctx context.Context, signalAnnounceDone func(), reducedPause bool) {
+	ctx context.Context, signalAnnounceDone func(), reducedPause bool, isPersonal bool) {
 
 	// Proxy one client, repeating until ctx is done.
 	//
@@ -567,7 +714,7 @@ func (p *Proxy) proxyClients(
 		}
 
 		backOff, err := p.proxyOneClient(
-			ctx, logAnnounce, signalAnnounceDone)
+			ctx, logAnnounce, signalAnnounceDone, isPersonal)
 
 		if !backOff || err == nil {
 			failureDelayFactor = 1
@@ -692,7 +839,8 @@ func (p *Proxy) doNetworkDiscovery(
 func (p *Proxy) proxyOneClient(
 	ctx context.Context,
 	logAnnounce func() bool,
-	signalAnnounceDone func()) (bool, error) {
+	signalAnnounceDone func(),
+	isPersonal bool) (bool, error) {
 
 	// Cancel/close this connection immediately if the network changes.
 	if p.config.GetCurrentNetworkContext != nil {
@@ -768,7 +916,7 @@ func (p *Proxy) proxyOneClient(
 	// for tactics.
 	checkTactics := signalAnnounceDone != nil
 
-	maxClients, rateLimits := p.getLimits()
+	maxCommonClients, maxPersonalClients, rateLimits := p.getLimits()
 
 	// Get the base Psiphon API parameters and additional proxy metrics,
 	// including performance information, which is sent to the broker in the
@@ -781,7 +929,7 @@ func (p *Proxy) proxyOneClient(
 	// with the original network ID.
 
 	metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
-		checkTactics, brokerCoordinator, webRTCCoordinator, maxClients, rateLimits)
+		checkTactics, brokerCoordinator, webRTCCoordinator, maxCommonClients, maxPersonalClients, rateLimits)
 	if err != nil {
 		return backOff, errors.Trace(err)
 	}
@@ -789,9 +937,9 @@ func (p *Proxy) proxyOneClient(
 	// Set a delay before announcing, to stagger the announce request times.
 	// The delay helps to avoid triggering rate limits or similar errors from
 	// any intermediate CDN between the proxy and the broker; and provides a
-	// nudge towards better load balancing across multiple large MaxClients
-	// proxies, as the broker primarily matches enqueued announces in FIFO
-	// order, since older announces expire earlier.
+	// nudge towards better load balancing across multiple large
+	// MaxCommonClients proxies, as the broker primarily matches enqueued
+	// announces in FIFO order, since older announces expire earlier.
 	//
 	// The delay is intended to be applied after doNetworkDiscovery, which has
 	// no reason to be delayed; and also after any waitToShareSession delay,
@@ -836,7 +984,12 @@ func (p *Proxy) proxyOneClient(
 	p.announcing.Add(1)
 
 	announceStartTime := time.Now()
-	personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
+
+	// Ignore the personalCompartmentIDs if this proxy is not personal
+	var personalCompartmentIDs []ID
+	if isPersonal {
+		personalCompartmentIDs = brokerCoordinator.PersonalCompartmentIDs()
+	}
 	announceResponse, err := brokerClient.ProxyAnnounce(
 		ctx,
 		requestDelay,
@@ -917,6 +1070,18 @@ func (p *Proxy) proxyOneClient(
 			announceResponse.SelectedProtocolVersion)
 	}
 
+	clientRegion := announceResponse.ClientRegion
+	var regionActivity *RegionActivity
+	if clientRegion != "" {
+		regionActivity = p.getOrCreateRegionActivity(clientRegion, isPersonal)
+	}
+
+	// Create per-connection activity wrapper with cached regionActivity pointer
+	connActivityWrapper := &connectionActivityWrapper{
+		p:              p,
+		regionActivity: regionActivity,
+	}
+
 	// Trigger back-off if the following WebRTC operations fail to establish a
 	// connections.
 	//
@@ -930,10 +1095,16 @@ func (p *Proxy) proxyOneClient(
 	// For activity updates, indicate that a client connection is now underway.
 
 	p.connectingClients.Add(1)
+	if regionActivity != nil {
+		regionActivity.connectingClients.Add(1)
+	}
 	connected := false
 	defer func() {
 		if !connected {
 			p.connectingClients.Add(-1)
+			if regionActivity != nil {
+				regionActivity.connectingClients.Add(-1)
+			}
 		}
 	}()
 
@@ -1069,8 +1240,15 @@ func (p *Proxy) proxyOneClient(
 	connected = true
 	p.connectingClients.Add(-1)
 	p.connectedClients.Add(1)
+	if regionActivity != nil {
+		regionActivity.connectingClients.Add(-1)
+		regionActivity.connectedClients.Add(1)
+	}
 	defer func() {
 		p.connectedClients.Add(-1)
+		if regionActivity != nil {
+			regionActivity.connectedClients.Add(-1)
+		}
 	}()
 
 	// Throttle the relay connection.
@@ -1078,9 +1256,9 @@ func (p *Proxy) proxyOneClient(
 	// Here, each client gets LimitUp/DownstreamBytesPerSecond. Proxy
 	// operators may to want to limit their bandwidth usage with a single
 	// up/down value, an overall limit. The ProxyConfig can simply be
-	// generated by dividing the limit by MaxClients. This approach favors
-	// performance stability: each client gets the same throttling limits
-	// regardless of how many other clients are connected.
+	// generated by dividing the limit by MaxCommonClients + MaxPersonalClients.
+	// This approach favors performance stability: each client gets the
+	// same throttling limits regardless of how many other clients are connected.
 	//
 	// Rate limits are applied only when a client connection is established;
 	// connected clients retain their initial limits even when reduced time
@@ -1107,7 +1285,7 @@ func (p *Proxy) proxyOneClient(
 			proxyRelayInactivityTimeout)
 
 	destinationConn, err = common.NewActivityMonitoredConn(
-		destinationConn, inactivityTimeout, false, nil, p.activityUpdateWrapper)
+		destinationConn, inactivityTimeout, false, nil, connActivityWrapper)
 	if err != nil {
 		return backOff, errors.Trace(err)
 	}
@@ -1200,7 +1378,8 @@ func (p *Proxy) getMetrics(
 	includeTacticsParameters bool,
 	brokerCoordinator BrokerDialCoordinator,
 	webRTCCoordinator WebRTCDialCoordinator,
-	maxClients int,
+	maxCommonClients int,
+	maxPersonalClients int,
 	rateLimits common.RateLimits) (
 	*ProxyMetrics, string, bool, error) {
 
@@ -1229,7 +1408,8 @@ func (p *Proxy) getMetrics(
 		ProtocolVersion:               LatestProtocolVersion,
 		NATType:                       webRTCCoordinator.NATType(),
 		PortMappingTypes:              webRTCCoordinator.PortMappingTypes(),
-		MaxClients:                    int32(maxClients),
+		MaxCommonClients:              int32(maxCommonClients),
+		MaxPersonalClients:            int32(maxPersonalClients),
 		ConnectingClients:             p.connectingClients.Load(),
 		ConnectedClients:              p.connectedClients.Load(),
 		LimitUpstreamBytesPerSecond:   rateLimits.ReadBytesPerSecond,

+ 4 - 4
psiphon/common/inproxy/reduced_test.go

@@ -52,8 +52,8 @@ func runTestReduced() error {
 	end := addMinutes(minuteOfDay, 60)
 
 	config := &ProxyConfig{
-		MaxClients:                           10,
-		ReducedMaxClients:                    5,
+		MaxCommonClients:                     10,
+		ReducedMaxCommonClients:              5,
 		LimitUpstreamBytesPerSecond:          100,
 		LimitDownstreamBytesPerSecond:        200,
 		ReducedLimitUpstreamBytesPerSecond:   10,
@@ -69,7 +69,7 @@ func runTestReduced() error {
 	}
 
 	maxClients1, until := p.isReducedUntil()
-	maxClients2, limits := p.getLimits()
+	maxClients2, _, limits := p.getLimits()
 
 	if maxClients1 != 5 || maxClients2 != 5 {
 		return errors.TraceNew("unexpected maxClients")
@@ -95,7 +95,7 @@ func runTestReduced() error {
 	}
 
 	maxClients1, until = p.isReducedUntil()
-	maxClients2, limits = p.getLimits()
+	maxClients2, _, limits = p.getLimits()
 
 	if maxClients1 != 10 || maxClients2 != 10 {
 		return errors.TraceNew("unexpected maxClients")

+ 49 - 11
psiphon/config.go

@@ -658,10 +658,24 @@ type Config struct {
 	// ephemeral key will be generated.
 	InproxyProxySessionPrivateKey string `json:",omitempty"`
 
-	// InproxyMaxClients specifies the maximum number of in-proxy clients to
-	// be proxied concurrently. Must be > 0 when InproxyEnableProxy is set.
+	// InproxyMaxClients specifies the maximum number of common in-proxy
+	// clients to be proxied concurrently. When InproxyEnableProxy is set,
+	// it can only be 0 when InProxyMaxPersonalClients is > 0.
+	//
+	// Deprecated: Use InproxyMaxCommonClients. When InproxyMaxCommonClients
+	// is not nil, this parameter is ignored.
 	InproxyMaxClients int `json:",omitempty"`
 
+	// InproxyMaxCommonClients specifies the maximum number of common
+	// in-proxy clients to be proxied concurrently. When InproxyEnableProxy
+	// is set, it can only be 0 when InProxyMaxPersonalClients is > 0.
+	InproxyMaxCommonClients int `json:",omitempty"`
+
+	// InproxyMaxPersonalClients specifies the maximum number of personal
+	// in-proxy clients to be proxied concurrently. When InproxyEnableProxy
+	// is set, it can only be 0 when InProxyMaxCommonClients is > 0.
+	InproxyMaxPersonalClients int `json:",omitempty"`
+
 	// InproxyLimitUpstreamBytesPerSecond specifies the upstream byte transfer
 	// rate limit for each proxied client. When 0, there is no limit.
 	InproxyLimitUpstreamBytesPerSecond int `json:",omitempty"`
@@ -678,14 +692,26 @@ type Config struct {
 	// UTC) at which reduced in-proxy settings end.
 	InproxyReducedEndTime string `json:",omitempty"`
 
-	// InproxyReducedMaxClients specifies the maximum number of in-proxy
-	// clients to be proxied concurrently during the reduced time range.
-	// When set, must be > 0 and <= InproxyMaxClients.
+	// InproxyReducedMaxClients specifies the maximum number of common
+	// in-proxy clients to be proxied concurrently during the reduced
+	// time range. When set, must be > 0 and <= InproxyMaxCommonClients.
 	//
 	// Clients connected when the reduced settings begin will not be
 	// disconnected, so InproxyReducedMaxClients is a soft limit.
+	//
+	// Deprecated: Use InproxyReducedMaxCommon Clients. When
+	// InproxyMaxCommonClients is not nil, this parameter is ignored.
 	InproxyReducedMaxClients int `json:",omitempty"`
 
+	// InproxyReducedMaxCommonClients specifies the maximum number of
+	// common in-proxy clients to be proxied concurrently during the
+	// reduced time range. When set, must be > 0 and
+	// <= InproxyMaxCommonClients.
+	//
+	// Clients connected when the reduced settings begin will not be
+	// disconnected, so InproxyReducedMaxCommonClients is a soft limit.
+	InproxyReducedMaxCommonClients int `json:",omitempty"`
+
 	// InproxyReducedLimitUpstreamBytesPerSecond specifies the upstream byte
 	// transfer rate limit for each proxied client during the reduced time
 	// range. When 0, InproxyLimitUpstreamBytesPerSecond is the limit.
@@ -1436,6 +1462,14 @@ func (config *Config) Commit(migrateFromLegacyFields bool) error {
 		config.MigrateUpgradeDownloadFilename = config.UpgradeDownloadFilename
 	}
 
+	if config.InproxyMaxClients != 0 && config.InproxyMaxCommonClients == 0 {
+		config.InproxyMaxCommonClients = config.InproxyMaxClients
+	}
+
+	if config.InproxyReducedMaxClients != 0 && config.InproxyReducedMaxCommonClients == 0 {
+		config.InproxyReducedMaxCommonClients = config.InproxyReducedMaxClients
+	}
+
 	// Supply default values.
 
 	// Create datastore directory.
@@ -1539,13 +1573,17 @@ func (config *Config) Commit(migrateFromLegacyFields bool) error {
 
 	if config.InproxyEnableProxy {
 
-		if config.InproxyMaxClients <= 0 {
-			return errors.TraceNew("invalid InproxyMaxClients")
+		if config.InproxyMaxCommonClients+config.InproxyMaxPersonalClients <= 0 {
+			return errors.TraceNew("invalid InproxyMaxCommonClients and InproxyMaxPersonalClients")
+		}
+
+		if len(config.InproxyProxyPersonalCompartmentID) > 0 && config.InproxyMaxPersonalClients <= 0 {
+			return errors.TraceNew("invalid InproxyMaxPersonalClients when personal compartment IDs are provided")
 		}
 
 		if config.InproxyReducedStartTime != "" ||
 			config.InproxyReducedEndTime != "" ||
-			config.InproxyReducedMaxClients > 0 {
+			config.InproxyReducedMaxCommonClients > 0 {
 
 			startMinute, err := common.ParseTimeOfDayMinutes(config.InproxyReducedStartTime)
 			if err != nil {
@@ -1562,9 +1600,9 @@ func (config *Config) Commit(migrateFromLegacyFields bool) error {
 				return errors.TraceNew("invalid InproxyReducedStartTime/InproxyReducedEndTime")
 			}
 
-			if config.InproxyReducedMaxClients <= 0 ||
-				config.InproxyReducedMaxClients > config.InproxyMaxClients {
-				return errors.TraceNew("invalid InproxyReducedMaxClients")
+			if config.InproxyReducedMaxCommonClients <= 0 ||
+				config.InproxyReducedMaxCommonClients > config.InproxyMaxCommonClients {
+				return errors.TraceNew("invalid InproxyReducedMaxCommonClients")
 			}
 
 			// InproxyReducedLimitUpstream/DownstreamBytesPerSecond don't necessarily

+ 7 - 4
psiphon/controller.go

@@ -3307,7 +3307,9 @@ func (controller *Controller) runInproxyProxy() {
 		connectedClients int32,
 		bytesUp int64,
 		bytesDown int64,
-		_ time.Duration) {
+		_ time.Duration,
+		personalRegionActivity map[string]inproxy.RegionActivitySnapshot,
+		commonRegionActivity map[string]inproxy.RegionActivitySnapshot) {
 
 		// This emit logic mirrors the logic for NoticeBytesTransferred and
 		// NoticeTotalBytesTransferred in tunnel.operateTunnel.
@@ -3325,7 +3327,7 @@ func (controller *Controller) runInproxyProxy() {
 				connectedClients != lastActivityConnectedClients) {
 
 			NoticeInproxyProxyActivity(
-				announcing, connectingClients, connectedClients, bytesUp, bytesDown)
+				announcing, connectingClients, connectedClients, bytesUp, bytesDown, personalRegionActivity, commonRegionActivity)
 
 			lastAnnouncing = announcing
 			lastActivityConnectingClients = connectingClients
@@ -3367,12 +3369,13 @@ func (controller *Controller) runInproxyProxy() {
 		GetBaseAPIParameters:                 controller.inproxyGetProxyAPIParameters,
 		MakeWebRTCDialCoordinator:            controller.inproxyMakeProxyWebRTCDialCoordinator,
 		HandleTacticsPayload:                 controller.inproxyHandleProxyTacticsPayload,
-		MaxClients:                           controller.config.InproxyMaxClients,
+		MaxCommonClients:                     controller.config.InproxyMaxCommonClients,
+		MaxPersonalClients:                   controller.config.InproxyMaxPersonalClients,
 		LimitUpstreamBytesPerSecond:          controller.config.InproxyLimitUpstreamBytesPerSecond,
 		LimitDownstreamBytesPerSecond:        controller.config.InproxyLimitDownstreamBytesPerSecond,
 		ReducedStartTime:                     controller.config.InproxyReducedStartTime,
 		ReducedEndTime:                       controller.config.InproxyReducedEndTime,
-		ReducedMaxClients:                    controller.config.InproxyReducedMaxClients,
+		ReducedMaxCommonClients:              controller.config.InproxyReducedMaxCommonClients,
 		ReducedLimitUpstreamBytesPerSecond:   controller.config.InproxyReducedLimitUpstreamBytesPerSecond,
 		ReducedLimitDownstreamBytesPerSecond: controller.config.InproxyReducedLimitDownstreamBytesPerSecond,
 		MustUpgrade:                          controller.config.OnInproxyMustUpgrade,

+ 7 - 2
psiphon/notice.go

@@ -35,6 +35,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/buildinfo"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
@@ -1155,7 +1156,9 @@ func NoticeInproxyProxyActivity(
 	connectingClients int32,
 	connectedClients int32,
 	bytesUp int64,
-	bytesDown int64) {
+	bytesDown int64,
+	personalRegionActivity map[string]inproxy.RegionActivitySnapshot,
+	commonRegionActivity map[string]inproxy.RegionActivitySnapshot) {
 
 	singletonNoticeLogger.outputNotice(
 		"InproxyProxyActivity", noticeIsNotDiagnostic,
@@ -1163,7 +1166,9 @@ func NoticeInproxyProxyActivity(
 		"connectingClients", connectingClients,
 		"connectedClients", connectedClients,
 		"bytesUp", bytesUp,
-		"bytesDown", bytesDown)
+		"bytesDown", bytesDown,
+		"personalRegionActivity", personalRegionActivity,
+		"commonRegionActivity", commonRegionActivity)
 }
 
 // NoticeInproxyProxyTotalActivity reports how many proxied bytes have been

+ 4 - 1
psiphon/server/server_test.go

@@ -1726,7 +1726,9 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		clientConfig.InproxySkipAwaitFullyConnected = true
 
 		clientConfig.InproxyProxySessionPrivateKey = inproxyTestConfig.proxySessionPrivateKey
-		clientConfig.InproxyMaxClients = 1
+		clientConfig.InproxyMaxClients = 1 // Deprecated; kept to make sure nothing breaks.
+		clientConfig.InproxyMaxCommonClients = 1
+		clientConfig.InproxyMaxPersonalClients = 0
 		clientConfig.InproxyLimitUpstreamBytesPerSecond = 0
 		clientConfig.InproxyLimitDownstreamBytesPerSecond = 0
 		clientConfig.ServerEntrySignaturePublicKey = inproxyTestConfig.brokerServerEntrySignaturePublicKey
@@ -1738,6 +1740,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 			clientConfig.InproxyClientPersonalCompartmentID = inproxyTestConfig.personalCompartmentID
 			clientConfig.InproxyProxyPersonalCompartmentID = inproxyTestConfig.personalCompartmentID
+			clientConfig.InproxyMaxPersonalClients = 1
 		}
 
 		// Simulate a CDN adding required HTTP headers by injecting them at