throttled.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package common
  20. import (
  21. "errors"
  22. "io"
  23. "net"
  24. "sync"
  25. "sync/atomic"
  26. "github.com/Psiphon-Inc/ratelimit"
  27. )
  28. // RateLimits specify the rate limits for a ThrottledConn.
  29. type RateLimits struct {
  30. // ReadUnthrottledBytes specifies the number of bytes to
  31. // read, approximately, before starting rate limiting.
  32. ReadUnthrottledBytes int64
  33. // ReadBytesPerSecond specifies a rate limit for read
  34. // data transfer. The default, 0, is no limit.
  35. ReadBytesPerSecond int64
  36. // WriteUnthrottledBytes specifies the number of bytes to
  37. // write, approximately, before starting rate limiting.
  38. WriteUnthrottledBytes int64
  39. // WriteBytesPerSecond specifies a rate limit for write
  40. // data transfer. The default, 0, is no limit.
  41. WriteBytesPerSecond int64
  42. // CloseAfterExhausted indicates that the underlying
  43. // net.Conn should be closed once either the read or
  44. // write unthrottled bytes have been exhausted. In this
  45. // case, throttling is never applied.
  46. CloseAfterExhausted bool
  47. }
  48. // ThrottledConn wraps a net.Conn with read and write rate limiters.
  49. // Rates are specified as bytes per second. Optional unlimited byte
  50. // counts allow for a number of bytes to read or write before
  51. // applying rate limiting. Specify limit values of 0 to set no rate
  52. // limit (unlimited counts are ignored in this case).
  53. // The underlying rate limiter uses the token bucket algorithm to
  54. // calculate delay times for read and write operations.
  55. type ThrottledConn struct {
  56. // Note: 64-bit ints used with atomic operations are at placed
  57. // at the start of struct to ensure 64-bit alignment.
  58. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  59. readUnthrottledBytes int64
  60. readBytesPerSecond int64
  61. writeUnthrottledBytes int64
  62. writeBytesPerSecond int64
  63. closeAfterExhausted int32
  64. readLock sync.Mutex
  65. throttledReader io.Reader
  66. writeLock sync.Mutex
  67. throttledWriter io.Writer
  68. net.Conn
  69. }
  70. // NewThrottledConn initializes a new ThrottledConn.
  71. func NewThrottledConn(conn net.Conn, limits RateLimits) *ThrottledConn {
  72. throttledConn := &ThrottledConn{Conn: conn}
  73. throttledConn.SetLimits(limits)
  74. return throttledConn
  75. }
  76. // SetLimits modifies the rate limits of an existing
  77. // ThrottledConn. It is safe to call SetLimits while
  78. // other goroutines are calling Read/Write. This function
  79. // will not block, and the new rate limits will be
  80. // applied within Read/Write, but not necessarily until
  81. // some futher I/O at previous rates.
  82. func (conn *ThrottledConn) SetLimits(limits RateLimits) {
  83. // Using atomic instead of mutex to avoid blocking
  84. // this function on throttled I/O in an ongoing
  85. // read or write. Precise synchronized application
  86. // of the rate limit values is not required.
  87. // Negative rates are invalid and -1 is a special
  88. // value to used to signal throttling initialized
  89. // state. Silently normalize negative values to 0.
  90. rate := limits.ReadBytesPerSecond
  91. if rate < 0 {
  92. rate = 0
  93. }
  94. atomic.StoreInt64(&conn.readBytesPerSecond, rate)
  95. atomic.StoreInt64(&conn.readUnthrottledBytes, limits.ReadUnthrottledBytes)
  96. rate = limits.WriteBytesPerSecond
  97. if rate < 0 {
  98. rate = 0
  99. }
  100. atomic.StoreInt64(&conn.writeBytesPerSecond, rate)
  101. atomic.StoreInt64(&conn.writeUnthrottledBytes, limits.WriteUnthrottledBytes)
  102. closeAfterExhausted := int32(0)
  103. if limits.CloseAfterExhausted {
  104. closeAfterExhausted = 1
  105. }
  106. atomic.StoreInt32(&conn.closeAfterExhausted, closeAfterExhausted)
  107. }
  108. func (conn *ThrottledConn) Read(buffer []byte) (int, error) {
  109. // A mutex is used to ensure conformance with net.Conn
  110. // concurrency semantics. The atomic.SwapInt64 and
  111. // subsequent assignment of throttledReader could be
  112. // a race condition with concurrent reads.
  113. conn.readLock.Lock()
  114. defer conn.readLock.Unlock()
  115. // Use the base conn until the unthrottled count is
  116. // exhausted. This is only an approximate enforcement
  117. // since this read, or concurrent reads, could exceed
  118. // the remaining count.
  119. if atomic.LoadInt64(&conn.readUnthrottledBytes) > 0 {
  120. n, err := conn.Conn.Read(buffer)
  121. atomic.AddInt64(&conn.readUnthrottledBytes, -int64(n))
  122. return n, err
  123. }
  124. if atomic.LoadInt32(&conn.closeAfterExhausted) == 1 {
  125. conn.Conn.Close()
  126. return 0, errors.New("throttled conn exhausted")
  127. }
  128. rate := atomic.SwapInt64(&conn.readBytesPerSecond, -1)
  129. if rate != -1 {
  130. // SetLimits has been called and a new rate limiter
  131. // must be initialized. When no limit is specified,
  132. // the reader/writer is simply the base conn.
  133. // No state is retained from the previous rate limiter,
  134. // so a pending I/O throttle sleep may be skipped when
  135. // the old and new rate are similar.
  136. if rate == 0 {
  137. conn.throttledReader = conn.Conn
  138. } else {
  139. conn.throttledReader = ratelimit.Reader(
  140. conn.Conn,
  141. ratelimit.NewBucketWithRate(float64(rate), rate))
  142. }
  143. }
  144. return conn.throttledReader.Read(buffer)
  145. }
  146. func (conn *ThrottledConn) Write(buffer []byte) (int, error) {
  147. // See comments in Read.
  148. conn.writeLock.Lock()
  149. defer conn.writeLock.Unlock()
  150. if atomic.LoadInt64(&conn.writeUnthrottledBytes) > 0 {
  151. n, err := conn.Conn.Write(buffer)
  152. atomic.AddInt64(&conn.writeUnthrottledBytes, -int64(n))
  153. return n, err
  154. }
  155. if atomic.LoadInt32(&conn.closeAfterExhausted) == 1 {
  156. conn.Conn.Close()
  157. return 0, errors.New("throttled conn exhausted")
  158. }
  159. rate := atomic.SwapInt64(&conn.writeBytesPerSecond, -1)
  160. if rate != -1 {
  161. if rate == 0 {
  162. conn.throttledWriter = conn.Conn
  163. } else {
  164. conn.throttledWriter = ratelimit.Writer(
  165. conn.Conn,
  166. ratelimit.NewBucketWithRate(float64(rate), rate))
  167. }
  168. }
  169. return conn.throttledWriter.Write(buffer)
  170. }