|
@@ -50,11 +50,13 @@ type Proxy struct {
|
|
|
bytesDown atomic.Int64
|
|
bytesDown atomic.Int64
|
|
|
peakBytesUp atomic.Int64
|
|
peakBytesUp atomic.Int64
|
|
|
peakBytesDown atomic.Int64
|
|
peakBytesDown atomic.Int64
|
|
|
- connectingClients int32
|
|
|
|
|
- connectedClients int32
|
|
|
|
|
|
|
+ announcing atomic.Int32
|
|
|
|
|
+ connectingClients atomic.Int32
|
|
|
|
|
+ connectedClients atomic.Int32
|
|
|
|
|
|
|
|
config *ProxyConfig
|
|
config *ProxyConfig
|
|
|
activityUpdateWrapper *activityUpdateWrapper
|
|
activityUpdateWrapper *activityUpdateWrapper
|
|
|
|
|
+ lastAnnouncing int32
|
|
|
lastConnectingClients int32
|
|
lastConnectingClients int32
|
|
|
lastConnectedClients int32
|
|
lastConnectedClients int32
|
|
|
|
|
|
|
@@ -65,6 +67,10 @@ type Proxy struct {
|
|
|
nextAnnounceMutex sync.Mutex
|
|
nextAnnounceMutex sync.Mutex
|
|
|
nextAnnounceBrokerClient *BrokerClient
|
|
nextAnnounceBrokerClient *BrokerClient
|
|
|
nextAnnounceNotBefore time.Time
|
|
nextAnnounceNotBefore time.Time
|
|
|
|
|
+
|
|
|
|
|
+ useReducedSettings bool
|
|
|
|
|
+ reducedStartMinute int
|
|
|
|
|
+ reducedEndMinute int
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
|
|
// TODO: add PublicNetworkAddress/ListenNetworkAddress to facilitate manually
|
|
@@ -144,17 +150,49 @@ type ProxyConfig struct {
|
|
|
// for a single client. When 0, there is no limit.
|
|
// for a single client. When 0, there is no limit.
|
|
|
LimitDownstreamBytesPerSecond int
|
|
LimitDownstreamBytesPerSecond int
|
|
|
|
|
|
|
|
|
|
+ // ReducedStartTime specifies the local time of day (HH:MM, 24-hour, UTC)
|
|
|
|
|
+ // at which reduced client settings begin.
|
|
|
|
|
+ ReducedStartTime string
|
|
|
|
|
+
|
|
|
|
|
+ // ReducedEndTime specifies the local time of day (HH:MM, 24-hour, UTC) at
|
|
|
|
|
+ // which reduced client settings end.
|
|
|
|
|
+ ReducedEndTime string
|
|
|
|
|
+
|
|
|
|
|
+ // ReducedMaxClients specifies the maximum number of clients that are
|
|
|
|
|
+ // allowed to connect to the proxy during the reduced time range.
|
|
|
|
|
+ //
|
|
|
|
|
+ // Clients connected when the reduced settings begin will not be
|
|
|
|
|
+ // disconnected.
|
|
|
|
|
+ ReducedMaxClients int
|
|
|
|
|
+
|
|
|
|
|
+ // ReducedLimitUpstreamBytesPerSecond limits the upstream data transfer
|
|
|
|
|
+ // rate for a single client during the reduced time range. When 0,
|
|
|
|
|
+ // LimitUpstreamBytesPerSecond is the limit.
|
|
|
|
|
+ //
|
|
|
|
|
+ // Rates for clients already connected when the reduced settings begin or
|
|
|
|
|
+ // end will not change.
|
|
|
|
|
+ ReducedLimitUpstreamBytesPerSecond int
|
|
|
|
|
+
|
|
|
|
|
+ // ReducedLimitDownstreamBytesPerSecond limits the downstream data
|
|
|
|
|
+ // transfer rate for a single client during the reduced time range. When
|
|
|
|
|
+ // 0, LimitDownstreamBytesPerSecond is the limit.
|
|
|
|
|
+ //
|
|
|
|
|
+ // Rates for clients already connected when the reduced settings begin or
|
|
|
|
|
+ // end will not change.
|
|
|
|
|
+ ReducedLimitDownstreamBytesPerSecond int
|
|
|
|
|
+
|
|
|
// ActivityUpdater specifies an ActivityUpdater for activity associated
|
|
// ActivityUpdater specifies an ActivityUpdater for activity associated
|
|
|
// with this proxy.
|
|
// with this proxy.
|
|
|
ActivityUpdater ActivityUpdater
|
|
ActivityUpdater ActivityUpdater
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// ActivityUpdater is a callback that is invoked when clients connect and
|
|
|
|
|
-// disconnect and periodically with data transfer updates (unless idle). This
|
|
|
|
|
-// callback may be used to update an activity UI. This callback should post
|
|
|
|
|
-// this data to another thread or handler and return immediately and not
|
|
|
|
|
-// block on UI updates.
|
|
|
|
|
|
|
+// ActivityUpdater is a callback that is invoked when the proxy announces
|
|
|
|
|
+// availability, when clients connect and disconnect, and periodically with
|
|
|
|
|
+// data transfer updates (unless idle). This callback may be used to update
|
|
|
|
|
+// an activity UI. This callback should post this data to another thread or
|
|
|
|
|
+// handler and return immediately and not block on UI updates.
|
|
|
type ActivityUpdater func(
|
|
type ActivityUpdater func(
|
|
|
|
|
+ announcing int32,
|
|
|
connectingClients int32,
|
|
connectingClients int32,
|
|
|
connectedClients int32,
|
|
connectedClients int32,
|
|
|
bytesUp int64,
|
|
bytesUp int64,
|
|
@@ -172,6 +210,34 @@ func NewProxy(config *ProxyConfig) (*Proxy, error) {
|
|
|
config: config,
|
|
config: config,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ if config.ReducedStartTime != "" ||
|
|
|
|
|
+ config.ReducedEndTime != "" ||
|
|
|
|
|
+ config.ReducedMaxClients > 0 {
|
|
|
|
|
+
|
|
|
|
|
+ startMinute, err := common.ParseTimeOfDayMinutes(config.ReducedStartTime)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, errors.Tracef("invalid ReducedStartTime: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ endMinute, err := common.ParseTimeOfDayMinutes(config.ReducedEndTime)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, errors.Tracef("invalid ReducedEndTime: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if startMinute == endMinute {
|
|
|
|
|
+ return nil, errors.TraceNew("invalid ReducedStartTime/ReducedEndTime")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if config.ReducedMaxClients <= 0 ||
|
|
|
|
|
+ config.ReducedMaxClients > config.MaxClients {
|
|
|
|
|
+ return nil, errors.TraceNew("invalid ReducedMaxClients")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ p.useReducedSettings = true
|
|
|
|
|
+ p.reducedStartMinute = startMinute
|
|
|
|
|
+ p.reducedEndMinute = endMinute
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
p.activityUpdateWrapper = &activityUpdateWrapper{p: p}
|
|
p.activityUpdateWrapper = &activityUpdateWrapper{p: p}
|
|
|
|
|
|
|
|
return p, nil
|
|
return p, nil
|
|
@@ -206,12 +272,39 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
|
|
|
|
|
proxyWaitGroup := new(sync.WaitGroup)
|
|
proxyWaitGroup := new(sync.WaitGroup)
|
|
|
|
|
|
|
|
|
|
+ // Capture activity updates every second, which is the required frequency
|
|
|
|
|
+ // for PeakUp/DownstreamBytesPerSecond. This is also a reasonable
|
|
|
|
|
+ // frequency for invoking the ActivityUpdater and updating UI widgets.
|
|
|
|
|
+
|
|
|
|
|
+ proxyWaitGroup.Add(1)
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ defer proxyWaitGroup.Done()
|
|
|
|
|
+
|
|
|
|
|
+ p.lastAnnouncing = 0
|
|
|
|
|
+ p.lastConnectingClients = 0
|
|
|
|
|
+ p.lastConnectedClients = 0
|
|
|
|
|
+
|
|
|
|
|
+ activityUpdatePeriod := 1 * time.Second
|
|
|
|
|
+ ticker := time.NewTicker(activityUpdatePeriod)
|
|
|
|
|
+ defer ticker.Stop()
|
|
|
|
|
+
|
|
|
|
|
+ loop:
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-ticker.C:
|
|
|
|
|
+ p.activityUpdate(activityUpdatePeriod)
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ break loop
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
// Launch the first proxy worker, passing a signal to be triggered once
|
|
// Launch the first proxy worker, passing a signal to be triggered once
|
|
|
// the very first announcement round trip is complete. The first round
|
|
// the very first announcement round trip is complete. The first round
|
|
|
// trip is awaited so that:
|
|
// trip is awaited so that:
|
|
|
//
|
|
//
|
|
|
// - The first announce response will arrive with any new tactics,
|
|
// - The first announce response will arrive with any new tactics,
|
|
|
- // which may be applied before launching additions workers.
|
|
|
|
|
|
|
+ // which may be applied before launching additional workers.
|
|
|
//
|
|
//
|
|
|
// - The first worker gets no announcement delay and is also guaranteed to
|
|
// - The first worker gets no announcement delay and is also guaranteed to
|
|
|
// be the shared session establisher. Since the announcement delays are
|
|
// be the shared session establisher. Since the announcement delays are
|
|
@@ -228,7 +321,7 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
proxyWaitGroup.Add(1)
|
|
proxyWaitGroup.Add(1)
|
|
|
go func() {
|
|
go func() {
|
|
|
defer proxyWaitGroup.Done()
|
|
defer proxyWaitGroup.Done()
|
|
|
- p.proxyClients(ctx, signalFirstAnnounceDone)
|
|
|
|
|
|
|
+ p.proxyClients(ctx, signalFirstAnnounceDone, false)
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
select {
|
|
@@ -240,80 +333,55 @@ func (p *Proxy) Run(ctx context.Context) {
|
|
|
// Launch the remaining workers.
|
|
// Launch the remaining workers.
|
|
|
|
|
|
|
|
for i := 0; i < p.config.MaxClients-1; i++ {
|
|
for i := 0; i < p.config.MaxClients-1; i++ {
|
|
|
- proxyWaitGroup.Add(1)
|
|
|
|
|
- go func() {
|
|
|
|
|
- defer proxyWaitGroup.Done()
|
|
|
|
|
- p.proxyClients(ctx, nil)
|
|
|
|
|
- }()
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- // Capture activity updates every second, which is the required frequency
|
|
|
|
|
- // for PeakUp/DownstreamBytesPerSecond. This is also a reasonable
|
|
|
|
|
- // frequency for invoking the ActivityUpdater and updating UI widgets.
|
|
|
|
|
-
|
|
|
|
|
- p.lastConnectingClients = 0
|
|
|
|
|
- p.lastConnectedClients = 0
|
|
|
|
|
|
|
+ // When reduced settings are in effect, a subset of workers will pause
|
|
|
|
|
+ // during the reduced time period. Since ReducedMaxClients > 0 the
|
|
|
|
|
+ // first proxy worker is never paused.
|
|
|
|
|
+ workerNum := i + 1
|
|
|
|
|
+ reducedPause := p.useReducedSettings &&
|
|
|
|
|
+ workerNum >= p.config.ReducedMaxClients
|
|
|
|
|
|
|
|
- activityUpdatePeriod := 1 * time.Second
|
|
|
|
|
- ticker := time.NewTicker(activityUpdatePeriod)
|
|
|
|
|
- defer ticker.Stop()
|
|
|
|
|
-
|
|
|
|
|
-loop:
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case <-ticker.C:
|
|
|
|
|
- p.activityUpdate(activityUpdatePeriod)
|
|
|
|
|
- case <-ctx.Done():
|
|
|
|
|
- break loop
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ proxyWaitGroup.Add(1)
|
|
|
|
|
+ go func(reducedPause bool) {
|
|
|
|
|
+ defer proxyWaitGroup.Done()
|
|
|
|
|
+ p.proxyClients(ctx, nil, reducedPause)
|
|
|
|
|
+ }(reducedPause)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
proxyWaitGroup.Wait()
|
|
proxyWaitGroup.Wait()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// getAnnounceDelayParameters is a helper that fetches the proxy announcement
|
|
|
|
|
-// delay parameters from the current broker client.
|
|
|
|
|
-//
|
|
|
|
|
-// getAnnounceDelayParameters is used to configure a delay when
|
|
|
|
|
-// proxyOneClient fails. As having no broker clients is a possible
|
|
|
|
|
-// proxyOneClient failure case, GetBrokerClient errors are ignored here and
|
|
|
|
|
-// defaults used in that case.
|
|
|
|
|
-func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, float64) {
|
|
|
|
|
- brokerClient, err := p.config.GetBrokerClient()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return proxyAnnounceDelay, proxyAnnounceMaxBackoffDelay, proxyAnnounceDelayJitter
|
|
|
|
|
- }
|
|
|
|
|
- brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
|
|
|
|
|
- return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
|
|
|
|
|
- common.ValueOrDefault(brokerCoordinator.AnnounceMaxBackoffDelay(), proxyAnnounceMaxBackoffDelay),
|
|
|
|
|
- common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
|
|
|
|
|
-
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func (p *Proxy) activityUpdate(period time.Duration) {
|
|
func (p *Proxy) activityUpdate(period time.Duration) {
|
|
|
|
|
|
|
|
- connectingClients := atomic.LoadInt32(&p.connectingClients)
|
|
|
|
|
- connectedClients := atomic.LoadInt32(&p.connectedClients)
|
|
|
|
|
|
|
+ // Concurrency: activityUpdate is called by only the single goroutine
|
|
|
|
|
+ // created in Run.
|
|
|
|
|
+
|
|
|
|
|
+ announcing := p.announcing.Load()
|
|
|
|
|
+ connectingClients := p.connectingClients.Load()
|
|
|
|
|
+ connectedClients := p.connectedClients.Load()
|
|
|
bytesUp := p.bytesUp.Swap(0)
|
|
bytesUp := p.bytesUp.Swap(0)
|
|
|
bytesDown := p.bytesDown.Swap(0)
|
|
bytesDown := p.bytesDown.Swap(0)
|
|
|
|
|
|
|
|
greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
|
|
greaterThanSwapInt64(&p.peakBytesUp, bytesUp)
|
|
|
greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
|
|
greaterThanSwapInt64(&p.peakBytesDown, bytesDown)
|
|
|
|
|
|
|
|
- clientsChanged := connectingClients != p.lastConnectingClients ||
|
|
|
|
|
|
|
+ stateChanged := announcing != p.lastAnnouncing ||
|
|
|
|
|
+ connectingClients != p.lastConnectingClients ||
|
|
|
connectedClients != p.lastConnectedClients
|
|
connectedClients != p.lastConnectedClients
|
|
|
|
|
|
|
|
|
|
+ p.lastAnnouncing = announcing
|
|
|
p.lastConnectingClients = connectingClients
|
|
p.lastConnectingClients = connectingClients
|
|
|
p.lastConnectedClients = connectedClients
|
|
p.lastConnectedClients = connectedClients
|
|
|
|
|
|
|
|
- if !clientsChanged &&
|
|
|
|
|
|
|
+ if !stateChanged &&
|
|
|
bytesUp == 0 &&
|
|
bytesUp == 0 &&
|
|
|
bytesDown == 0 {
|
|
bytesDown == 0 {
|
|
|
- // Skip the activity callback on idle bytes or no change in client counts.
|
|
|
|
|
|
|
+ // Skip the activity callback on idle bytes or no change in worker state.
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
p.config.ActivityUpdater(
|
|
p.config.ActivityUpdater(
|
|
|
|
|
+ announcing,
|
|
|
connectingClients,
|
|
connectingClients,
|
|
|
connectedClients,
|
|
connectedClients,
|
|
|
bytesUp,
|
|
bytesUp,
|
|
@@ -333,8 +401,93 @@ func greaterThanSwapInt64(addr *atomic.Int64, new int64) bool {
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (p *Proxy) isReducedUntil() (int, time.Time) {
|
|
|
|
|
+ if !p.useReducedSettings {
|
|
|
|
|
+ return p.config.MaxClients, time.Time{}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ now := time.Now().UTC()
|
|
|
|
|
+ minute := now.Hour()*60 + now.Minute()
|
|
|
|
|
+
|
|
|
|
|
+ isReduced := false
|
|
|
|
|
+ if p.reducedStartMinute < p.reducedEndMinute {
|
|
|
|
|
+ isReduced = minute >= p.reducedStartMinute && minute < p.reducedEndMinute
|
|
|
|
|
+ } else {
|
|
|
|
|
+ isReduced = minute >= p.reducedStartMinute || minute < p.reducedEndMinute
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if !isReduced {
|
|
|
|
|
+ return p.config.MaxClients, time.Time{}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ endHour := p.reducedEndMinute / 60
|
|
|
|
|
+ endMinute := p.reducedEndMinute % 60
|
|
|
|
|
+ endTime := time.Date(
|
|
|
|
|
+ now.Year(),
|
|
|
|
|
+ now.Month(),
|
|
|
|
|
+ now.Day(),
|
|
|
|
|
+ endHour,
|
|
|
|
|
+ endMinute,
|
|
|
|
|
+ 0,
|
|
|
|
|
+ 0,
|
|
|
|
|
+ now.Location(),
|
|
|
|
|
+ )
|
|
|
|
|
+ if !endTime.After(now) {
|
|
|
|
|
+ endTime = endTime.AddDate(0, 0, 1)
|
|
|
|
|
+ }
|
|
|
|
|
+ return p.config.ReducedMaxClients, endTime
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (p *Proxy) getLimits() (int, common.RateLimits) {
|
|
|
|
|
+
|
|
|
|
|
+ rateLimits := common.RateLimits{
|
|
|
|
|
+ ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
|
|
|
+ WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ maxClients, reducedUntil := p.isReducedUntil()
|
|
|
|
|
+ if !reducedUntil.IsZero() {
|
|
|
|
|
+
|
|
|
|
|
+ upstream := p.config.ReducedLimitUpstreamBytesPerSecond
|
|
|
|
|
+ if upstream == 0 {
|
|
|
|
|
+ upstream = p.config.LimitUpstreamBytesPerSecond
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ downstream := p.config.ReducedLimitDownstreamBytesPerSecond
|
|
|
|
|
+ if downstream == 0 {
|
|
|
|
|
+ downstream = p.config.LimitDownstreamBytesPerSecond
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ rateLimits = common.RateLimits{
|
|
|
|
|
+ ReadBytesPerSecond: int64(upstream),
|
|
|
|
|
+ WriteBytesPerSecond: int64(downstream),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return maxClients, rateLimits
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// getAnnounceDelayParameters is a helper that fetches the proxy announcement
|
|
|
|
|
+// delay parameters from the current broker client.
|
|
|
|
|
+//
|
|
|
|
|
+// getAnnounceDelayParameters is used to configure a delay when
|
|
|
|
|
+// proxyOneClient fails. As having no broker clients is a possible
|
|
|
|
|
+// proxyOneClient failure case, GetBrokerClient errors are ignored here and
|
|
|
|
|
+// defaults used in that case.
|
|
|
|
|
+func (p *Proxy) getAnnounceDelayParameters() (time.Duration, time.Duration, float64) {
|
|
|
|
|
+ brokerClient, err := p.config.GetBrokerClient()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return proxyAnnounceDelay, proxyAnnounceMaxBackoffDelay, proxyAnnounceDelayJitter
|
|
|
|
|
+ }
|
|
|
|
|
+ brokerCoordinator := brokerClient.GetBrokerDialCoordinator()
|
|
|
|
|
+ return common.ValueOrDefault(brokerCoordinator.AnnounceDelay(), proxyAnnounceDelay),
|
|
|
|
|
+ common.ValueOrDefault(brokerCoordinator.AnnounceMaxBackoffDelay(), proxyAnnounceMaxBackoffDelay),
|
|
|
|
|
+ common.ValueOrDefault(brokerCoordinator.AnnounceDelayJitter(), proxyAnnounceDelayJitter)
|
|
|
|
|
+
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (p *Proxy) proxyClients(
|
|
func (p *Proxy) proxyClients(
|
|
|
- ctx context.Context, signalAnnounceDone func()) {
|
|
|
|
|
|
|
+ ctx context.Context, signalAnnounceDone func(), reducedPause bool) {
|
|
|
|
|
|
|
|
// Proxy one client, repeating until ctx is done.
|
|
// Proxy one client, repeating until ctx is done.
|
|
|
//
|
|
//
|
|
@@ -381,6 +534,31 @@ func (p *Proxy) proxyClients(
|
|
|
break
|
|
break
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Pause designated workers during the reduced time range. In-flight
|
|
|
|
|
+ // announces are not interrupted and connected clients are not
|
|
|
|
|
+ // disconnected, so there is a gradual transition into reduced mode.
|
|
|
|
|
+
|
|
|
|
|
+ if reducedPause {
|
|
|
|
|
+ _, reducedUntil := p.isReducedUntil()
|
|
|
|
|
+ if !reducedUntil.IsZero() {
|
|
|
|
|
+
|
|
|
|
|
+ pauseDuration := time.Until(reducedUntil)
|
|
|
|
|
+ p.config.Logger.WithTraceFields(common.LogFields{
|
|
|
|
|
+ "duration": pauseDuration.String(),
|
|
|
|
|
+ }).Info("pause worker")
|
|
|
|
|
+
|
|
|
|
|
+ timer := time.NewTimer(pauseDuration)
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-timer.C:
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ }
|
|
|
|
|
+ timer.Stop()
|
|
|
|
|
+ if ctx.Err() != nil {
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
if time.Since(startLogSampleTime) >= proxyAnnounceLogSamplePeriod {
|
|
if time.Since(startLogSampleTime) >= proxyAnnounceLogSamplePeriod {
|
|
|
logAnnounceCount = proxyAnnounceLogSampleSize
|
|
logAnnounceCount = proxyAnnounceLogSampleSize
|
|
|
logErrorsCount = proxyAnnounceLogSampleSize
|
|
logErrorsCount = proxyAnnounceLogSampleSize
|
|
@@ -590,6 +768,8 @@ func (p *Proxy) proxyOneClient(
|
|
|
// for tactics.
|
|
// for tactics.
|
|
|
checkTactics := signalAnnounceDone != nil
|
|
checkTactics := signalAnnounceDone != nil
|
|
|
|
|
|
|
|
|
|
+ maxClients, rateLimits := p.getLimits()
|
|
|
|
|
+
|
|
|
// Get the base Psiphon API parameters and additional proxy metrics,
|
|
// Get the base Psiphon API parameters and additional proxy metrics,
|
|
|
// including performance information, which is sent to the broker in the
|
|
// including performance information, which is sent to the broker in the
|
|
|
// proxy announcment.
|
|
// proxy announcment.
|
|
@@ -601,7 +781,7 @@ func (p *Proxy) proxyOneClient(
|
|
|
// with the original network ID.
|
|
// with the original network ID.
|
|
|
|
|
|
|
|
metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
|
|
metrics, tacticsNetworkID, compressTactics, err := p.getMetrics(
|
|
|
- checkTactics, brokerCoordinator, webRTCCoordinator)
|
|
|
|
|
|
|
+ checkTactics, brokerCoordinator, webRTCCoordinator, maxClients, rateLimits)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return backOff, errors.Trace(err)
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
}
|
|
@@ -652,6 +832,9 @@ func (p *Proxy) proxyOneClient(
|
|
|
//
|
|
//
|
|
|
// ProxyAnnounce applies an additional request timeout to facilitate
|
|
// ProxyAnnounce applies an additional request timeout to facilitate
|
|
|
// long-polling.
|
|
// long-polling.
|
|
|
|
|
+
|
|
|
|
|
+ p.announcing.Add(1)
|
|
|
|
|
+
|
|
|
announceStartTime := time.Now()
|
|
announceStartTime := time.Now()
|
|
|
personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
|
|
personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()
|
|
|
announceResponse, err := brokerClient.ProxyAnnounce(
|
|
announceResponse, err := brokerClient.ProxyAnnounce(
|
|
@@ -668,6 +851,9 @@ func (p *Proxy) proxyOneClient(
|
|
|
"elapsedTime": time.Since(announceStartTime).String(),
|
|
"elapsedTime": time.Since(announceStartTime).String(),
|
|
|
}).Info("announcement request")
|
|
}).Info("announcement request")
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ p.announcing.Add(-1)
|
|
|
|
|
+
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return backOff, errors.Trace(err)
|
|
return backOff, errors.Trace(err)
|
|
|
}
|
|
}
|
|
@@ -743,11 +929,11 @@ func (p *Proxy) proxyOneClient(
|
|
|
|
|
|
|
|
// For activity updates, indicate that a client connection is now underway.
|
|
// For activity updates, indicate that a client connection is now underway.
|
|
|
|
|
|
|
|
- atomic.AddInt32(&p.connectingClients, 1)
|
|
|
|
|
|
|
+ p.connectingClients.Add(1)
|
|
|
connected := false
|
|
connected := false
|
|
|
defer func() {
|
|
defer func() {
|
|
|
if !connected {
|
|
if !connected {
|
|
|
- atomic.AddInt32(&p.connectingClients, -1)
|
|
|
|
|
|
|
+ p.connectingClients.Add(-1)
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
@@ -881,10 +1067,10 @@ func (p *Proxy) proxyOneClient(
|
|
|
// For activity updates, indicate that a client connection is established.
|
|
// For activity updates, indicate that a client connection is established.
|
|
|
|
|
|
|
|
connected = true
|
|
connected = true
|
|
|
- atomic.AddInt32(&p.connectingClients, -1)
|
|
|
|
|
- atomic.AddInt32(&p.connectedClients, 1)
|
|
|
|
|
|
|
+ p.connectingClients.Add(-1)
|
|
|
|
|
+ p.connectedClients.Add(1)
|
|
|
defer func() {
|
|
defer func() {
|
|
|
- atomic.AddInt32(&p.connectedClients, -1)
|
|
|
|
|
|
|
+ p.connectedClients.Add(-1)
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
// Throttle the relay connection.
|
|
// Throttle the relay connection.
|
|
@@ -895,14 +1081,15 @@ func (p *Proxy) proxyOneClient(
|
|
|
// generated by dividing the limit by MaxClients. This approach favors
|
|
// generated by dividing the limit by MaxClients. This approach favors
|
|
|
// performance stability: each client gets the same throttling limits
|
|
// performance stability: each client gets the same throttling limits
|
|
|
// regardless of how many other clients are connected.
|
|
// regardless of how many other clients are connected.
|
|
|
|
|
+ //
|
|
|
|
|
+ // Rate limits are applied only when a client connection is established;
|
|
|
|
|
+ // connected clients retain their initial limits even when reduced time
|
|
|
|
|
+ // starts or ends.
|
|
|
|
|
|
|
|
destinationConn = common.NewThrottledConn(
|
|
destinationConn = common.NewThrottledConn(
|
|
|
destinationConn,
|
|
destinationConn,
|
|
|
announceResponse.NetworkProtocol.IsStream(),
|
|
announceResponse.NetworkProtocol.IsStream(),
|
|
|
- common.RateLimits{
|
|
|
|
|
- ReadBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
|
|
|
- WriteBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ rateLimits)
|
|
|
|
|
|
|
|
// Hook up bytes transferred counting for activity updates.
|
|
// Hook up bytes transferred counting for activity updates.
|
|
|
|
|
|
|
@@ -1012,7 +1199,9 @@ func (p *Proxy) proxyOneClient(
|
|
|
func (p *Proxy) getMetrics(
|
|
func (p *Proxy) getMetrics(
|
|
|
includeTacticsParameters bool,
|
|
includeTacticsParameters bool,
|
|
|
brokerCoordinator BrokerDialCoordinator,
|
|
brokerCoordinator BrokerDialCoordinator,
|
|
|
- webRTCCoordinator WebRTCDialCoordinator) (
|
|
|
|
|
|
|
+ webRTCCoordinator WebRTCDialCoordinator,
|
|
|
|
|
+ maxClients int,
|
|
|
|
|
+ rateLimits common.RateLimits) (
|
|
|
*ProxyMetrics, string, bool, error) {
|
|
*ProxyMetrics, string, bool, error) {
|
|
|
|
|
|
|
|
// tacticsNetworkID records the exact network ID that corresponds to the
|
|
// tacticsNetworkID records the exact network ID that corresponds to the
|
|
@@ -1040,11 +1229,11 @@ func (p *Proxy) getMetrics(
|
|
|
ProtocolVersion: LatestProtocolVersion,
|
|
ProtocolVersion: LatestProtocolVersion,
|
|
|
NATType: webRTCCoordinator.NATType(),
|
|
NATType: webRTCCoordinator.NATType(),
|
|
|
PortMappingTypes: webRTCCoordinator.PortMappingTypes(),
|
|
PortMappingTypes: webRTCCoordinator.PortMappingTypes(),
|
|
|
- MaxClients: int32(p.config.MaxClients),
|
|
|
|
|
- ConnectingClients: atomic.LoadInt32(&p.connectingClients),
|
|
|
|
|
- ConnectedClients: atomic.LoadInt32(&p.connectedClients),
|
|
|
|
|
- LimitUpstreamBytesPerSecond: int64(p.config.LimitUpstreamBytesPerSecond),
|
|
|
|
|
- LimitDownstreamBytesPerSecond: int64(p.config.LimitDownstreamBytesPerSecond),
|
|
|
|
|
|
|
+ MaxClients: int32(maxClients),
|
|
|
|
|
+ ConnectingClients: p.connectingClients.Load(),
|
|
|
|
|
+ ConnectedClients: p.connectedClients.Load(),
|
|
|
|
|
+ LimitUpstreamBytesPerSecond: rateLimits.ReadBytesPerSecond,
|
|
|
|
|
+ LimitDownstreamBytesPerSecond: rateLimits.WriteBytesPerSecond,
|
|
|
PeakUpstreamBytesPerSecond: p.peakBytesUp.Load(),
|
|
PeakUpstreamBytesPerSecond: p.peakBytesUp.Load(),
|
|
|
PeakDownstreamBytesPerSecond: p.peakBytesDown.Load(),
|
|
PeakDownstreamBytesPerSecond: p.peakBytesDown.Load(),
|
|
|
}, tacticsNetworkID, compressTactics, nil
|
|
}, tacticsNetworkID, compressTactics, nil
|