|
|
@@ -28,29 +28,27 @@ import (
|
|
|
// BurstMonitoredConn wraps a net.Conn and monitors for data transfer bursts.
|
|
|
// Upstream (read) and downstream (write) bursts are tracked independently.
|
|
|
//
|
|
|
-// A burst is defined as a transfer of at least "threshold" bytes, across
|
|
|
-// multiple I/O operations where the delay between operations does not exceed
|
|
|
+// A burst is defined as a transfer of "target" bytes, possibly across
|
|
|
+// multiple I/O operations, where the total time elapsed does not exceed
|
|
|
// "deadline". Both a non-zero deadline and theshold must be set to enable
|
|
|
// monitoring. Four bursts are reported: the first, the last, the min (by
|
|
|
// rate) and max.
|
|
|
//
|
|
|
-// The reported rates will be more accurate for larger data transfers,
|
|
|
-// especially for higher transfer rates. Tune the deadline/threshold as
|
|
|
-// required. The threshold should be set to account for buffering (e.g, the
|
|
|
-// local host socket send/receive buffer) but this is not enforced by
|
|
|
-// BurstMonitoredConn.
|
|
|
-//
|
|
|
-// Close must be called to complete any outstanding bursts. For complete
|
|
|
-// results, call GetMetrics only after Close is called.
|
|
|
+// The burst monitoring is heuristical in nature and may not capture all
|
|
|
+// bursts. The reported rates will be more accurate for larger target values
|
|
|
+// and shorter deadlines, but these settings may fail to register bursts on
|
|
|
+// slower connections. Tune the deadline/target as required. The threshold
|
|
|
+// should be set to account for buffering (e.g, the local host socket
|
|
|
+// send/receive buffer) but this is not enforced by BurstMonitoredConn.
|
|
|
//
|
|
|
// Overhead: BurstMonitoredConn adds mutexes but does not use timers.
|
|
|
type BurstMonitoredConn struct {
|
|
|
net.Conn
|
|
|
- isServer bool
|
|
|
- readDeadline time.Duration
|
|
|
- readThresholdBytes int64
|
|
|
- writeDeadline time.Duration
|
|
|
- writeThresholdBytes int64
|
|
|
+ isServer bool
|
|
|
+ readTargetBytes int64
|
|
|
+ readDeadline time.Duration
|
|
|
+ writeTargetBytes int64
|
|
|
+ writeDeadline time.Duration
|
|
|
|
|
|
readMutex sync.Mutex
|
|
|
currentReadBurst burst
|
|
|
@@ -65,10 +63,10 @@ type BurstMonitoredConn struct {
|
|
|
func NewBurstMonitoredConn(
|
|
|
conn net.Conn,
|
|
|
isServer bool,
|
|
|
+ upstreamTargetBytes int64,
|
|
|
upstreamDeadline time.Duration,
|
|
|
- upstreamThresholdBytes int64,
|
|
|
- downstreamDeadline time.Duration,
|
|
|
- downstreamThresholdBytes int64) *BurstMonitoredConn {
|
|
|
+ downstreamTargetBytes int64,
|
|
|
+ downstreamDeadline time.Duration) *BurstMonitoredConn {
|
|
|
|
|
|
burstConn := &BurstMonitoredConn{
|
|
|
Conn: conn,
|
|
|
@@ -76,24 +74,24 @@ func NewBurstMonitoredConn(
|
|
|
}
|
|
|
|
|
|
if isServer {
|
|
|
+ burstConn.readTargetBytes = upstreamTargetBytes
|
|
|
burstConn.readDeadline = upstreamDeadline
|
|
|
- burstConn.readThresholdBytes = upstreamThresholdBytes
|
|
|
+ burstConn.writeTargetBytes = downstreamTargetBytes
|
|
|
burstConn.writeDeadline = downstreamDeadline
|
|
|
- burstConn.writeThresholdBytes = downstreamThresholdBytes
|
|
|
} else {
|
|
|
+ burstConn.readTargetBytes = downstreamTargetBytes
|
|
|
burstConn.readDeadline = downstreamDeadline
|
|
|
- burstConn.readThresholdBytes = downstreamThresholdBytes
|
|
|
+ burstConn.writeTargetBytes = upstreamTargetBytes
|
|
|
burstConn.writeDeadline = upstreamDeadline
|
|
|
- burstConn.writeThresholdBytes = upstreamThresholdBytes
|
|
|
}
|
|
|
|
|
|
return burstConn
|
|
|
}
|
|
|
|
|
|
type burst struct {
|
|
|
- startTime time.Time
|
|
|
- lastByteTime time.Time
|
|
|
- bytes int64
|
|
|
+ startTime time.Time
|
|
|
+ endTime time.Time
|
|
|
+ bytes int64
|
|
|
}
|
|
|
|
|
|
func (b *burst) isZero() bool {
|
|
|
@@ -109,7 +107,7 @@ func (b *burst) offset(baseTime time.Time) time.Duration {
|
|
|
}
|
|
|
|
|
|
func (b *burst) duration() time.Duration {
|
|
|
- duration := b.lastByteTime.Sub(b.startTime)
|
|
|
+ duration := b.endTime.Sub(b.startTime)
|
|
|
if duration <= 0 {
|
|
|
return 0
|
|
|
}
|
|
|
@@ -117,9 +115,19 @@ func (b *burst) duration() time.Duration {
|
|
|
}
|
|
|
|
|
|
func (b *burst) rate() int64 {
|
|
|
+ duration := b.duration()
|
|
|
+ if duration <= 0 {
|
|
|
+ return 0
|
|
|
+ }
|
|
|
return int64(
|
|
|
(float64(b.bytes) * float64(time.Second)) /
|
|
|
- float64(b.duration()))
|
|
|
+ float64(duration))
|
|
|
+}
|
|
|
+
|
|
|
+func (b *burst) reset() {
|
|
|
+ b.startTime = time.Time{}
|
|
|
+ b.endTime = time.Time{}
|
|
|
+ b.bytes = 0
|
|
|
}
|
|
|
|
|
|
type burstHistory struct {
|
|
|
@@ -131,7 +139,7 @@ type burstHistory struct {
|
|
|
|
|
|
func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
|
|
|
|
|
|
- if conn.readDeadline <= 0 || conn.readThresholdBytes <= 0 {
|
|
|
+ if conn.readTargetBytes <= 0 || conn.readDeadline <= 0 {
|
|
|
return conn.Conn.Read(buffer)
|
|
|
}
|
|
|
|
|
|
@@ -145,8 +153,8 @@ func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
|
|
|
start,
|
|
|
end,
|
|
|
int64(n),
|
|
|
+ conn.readTargetBytes,
|
|
|
conn.readDeadline,
|
|
|
- conn.readThresholdBytes,
|
|
|
&conn.currentReadBurst,
|
|
|
&conn.readBursts)
|
|
|
conn.readMutex.Unlock()
|
|
|
@@ -158,7 +166,7 @@ func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
|
|
|
|
|
|
func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
|
|
|
|
|
|
- if conn.writeDeadline <= 0 || conn.writeThresholdBytes <= 0 {
|
|
|
+ if conn.writeTargetBytes <= 0 || conn.writeDeadline <= 0 {
|
|
|
return conn.Conn.Write(buffer)
|
|
|
}
|
|
|
|
|
|
@@ -172,8 +180,8 @@ func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
|
|
|
start,
|
|
|
end,
|
|
|
int64(n),
|
|
|
+ conn.writeTargetBytes,
|
|
|
conn.writeDeadline,
|
|
|
- conn.writeThresholdBytes,
|
|
|
&conn.currentWriteBurst,
|
|
|
&conn.writeBursts)
|
|
|
conn.writeMutex.Unlock()
|
|
|
@@ -183,31 +191,6 @@ func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
|
|
|
return n, err
|
|
|
}
|
|
|
|
|
|
-func (conn *BurstMonitoredConn) Close() error {
|
|
|
- err := conn.Conn.Close()
|
|
|
-
|
|
|
- if conn.readDeadline > 0 && conn.readThresholdBytes > 0 {
|
|
|
- conn.readMutex.Lock()
|
|
|
- conn.endBurst(
|
|
|
- conn.readThresholdBytes,
|
|
|
- &conn.currentReadBurst,
|
|
|
- &conn.readBursts)
|
|
|
- conn.readMutex.Unlock()
|
|
|
- }
|
|
|
-
|
|
|
- if conn.writeDeadline > 0 && conn.writeThresholdBytes > 0 {
|
|
|
- conn.writeMutex.Lock()
|
|
|
- conn.endBurst(
|
|
|
- conn.writeThresholdBytes,
|
|
|
- &conn.currentWriteBurst,
|
|
|
- &conn.writeBursts)
|
|
|
- conn.writeMutex.Unlock()
|
|
|
- }
|
|
|
-
|
|
|
- // Note: no context error to preserve error type
|
|
|
- return err
|
|
|
-}
|
|
|
-
|
|
|
// IsClosed implements the Closer iterface. The return value indicates whether
|
|
|
// the underlying conn has been closed.
|
|
|
func (conn *BurstMonitoredConn) IsClosed() bool {
|
|
|
@@ -262,65 +245,51 @@ func (conn *BurstMonitoredConn) updateBurst(
|
|
|
operationStart time.Time,
|
|
|
operationEnd time.Time,
|
|
|
operationBytes int64,
|
|
|
- deadline time.Duration,
|
|
|
thresholdBytes int64,
|
|
|
+ deadline time.Duration,
|
|
|
currentBurst *burst,
|
|
|
history *burstHistory) {
|
|
|
|
|
|
// Assumes the associated mutex is locked.
|
|
|
|
|
|
- if currentBurst.isZero() {
|
|
|
- currentBurst.startTime = operationStart
|
|
|
- currentBurst.lastByteTime = operationEnd
|
|
|
- currentBurst.bytes = operationBytes
|
|
|
-
|
|
|
- } else {
|
|
|
-
|
|
|
- if operationStart.Sub(currentBurst.lastByteTime) >
|
|
|
- deadline {
|
|
|
-
|
|
|
- conn.endBurst(thresholdBytes, currentBurst, history)
|
|
|
- currentBurst.startTime = operationStart
|
|
|
- }
|
|
|
-
|
|
|
- currentBurst.lastByteTime = operationEnd
|
|
|
- currentBurst.bytes += operationBytes
|
|
|
+ if operationEnd.Sub(currentBurst.startTime) > deadline {
|
|
|
+ // Partial burst failed to reach the target, so discard it.
|
|
|
+ currentBurst.reset()
|
|
|
}
|
|
|
|
|
|
-}
|
|
|
-
|
|
|
-func (conn *BurstMonitoredConn) endBurst(
|
|
|
- thresholdBytes int64,
|
|
|
- currentBurst *burst,
|
|
|
- history *burstHistory) {
|
|
|
-
|
|
|
- // Assumes the associated mutex is locked.
|
|
|
-
|
|
|
- if currentBurst.isZero() {
|
|
|
+ if operationEnd.Sub(operationStart) > deadline {
|
|
|
+ // Operation exceeded deadline, so no burst.
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- burst := *currentBurst
|
|
|
+ if currentBurst.isZero() {
|
|
|
+ // Start a new burst.
|
|
|
+ currentBurst.startTime = operationStart
|
|
|
+ }
|
|
|
|
|
|
- currentBurst.startTime = time.Time{}
|
|
|
- currentBurst.lastByteTime = time.Time{}
|
|
|
- currentBurst.bytes = 0
|
|
|
+ currentBurst.bytes += operationBytes
|
|
|
|
|
|
- if burst.bytes < thresholdBytes {
|
|
|
- return
|
|
|
- }
|
|
|
+ if currentBurst.bytes >= thresholdBytes {
|
|
|
|
|
|
- if history.first.isZero() {
|
|
|
- history.first = burst
|
|
|
- }
|
|
|
+ // Burst completed. Bytes in excess of the target are included in the burst
|
|
|
+ // for a more accurate rate calculation: we know, roughly, when the last
|
|
|
+ // byte arrived, but not the last target byte. For the same reason, we do
|
|
|
+ // not count the excess bytes towards a subsequent burst.
|
|
|
|
|
|
- history.last = burst
|
|
|
+ currentBurst.endTime = operationEnd
|
|
|
|
|
|
- if history.min.isZero() || history.min.rate() > burst.rate() {
|
|
|
- history.min = burst
|
|
|
- }
|
|
|
+ if history.first.isZero() {
|
|
|
+ history.first = *currentBurst
|
|
|
+ }
|
|
|
+ history.last = *currentBurst
|
|
|
+ rate := currentBurst.rate()
|
|
|
+ if history.min.isZero() || history.min.rate() > rate {
|
|
|
+ history.min = *currentBurst
|
|
|
+ }
|
|
|
+ if history.max.isZero() || history.max.rate() < rate {
|
|
|
+ history.max = *currentBurst
|
|
|
+ }
|
|
|
|
|
|
- if history.max.isZero() || history.max.rate() < burst.rate() {
|
|
|
- history.max = burst
|
|
|
+ currentBurst.reset()
|
|
|
}
|
|
|
}
|