burst.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. /*
  2. * Copyright (c) 2020, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package common
  20. import (
  21. "net"
  22. "sync"
  23. "time"
  24. )
  25. // BurstMonitoredConn wraps a net.Conn and monitors for data transfer bursts.
  26. // Upstream (read) and downstream (write) bursts are tracked independently.
  27. //
  28. // A burst is defined as a transfer of "target" bytes, possibly across
  29. // multiple I/O operations, where the total time elapsed does not exceed
  30. // "deadline". Both a non-zero deadline and theshold must be set to enable
  31. // monitoring. Four bursts are reported: the first, the last, the min (by
  32. // rate) and max.
  33. //
  34. // The burst monitoring is heuristical in nature and may not capture all
  35. // bursts. The reported rates will be more accurate for larger target values
  36. // and shorter deadlines, but these settings may fail to register bursts on
  37. // slower connections. Tune the deadline/target as required. The threshold
  38. // should be set to account for buffering (e.g, the local host socket
  39. // send/receive buffer) but this is not enforced by BurstMonitoredConn.
  40. //
  41. // Overhead: BurstMonitoredConn adds mutexes but does not use timers.
  42. type BurstMonitoredConn struct {
  43. net.Conn
  44. isServer bool
  45. readTargetBytes int64
  46. readDeadline time.Duration
  47. writeTargetBytes int64
  48. writeDeadline time.Duration
  49. readMutex sync.Mutex
  50. currentReadBurst burst
  51. readBursts burstHistory
  52. writeMutex sync.Mutex
  53. currentWriteBurst burst
  54. writeBursts burstHistory
  55. }
  56. // NewBurstMonitoredConn creates a new BurstMonitoredConn.
  57. func NewBurstMonitoredConn(
  58. conn net.Conn,
  59. isServer bool,
  60. upstreamTargetBytes int64,
  61. upstreamDeadline time.Duration,
  62. downstreamTargetBytes int64,
  63. downstreamDeadline time.Duration) *BurstMonitoredConn {
  64. burstConn := &BurstMonitoredConn{
  65. Conn: conn,
  66. isServer: isServer,
  67. }
  68. if isServer {
  69. burstConn.readTargetBytes = upstreamTargetBytes
  70. burstConn.readDeadline = upstreamDeadline
  71. burstConn.writeTargetBytes = downstreamTargetBytes
  72. burstConn.writeDeadline = downstreamDeadline
  73. } else {
  74. burstConn.readTargetBytes = downstreamTargetBytes
  75. burstConn.readDeadline = downstreamDeadline
  76. burstConn.writeTargetBytes = upstreamTargetBytes
  77. burstConn.writeDeadline = upstreamDeadline
  78. }
  79. return burstConn
  80. }
  81. type burst struct {
  82. startTime time.Time
  83. endTime time.Time
  84. bytes int64
  85. }
  86. func (b *burst) isZero() bool {
  87. return b.startTime.IsZero()
  88. }
  89. func (b *burst) offset(baseTime time.Time) time.Duration {
  90. offset := b.startTime.Sub(baseTime)
  91. if offset <= 0 {
  92. return 0
  93. }
  94. return offset
  95. }
  96. func (b *burst) duration() time.Duration {
  97. duration := b.endTime.Sub(b.startTime)
  98. if duration <= 0 {
  99. return 0
  100. }
  101. return duration
  102. }
  103. func (b *burst) rate() int64 {
  104. duration := b.duration()
  105. if duration <= 0 {
  106. return 0
  107. }
  108. return int64(
  109. (float64(b.bytes) * float64(time.Second)) /
  110. float64(duration))
  111. }
  112. func (b *burst) reset() {
  113. b.startTime = time.Time{}
  114. b.endTime = time.Time{}
  115. b.bytes = 0
  116. }
  117. type burstHistory struct {
  118. first burst
  119. last burst
  120. min burst
  121. max burst
  122. }
  123. func (conn *BurstMonitoredConn) Read(buffer []byte) (int, error) {
  124. if conn.readTargetBytes <= 0 || conn.readDeadline <= 0 {
  125. return conn.Conn.Read(buffer)
  126. }
  127. start := time.Now()
  128. n, err := conn.Conn.Read(buffer)
  129. end := time.Now()
  130. if n > 0 {
  131. conn.readMutex.Lock()
  132. conn.updateBurst(
  133. start,
  134. end,
  135. int64(n),
  136. conn.readTargetBytes,
  137. conn.readDeadline,
  138. &conn.currentReadBurst,
  139. &conn.readBursts)
  140. conn.readMutex.Unlock()
  141. }
  142. // Note: no context error to preserve error type
  143. return n, err
  144. }
  145. func (conn *BurstMonitoredConn) Write(buffer []byte) (int, error) {
  146. if conn.writeTargetBytes <= 0 || conn.writeDeadline <= 0 {
  147. return conn.Conn.Write(buffer)
  148. }
  149. start := time.Now()
  150. n, err := conn.Conn.Write(buffer)
  151. end := time.Now()
  152. if n > 0 {
  153. conn.writeMutex.Lock()
  154. conn.updateBurst(
  155. start,
  156. end,
  157. int64(n),
  158. conn.writeTargetBytes,
  159. conn.writeDeadline,
  160. &conn.currentWriteBurst,
  161. &conn.writeBursts)
  162. conn.writeMutex.Unlock()
  163. }
  164. // Note: no context error to preserve error type
  165. return n, err
  166. }
  167. // IsClosed implements the Closer iterface. The return value indicates whether
  168. // the underlying conn has been closed.
  169. func (conn *BurstMonitoredConn) IsClosed() bool {
  170. closer, ok := conn.Conn.(Closer)
  171. if !ok {
  172. return false
  173. }
  174. return closer.IsClosed()
  175. }
  176. // GetMetrics returns log fields with burst metrics for the first, last, min
  177. // (by rate), and max bursts for this conn. Time/duration values are reported
  178. // in milliseconds. Rate is reported in bytes per second.
  179. func (conn *BurstMonitoredConn) GetMetrics(baseTime time.Time) LogFields {
  180. logFields := make(LogFields)
  181. addFields := func(prefix string, burst *burst) {
  182. if burst.isZero() {
  183. return
  184. }
  185. logFields[prefix+"offset"] = int64(burst.offset(baseTime) / time.Millisecond)
  186. logFields[prefix+"duration"] = int64(burst.duration() / time.Millisecond)
  187. logFields[prefix+"bytes"] = burst.bytes
  188. logFields[prefix+"rate"] = burst.rate()
  189. }
  190. addHistory := func(prefix string, history *burstHistory) {
  191. addFields(prefix+"first_", &history.first)
  192. addFields(prefix+"last_", &history.last)
  193. addFields(prefix+"min_", &history.min)
  194. addFields(prefix+"max_", &history.max)
  195. }
  196. var upstreamBursts *burstHistory
  197. var downstreamBursts *burstHistory
  198. if conn.isServer {
  199. upstreamBursts = &conn.readBursts
  200. downstreamBursts = &conn.writeBursts
  201. } else {
  202. upstreamBursts = &conn.writeBursts
  203. downstreamBursts = &conn.readBursts
  204. }
  205. addHistory("burst_upstream_", upstreamBursts)
  206. addHistory("burst_downstream_", downstreamBursts)
  207. return logFields
  208. }
  209. func (conn *BurstMonitoredConn) updateBurst(
  210. operationStart time.Time,
  211. operationEnd time.Time,
  212. operationBytes int64,
  213. thresholdBytes int64,
  214. deadline time.Duration,
  215. currentBurst *burst,
  216. history *burstHistory) {
  217. // Assumes the associated mutex is locked.
  218. if !currentBurst.isZero() &&
  219. operationEnd.Sub(currentBurst.startTime) > deadline {
  220. // Partial burst failed to reach the target, so discard it.
  221. currentBurst.reset()
  222. }
  223. if operationEnd.Sub(operationStart) > deadline {
  224. // Operation exceeded deadline, so no burst.
  225. return
  226. }
  227. if currentBurst.isZero() {
  228. // Start a new burst.
  229. currentBurst.startTime = operationStart
  230. }
  231. currentBurst.bytes += operationBytes
  232. if currentBurst.bytes >= thresholdBytes {
  233. // Burst completed. Bytes in excess of the target are included in the burst
  234. // for a more accurate rate calculation: we know, roughly, when the last
  235. // byte arrived, but not the last target byte. For the same reason, we do
  236. // not count the excess bytes towards a subsequent burst.
  237. currentBurst.endTime = operationEnd
  238. if history.first.isZero() {
  239. history.first = *currentBurst
  240. }
  241. history.last = *currentBurst
  242. rate := currentBurst.rate()
  243. if history.min.isZero() || history.min.rate() > rate {
  244. history.min = *currentBurst
  245. }
  246. if history.max.isZero() || history.max.rate() < rate {
  247. history.max = *currentBurst
  248. }
  249. currentBurst.reset()
  250. }
  251. }