activity.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. /*
  2. * Copyright (c) 2020, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package common
  20. import (
  21. "net"
  22. "sync/atomic"
  23. "time"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  26. )
  27. // ActivityMonitoredConn wraps a net.Conn, adding logic to deal with events
  28. // triggered by I/O activity.
  29. //
  30. // ActivityMonitoredConn uses lock-free concurrency synronization, avoiding an
  31. // additional mutex resource, making it suitable for wrapping many net.Conns
  32. // (e.g, each Psiphon port forward).
  33. //
  34. // When an inactivity timeout is specified, the network I/O will timeout after
  35. // the specified period of read inactivity. Optionally, for the purpose of
  36. // inactivity only, ActivityMonitoredConn will also consider the connection
  37. // active when data is written to it.
  38. //
  39. // When a LRUConnsEntry is specified, then the LRU entry is promoted on either
  40. // a successful read or write.
  41. //
  42. // When an ActivityUpdater is set, then its UpdateActivity method is called on
  43. // each read and write with the number of bytes transferred. The
  44. // durationNanoseconds, which is the time since the last read, is reported
  45. // only on reads.
  46. type ActivityMonitoredConn struct {
  47. // Note: 64-bit ints used with atomic operations are placed
  48. // at the start of struct to ensure 64-bit alignment.
  49. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  50. monotonicStartTime int64
  51. lastReadActivityTime int64
  52. realStartTime time.Time
  53. net.Conn
  54. inactivityTimeout time.Duration
  55. activeOnWrite bool
  56. activityUpdaters []ActivityUpdater
  57. lruEntry *LRUConnsEntry
  58. }
  59. // ActivityUpdater defines an interface for receiving updates for
  60. // ActivityMonitoredConn activity. Values passed to UpdateProgress are bytes
  61. // transferred and conn duration since the previous UpdateProgress.
  62. type ActivityUpdater interface {
  63. UpdateProgress(bytesRead, bytesWritten, durationNanoseconds int64)
  64. }
  65. // NewActivityMonitoredConn creates a new ActivityMonitoredConn.
  66. func NewActivityMonitoredConn(
  67. conn net.Conn,
  68. inactivityTimeout time.Duration,
  69. activeOnWrite bool,
  70. lruEntry *LRUConnsEntry,
  71. activityUpdaters ...ActivityUpdater) (*ActivityMonitoredConn, error) {
  72. if inactivityTimeout > 0 {
  73. err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
  74. if err != nil {
  75. return nil, errors.Trace(err)
  76. }
  77. }
  78. // The "monotime" package is still used here as its time value is an int64,
  79. // which is compatible with atomic operations.
  80. now := int64(monotime.Now())
  81. return &ActivityMonitoredConn{
  82. Conn: conn,
  83. inactivityTimeout: inactivityTimeout,
  84. activeOnWrite: activeOnWrite,
  85. realStartTime: time.Now(),
  86. monotonicStartTime: now,
  87. lastReadActivityTime: now,
  88. lruEntry: lruEntry,
  89. activityUpdaters: activityUpdaters,
  90. }, nil
  91. }
  92. // GetStartTime gets the time when the ActivityMonitoredConn was initialized.
  93. // Reported time is UTC.
  94. func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
  95. return conn.realStartTime.UTC()
  96. }
  97. // GetActiveDuration returns the time elapsed between the initialization of
  98. // the ActivityMonitoredConn and the last Read. Only reads are used for this
  99. // calculation since writes may succeed locally due to buffering.
  100. func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
  101. return time.Duration(atomic.LoadInt64(&conn.lastReadActivityTime) - conn.monotonicStartTime)
  102. }
  103. func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
  104. n, err := conn.Conn.Read(buffer)
  105. if n > 0 {
  106. if conn.inactivityTimeout > 0 {
  107. err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
  108. if err != nil {
  109. return n, errors.Trace(err)
  110. }
  111. }
  112. lastReadActivityTime := atomic.LoadInt64(&conn.lastReadActivityTime)
  113. readActivityTime := int64(monotime.Now())
  114. atomic.StoreInt64(&conn.lastReadActivityTime, readActivityTime)
  115. for _, activityUpdater := range conn.activityUpdaters {
  116. activityUpdater.UpdateProgress(
  117. int64(n), 0, readActivityTime-lastReadActivityTime)
  118. }
  119. if conn.lruEntry != nil {
  120. conn.lruEntry.Touch()
  121. }
  122. }
  123. // Note: no trace error to preserve error type
  124. return n, err
  125. }
  126. func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
  127. n, err := conn.Conn.Write(buffer)
  128. if n > 0 && conn.activeOnWrite {
  129. if conn.inactivityTimeout > 0 {
  130. err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
  131. if err != nil {
  132. return n, errors.Trace(err)
  133. }
  134. }
  135. for _, activityUpdater := range conn.activityUpdaters {
  136. activityUpdater.UpdateProgress(0, int64(n), 0)
  137. }
  138. if conn.lruEntry != nil {
  139. conn.lruEntry.Touch()
  140. }
  141. }
  142. // Note: no trace error to preserve error type
  143. return n, err
  144. }
  145. // IsClosed implements the Closer iterface. The return value indicates whether
  146. // the underlying conn has been closed.
  147. func (conn *ActivityMonitoredConn) IsClosed() bool {
  148. closer, ok := conn.Conn.(Closer)
  149. if !ok {
  150. return false
  151. }
  152. return closer.IsClosed()
  153. }