|
@@ -71,6 +71,20 @@ type Proxy struct {
|
|
|
useReducedSettings bool
|
|
useReducedSettings bool
|
|
|
reducedStartMinute int
|
|
reducedStartMinute int
|
|
|
reducedEndMinute int
|
|
reducedEndMinute int
|
|
|
|
|
+
|
|
|
|
|
+ personalStatsMutex sync.Mutex
|
|
|
|
|
+ personalRegionActivity map[string]*RegionActivity
|
|
|
|
|
+
|
|
|
|
|
+ commonStatsMutex sync.Mutex
|
|
|
|
|
+ commonRegionActivity map[string]*RegionActivity
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// RegionActivity holds metrics per-region for more detailed metric collection.
|
|
|
|
|
+type RegionActivity struct {
|
|
|
|
|
+ bytesUp atomic.Int64
|
|
|
|
|
+ bytesDown atomic.Int64
|
|
|
|
|
+ connectingClients atomic.Int32
|
|
|
|
|
+ connectedClients atomic.Int32
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
|
|
// TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
|
|
@@ -138,9 +152,13 @@ type ProxyConfig struct {
|
|
|
// restarting the proxy.
|
|
// restarting the proxy.
|
|
|
MustUpgrade func()
|
|
MustUpgrade func()
|
|
|
|
|
|
|
|
- // MaxClients is the maximum number of clients that are allowed to connect
|
|
|
|
|
- // to the proxy. Must be > 0.
|
|
|
|
|
- MaxClients int
|
|
|
|
|
|
|
+ // MaxCommonClients (formerly MaxClients) is the maximum number of common
|
|
|
|
|
+ // clients that are allowed to connect to the proxy. Must be > 0.
|
|
|
|
|
+ MaxCommonClients int
|
|
|
|
|
+
|
|
|
|
|
+ // MaxPersonalClients is the maximum number of personal clients that are
|
|
|
|
|
+ // allowed to connect to the proxy. Must be > 0.
|
|
|
|
|
+ MaxPersonalClients int
|
|
|
|
|
|
|
|
// LimitUpstreamBytesPerSecond limits the upstream data transfer rate for
|
|
// LimitUpstreamBytesPerSecond limits the upstream data transfer rate for
|
|
|
// a single client. When 0, there is no limit.
|
|
// a single client. When 0, there is no limit.
|
|
@@ -158,12 +176,16 @@ type ProxyConfig struct {
|
|
|
// which reduced client settings end.
|
|
// which reduced client settings end.
|
|
|
ReducedEndTime string
|
|
ReducedEndTime string
|
|
|
|
|
|
|
|
- // ReducedMaxClients specifies the maximum number of clients that are
|
|
|
|
|
- // allowed to connect to the proxy during the reduced time range.
|
|
|
|
|
|
|
+ // ReducedMaxCommonClients specifies the maximum number of common clients
|
|
|
|
|
+ // that are allowed to connect to the proxy during the reduced time range.
|
|
|
|
|
+ //
|
|
|
|
|
+ // Limitation: We currently do not support ReducedMaxPersonalClients.
|
|
|
|
|
+ // We assume that due to the importance of personal clients, users
|
|
|
|
|
+ // always prefer to have them connected.
|
|
|
//
|
|
//
|
|
|
// Clients connected when the reduced settings begin will not be
|
|
// Clients connected when the reduced settings begin will not be
|
|
|
// disconnected.
|
|
// disconnected.
|
|
|
- ReducedMaxClients int
|
|
|
|
|
|
|
+ ReducedMaxCommonClients int
|
|
|
|
|
|
|
|
// ReducedLimitUpstreamBytesPerSecond limits the upstream data transfer
|
|
// ReducedLimitUpstreamBytesPerSecond limits the upstream data transfer
|
|
|
// rate for a single client during the reduced time range. When 0,
|
|
// rate for a single client during the reduced time range. When 0,
|
|
@@ -186,33 +208,51 @@ type ProxyConfig struct {
|
|
|
ActivityUpdater ActivityUpdater
|
|
ActivityUpdater ActivityUpdater
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// RegionActivitySnapshot holds a point-in-time copy of per-region metrics.
|
|
|
|
|
+// This is used for the ActivityUpdater callback and notice serialization.
|
|
|
|
|
+type RegionActivitySnapshot struct {
|
|
|
|
|
+ BytesUp int64 `json:"bytesUp"`
|
|
|
|
|
+ BytesDown int64 `json:"bytesDown"`
|
|
|
|
|
+ ConnectingClients int32 `json:"connectingClients"`
|
|
|
|
|
+ ConnectedClients int32 `json:"connectedClients"`
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// ActivityUpdater is a callback that is invoked when the proxy announces
|
|
// ActivityUpdater is a callback that is invoked when the proxy announces
|
|
|
// availability, when clients connect and disconnect, and periodically with
|
|
// availability, when clients connect and disconnect, and periodically with
|
|
|
// data transfer updates (unless idle). This callback may be used to update
|
|
// data transfer updates (unless idle). This callback may be used to update
|
|
|
// an activity UI. This callback should post this data to another thread or
|
|
// an activity UI. This callback should post this data to another thread or
|
|
|
// handler and return immediately and not block on UI updates.
|
|
// handler and return immediately and not block on UI updates.
|
|
|
|
|
+//
|
|
|
|
|
+// The personalRegionActivity and commonRegionActivity parameters contain per-region
|
|
|
|
|
+// metrics (bytes transferred, connecting/connected counts) segmented by client
|
|
|
|
|
+// region.
|
|
|
type ActivityUpdater func(
|
|
type ActivityUpdater func(
|
|
|
announcing int32,
|
|
announcing int32,
|
|
|
connectingClients int32,
|
|
connectingClients int32,
|
|
|
connectedClients int32,
|
|
connectedClients int32,
|
|
|
bytesUp int64,
|
|
bytesUp int64,
|
|
|
bytesDown int64,
|
|
bytesDown int64,
|
|
|
- bytesDuration time.Duration)
|
|
|
|
|
|
|
+ bytesDuration time.Duration,
|
|
|
|
|
+ personalRegionActivitySnapshot map[string]RegionActivitySnapshot,
|
|
|
|
|
+ commonRegionActivitySnapshot map[string]RegionActivitySnapshot)
|
|
|
|
|
|
|
|
// NewProxy initializes a new Proxy with the specified configuration.
|
|
// NewProxy initializes a new Proxy with the specified configuration.
|
|
|
func NewProxy(config *ProxyConfig) (*Proxy, error) {
|
|
func NewProxy(config *ProxyConfig) (*Proxy, error) {
|
|
|
|
|
|
|
|
- if config.MaxClients <= 0 {
|
|
|
|
|
- return nil, errors.TraceNew("invalid MaxClients")
|
|
|
|
|
|
|
+ // Check if there are no clients who can connect
|
|
|
|
|
+ if config.MaxCommonClients+config.MaxPersonalClients <= 0 {
|
|
|
|
|
+ return nil, errors.TraceNew("invalid MaxCommonClients")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
p := &Proxy{
|
|
p := &Proxy{
|
|
|
- config: config,
|
|
|
|
|
|
|
+ config: config,
|
|
|
|
|
+ personalRegionActivity: make(map[string]*RegionActivity),
|
|
|
|
|
+ commonRegionActivity: make(map[string]*RegionActivity),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if config.ReducedStartTime != "" ||
|
|
if config.ReducedStartTime != "" ||
|
|
|
config.ReducedEndTime != "" ||
|
|
config.ReducedEndTime != "" ||
|
|
|
- config.ReducedMaxClients > 0 {
|
|
|
|
|
|
|
+ config.ReducedMaxCommonClients > 0 {
|
|
|
|
|
|
|
|
startMinute, err := common.ParseTimeOfDayMinutes(config.ReducedStartTime)
|
|
startMinute, err := common.ParseTimeOfDayMinutes(config.ReducedStartTime)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -228,9 +268,9 @@ func NewProxy(config *ProxyConfig) (*Proxy, error) {
|
|
|
return nil, errors.TraceNew("invalid ReducedStartTime/ReducedEndTime")
|
|
return nil, errors.TraceNew("invalid ReducedStartTime/ReducedEndTime")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if config.ReducedMaxClients <= 0 ||
|
|
|
|
|
- config.ReducedMaxClients > config.MaxClients {
|
|
|
|
|
- return nil, errors.TraceNew("invalid ReducedMaxClients")
|
|
|
|
|
|
|
+ if config.ReducedMaxCommonClients <= 0 ||
|
|
|
|
|
+ config.ReducedMaxCommonClients > config.MaxCommonClients {
|
|
|
|
|
+ return nil, errors.TraceNew("invalid ReducedMaxCommonClients")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
p.useReducedSettings = true
|
|
p.useReducedSettings = true
|
|
@@ -256,6 +296,24 @@ func (w *activityUpdateWrapper) UpdateProgress(bytesRead, bytesWritten int64, _
|
|
|
w.p.bytesDown.Add(bytesRead)
|
|
w.p.bytesDown.Add(bytesRead)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// connectionActivityWrapper implements common.ActivityUpdater for a single
|
|
|
|
|
+// connection. It caches the RegionActivity pointer to enable atomic updates
|
|
|
|
|
+// with no mutex locking.
|
|
|
|
|
+type connectionActivityWrapper struct {
|
|
|
|
|
+ p *Proxy
|
|
|
|
|
+ regionActivity *RegionActivity
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (w *connectionActivityWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
|
|
|
|
|
+ w.p.bytesUp.Add(bytesWritten)
|
|
|
|
|
+ w.p.bytesDown.Add(bytesRead)
|
|
|
|
|
+
|
|
|
|
|
+ if w.regionActivity != nil {
|
|
|
|
|
+ w.regionActivity.bytesUp.Add(bytesWritten)
|
|
|
|
|
+ w.regionActivity.bytesDown.Add(bytesRead)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// Run runs the proxy. The proxy sends requests to the Broker announcing its
|
|
// Run runs the proxy. The proxy sends requests to the Broker announcing its
|
|
|
// availability; the Broker matches the proxy with clients, and facilitates
|
|
// availability; the Broker matches the proxy with clients, and facilitates
|
|
|
// an exchange of WebRTC connection information; the proxy and each client
|
|
// an exchange of WebRTC connection information; the proxy and each client
|
|
@@ -314,6 +372,13 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
//
|
|
//
|
|
|
// The first worker is the only proxy worker which sets
|
|
// The first worker is the only proxy worker which sets
|
|
|
// ProxyAnnounceRequest.CheckTactics.
|
|
// ProxyAnnounceRequest.CheckTactics.
|
|
|
|
|
+ //
|
|
|
|
|
+ // Limitation: currently, the first proxy is always common (unless
|
|
|
|
|
+ // MaxCommonClients == 0). We might want to change this later
|
|
|
|
|
+ // so that the first message is just an announcement, and not a full
|
|
|
|
|
+ // proxy, so we don't have to decide its type.
|
|
|
|
|
+
|
|
|
|
|
+ commonProxiesToCreate, personalProxiesToCreate := p.config.MaxCommonClients, p.config.MaxPersonalClients
|
|
|
|
|
|
|
|
signalFirstAnnounceCtx, signalFirstAnnounceDone :=
|
|
signalFirstAnnounceCtx, signalFirstAnnounceDone :=
|
|
|
context.WithCancel(context.Background())
|
|
context.WithCancel(context.Background())
|
|
@@ -321,7 +386,15 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
proxyWaitGroup.Add(1)
|
|
proxyWaitGroup.Add(1)
|
|
|
go func() {
|
|
go func() {
|
|
|
defer proxyWaitGroup.Done()
|
|
defer proxyWaitGroup.Done()
|
|
|
- p.proxyClients(ctx, signalFirstAnnounceDone, false)
|
|
|
|
|
|
|
+ if p.config.MaxCommonClients <= 0 {
|
|
|
|
|
+ // Create personal if no common clients are allowed
|
|
|
|
|
+ p.proxyClients(ctx, signalFirstAnnounceDone, false, true)
|
|
|
|
|
+ personalProxiesToCreate -= 1
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // Create common
|
|
|
|
|
+ p.proxyClients(ctx, signalFirstAnnounceDone, false, false)
|
|
|
|
|
+ commonProxiesToCreate -= 1
|
|
|
|
|
+ }
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
select {
|
|
@@ -332,22 +405,34 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
|
|
|
|
|
// Launch the remaining workers.
|
|
// Launch the remaining workers.
|
|
|
|
|
|
|
|
- for i := 0; i < p.config.MaxClients-1; i++ {
|
|
|
|
|
|
|
+ for i := 0; i < commonProxiesToCreate; i++ {
|
|
|
|
|
+ isPersonal := false
|
|
|
|
|
|
|
|
// When reduced settings are in effect, a subset of workers will pause
|
|
// When reduced settings are in effect, a subset of workers will pause
|
|
|
- // during the reduced time period. Since ReducedMaxClients > 0 the
|
|
|
|
|
|
|
+ // during the reduced time period. Since ReducedMaxCommonClients > 0 the
|
|
|
// first proxy worker is never paused.
|
|
// first proxy worker is never paused.
|
|
|
workerNum := i + 1
|
|
workerNum := i + 1
|
|
|
reducedPause := p.useReducedSettings &&
|
|
reducedPause := p.useReducedSettings &&
|
|
|
- workerNum >= p.config.ReducedMaxClients
|
|
|
|
|
|
|
+ workerNum >= p.config.ReducedMaxCommonClients
|
|
|
|
|
|
|
|
proxyWaitGroup.Add(1)
|
|
proxyWaitGroup.Add(1)
|
|
|
go func(reducedPause bool) {
|
|
go func(reducedPause bool) {
|
|
|
defer proxyWaitGroup.Done()
|
|
defer proxyWaitGroup.Done()
|
|
|
- p.proxyClients(ctx, nil, reducedPause)
|
|
|
|
|
|
|
+ p.proxyClients(ctx, nil, reducedPause, isPersonal)
|
|
|
}(reducedPause)
|
|
}(reducedPause)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ for i := 0; i < personalProxiesToCreate; i++ {
|
|
|
|
|
+ // Limitation: There are no reduced settings for personal proxies
|
|
|
|
|
+ isPersonal := true
|
|
|
|
|
+
|
|
|
|
|
+ proxyWaitGroup.Add(1)
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ defer proxyWaitGroup.Done()
|
|
|
|
|
+ p.proxyClients(ctx, nil, false, isPersonal)
|
|
|
|
|
+ }()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
proxyWaitGroup.Wait()
|
|
proxyWaitGroup.Wait()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -365,6 +450,9 @@ func (p *Proxy) activityUpdate(period time.Duration) {
|
|
|
greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
|
|
greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
|
|
|
greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
|
|
greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
|
|
|
|
|
|
|
|
|
|
+ personalRegionActivity := p.snapshotAndResetRegionActivity(&p.personalStatsMutex, p.personalRegionActivity)
|
|
|
|
|
+ commonRegionActivity := p.snapshotAndResetRegionActivity(&p.commonStatsMutex, p.commonRegionActivity)
|
|
|
|
|
+
|
|
|
stateChanged := announcing != p.lastAnnouncing ||
|
|
stateChanged := announcing != p.lastAnnouncing ||
|
|
|
connectingClients != p.lastConnectingClients ||
|
|
connectingClients != p.lastConnectingClients ||
|
|
|
connectedClients != p.lastConnectedClients
|
|
connectedClients != p.lastConnectedClients
|
|
@@ -386,7 +474,64 @@ func (p *Proxy) activityUpdate(period time.Duration) {
|
|
|
connectedClients,
|
|
connectedClients,
|
|
|
bytesUp,
|
|
bytesUp,
|
|
|
bytesDown,
|
|
bytesDown,
|
|
|
- period)
|
|
|
|
|
|
|
+ period,
|
|
|
|
|
+ personalRegionActivity,
|
|
|
|
|
+ commonRegionActivity)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// getOrCreateRegionActivity returns the RegionActivity for a region, creating it
|
|
|
|
|
+// if needed. This should be called once at connection start to avoid multiple
|
|
|
|
|
+// lock usage.
|
|
|
|
|
+func (p *Proxy) getOrCreateRegionActivity(region string, isPersonal bool) *RegionActivity {
|
|
|
|
|
+ var mutex *sync.Mutex
|
|
|
|
|
+ var statsMap map[string]*RegionActivity
|
|
|
|
|
+ if isPersonal {
|
|
|
|
|
+ mutex = &p.personalStatsMutex
|
|
|
|
|
+ statsMap = p.personalRegionActivity
|
|
|
|
|
+ } else {
|
|
|
|
|
+ mutex = &p.commonStatsMutex
|
|
|
|
|
+ statsMap = p.commonRegionActivity
|
|
|
|
|
+ }
|
|
|
|
|
+ mutex.Lock()
|
|
|
|
|
+ defer mutex.Unlock()
|
|
|
|
|
+ stats, exists := statsMap[region]
|
|
|
|
|
+ if !exists {
|
|
|
|
|
+ stats = &RegionActivity{}
|
|
|
|
|
+ statsMap[region] = stats
|
|
|
|
|
+ }
|
|
|
|
|
+ return stats
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// snapshotAndResetRegionActivity creates a copy of region stats with bytes reset
|
|
|
|
|
+// to zero, and prunes any entries that have no active connections and zero
|
|
|
|
|
+// bytes. The snapshot mechanism allows us to avoid holding locks during the
|
|
|
|
|
+// callback invocation.
|
|
|
|
|
+func (p *Proxy) snapshotAndResetRegionActivity(
|
|
|
|
|
+ mutex *sync.Mutex,
|
|
|
|
|
+ statsMap map[string]*RegionActivity,
|
|
|
|
|
+) map[string]RegionActivitySnapshot {
|
|
|
|
|
+ mutex.Lock()
|
|
|
|
|
+ defer mutex.Unlock()
|
|
|
|
|
+ result := make(map[string]RegionActivitySnapshot, len(statsMap))
|
|
|
|
|
+ regionsToDelete := []string{}
|
|
|
|
|
+ for region, stats := range statsMap {
|
|
|
|
|
+ snapshot := RegionActivitySnapshot{
|
|
|
|
|
+ BytesUp: stats.bytesUp.Swap(0),
|
|
|
|
|
+ BytesDown: stats.bytesDown.Swap(0),
|
|
|
|
|
+ ConnectingClients: stats.connectingClients.Load(),
|
|
|
|
|
+ ConnectedClients: stats.connectedClients.Load(),
|
|
|
|
|
+ }
|
|
|
|
|
+ if snapshot.BytesUp > 0 || snapshot.BytesDown > 0 ||
|
|
|
|
|
+ snapshot.ConnectingClients > 0 || snapshot.ConnectedClients > 0 {
|
|
|
|
|
+ result[region] = snapshot
|
|
|
|
|
+ } else {
|
|
|
|
|
+ regionsToDelete = append(regionsToDelete, region)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, region := range regionsToDelete {
|
|
|
|
|
+ delete(statsMap, region)
|
|
|
|
|
+ }
|
|
|
|
|
+ return result
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
|
|
func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
|
|
@@ -403,7 +548,7 @@ func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
|
|
|
|
|
|
|
|
func (p *Proxy) isReducedUntil() (int, time.Time) {
|
|
func (p *Proxy) isReducedUntil() (int, time.Time) {
|
|
|
if !p.useReducedSettings {
|
|
if !p.useReducedSettings {
|
|
|
- return p.config.MaxClients, time.Time{}
|
|
|
|
|
|
|
+ return p.config.MaxCommonClients, time.Time{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
now := time.Now().UTC()
|
|
now := time.Now().UTC()
|
|
@@ -417,7 +562,7 @@ func (p *Proxy) isReducedUntil() (int, time.Time) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if !isReduced {
|
|
if !isReduced {
|
|
|
- return p.config.MaxClients, time.Time{}
|
|
|
|
|
|
|
+ return p.config.MaxCommonClients, time.Time{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
endHour := p.reducedEndMinute / 60
|
|
endHour := p.reducedEndMinute / 60
|
|
@@ -435,17 +580,17 @@ func (p *Proxy) isReducedUntil() (int, time.Time) {
|
|
|
if !endTime.After(now) {
|
|
if !endTime.After(now) {
|
|
|
endTime = endTime.AddDate(0, 0, 1)
|
|
endTime = endTime.AddDate(0, 0, 1)
|
|
|
}
|
|
}
|
|
|
- return p.config.ReducedMaxClients, endTime
|
|
|
|
|
|
|
+ return p.config.ReducedMaxCommonClients, endTime
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (p *Proxy) getLimits() (int, common.RateLimits) {
|
|
|
|
|
|
|
+func (p *Proxy) getLimits() (int, int, common.RateLimits) {
|
|
|
|
|
|
|
|
rateLimits := common.RateLimits{
|
|
rateLimits := common.RateLimits{
|
|
|
ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
|
WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- maxClients, reducedUntil := p.isReducedUntil()
|
|
|
|
|
|
|
+ maxCommonClients, reducedUntil := p.isReducedUntil()
|
|
|
if !reducedUntil.IsZero() {
|
|
if !reducedUntil.IsZero() {
|
|
|
|
|
|
|
|
upstream := p.config.ReducedLimitUpstreamBytesPerSecond
|
|
upstream := p.config.ReducedLimitUpstreamBytesPerSecond
|
|
@@ -464,7 +609,9 @@ func (p *Proxy) getLimits() (int, common.RateLimits) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return maxClients, rateLimits
|
|
|
|
|
|
|
+ maxPersonalClients := p.config.MaxPersonalClients
|
|
|
|
|
+
|
|
|
|
|
+ return maxCommonClients, maxPersonalClients, rateLimits
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// getAnnounceDelayParameters is a helper that fetches the proxy announcement
|
|
// getAnnounceDelayParameters is a helper that fetches the proxy announcement
|
|
@@ -487,7 +634,7 @@ func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, floa
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *Proxy) proxyClients(
|
|
func (p *Proxy) proxyClients(
|
|
|
- ctx context.Context, signalAnnounceDone func(), reducedPause bool) {
|
|
|
|
|
|
|
+ ctx context.Context, signalAnnounceDone func(), reducedPause bool, isPersonal bool) {
|
|
|
|
|
|
|
|
// Proxy one client, repeating until ctx is done.
|
|
// Proxy one client, repeating until ctx is done.
|
|
|
//
|
|
//
|
|
@@ -567,7 +714,7 @@ func (p *Proxy) proxyClients(
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
backOff, err := p.proxyOneClient(
|
|
backOff, err := p.proxyOneClient(
|
|
|
- ctx, logAnnounce, signalAnnounceDone)
|
|
|
|
|
|
|
+ ctx, logAnnounce, signalAnnounceDone, isPersonal)
|
|
|
|
|
|
|
|
if !backOff || err == nil {
|
|
if !backOff || err == nil {
|
|
|
failureDelayFactor = 1
|
|
failureDelayFactor = 1
|
|
@@ -692,7 +839,8 @@ func (p *Proxy) doNetworkDiscovery(
|
|
|
func (p *Proxy) proxyOneClient(
|
|
func (p *Proxy) proxyOneClient(
|
|
|
ctx context.Context,
|
|
ctx context.Context,
|
|
|
logAnnounce func() bool,
|
|
logAnnounce func() bool,
|
|
|
- signalAnnounceDone func()) (bool, error) {
|
|
|
|
|
|
|
+ signalAnnounceDone func(),
|
|
|
|
|
+ isPersonal bool) (bool, error) {
|
|
|
|
|
|
|
|
// Cancel/close this connection immediately if the network changes.
|
|
// Cancel/close this connection immediately if the network changes.
|
|
|
if p.config.GetCurrentNetworkContext != nil {
|
|
if p.config.GetCurrentNetworkContext != nil {
|
|
@@ -768,7 +916,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
// for tactics.
|
|
// for tactics.
|
|
|
checkTactics := signalAnnounceDone != nil
|
|
checkTactics := signalAnnounceDone != nil
|
|
|
|
|
|
|
|
- maxClients, rateLimits := p.getLimits()
|
|
|
|
|
|
|
+ maxCommonClients, maxPersonalClients, rateLimits := p.getLimits()
|
|
|
|
|
|
|
|
// Get the base Psiphon API parameters and additional proxy metrics,
|
|
// Get the base Psiphon API parameters and additional proxy metrics,
|
|
|
// including performance information, which is sent to the broker in the
|
|
// including performance information, which is sent to the broker in the
|
|
@@ -781,7 +929,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
// with the original network ID.
|
|
// with the original network ID.
|
|
|
|
|
|
|
|
metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
|
|
metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
|
|
|
- checkTactics, brokerCoordinator, webRTCCoordinator, maxClients, rateLimits)
|
|
|
|
|
|
|
+ checkTactics, brokerCoordinator, webRTCCoordinator, maxCommonClients, maxPersonalClients, rateLimits)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return backOff, errors.Trace(err)
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
}
|
|
@@ -789,9 +937,9 @@ func (p *Proxy) proxyOneClient(
|
|
|
// Set a delay before announcing, to stagger the announce request times.
|
|
// Set a delay before announcing, to stagger the announce request times.
|
|
|
// The delay helps to avoid triggering rate limits or similar errors from
|
|
// The delay helps to avoid triggering rate limits or similar errors from
|
|
|
// any intermediate CDN between the proxy and the broker; and provides a
|
|
// any intermediate CDN between the proxy and the broker; and provides a
|
|
|
- // nudge towards better load balancing across multiple large MaxClients
|
|
|
|
|
- // proxies, as the broker primarily matches enqueued announces in FIFO
|
|
|
|
|
- // order, since older announces expire earlier.
|
|
|
|
|
|
|
+ // nudge towards better load balancing across multiple large
|
|
|
|
|
+ // MaxCommonClients proxies, as the broker primarily matches enqueued
|
|
|
|
|
+ // announces in FIFO order, since older announces expire earlier.
|
|
|
//
|
|
//
|
|
|
// The delay is intended to be applied after doNetworkDiscovery, which has
|
|
// The delay is intended to be applied after doNetworkDiscovery, which has
|
|
|
// no reason to be delayed; and also after any waitToShareSession delay,
|
|
// no reason to be delayed; and also after any waitToShareSession delay,
|
|
@@ -836,7 +984,12 @@ func (p *Proxy) proxyOneClient(
|
|
|
p.announcing.Add(1)
|
|
p.announcing.Add(1)
|
|
|
|
|
|
|
|
announceStartTime := time.Now()
|
|
announceStartTime := time.Now()
|
|
|
- personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // Ignore the personalCompartmentIDs if this proxy is not personal
|
|
|
|
|
+ var personalCompartmentIDs []ID
|
|
|
|
|
+ if isPersonal {
|
|
|
|
|
+ personalCompartmentIDs = brokerCoordinator.PersonalCompartmentIDs()
|
|
|
|
|
+ }
|
|
|
announceResponse, err := brokerClient.ProxyAnnounce(
|
|
announceResponse, err := brokerClient.ProxyAnnounce(
|
|
|
ctx,
|
|
ctx,
|
|
|
requestDelay,
|
|
requestDelay,
|
|
@@ -917,6 +1070,18 @@ func (p *Proxy) proxyOneClient(
|
|
|
announceResponse.SelectedProtocolVersion)
|
|
announceResponse.SelectedProtocolVersion)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ clientRegion := announceResponse.ClientRegion
|
|
|
|
|
+ var regionActivity *RegionActivity
|
|
|
|
|
+ if clientRegion != "" {
|
|
|
|
|
+ regionActivity = p.getOrCreateRegionActivity(clientRegion, isPersonal)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Create per-connection activity wrapper with cached regionActivity pointer
|
|
|
|
|
+ connActivityWrapper := &connectionActivityWrapper{
|
|
|
|
|
+ p: p,
|
|
|
|
|
+ regionActivity: regionActivity,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// Trigger back-off if the following WebRTC operations fail to establish a
|
|
// Trigger back-off if the following WebRTC operations fail to establish a
|
|
|
// connections.
|
|
// connections.
|
|
|
//
|
|
//
|
|
@@ -930,10 +1095,16 @@ func (p *Proxy) proxyOneClient(
|
|
|
// For activity updates, indicate that a client connection is now underway.
|
|
// For activity updates, indicate that a client connection is now underway.
|
|
|
|
|
|
|
|
p.connectingClients.Add(1)
|
|
p.connectingClients.Add(1)
|
|
|
|
|
+ if regionActivity != nil {
|
|
|
|
|
+ regionActivity.connectingClients.Add(1)
|
|
|
|
|
+ }
|
|
|
connected := false
|
|
connected := false
|
|
|
defer func() {
|
|
defer func() {
|
|
|
if !connected {
|
|
if !connected {
|
|
|
p.connectingClients.Add(-1)
|
|
p.connectingClients.Add(-1)
|
|
|
|
|
+ if regionActivity != nil {
|
|
|
|
|
+ regionActivity.connectingClients.Add(-1)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
@@ -1069,8 +1240,15 @@ func (p *Proxy) proxyOneClient(
|
|
|
connected = true
|
|
connected = true
|
|
|
p.connectingClients.Add(-1)
|
|
p.connectingClients.Add(-1)
|
|
|
p.connectedClients.Add(1)
|
|
p.connectedClients.Add(1)
|
|
|
|
|
+ if regionActivity != nil {
|
|
|
|
|
+ regionActivity.connectingClients.Add(-1)
|
|
|
|
|
+ regionActivity.connectedClients.Add(1)
|
|
|
|
|
+ }
|
|
|
defer func() {
|
|
defer func() {
|
|
|
p.connectedClients.Add(-1)
|
|
p.connectedClients.Add(-1)
|
|
|
|
|
+ if regionActivity != nil {
|
|
|
|
|
+ regionActivity.connectedClients.Add(-1)
|
|
|
|
|
+ }
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
// Throttle the relay connection.
|
|
// Throttle the relay connection.
|
|
@@ -1078,9 +1256,9 @@ func (p *Proxy) proxyOneClient(
|
|
|
// Here, each client gets LimitUp/DownstreamBytesPerSecond. Proxy
|
|
// Here, each client gets LimitUp/DownstreamBytesPerSecond. Proxy
|
|
|
// operators may to want to limit their bandwidth usage with a single
|
|
// operators may to want to limit their bandwidth usage with a single
|
|
|
// up/down value, an overall limit. The ProxyConfig can simply be
|
|
// up/down value, an overall limit. The ProxyConfig can simply be
|
|
|
- // generated by dividing the limit by MaxClients. This approach favors
|
|
|
|
|
- // performance stability: each client gets the same throttling limits
|
|
|
|
|
- // regardless of how many other clients are connected.
|
|
|
|
|
|
|
+ // generated by dividing the limit by MaxCommonClients + MaxPersonalClients.
|
|
|
|
|
+ // This approach favors performance stability: each client gets the
|
|
|
|
|
+ // same throttling limits regardless of how many other clients are connected.
|
|
|
//
|
|
//
|
|
|
// Rate limits are applied only when a client connection is established;
|
|
// Rate limits are applied only when a client connection is established;
|
|
|
// connected clients retain their initial limits even when reduced time
|
|
// connected clients retain their initial limits even when reduced time
|
|
@@ -1107,7 +1285,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
proxyRelayInactivityTimeout)
|
|
proxyRelayInactivityTimeout)
|
|
|
|
|
|
|
|
destinationConn, err = common.NewActivityMonitoredConn(
|
|
destinationConn, err = common.NewActivityMonitoredConn(
|
|
|
- destinationConn, inactivityTimeout, false, nil, p.activityUpdateWrapper)
|
|
|
|
|
|
|
+ destinationConn, inactivityTimeout, false, nil, connActivityWrapper)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return backOff, errors.Trace(err)
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
}
|
|
@@ -1200,7 +1378,8 @@ func (p *Proxy) getMetrics(
|
|
|
includeTacticsParameters bool,
|
|
includeTacticsParameters bool,
|
|
|
brokerCoordinator BrokerDialCoordinator,
|
|
brokerCoordinator BrokerDialCoordinator,
|
|
|
webRTCCoordinator WebRTCDialCoordinator,
|
|
webRTCCoordinator WebRTCDialCoordinator,
|
|
|
- maxClients int,
|
|
|
|
|
|
|
+ maxCommonClients int,
|
|
|
|
|
+ maxPersonalClients int,
|
|
|
rateLimits common.RateLimits) (
|
|
rateLimits common.RateLimits) (
|
|
|
*ProxyMetrics, string, bool, error) {
|
|
*ProxyMetrics, string, bool, error) {
|
|
|
|
|
|
|
@@ -1229,7 +1408,8 @@ func (p *Proxy) getMetrics(
|
|
|
ProtocolVersion: LatestProtocolVersion,
|
|
ProtocolVersion: LatestProtocolVersion,
|
|
|
NATType: webRTCCoordinator.NATType(),
|
|
NATType: webRTCCoordinator.NATType(),
|
|
|
PortMappingTypes: webRTCCoordinator.PortMappingTypes(),
|
|
PortMappingTypes: webRTCCoordinator.PortMappingTypes(),
|
|
|
- MaxClients: int32(maxClients),
|
|
|
|
|
|
|
+ MaxCommonClients: int32(maxCommonClients),
|
|
|
|
|
+ MaxPersonalClients: int32(maxPersonalClients),
|
|
|
ConnectingClients: p.connectingClients.Load(),
|
|
ConnectingClients: p.connectingClients.Load(),
|
|
|
ConnectedClients: p.connectedClients.Load(),
|
|
ConnectedClients: p.connectedClients.Load(),
|
|
|
LimitUpstreamBytesPerSecond: rateLimits.ReadBytesPerSecond,
|
|
LimitUpstreamBytesPerSecond: rateLimits.ReadBytesPerSecond,
|