agent.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package stun
  2. import (
  3. "errors"
  4. "sync"
  5. "time"
  6. )
  7. // NoopHandler just discards any event.
  8. func NoopHandler() Handler {
  9. return func(e Event) {}
  10. }
  11. // NewAgent initializes and returns new Agent with provided handler.
  12. // If h is nil, the NoopHandler will be used.
  13. func NewAgent(h Handler) *Agent {
  14. if h == nil {
  15. h = NoopHandler()
  16. }
  17. a := &Agent{
  18. transactions: make(map[transactionID]agentTransaction),
  19. handler: h,
  20. }
  21. return a
  22. }
  23. // Agent is low-level abstraction over transaction list that
  24. // handles concurrency (all calls are goroutine-safe) and
  25. // time outs (via Collect call).
  26. type Agent struct {
  27. // transactions is map of transactions that are currently
  28. // in progress. Event handling is done in such way when
  29. // transaction is unregistered before agentTransaction access,
  30. // minimizing mux lock and protecting agentTransaction from
  31. // data races via unexpected concurrent access.
  32. transactions map[transactionID]agentTransaction
  33. closed bool // all calls are invalid if true
  34. mux sync.Mutex // protects transactions and closed
  35. handler Handler // handles transactions
  36. }
  37. // Handler handles state changes of transaction.
  38. //
  39. // Handler is called on transaction state change.
  40. // Usage of e is valid only during call, user must
  41. // copy needed fields explicitly.
  42. type Handler func(e Event)
  43. // Event is passed to Handler describing the transaction event.
  44. // Do not reuse outside Handler.
  45. type Event struct {
  46. TransactionID [TransactionIDSize]byte
  47. Message *Message
  48. Error error
  49. }
  50. // agentTransaction represents transaction in progress.
  51. // Concurrent access is invalid.
  52. type agentTransaction struct {
  53. id transactionID
  54. deadline time.Time
  55. }
  56. var (
  57. // ErrTransactionStopped indicates that transaction was manually stopped.
  58. ErrTransactionStopped = errors.New("transaction is stopped")
  59. // ErrTransactionNotExists indicates that agent failed to find transaction.
  60. ErrTransactionNotExists = errors.New("transaction not exists")
  61. // ErrTransactionExists indicates that transaction with same id is already
  62. // registered.
  63. ErrTransactionExists = errors.New("transaction exists with same id")
  64. )
  65. // StopWithError removes transaction from list and calls handler with
  66. // provided error. Can return ErrTransactionNotExists and ErrAgentClosed.
  67. func (a *Agent) StopWithError(id [TransactionIDSize]byte, err error) error {
  68. a.mux.Lock()
  69. if a.closed {
  70. a.mux.Unlock()
  71. return ErrAgentClosed
  72. }
  73. t, exists := a.transactions[id]
  74. delete(a.transactions, id)
  75. h := a.handler
  76. a.mux.Unlock()
  77. if !exists {
  78. return ErrTransactionNotExists
  79. }
  80. h(Event{
  81. TransactionID: t.id,
  82. Error: err,
  83. })
  84. return nil
  85. }
  86. // Stop stops transaction by id with ErrTransactionStopped, blocking
  87. // until handler returns.
  88. func (a *Agent) Stop(id [TransactionIDSize]byte) error {
  89. return a.StopWithError(id, ErrTransactionStopped)
  90. }
  91. // ErrAgentClosed indicates that agent is in closed state and is unable
  92. // to handle transactions.
  93. var ErrAgentClosed = errors.New("agent is closed")
  94. // Start registers transaction with provided id and deadline.
  95. // Could return ErrAgentClosed, ErrTransactionExists.
  96. //
  97. // Agent handler is guaranteed to be eventually called.
  98. func (a *Agent) Start(id [TransactionIDSize]byte, deadline time.Time) error {
  99. a.mux.Lock()
  100. defer a.mux.Unlock()
  101. if a.closed {
  102. return ErrAgentClosed
  103. }
  104. _, exists := a.transactions[id]
  105. if exists {
  106. return ErrTransactionExists
  107. }
  108. a.transactions[id] = agentTransaction{
  109. id: id,
  110. deadline: deadline,
  111. }
  112. return nil
  113. }
  114. // agentCollectCap is initial capacity for Agent.Collect slices,
  115. // sufficient to make function zero-alloc in most cases.
  116. const agentCollectCap = 100
  117. // ErrTransactionTimeOut indicates that transaction has reached deadline.
  118. var ErrTransactionTimeOut = errors.New("transaction is timed out")
  119. // Collect terminates all transactions that have deadline before provided
  120. // time, blocking until all handlers will process ErrTransactionTimeOut.
  121. // Will return ErrAgentClosed if agent is already closed.
  122. //
  123. // It is safe to call Collect concurrently but makes no sense.
  124. func (a *Agent) Collect(gcTime time.Time) error {
  125. toRemove := make([]transactionID, 0, agentCollectCap)
  126. a.mux.Lock()
  127. if a.closed {
  128. // Doing nothing if agent is closed.
  129. // All transactions should be already closed
  130. // during Close() call.
  131. a.mux.Unlock()
  132. return ErrAgentClosed
  133. }
  134. // Adding all transactions with deadline before gcTime
  135. // to toCall and toRemove slices.
  136. // No allocs if there are less than agentCollectCap
  137. // timed out transactions.
  138. for id, t := range a.transactions {
  139. if t.deadline.Before(gcTime) {
  140. toRemove = append(toRemove, id)
  141. }
  142. }
  143. // Un-registering timed out transactions.
  144. for _, id := range toRemove {
  145. delete(a.transactions, id)
  146. }
  147. // Calling handler does not require locked mutex,
  148. // reducing lock time.
  149. h := a.handler
  150. a.mux.Unlock()
  151. // Sending ErrTransactionTimeOut to handler for all transactions,
  152. // blocking until last one.
  153. event := Event{
  154. Error: ErrTransactionTimeOut,
  155. }
  156. for _, id := range toRemove {
  157. event.TransactionID = id
  158. h(event)
  159. }
  160. return nil
  161. }
  162. // Process incoming message, synchronously passing it to handler.
  163. func (a *Agent) Process(m *Message) error {
  164. e := Event{
  165. TransactionID: m.TransactionID,
  166. Message: m,
  167. }
  168. a.mux.Lock()
  169. if a.closed {
  170. a.mux.Unlock()
  171. return ErrAgentClosed
  172. }
  173. h := a.handler
  174. delete(a.transactions, m.TransactionID)
  175. a.mux.Unlock()
  176. h(e)
  177. return nil
  178. }
  179. // SetHandler sets agent handler to h.
  180. func (a *Agent) SetHandler(h Handler) error {
  181. a.mux.Lock()
  182. if a.closed {
  183. a.mux.Unlock()
  184. return ErrAgentClosed
  185. }
  186. a.handler = h
  187. a.mux.Unlock()
  188. return nil
  189. }
  190. // Close terminates all transactions with ErrAgentClosed and renders Agent to
  191. // closed state.
  192. func (a *Agent) Close() error {
  193. e := Event{
  194. Error: ErrAgentClosed,
  195. }
  196. a.mux.Lock()
  197. if a.closed {
  198. a.mux.Unlock()
  199. return ErrAgentClosed
  200. }
  201. for _, t := range a.transactions {
  202. e.TransactionID = t.id
  203. a.handler(e)
  204. }
  205. a.transactions = nil
  206. a.closed = true
  207. a.handler = nil
  208. a.mux.Unlock()
  209. return nil
  210. }
  211. type transactionID [TransactionIDSize]byte