|
|
@@ -50,11 +50,13 @@ type Proxy struct {
|
|
|
bytesDown atomic.Int64
|
|
|
peakBytesUp atomic.Int64
|
|
|
peakBytesDown atomic.Int64
|
|
|
- connectingClients int32
|
|
|
- connectedClients int32
|
|
|
+ announcing atomic.Int32
|
|
|
+ connectingClients atomic.Int32
|
|
|
+ connectedClients atomic.Int32
|
|
|
|
|
|
config *ProxyConfig
|
|
|
activityUpdateWrapper *activityUpdateWrapper
|
|
|
+ lastAnnouncing int32
|
|
|
lastConnectingClients int32
|
|
|
lastConnectedClients int32
|
|
|
|
|
|
@@ -65,6 +67,24 @@ type Proxy struct {
|
|
|
nextAnnounceMutex sync.Mutex
|
|
|
nextAnnounceBrokerClient *BrokerClient
|
|
|
nextAnnounceNotBefore time.Time
|
|
|
+
|
|
|
+ useReducedSettings bool
|
|
|
+ reducedStartMinute int
|
|
|
+ reducedEndMinute int
|
|
|
+
|
|
|
+ personalStatsMutex sync.Mutex
|
|
|
+ personalRegionActivity map[string]*RegionActivity
|
|
|
+
|
|
|
+ commonStatsMutex sync.Mutex
|
|
|
+ commonRegionActivity map[string]*RegionActivity
|
|
|
+}
|
|
|
+
|
|
|
+// RegionActivity holds metrics per-region for more detailed metric collection.
|
|
|
+type RegionActivity struct {
|
|
|
+ bytesUp atomic.Int64
|
|
|
+ bytesDown atomic.Int64
|
|
|
+ connectingClients atomic.Int32
|
|
|
+ connectedClients atomic.Int32
|
|
|
}
|
|
|
|
|
|
// TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
|
|
|
@@ -132,9 +152,13 @@ type ProxyConfig struct {
|
|
|
// restarting the proxy.
|
|
|
MustUpgrade func()
|
|
|
|
|
|
- // MaxClients is the maximum number of clients that are allowed to connect
|
|
|
- // to the proxy. Must be > 0.
|
|
|
- MaxClients int
|
|
|
+ // MaxCommonClients (formerly MaxClients) is the maximum number of common
|
|
|
+ // clients that are allowed to connect to the proxy. Must be > 0.
|
|
|
+ MaxCommonClients int
|
|
|
+
|
|
|
+ // MaxPersonalClients is the maximum number of personal clients that are
|
|
|
+ // allowed to connect to the proxy. Must be > 0.
|
|
|
+ MaxPersonalClients int
|
|
|
|
|
|
// LimitUpstreamBytesPerSecond limits the upstream data transfer rate for
|
|
|
// a single client. When 0, there is no limit.
|
|
|
@@ -144,32 +168,114 @@ type ProxyConfig struct {
|
|
|
// for a single client. When 0, there is no limit.
|
|
|
LimitDownstreamBytesPerSecond int
|
|
|
|
|
|
+ // ReducedStartTime specifies the local time of day (HH:MM, 24-hour, UTC)
|
|
|
+ // at which reduced client settings begin.
|
|
|
+ ReducedStartTime string
|
|
|
+
|
|
|
+ // ReducedEndTime specifies the local time of day (HH:MM, 24-hour, UTC) at
|
|
|
+ // which reduced client settings end.
|
|
|
+ ReducedEndTime string
|
|
|
+
|
|
|
+ // ReducedMaxCommonClients specifies the maximum number of common clients
|
|
|
+ // that are allowed to connect to the proxy during the reduced time range.
|
|
|
+ //
|
|
|
+ // Limitation: We currently do not support ReducedMaxPersonalClients.
|
|
|
+ // We assume that due to the importance of personal clients, users
|
|
|
+ // always prefer to have them connected.
|
|
|
+ //
|
|
|
+ // Clients connected when the reduced settings begin will not be
|
|
|
+ // disconnected.
|
|
|
+ ReducedMaxCommonClients int
|
|
|
+
|
|
|
+ // ReducedLimitUpstreamBytesPerSecond limits the upstream data transfer
|
|
|
+ // rate for a single client during the reduced time range. When 0,
|
|
|
+ // LimitUpstreamBytesPerSecond is the limit.
|
|
|
+ //
|
|
|
+ // Rates for clients already connected when the reduced settings begin or
|
|
|
+ // end will not change.
|
|
|
+ ReducedLimitUpstreamBytesPerSecond int
|
|
|
+
|
|
|
+ // ReducedLimitDownstreamBytesPerSecond limits the downstream data
|
|
|
+ // transfer rate for a single client during the reduced time range. When
|
|
|
+ // 0, LimitDownstreamBytesPerSecond is the limit.
|
|
|
+ //
|
|
|
+ // Rates for clients already connected when the reduced settings begin or
|
|
|
+ // end will not change.
|
|
|
+ ReducedLimitDownstreamBytesPerSecond int
|
|
|
+
|
|
|
// ActivityUpdater specifies an ActivityUpdater for activity associated
|
|
|
// with this proxy.
|
|
|
ActivityUpdater ActivityUpdater
|
|
|
}
|
|
|
|
|
|
-// ActivityUpdater is a callback that is invoked when clients connect and
|
|
|
-// disconnect and periodically with data transfer updates (unless idle). This
|
|
|
-// callback may be used to update an activity UI. This callback should post
|
|
|
-// this data to another thread or handler and return immediately and not
|
|
|
-// block on UI updates.
|
|
|
+// RegionActivitySnapshot holds a point-in-time copy of per-region metrics.
|
|
|
+// This is used for the ActivityUpdater callback and notice serialization.
|
|
|
+type RegionActivitySnapshot struct {
|
|
|
+ BytesUp int64 `json:"bytesUp"`
|
|
|
+ BytesDown int64 `json:"bytesDown"`
|
|
|
+ ConnectingClients int32 `json:"connectingClients"`
|
|
|
+ ConnectedClients int32 `json:"connectedClients"`
|
|
|
+}
|
|
|
+
|
|
|
+// ActivityUpdater is a callback that is invoked when the proxy announces
|
|
|
+// availability, when clients connect and disconnect, and periodically with
|
|
|
+// data transfer updates (unless idle). This callback may be used to update
|
|
|
+// an activity UI. This callback should post this data to another thread or
|
|
|
+// handler and return immediately and not block on UI updates.
|
|
|
+//
|
|
|
+// The personalRegionActivity and commonRegionActivity parameters contain per-region
|
|
|
+// metrics (bytes transferred, connecting/connected counts) segmented by client
|
|
|
+// region.
|
|
|
type ActivityUpdater func(
|
|
|
+ announcing int32,
|
|
|
connectingClients int32,
|
|
|
connectedClients int32,
|
|
|
bytesUp int64,
|
|
|
bytesDown int64,
|
|
|
- bytesDuration time.Duration)
|
|
|
+ bytesDuration time.Duration,
|
|
|
+ personalRegionActivitySnapshot map[string]RegionActivitySnapshot,
|
|
|
+ commonRegionActivitySnapshot map[string]RegionActivitySnapshot)
|
|
|
|
|
|
// NewProxy initializes a new Proxy with the specified configuration.
|
|
|
func NewProxy(config *ProxyConfig) (*Proxy, error) {
|
|
|
|
|
|
- if config.MaxClients <= 0 {
|
|
|
- return nil, errors.TraceNew("invalid MaxClients")
|
|
|
+ // Check if there are no clients who can connect
|
|
|
+ if config.MaxCommonClients+config.MaxPersonalClients <= 0 {
|
|
|
+ return nil, errors.TraceNew("invalid MaxCommonClients")
|
|
|
}
|
|
|
|
|
|
p := &Proxy{
|
|
|
- config: config,
|
|
|
+ config: config,
|
|
|
+ personalRegionActivity: make(map[string]*RegionActivity),
|
|
|
+ commonRegionActivity: make(map[string]*RegionActivity),
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.ReducedStartTime != "" ||
|
|
|
+ config.ReducedEndTime != "" ||
|
|
|
+ config.ReducedMaxCommonClients > 0 {
|
|
|
+
|
|
|
+ startMinute, err := common.ParseTimeOfDayMinutes(config.ReducedStartTime)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Tracef("invalid ReducedStartTime: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ endMinute, err := common.ParseTimeOfDayMinutes(config.ReducedEndTime)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Tracef("invalid ReducedEndTime: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if startMinute == endMinute {
|
|
|
+ return nil, errors.TraceNew("invalid ReducedStartTime/ReducedEndTime")
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.ReducedMaxCommonClients <= 0 ||
|
|
|
+ config.ReducedMaxCommonClients > config.MaxCommonClients {
|
|
|
+ return nil, errors.TraceNew("invalid ReducedMaxCommonClients")
|
|
|
+ }
|
|
|
+
|
|
|
+ p.useReducedSettings = true
|
|
|
+ p.reducedStartMinute = startMinute
|
|
|
+ p.reducedEndMinute = endMinute
|
|
|
}
|
|
|
|
|
|
p.activityUpdateWrapper = &activityUpdateWrapper{p: p}
|
|
|
@@ -190,6 +296,24 @@ func (w *activityUpdateWrapper) UpdateProgress(bytesRead, bytesWritten int64, _
|
|
|
w.p.bytesDown.Add(bytesRead)
|
|
|
}
|
|
|
|
|
|
+// connectionActivityWrapper implements common.ActivityUpdater for a single
|
|
|
+// connection. It caches the RegionActivity pointer to enable atomic updates
|
|
|
+// with no mutex locking.
|
|
|
+type connectionActivityWrapper struct {
|
|
|
+ p *Proxy
|
|
|
+ regionActivity *RegionActivity
|
|
|
+}
|
|
|
+
|
|
|
+func (w *connectionActivityWrapper) UpdateProgress(bytesRead, bytesWritten int64, _ int64) {
|
|
|
+ w.p.bytesUp.Add(bytesWritten)
|
|
|
+ w.p.bytesDown.Add(bytesRead)
|
|
|
+
|
|
|
+ if w.regionActivity != nil {
|
|
|
+ w.regionActivity.bytesUp.Add(bytesWritten)
|
|
|
+ w.regionActivity.bytesDown.Add(bytesRead)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// Run runs the proxy. The proxy sends requests to the Broker announcing its
|
|
|
// availability; the Broker matches the proxy with clients, and facilitates
|
|
|
// an exchange of WebRTC connection information; the proxy and each client
|
|
|
@@ -206,12 +330,39 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
|
|
|
proxyWaitGroup := new(sync.WaitGroup)
|
|
|
|
|
|
+ // Capture activity updates every second, which is the required frequency
|
|
|
+ // for PeakUp/DownstreamBytesPerSecond. This is also a reasonable
|
|
|
+ // frequency for invoking the ActivityUpdater and updating UI widgets.
|
|
|
+
|
|
|
+ proxyWaitGroup.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer proxyWaitGroup.Done()
|
|
|
+
|
|
|
+ p.lastAnnouncing = 0
|
|
|
+ p.lastConnectingClients = 0
|
|
|
+ p.lastConnectedClients = 0
|
|
|
+
|
|
|
+ activityUpdatePeriod := 1 * time.Second
|
|
|
+ ticker := time.NewTicker(activityUpdatePeriod)
|
|
|
+ defer ticker.Stop()
|
|
|
+
|
|
|
+ loop:
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ticker.C:
|
|
|
+ p.activityUpdate(activityUpdatePeriod)
|
|
|
+ case <-ctx.Done():
|
|
|
+ break loop
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
// Launch the first proxy worker, passing a signal to be triggered once
|
|
|
// the very first announcement round trip is complete. The first round
|
|
|
// trip is awaited so that:
|
|
|
//
|
|
|
// - The first announce response will arrive with any new tactics,
|
|
|
- // which may be applied before launching additions workers.
|
|
|
+ // which may be applied before launching additional workers.
|
|
|
//
|
|
|
// - The first worker gets no announcement delay and is also guaranteed to
|
|
|
// be the shared session establisher. Since the announcement delays are
|
|
|
@@ -220,7 +371,21 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
// session establisher to be a different worker than the no-delay worker.
|
|
|
//
|
|
|
// The first worker is the only proxy worker which sets
|
|
|
- // ProxyAnnounceRequest.CheckTactics.
|
|
|
+ // ProxyAnnounceRequest.CheckTactics/PreCheckTactics. PreCheckTactics is
|
|
|
+ // used on the first announcement so the request returns immediately
|
|
|
+ // without awaiting a match. This allows all workers to be launched
|
|
|
+ // quickly.
|
|
|
+
|
|
|
+ commonProxiesToCreate, personalProxiesToCreate :=
|
|
|
+ p.config.MaxCommonClients, p.config.MaxPersonalClients
|
|
|
+
|
|
|
+ // Doing this outside of the go routine to avoid race conditions
|
|
|
+ firstWorkerIsPersonal := p.config.MaxCommonClients <= 0
|
|
|
+ if firstWorkerIsPersonal {
|
|
|
+ personalProxiesToCreate -= 1
|
|
|
+ } else {
|
|
|
+ commonProxiesToCreate -= 1
|
|
|
+ }
|
|
|
|
|
|
signalFirstAnnounceCtx, signalFirstAnnounceDone :=
|
|
|
context.WithCancel(context.Background())
|
|
|
@@ -228,7 +393,7 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
proxyWaitGroup.Add(1)
|
|
|
go func() {
|
|
|
defer proxyWaitGroup.Done()
|
|
|
- p.proxyClients(ctx, signalFirstAnnounceDone)
|
|
|
+ p.proxyClients(ctx, signalFirstAnnounceDone, false, firstWorkerIsPersonal)
|
|
|
}()
|
|
|
|
|
|
select {
|
|
|
@@ -239,86 +404,136 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
|
|
|
// Launch the remaining workers.
|
|
|
|
|
|
- for i := 0; i < p.config.MaxClients-1; i++ {
|
|
|
+ for i := 0; i < commonProxiesToCreate; i++ {
|
|
|
+ isPersonal := false
|
|
|
+
|
|
|
+ // When reduced settings are in effect, a subset of workers will pause
|
|
|
+ // during the reduced time period. Since ReducedMaxCommonClients > 0 the
|
|
|
+ // first proxy worker is never paused.
|
|
|
+ workerNum := i + 1
|
|
|
+ reducedPause := p.useReducedSettings &&
|
|
|
+ workerNum >= p.config.ReducedMaxCommonClients
|
|
|
+
|
|
|
proxyWaitGroup.Add(1)
|
|
|
- go func() {
|
|
|
+ go func(reducedPause bool) {
|
|
|
defer proxyWaitGroup.Done()
|
|
|
- p.proxyClients(ctx, nil)
|
|
|
- }()
|
|
|
+ p.proxyClients(ctx, nil, reducedPause, isPersonal)
|
|
|
+ }(reducedPause)
|
|
|
}
|
|
|
|
|
|
- // Capture activity updates every second, which is the required frequency
|
|
|
- // for PeakUp/DownstreamBytesPerSecond. This is also a reasonable
|
|
|
- // frequency for invoking the ActivityUpdater and updating UI widgets.
|
|
|
-
|
|
|
- p.lastConnectingClients = 0
|
|
|
- p.lastConnectedClients = 0
|
|
|
+ for i := 0; i < personalProxiesToCreate; i++ {
|
|
|
+ // Limitation: There are no reduced settings for personal proxies
|
|
|
+ isPersonal := true
|
|
|
|
|
|
- activityUpdatePeriod := 1 * time.Second
|
|
|
- ticker := time.NewTicker(activityUpdatePeriod)
|
|
|
- defer ticker.Stop()
|
|
|
-
|
|
|
-loop:
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-ticker.C:
|
|
|
- p.activityUpdate(activityUpdatePeriod)
|
|
|
- case <-ctx.Done():
|
|
|
- break loop
|
|
|
- }
|
|
|
+ proxyWaitGroup.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer proxyWaitGroup.Done()
|
|
|
+ p.proxyClients(ctx, nil, false, isPersonal)
|
|
|
+ }()
|
|
|
}
|
|
|
|
|
|
proxyWaitGroup.Wait()
|
|
|
}
|
|
|
|
|
|
-// getAnnounceDelayParameters is a helper that fetches the proxy announcement
|
|
|
-// delay parameters from the current broker client.
|
|
|
-//
|
|
|
-// getAnnounceDelayParameters is used to configure a delay when
|
|
|
-// proxyOneClient fails. As having no broker clients is a possible
|
|
|
-// proxyOneClient failure case, GetBrokerClient errors are ignored here and
|
|
|
-// defaults used in that case.
|
|
|
-func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, float64) {
|
|
|
- brokerClient, err := p.config.GetBrokerClient()
|
|
|
- if err != nil {
|
|
|
- return proxyAnnounceDelay, proxyAnnounceMaxBackoffDelay, proxyAnnounceDelayJitter
|
|
|
- }
|
|
|
- brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
|
|
|
- return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
|
|
|
- common.ValueOrDefault(brokerCoordinator.AnnounceMaxBackoffDelay(), proxyAnnounceMaxBackoffDelay),
|
|
|
- common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
func (p *Proxy) activityUpdate(period time.Duration) {
|
|
|
|
|
|
- connectingClients := atomic.LoadInt32(&p.connectingClients)
|
|
|
- connectedClients := atomic.LoadInt32(&p.connectedClients)
|
|
|
+ // Concurrency: activityUpdate is called by only the single goroutine
|
|
|
+ // created in Run.
|
|
|
+
|
|
|
+ announcing := p.announcing.Load()
|
|
|
+ connectingClients := p.connectingClients.Load()
|
|
|
+ connectedClients := p.connectedClients.Load()
|
|
|
bytesUp := p.bytesUp.Swap(0)
|
|
|
bytesDown := p.bytesDown.Swap(0)
|
|
|
|
|
|
greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
|
|
|
greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
|
|
|
|
|
|
- clientsChanged := connectingClients != p.lastConnectingClients ||
|
|
|
+ personalRegionActivity := p.snapshotAndResetRegionActivity(
|
|
|
+ &p.personalStatsMutex, p.personalRegionActivity)
|
|
|
+
|
|
|
+ commonRegionActivity := p.snapshotAndResetRegionActivity(
|
|
|
+ &p.commonStatsMutex, p.commonRegionActivity)
|
|
|
+
|
|
|
+ stateChanged := announcing != p.lastAnnouncing ||
|
|
|
+ connectingClients != p.lastConnectingClients ||
|
|
|
connectedClients != p.lastConnectedClients
|
|
|
|
|
|
+ p.lastAnnouncing = announcing
|
|
|
p.lastConnectingClients = connectingClients
|
|
|
p.lastConnectedClients = connectedClients
|
|
|
|
|
|
- if !clientsChanged &&
|
|
|
+ if !stateChanged &&
|
|
|
bytesUp == 0 &&
|
|
|
bytesDown == 0 {
|
|
|
- // Skip the activity callback on idle bytes or no change in client counts.
|
|
|
+ // Skip the activity callback on idle bytes or no change in worker state.
|
|
|
return
|
|
|
}
|
|
|
|
|
|
p.config.ActivityUpdater(
|
|
|
+ announcing,
|
|
|
connectingClients,
|
|
|
connectedClients,
|
|
|
bytesUp,
|
|
|
bytesDown,
|
|
|
- period)
|
|
|
+ period,
|
|
|
+ personalRegionActivity,
|
|
|
+ commonRegionActivity)
|
|
|
+}
|
|
|
+
|
|
|
+// getOrCreateRegionActivity returns the RegionActivity for a region, creating it
|
|
|
+// if needed. This should be called once at connection start to avoid multiple
|
|
|
+// lock usage.
|
|
|
+func (p *Proxy) getOrCreateRegionActivity(region string, isPersonal bool) *RegionActivity {
|
|
|
+ var mutex *sync.Mutex
|
|
|
+ var statsMap map[string]*RegionActivity
|
|
|
+ if isPersonal {
|
|
|
+ mutex = &p.personalStatsMutex
|
|
|
+ statsMap = p.personalRegionActivity
|
|
|
+ } else {
|
|
|
+ mutex = &p.commonStatsMutex
|
|
|
+ statsMap = p.commonRegionActivity
|
|
|
+ }
|
|
|
+ mutex.Lock()
|
|
|
+ defer mutex.Unlock()
|
|
|
+ stats, exists := statsMap[region]
|
|
|
+ if !exists {
|
|
|
+ stats = &RegionActivity{}
|
|
|
+ statsMap[region] = stats
|
|
|
+ }
|
|
|
+ return stats
|
|
|
+}
|
|
|
+
|
|
|
+// snapshotAndResetRegionActivity creates a copy of region stats with bytes reset
|
|
|
+// to zero, and prunes any entries that have no active connections and zero
|
|
|
+// bytes. The snapshot mechanism allows us to avoid holding locks during the
|
|
|
+// callback invocation.
|
|
|
+func (p *Proxy) snapshotAndResetRegionActivity(
|
|
|
+ mutex *sync.Mutex,
|
|
|
+ statsMap map[string]*RegionActivity,
|
|
|
+) map[string]RegionActivitySnapshot {
|
|
|
+ mutex.Lock()
|
|
|
+ defer mutex.Unlock()
|
|
|
+ result := make(map[string]RegionActivitySnapshot, len(statsMap))
|
|
|
+ regionsToDelete := []string{}
|
|
|
+ for region, stats := range statsMap {
|
|
|
+ snapshot := RegionActivitySnapshot{
|
|
|
+ BytesUp: stats.bytesUp.Swap(0),
|
|
|
+ BytesDown: stats.bytesDown.Swap(0),
|
|
|
+ ConnectingClients: stats.connectingClients.Load(),
|
|
|
+ ConnectedClients: stats.connectedClients.Load(),
|
|
|
+ }
|
|
|
+ if snapshot.BytesUp > 0 || snapshot.BytesDown > 0 ||
|
|
|
+ snapshot.ConnectingClients > 0 || snapshot.ConnectedClients > 0 {
|
|
|
+ result[region] = snapshot
|
|
|
+ } else {
|
|
|
+ regionsToDelete = append(regionsToDelete, region)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for _, region := range regionsToDelete {
|
|
|
+ delete(statsMap, region)
|
|
|
+ }
|
|
|
+ return result
|
|
|
}
|
|
|
|
|
|
func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
|
|
|
@@ -333,8 +548,95 @@ func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
+func (p *Proxy) isReducedUntil() (int, time.Time) {
|
|
|
+ if !p.useReducedSettings {
|
|
|
+ return p.config.MaxCommonClients, time.Time{}
|
|
|
+ }
|
|
|
+
|
|
|
+ now := time.Now().UTC()
|
|
|
+ minute := now.Hour()*60 + now.Minute()
|
|
|
+
|
|
|
+ isReduced := false
|
|
|
+ if p.reducedStartMinute < p.reducedEndMinute {
|
|
|
+ isReduced = minute >= p.reducedStartMinute && minute < p.reducedEndMinute
|
|
|
+ } else {
|
|
|
+ isReduced = minute >= p.reducedStartMinute || minute < p.reducedEndMinute
|
|
|
+ }
|
|
|
+
|
|
|
+ if !isReduced {
|
|
|
+ return p.config.MaxCommonClients, time.Time{}
|
|
|
+ }
|
|
|
+
|
|
|
+ endHour := p.reducedEndMinute / 60
|
|
|
+ endMinute := p.reducedEndMinute % 60
|
|
|
+ endTime := time.Date(
|
|
|
+ now.Year(),
|
|
|
+ now.Month(),
|
|
|
+ now.Day(),
|
|
|
+ endHour,
|
|
|
+ endMinute,
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ now.Location(),
|
|
|
+ )
|
|
|
+ if !endTime.After(now) {
|
|
|
+ endTime = endTime.AddDate(0, 0, 1)
|
|
|
+ }
|
|
|
+ return p.config.ReducedMaxCommonClients, endTime
|
|
|
+}
|
|
|
+
|
|
|
+func (p *Proxy) getLimits() (int, int, common.RateLimits) {
|
|
|
+
|
|
|
+ rateLimits := common.RateLimits{
|
|
|
+ ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
|
+ WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
|
+ }
|
|
|
+
|
|
|
+ maxCommonClients, reducedUntil := p.isReducedUntil()
|
|
|
+ if !reducedUntil.IsZero() {
|
|
|
+
|
|
|
+ upstream := p.config.ReducedLimitUpstreamBytesPerSecond
|
|
|
+ if upstream == 0 {
|
|
|
+ upstream = p.config.LimitUpstreamBytesPerSecond
|
|
|
+ }
|
|
|
+
|
|
|
+ downstream := p.config.ReducedLimitDownstreamBytesPerSecond
|
|
|
+ if downstream == 0 {
|
|
|
+ downstream = p.config.LimitDownstreamBytesPerSecond
|
|
|
+ }
|
|
|
+
|
|
|
+ rateLimits = common.RateLimits{
|
|
|
+ ReadBytesPerSecond: int64(upstream),
|
|
|
+ WriteBytesPerSecond: int64(downstream),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ maxPersonalClients := p.config.MaxPersonalClients
|
|
|
+
|
|
|
+ return maxCommonClients, maxPersonalClients, rateLimits
|
|
|
+}
|
|
|
+
|
|
|
+// getAnnounceDelayParameters is a helper that fetches the proxy announcement
|
|
|
+// delay parameters from the current broker client.
|
|
|
+//
|
|
|
+// getAnnounceDelayParameters is used to configure a delay when
|
|
|
+// proxyOneClient fails. As having no broker clients is a possible
|
|
|
+// proxyOneClient failure case, GetBrokerClient errors are ignored here and
|
|
|
+// defaults used in that case.
|
|
|
+func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, float64) {
|
|
|
+ brokerClient, err := p.config.GetBrokerClient()
|
|
|
+ if err != nil {
|
|
|
+ return proxyAnnounceDelay, proxyAnnounceMaxBackoffDelay, proxyAnnounceDelayJitter
|
|
|
+ }
|
|
|
+ brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
|
|
|
+ return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
|
|
|
+ common.ValueOrDefault(brokerCoordinator.AnnounceMaxBackoffDelay(), proxyAnnounceMaxBackoffDelay),
|
|
|
+ common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
func (p *Proxy) proxyClients(
|
|
|
- ctx context.Context, signalAnnounceDone func()) {
|
|
|
+ ctx context.Context, signalAnnounceDone func(), reducedPause bool, isPersonal bool) {
|
|
|
|
|
|
// Proxy one client, repeating until ctx is done.
|
|
|
//
|
|
|
@@ -375,12 +677,39 @@ func (p *Proxy) proxyClients(
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
+ preCheckTacticsDone := false
|
|
|
+
|
|
|
for ctx.Err() == nil {
|
|
|
|
|
|
if !p.config.WaitForNetworkConnectivity() {
|
|
|
break
|
|
|
}
|
|
|
|
|
|
+ // Pause designated workers during the reduced time range. In-flight
|
|
|
+ // announces are not interrupted and connected clients are not
|
|
|
+ // disconnected, so there is a gradual transition into reduced mode.
|
|
|
+
|
|
|
+ if reducedPause {
|
|
|
+ _, reducedUntil := p.isReducedUntil()
|
|
|
+ if !reducedUntil.IsZero() {
|
|
|
+
|
|
|
+ pauseDuration := time.Until(reducedUntil)
|
|
|
+ p.config.Logger.WithTraceFields(common.LogFields{
|
|
|
+ "duration": pauseDuration.String(),
|
|
|
+ }).Info("pause worker")
|
|
|
+
|
|
|
+ timer := time.NewTimer(pauseDuration)
|
|
|
+ select {
|
|
|
+ case <-timer.C:
|
|
|
+ case <-ctx.Done():
|
|
|
+ }
|
|
|
+ timer.Stop()
|
|
|
+ if ctx.Err() != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if time.Since(startLogSampleTime) >= proxyAnnounceLogSamplePeriod {
|
|
|
logAnnounceCount = proxyAnnounceLogSampleSize
|
|
|
logErrorsCount = proxyAnnounceLogSampleSize
|
|
|
@@ -389,7 +718,7 @@ func (p *Proxy) proxyClients(
|
|
|
}
|
|
|
|
|
|
backOff, err := p.proxyOneClient(
|
|
|
- ctx, logAnnounce, signalAnnounceDone)
|
|
|
+ ctx, logAnnounce, &preCheckTacticsDone, signalAnnounceDone, isPersonal)
|
|
|
|
|
|
if !backOff || err == nil {
|
|
|
failureDelayFactor = 1
|
|
|
@@ -514,7 +843,9 @@ func (p *Proxy) doNetworkDiscovery(
|
|
|
func (p *Proxy) proxyOneClient(
|
|
|
ctx context.Context,
|
|
|
logAnnounce func() bool,
|
|
|
- signalAnnounceDone func()) (bool, error) {
|
|
|
+ preCheckTacticsDone *bool,
|
|
|
+ signalAnnounceDone func(),
|
|
|
+ isPersonal bool) (bool, error) {
|
|
|
|
|
|
// Cancel/close this connection immediately if the network changes.
|
|
|
if p.config.GetCurrentNetworkContext != nil {
|
|
|
@@ -588,7 +919,10 @@ func (p *Proxy) proxyOneClient(
|
|
|
|
|
|
// Only the first worker, which has signalAnnounceDone configured, checks
|
|
|
// for tactics.
|
|
|
- checkTactics := signalAnnounceDone != nil
|
|
|
+ checkTactics := signalAnnounceDone != nil && *preCheckTacticsDone
|
|
|
+ preCheckTactics := signalAnnounceDone != nil && !*preCheckTacticsDone
|
|
|
+
|
|
|
+ maxCommonClients, maxPersonalClients, rateLimits := p.getLimits()
|
|
|
|
|
|
// Get the base Psiphon API parameters and additional proxy metrics,
|
|
|
// including performance information, which is sent to the broker in the
|
|
|
@@ -601,7 +935,12 @@ func (p *Proxy) proxyOneClient(
|
|
|
// with the original network ID.
|
|
|
|
|
|
metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
|
|
|
- checkTactics, brokerCoordinator, webRTCCoordinator)
|
|
|
+ checkTactics || preCheckTactics,
|
|
|
+ brokerCoordinator,
|
|
|
+ webRTCCoordinator,
|
|
|
+ maxCommonClients,
|
|
|
+ maxPersonalClients,
|
|
|
+ rateLimits)
|
|
|
if err != nil {
|
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
|
@@ -609,9 +948,9 @@ func (p *Proxy) proxyOneClient(
|
|
|
// Set a delay before announcing, to stagger the announce request times.
|
|
|
// The delay helps to avoid triggering rate limits or similar errors from
|
|
|
// any intermediate CDN between the proxy and the broker; and provides a
|
|
|
- // nudge towards better load balancing across multiple large MaxClients
|
|
|
- // proxies, as the broker primarily matches enqueued announces in FIFO
|
|
|
- // order, since older announces expire earlier.
|
|
|
+ // nudge towards better load balancing across multiple large
|
|
|
+ // MaxCommonClients proxies, as the broker primarily matches enqueued
|
|
|
+ // announces in FIFO order, since older announces expire earlier.
|
|
|
//
|
|
|
// The delay is intended to be applied after doNetworkDiscovery, which has
|
|
|
// no reason to be delayed; and also after any waitToShareSession delay,
|
|
|
@@ -652,8 +991,16 @@ func (p *Proxy) proxyOneClient(
|
|
|
//
|
|
|
// ProxyAnnounce applies an additional request timeout to facilitate
|
|
|
// long-polling.
|
|
|
+
|
|
|
+ p.announcing.Add(1)
|
|
|
+
|
|
|
announceStartTime := time.Now()
|
|
|
- personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
|
|
|
+
|
|
|
+ // Ignore the personalCompartmentIDs if this proxy is not personal
|
|
|
+ var personalCompartmentIDs []ID
|
|
|
+ if isPersonal {
|
|
|
+ personalCompartmentIDs = brokerCoordinator.PersonalCompartmentIDs()
|
|
|
+ }
|
|
|
announceResponse, err := brokerClient.ProxyAnnounce(
|
|
|
ctx,
|
|
|
requestDelay,
|
|
|
@@ -661,6 +1008,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
PersonalCompartmentIDs: personalCompartmentIDs,
|
|
|
Metrics: metrics,
|
|
|
CheckTactics: checkTactics,
|
|
|
+ PreCheckTactics: preCheckTactics,
|
|
|
})
|
|
|
if logAnnounce() {
|
|
|
p.config.Logger.WithTraceFields(common.LogFields{
|
|
|
@@ -668,6 +1016,9 @@ func (p *Proxy) proxyOneClient(
|
|
|
"elapsedTime": time.Since(announceStartTime).String(),
|
|
|
}).Info("announcement request")
|
|
|
}
|
|
|
+
|
|
|
+ p.announcing.Add(-1)
|
|
|
+
|
|
|
if err != nil {
|
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
|
@@ -690,12 +1041,16 @@ func (p *Proxy) proxyOneClient(
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Signal that the announce round trip is complete. At this point, the
|
|
|
- // broker Noise session should be established and any fresh tactics
|
|
|
- // applied.
|
|
|
+ // Signal that the announce round trip is complete, allowing other workers
|
|
|
+ // to launch. At this point, the broker Noise session should be established
|
|
|
+ // and any fresh tactics applied. Also toggle preCheckTacticsDone since
|
|
|
+ // there's no need to retry PreCheckTactics once a round trip succeeds.
|
|
|
if signalAnnounceDone != nil {
|
|
|
signalAnnounceDone()
|
|
|
}
|
|
|
+ if preCheckTactics {
|
|
|
+ *preCheckTacticsDone = true
|
|
|
+ }
|
|
|
|
|
|
// MustUpgrade has precedence over other cases, to ensure the callback is
|
|
|
// invoked. Trigger back-off back off when rate/entry limited or must
|
|
|
@@ -716,10 +1071,24 @@ func (p *Proxy) proxyOneClient(
|
|
|
|
|
|
} else if announceResponse.NoMatch {
|
|
|
|
|
|
+ // No backoff for no-match.
|
|
|
+ //
|
|
|
+ // This is also the expected response for CheckTactics with a tactics
|
|
|
+ // payload and PreCheckTactics with or without a tactics payload,
|
|
|
+ // distinct cases which should not back off.
|
|
|
+
|
|
|
return backOff, errors.TraceNew("no match")
|
|
|
|
|
|
}
|
|
|
|
|
|
+ if preCheckTactics && !announceResponse.NoMatch {
|
|
|
+
|
|
|
+ // Sanity check: the broker should always respond with no-match for
|
|
|
+ // PreCheckTactics.
|
|
|
+
|
|
|
+ return backOff, errors.TraceNew("unexpected PreCheckTactics response")
|
|
|
+ }
|
|
|
+
|
|
|
if announceResponse.SelectedProtocolVersion < ProtocolVersion1 ||
|
|
|
(announceResponse.UseMediaStreams &&
|
|
|
announceResponse.SelectedProtocolVersion < ProtocolVersion2) ||
|
|
|
@@ -731,23 +1100,36 @@ func (p *Proxy) proxyOneClient(
|
|
|
announceResponse.SelectedProtocolVersion)
|
|
|
}
|
|
|
|
|
|
+ clientRegion := announceResponse.ClientRegion
|
|
|
+ var regionActivity *RegionActivity
|
|
|
+ if clientRegion != "" {
|
|
|
+ regionActivity = p.getOrCreateRegionActivity(clientRegion, isPersonal)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create per-connection activity wrapper with cached regionActivity pointer
|
|
|
+ connActivityWrapper := &connectionActivityWrapper{
|
|
|
+ p: p,
|
|
|
+ regionActivity: regionActivity,
|
|
|
+ }
|
|
|
+
|
|
|
// Trigger back-off if the following WebRTC operations fail to establish a
|
|
|
// connections.
|
|
|
- //
|
|
|
- // Limitation: the proxy answer request to the broker may fail due to the
|
|
|
- // non-back-off reasons documented above for the proxy announcment request;
|
|
|
- // however, these should be unlikely assuming that the broker client is
|
|
|
- // using a persistent transport connection.
|
|
|
|
|
|
backOff = true
|
|
|
|
|
|
// For activity updates, indicate that a client connection is now underway.
|
|
|
|
|
|
- atomic.AddInt32(&p.connectingClients, 1)
|
|
|
+ p.connectingClients.Add(1)
|
|
|
+ if regionActivity != nil {
|
|
|
+ regionActivity.connectingClients.Add(1)
|
|
|
+ }
|
|
|
connected := false
|
|
|
defer func() {
|
|
|
if !connected {
|
|
|
- atomic.AddInt32(&p.connectingClients, -1)
|
|
|
+ p.connectingClients.Add(-1)
|
|
|
+ if regionActivity != nil {
|
|
|
+ regionActivity.connectingClients.Add(-1)
|
|
|
+ }
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
@@ -796,7 +1178,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
|
|
|
// Send answer request with SDP or error.
|
|
|
|
|
|
- _, err = brokerClient.ProxyAnswer(
|
|
|
+ answerResponse, err := brokerClient.ProxyAnswer(
|
|
|
ctx,
|
|
|
&ProxyAnswerRequest{
|
|
|
ConnectionID: announceResponse.ConnectionID,
|
|
|
@@ -809,6 +1191,11 @@ func (p *Proxy) proxyOneClient(
|
|
|
// Prioritize returning any WebRTC error for logging.
|
|
|
return backOff, webRTCErr
|
|
|
}
|
|
|
+
|
|
|
+ // Don't backoff if the answer request fails due to possible transient
|
|
|
+ // request transport errors.
|
|
|
+
|
|
|
+ backOff = false
|
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
@@ -818,6 +1205,22 @@ func (p *Proxy) proxyOneClient(
|
|
|
return backOff, webRTCErr
|
|
|
}
|
|
|
|
|
|
+ // Exit if the client was no longer awaiting the answer. There is no
|
|
|
+ // backoff in this case, and there's no error, as the proxy did not fail
|
|
|
+ // as it's not an unexpected outcome.
|
|
|
+ //
|
|
|
+ // Limitation: it's possible that the announce request responds quickly
|
|
|
+ // and the matched client offer is already close to timing out. The
|
|
|
+ // answer request will also respond quickly. There's an increased chance
|
|
|
+ // of hitting rate limits in this fast turn around scenario. This outcome
|
|
|
+ // is mitigated by InproxyBrokerMatcherOfferMinimumDeadline.
|
|
|
+
|
|
|
+ if answerResponse.NoAwaitingClient {
|
|
|
+
|
|
|
+ backOff = false
|
|
|
+ return backOff, nil
|
|
|
+ }
|
|
|
+
|
|
|
// Await the WebRTC connection.
|
|
|
|
|
|
// We could concurrently dial the destination, to have that network
|
|
|
@@ -881,10 +1284,17 @@ func (p *Proxy) proxyOneClient(
|
|
|
// For activity updates, indicate that a client connection is established.
|
|
|
|
|
|
connected = true
|
|
|
- atomic.AddInt32(&p.connectingClients, -1)
|
|
|
- atomic.AddInt32(&p.connectedClients, 1)
|
|
|
+ p.connectingClients.Add(-1)
|
|
|
+ p.connectedClients.Add(1)
|
|
|
+ if regionActivity != nil {
|
|
|
+ regionActivity.connectingClients.Add(-1)
|
|
|
+ regionActivity.connectedClients.Add(1)
|
|
|
+ }
|
|
|
defer func() {
|
|
|
- atomic.AddInt32(&p.connectedClients, -1)
|
|
|
+ p.connectedClients.Add(-1)
|
|
|
+ if regionActivity != nil {
|
|
|
+ regionActivity.connectedClients.Add(-1)
|
|
|
+ }
|
|
|
}()
|
|
|
|
|
|
// Throttle the relay connection.
|
|
|
@@ -892,17 +1302,18 @@ func (p *Proxy) proxyOneClient(
|
|
|
// Here, each client gets LimitUp/DownstreamBytesPerSecond. Proxy
|
|
|
// operators may to want to limit their bandwidth usage with a single
|
|
|
// up/down value, an overall limit. The ProxyConfig can simply be
|
|
|
- // generated by dividing the limit by MaxClients. This approach favors
|
|
|
- // performance stability: each client gets the same throttling limits
|
|
|
- // regardless of how many other clients are connected.
|
|
|
+ // generated by dividing the limit by MaxCommonClients + MaxPersonalClients.
|
|
|
+ // This approach favors performance stability: each client gets the
|
|
|
+ // same throttling limits regardless of how many other clients are connected.
|
|
|
+ //
|
|
|
+ // Rate limits are applied only when a client connection is established;
|
|
|
+ // connected clients retain their initial limits even when reduced time
|
|
|
+ // starts or ends.
|
|
|
|
|
|
destinationConn = common.NewThrottledConn(
|
|
|
destinationConn,
|
|
|
announceResponse.NetworkProtocol.IsStream(),
|
|
|
- common.RateLimits{
|
|
|
- ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
|
- WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
|
- })
|
|
|
+ rateLimits)
|
|
|
|
|
|
// Hook up bytes transferred counting for activity updates.
|
|
|
|
|
|
@@ -920,7 +1331,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
proxyRelayInactivityTimeout)
|
|
|
|
|
|
destinationConn, err = common.NewActivityMonitoredConn(
|
|
|
- destinationConn, inactivityTimeout, false, nil, p.activityUpdateWrapper)
|
|
|
+ destinationConn, inactivityTimeout, false, nil, connActivityWrapper)
|
|
|
if err != nil {
|
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
|
@@ -1012,7 +1423,10 @@ func (p *Proxy) proxyOneClient(
|
|
|
func (p *Proxy) getMetrics(
|
|
|
includeTacticsParameters bool,
|
|
|
brokerCoordinator BrokerDialCoordinator,
|
|
|
- webRTCCoordinator WebRTCDialCoordinator) (
|
|
|
+ webRTCCoordinator WebRTCDialCoordinator,
|
|
|
+ maxCommonClients int,
|
|
|
+ maxPersonalClients int,
|
|
|
+ rateLimits common.RateLimits) (
|
|
|
*ProxyMetrics, string, bool, error) {
|
|
|
|
|
|
// tacticsNetworkID records the exact network ID that corresponds to the
|
|
|
@@ -1040,11 +1454,12 @@ func (p *Proxy) getMetrics(
|
|
|
ProtocolVersion: LatestProtocolVersion,
|
|
|
NATType: webRTCCoordinator.NATType(),
|
|
|
PortMappingTypes: webRTCCoordinator.PortMappingTypes(),
|
|
|
- MaxClients: int32(p.config.MaxClients),
|
|
|
- ConnectingClients: atomic.LoadInt32(&p.connectingClients),
|
|
|
- ConnectedClients: atomic.LoadInt32(&p.connectedClients),
|
|
|
- LimitUpstreamBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
|
- LimitDownstreamBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
|
+ MaxCommonClients: int32(maxCommonClients),
|
|
|
+ MaxPersonalClients: int32(maxPersonalClients),
|
|
|
+ ConnectingClients: p.connectingClients.Load(),
|
|
|
+ ConnectedClients: p.connectedClients.Load(),
|
|
|
+ LimitUpstreamBytesPerSecond: rateLimits.ReadBytesPerSecond,
|
|
|
+ LimitDownstreamBytesPerSecond: rateLimits.WriteBytesPerSecond,
|
|
|
PeakUpstreamBytesPerSecond: p.peakBytesUp.Load(),
|
|
|
PeakDownstreamBytesPerSecond: p.peakBytesDown.Load(),
|
|
|
}, tacticsNetworkID, compressTactics, nil
|