rtx_timer.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. package sctp
  4. import (
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. // RTO.Initial in msec
  11. rtoInitial float64 = 1.0 * 1000
  12. // RTO.Min in msec
  13. rtoMin float64 = 1.0 * 1000
  14. // RTO.Max in msec
  15. defaultRTOMax float64 = 60.0 * 1000
  16. // RTO.Alpha
  17. rtoAlpha float64 = 0.125
  18. // RTO.Beta
  19. rtoBeta float64 = 0.25
  20. // Max.Init.Retransmits:
  21. maxInitRetrans uint = 8
  22. // Path.Max.Retrans
  23. pathMaxRetrans uint = 5
  24. noMaxRetrans uint = 0
  25. )
  26. // rtoManager manages Rtx timeout values.
  27. // This is an implementation of RFC 4960 sec 6.3.1.
  28. type rtoManager struct {
  29. srtt float64
  30. rttvar float64
  31. rto float64
  32. noUpdate bool
  33. mutex sync.RWMutex
  34. rtoMax float64
  35. }
  36. // newRTOManager creates a new rtoManager.
  37. func newRTOManager(rtoMax float64) *rtoManager {
  38. mgr := rtoManager{
  39. rto: rtoInitial,
  40. rtoMax: rtoMax,
  41. }
  42. if mgr.rtoMax == 0 {
  43. mgr.rtoMax = defaultRTOMax
  44. }
  45. return &mgr
  46. }
  47. // setNewRTT takes a newly measured RTT then adjust the RTO in msec.
  48. func (m *rtoManager) setNewRTT(rtt float64) float64 {
  49. m.mutex.Lock()
  50. defer m.mutex.Unlock()
  51. if m.noUpdate {
  52. return m.srtt
  53. }
  54. if m.srtt == 0 {
  55. // First measurement
  56. m.srtt = rtt
  57. m.rttvar = rtt / 2
  58. } else {
  59. // Subsequent rtt measurement
  60. m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt))
  61. m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt
  62. }
  63. m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), m.rtoMax)
  64. return m.srtt
  65. }
  66. // getRTO simply returns the current RTO in msec.
  67. func (m *rtoManager) getRTO() float64 {
  68. m.mutex.RLock()
  69. defer m.mutex.RUnlock()
  70. return m.rto
  71. }
  72. // reset resets the RTO variables to the initial values.
  73. func (m *rtoManager) reset() {
  74. m.mutex.Lock()
  75. defer m.mutex.Unlock()
  76. if m.noUpdate {
  77. return
  78. }
  79. m.srtt = 0
  80. m.rttvar = 0
  81. m.rto = rtoInitial
  82. }
  83. // set RTO value for testing
  84. func (m *rtoManager) setRTO(rto float64, noUpdate bool) {
  85. m.mutex.Lock()
  86. defer m.mutex.Unlock()
  87. m.rto = rto
  88. m.noUpdate = noUpdate
  89. }
  90. // rtxTimerObserver is the inteface to a timer observer.
  91. // NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
  92. // from within these callbacks.
  93. type rtxTimerObserver interface {
  94. onRetransmissionTimeout(timerID int, n uint)
  95. onRetransmissionFailure(timerID int)
  96. }
  97. // rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
  98. type rtxTimer struct {
  99. id int
  100. observer rtxTimerObserver
  101. maxRetrans uint
  102. stopFunc stopTimerLoop
  103. closed bool
  104. mutex sync.RWMutex
  105. rtoMax float64
  106. }
  107. type stopTimerLoop func()
  108. // newRTXTimer creates a new retransmission timer.
  109. // if maxRetrans is set to 0, it will keep retransmitting until stop() is called.
  110. // (it will never make onRetransmissionFailure() callback.
  111. func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint,
  112. rtoMax float64,
  113. ) *rtxTimer {
  114. timer := rtxTimer{
  115. id: id,
  116. observer: observer,
  117. maxRetrans: maxRetrans,
  118. rtoMax: rtoMax,
  119. }
  120. if timer.rtoMax == 0 {
  121. timer.rtoMax = defaultRTOMax
  122. }
  123. return &timer
  124. }
  125. // start starts the timer.
  126. func (t *rtxTimer) start(rto float64) bool {
  127. t.mutex.Lock()
  128. defer t.mutex.Unlock()
  129. // this timer is already closed
  130. if t.closed {
  131. return false
  132. }
  133. // this is a noop if the timer is always running
  134. if t.stopFunc != nil {
  135. return false
  136. }
  137. // Note: rto value is intentionally not capped by RTO.Min to allow
  138. // fast timeout for the tests. Non-test code should pass in the
  139. // rto generated by rtoManager getRTO() method which caps the
  140. // value at RTO.Min or at RTO.Max.
  141. var nRtos uint
  142. cancelCh := make(chan struct{})
  143. go func() {
  144. canceling := false
  145. timer := time.NewTimer(math.MaxInt64)
  146. timer.Stop()
  147. for !canceling {
  148. timeout := calculateNextTimeout(rto, nRtos, t.rtoMax)
  149. timer.Reset(time.Duration(timeout) * time.Millisecond)
  150. select {
  151. case <-timer.C:
  152. nRtos++
  153. if t.maxRetrans == 0 || nRtos <= t.maxRetrans {
  154. t.observer.onRetransmissionTimeout(t.id, nRtos)
  155. } else {
  156. t.stop()
  157. t.observer.onRetransmissionFailure(t.id)
  158. }
  159. case <-cancelCh:
  160. canceling = true
  161. timer.Stop()
  162. }
  163. }
  164. }()
  165. t.stopFunc = func() {
  166. close(cancelCh)
  167. }
  168. return true
  169. }
  170. // stop stops the timer.
  171. func (t *rtxTimer) stop() {
  172. t.mutex.Lock()
  173. defer t.mutex.Unlock()
  174. if t.stopFunc != nil {
  175. t.stopFunc()
  176. t.stopFunc = nil
  177. }
  178. }
  179. // closes the timer. this is similar to stop() but subsequent start() call
  180. // will fail (the timer is no longer usable)
  181. func (t *rtxTimer) close() {
  182. t.mutex.Lock()
  183. defer t.mutex.Unlock()
  184. if t.stopFunc != nil {
  185. t.stopFunc()
  186. t.stopFunc = nil
  187. }
  188. t.closed = true
  189. }
  190. // isRunning tests if the timer is running.
  191. // Debug purpose only
  192. func (t *rtxTimer) isRunning() bool {
  193. t.mutex.RLock()
  194. defer t.mutex.RUnlock()
  195. return (t.stopFunc != nil)
  196. }
  197. func calculateNextTimeout(rto float64, nRtos uint, rtoMax float64) float64 {
  198. // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
  199. // E2) For the destination address for which the timer expires, set RTO
  200. // <- RTO * 2 ("back off the timer"). The maximum value discussed
  201. // in rule C7 above (RTO.max) may be used to provide an upper bound
  202. // to this doubling operation.
  203. if nRtos < 31 {
  204. m := 1 << nRtos
  205. return math.Min(rto*float64(m), rtoMax)
  206. }
  207. return rtoMax
  208. }