throttled.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package common
  20. import (
  21. "net"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  26. "golang.org/x/time/rate"
  27. )
  28. // RateLimits specify the rate limits for a ThrottledConn.
  29. type RateLimits struct {
  30. // ReadUnthrottledBytes specifies the number of bytes to
  31. // read, approximately, before starting rate limiting.
  32. ReadUnthrottledBytes int64
  33. // ReadBytesPerSecond specifies a rate limit for read
  34. // data transfer. The default, 0, is no limit.
  35. ReadBytesPerSecond int64
  36. // WriteUnthrottledBytes specifies the number of bytes to
  37. // write, approximately, before starting rate limiting.
  38. WriteUnthrottledBytes int64
  39. // WriteBytesPerSecond specifies a rate limit for write
  40. // data transfer. The default, 0, is no limit.
  41. WriteBytesPerSecond int64
  42. // CloseAfterExhausted indicates that the underlying
  43. // net.Conn should be closed once either the read or
  44. // write unthrottled bytes have been exhausted. In this
  45. // case, throttling is never applied.
  46. CloseAfterExhausted bool
  47. }
  48. // ThrottledConn wraps a net.Conn with read and write rate limiters.
  49. // Rates are specified as bytes per second. Optional unlimited byte
  50. // counts allow for a number of bytes to read or write before
  51. // applying rate limiting. Specify limit values of 0 to set no rate
  52. // limit (unlimited counts are ignored in this case).
  53. // The underlying rate limiter uses the token bucket algorithm to
  54. // calculate delay times for read and write operations.
  55. type ThrottledConn struct {
  56. // Note: 64-bit ints used with atomic operations are placed
  57. // at the start of struct to ensure 64-bit alignment.
  58. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  59. readUnthrottledBytes int64
  60. readBytesPerSecond int64
  61. writeUnthrottledBytes int64
  62. writeBytesPerSecond int64
  63. closeAfterExhausted int32
  64. readLock sync.Mutex
  65. readRateLimiter *rate.Limiter
  66. readDelayTimer *time.Timer
  67. writeLock sync.Mutex
  68. writeRateLimiter *rate.Limiter
  69. writeDelayTimer *time.Timer
  70. isClosed int32
  71. stopBroadcast chan struct{}
  72. isStream bool
  73. net.Conn
  74. }
  75. // NewThrottledConn initializes a new ThrottledConn.
  76. //
  77. // Set isStreamConn to true when conn is stream-oriented, such as TCP, and
  78. // false when the conn is packet-oriented, such as UDP. When conn is a
  79. // stream, reads and writes may be split to accomodate rate limits.
  80. func NewThrottledConn(
  81. conn net.Conn, isStream bool, limits RateLimits) *ThrottledConn {
  82. throttledConn := &ThrottledConn{
  83. Conn: conn,
  84. isStream: isStream,
  85. stopBroadcast: make(chan struct{}),
  86. }
  87. throttledConn.SetLimits(limits)
  88. return throttledConn
  89. }
  90. // SetLimits modifies the rate limits of an existing
  91. // ThrottledConn. It is safe to call SetLimits while
  92. // other goroutines are calling Read/Write. This function
  93. // will not block, and the new rate limits will be
  94. // applied within Read/Write, but not necessarily until
  95. // some further I/O at previous rates.
  96. func (conn *ThrottledConn) SetLimits(limits RateLimits) {
  97. // Using atomic instead of mutex to avoid blocking
  98. // this function on throttled I/O in an ongoing
  99. // read or write. Precise synchronized application
  100. // of the rate limit values is not required.
  101. // Negative rates are invalid and -1 is a special
  102. // value to used to signal throttling initialized
  103. // state. Silently normalize negative values to 0.
  104. rate := limits.ReadBytesPerSecond
  105. if rate < 0 {
  106. rate = 0
  107. }
  108. atomic.StoreInt64(&conn.readBytesPerSecond, rate)
  109. atomic.StoreInt64(&conn.readUnthrottledBytes, limits.ReadUnthrottledBytes)
  110. rate = limits.WriteBytesPerSecond
  111. if rate < 0 {
  112. rate = 0
  113. }
  114. atomic.StoreInt64(&conn.writeBytesPerSecond, rate)
  115. atomic.StoreInt64(&conn.writeUnthrottledBytes, limits.WriteUnthrottledBytes)
  116. closeAfterExhausted := int32(0)
  117. if limits.CloseAfterExhausted {
  118. closeAfterExhausted = 1
  119. }
  120. atomic.StoreInt32(&conn.closeAfterExhausted, closeAfterExhausted)
  121. }
  122. func (conn *ThrottledConn) Read(buffer []byte) (int, error) {
  123. // A mutex is used to ensure conformance with net.Conn concurrency semantics.
  124. // The atomic.SwapInt64 and subsequent assignment of readRateLimiter or
  125. // readDelayTimer could be a race condition with concurrent reads.
  126. conn.readLock.Lock()
  127. defer conn.readLock.Unlock()
  128. if atomic.LoadInt32(&conn.isClosed) == 1 {
  129. return 0, errors.TraceNew("throttled conn closed")
  130. }
  131. // Use the base conn until the unthrottled count is
  132. // exhausted. This is only an approximate enforcement
  133. // since this read, or concurrent reads, could exceed
  134. // the remaining count.
  135. if atomic.LoadInt64(&conn.readUnthrottledBytes) > 0 {
  136. n, err := conn.Conn.Read(buffer)
  137. atomic.AddInt64(&conn.readUnthrottledBytes, -int64(n))
  138. return n, err
  139. }
  140. if atomic.LoadInt32(&conn.closeAfterExhausted) == 1 {
  141. conn.Conn.Close()
  142. return 0, errors.TraceNew("throttled conn exhausted")
  143. }
  144. readRate := atomic.SwapInt64(&conn.readBytesPerSecond, -1)
  145. if readRate != -1 {
  146. // SetLimits has been called and a new rate limiter
  147. // must be initialized. When no limit is specified,
  148. // the reader/writer is simply the base conn.
  149. // No state is retained from the previous rate limiter,
  150. // so a pending I/O throttle sleep may be skipped when
  151. // the old and new rate are similar.
  152. if readRate == 0 {
  153. conn.readRateLimiter = nil
  154. } else {
  155. conn.readRateLimiter =
  156. rate.NewLimiter(rate.Limit(readRate), int(readRate))
  157. }
  158. }
  159. // The number of bytes read cannot exceed the rate limiter burst size,
  160. // which is enforced by rate.Limiter.ReserveN. Reduce any read buffer
  161. // size to be at most the burst size.
  162. //
  163. // Read should still return as soon as read bytes are available; and the
  164. // number of bytes that will be received is unknown; so there is no loop
  165. // here to read more bytes. Reducing the read buffer size minimizes
  166. // latency for the up-to-burst-size bytes read, whereas allowing a full
  167. // read followed by multiple ReserveN calls and sleeps would increase
  168. // latency.
  169. //
  170. // In practise, with Psiphon tunnels, throttling is not applied until
  171. // after the Psiphon API handshake, so read buffer reductions won't
  172. // impact early obfuscation traffic shaping; and reads are on the order
  173. // of one SSH "packet", up to 32K, unlikely to be split for all but the
  174. // most restrictive of rate limits.
  175. if conn.readRateLimiter != nil {
  176. burst := conn.readRateLimiter.Burst()
  177. if len(buffer) > burst {
  178. if !conn.isStream {
  179. return 0, errors.TraceNew("non-stream read buffer exceeds burst")
  180. }
  181. buffer = buffer[:burst]
  182. }
  183. }
  184. n, err := conn.Conn.Read(buffer)
  185. if n > 0 && conn.readRateLimiter != nil {
  186. // While rate.Limiter.WaitN would be simpler to use, internally Wait
  187. // creates a new timer for every call which must sleep, which is
  188. // expected to be most calls. Instead, call ReserveN to get the sleep
  189. // time and reuse one timer without allocation.
  190. //
  191. // TODO: avoid allocation: ReserveN allocates a *Reservation; while
  192. // the internal reserveN returns a struct, not a pointer.
  193. reservation := conn.readRateLimiter.ReserveN(time.Now(), n)
  194. if !reservation.OK() {
  195. // This error is not expected, given the buffer size adjustment.
  196. return 0, errors.TraceNew("burst size exceeded")
  197. }
  198. sleepDuration := reservation.Delay()
  199. if sleepDuration > 0 {
  200. if conn.readDelayTimer == nil {
  201. conn.readDelayTimer = time.NewTimer(sleepDuration)
  202. } else {
  203. conn.readDelayTimer.Reset(sleepDuration)
  204. }
  205. select {
  206. case <-conn.readDelayTimer.C:
  207. case <-conn.stopBroadcast:
  208. if !conn.readDelayTimer.Stop() {
  209. <-conn.readDelayTimer.C
  210. }
  211. }
  212. }
  213. }
  214. // Don't wrap I/O errors
  215. return n, err
  216. }
  217. func (conn *ThrottledConn) Write(buffer []byte) (int, error) {
  218. // See comments in Read.
  219. conn.writeLock.Lock()
  220. defer conn.writeLock.Unlock()
  221. if atomic.LoadInt32(&conn.isClosed) == 1 {
  222. return 0, errors.TraceNew("throttled conn closed")
  223. }
  224. if atomic.LoadInt64(&conn.writeUnthrottledBytes) > 0 {
  225. n, err := conn.Conn.Write(buffer)
  226. atomic.AddInt64(&conn.writeUnthrottledBytes, -int64(n))
  227. return n, err
  228. }
  229. if atomic.LoadInt32(&conn.closeAfterExhausted) == 1 {
  230. conn.Conn.Close()
  231. return 0, errors.TraceNew("throttled conn exhausted")
  232. }
  233. writeRate := atomic.SwapInt64(&conn.writeBytesPerSecond, -1)
  234. if writeRate != -1 {
  235. if writeRate == 0 {
  236. conn.writeRateLimiter = nil
  237. } else {
  238. conn.writeRateLimiter =
  239. rate.NewLimiter(rate.Limit(writeRate), int(writeRate))
  240. }
  241. }
  242. if conn.writeRateLimiter == nil {
  243. n, err := conn.Conn.Write(buffer)
  244. // Don't wrap I/O errors
  245. return n, err
  246. }
  247. // The number of bytes written cannot exceed the rate limiter burst size,
  248. // which is enforced by rate.Limiter.ReserveN. Split writes to be at most
  249. // the burst size.
  250. //
  251. // Splitting writes may have some effect on the shape of TCP packets sent
  252. // on the network.
  253. //
  254. // In practise, with Psiphon tunnels, throttling is not applied until
  255. // after the Psiphon API handshake, so write splits won't impact early
  256. // obfuscation traffic shaping; and writes are on the order of one
  257. // SSH "packet", up to 32K, unlikely to be split for all but the most
  258. // restrictive of rate limits.
  259. burst := conn.writeRateLimiter.Burst()
  260. if !conn.isStream && len(buffer) > burst {
  261. return 0, errors.TraceNew("non-stream write exceeds burst")
  262. }
  263. totalWritten := 0
  264. for i := 0; i < len(buffer); i += burst {
  265. j := i + burst
  266. if j > len(buffer) {
  267. j = len(buffer)
  268. }
  269. b := buffer[i:j]
  270. // See comment in Read regarding rate.Limiter.ReserveN vs.
  271. // rate.Limiter.WaitN.
  272. reservation := conn.writeRateLimiter.ReserveN(time.Now(), len(b))
  273. if !reservation.OK() {
  274. // This error is not expected, given the write split adjustments.
  275. return 0, errors.TraceNew("burst size exceeded")
  276. }
  277. sleepDuration := reservation.Delay()
  278. if sleepDuration > 0 {
  279. if conn.writeDelayTimer == nil {
  280. conn.writeDelayTimer = time.NewTimer(sleepDuration)
  281. } else {
  282. conn.writeDelayTimer.Reset(sleepDuration)
  283. }
  284. select {
  285. case <-conn.writeDelayTimer.C:
  286. case <-conn.stopBroadcast:
  287. if !conn.writeDelayTimer.Stop() {
  288. <-conn.writeDelayTimer.C
  289. }
  290. }
  291. }
  292. n, err := conn.Conn.Write(b)
  293. totalWritten += n
  294. if err != nil {
  295. // Don't wrap I/O errors
  296. return totalWritten, err
  297. }
  298. }
  299. return totalWritten, nil
  300. }
  301. func (conn *ThrottledConn) Close() error {
  302. // Ensure close channel only called once.
  303. if !atomic.CompareAndSwapInt32(&conn.isClosed, 0, 1) {
  304. return nil
  305. }
  306. close(conn.stopBroadcast)
  307. return errors.Trace(conn.Conn.Close())
  308. }