activity.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. /*
  2. * Copyright (c) 2020, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package common
  20. import (
  21. "net"
  22. "sync/atomic"
  23. "time"
  24. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  26. )
  27. // ActivityMonitoredConn wraps a net.Conn, adding logic to deal with events
  28. // triggered by I/O activity.
  29. //
  30. // ActivityMonitoredConn uses lock-free concurrency synronization, avoiding an
  31. // additional mutex resource, making it suitable for wrapping many net.Conns
  32. // (e.g, each Psiphon port forward).
  33. //
  34. // When an inactivity timeout is specified, the network I/O will timeout after
  35. // the specified period of read inactivity. Optionally, for the purpose of
  36. // inactivity only, ActivityMonitoredConn will also consider the connection
  37. // active when data is written to it.
  38. //
  39. // When a LRUConnsEntry is specified, then the LRU entry is promoted on either
  40. // a successful read or write.
  41. //
  42. // When an ActivityUpdater is set, then its UpdateActivity method is called on
  43. // each read and write with the number of bytes transferred. The
  44. // durationNanoseconds, which is the time since the last read, is reported
  45. // only on reads.
  46. type ActivityMonitoredConn struct {
  47. net.Conn
  48. monotonicStartTime int64
  49. lastReadActivityTime atomic.Int64
  50. realStartTime time.Time
  51. inactivityTimeout time.Duration
  52. activeOnWrite bool
  53. activityUpdaters []ActivityUpdater
  54. lruEntry *LRUConnsEntry
  55. }
  56. // ActivityUpdater defines an interface for receiving updates for
  57. // ActivityMonitoredConn activity. Values passed to UpdateProgress are bytes
  58. // transferred and conn duration since the previous UpdateProgress.
  59. type ActivityUpdater interface {
  60. UpdateProgress(bytesRead, bytesWritten, durationNanoseconds int64)
  61. }
  62. // NewActivityMonitoredConn creates a new ActivityMonitoredConn.
  63. func NewActivityMonitoredConn(
  64. conn net.Conn,
  65. inactivityTimeout time.Duration,
  66. activeOnWrite bool,
  67. lruEntry *LRUConnsEntry,
  68. activityUpdaters ...ActivityUpdater) (*ActivityMonitoredConn, error) {
  69. if inactivityTimeout > 0 {
  70. err := conn.SetDeadline(time.Now().Add(inactivityTimeout))
  71. if err != nil {
  72. return nil, errors.Trace(err)
  73. }
  74. }
  75. // The "monotime" package is still used here as its time value is an int64,
  76. // which is compatible with atomic operations.
  77. now := int64(monotime.Now())
  78. activityConn := &ActivityMonitoredConn{
  79. Conn: conn,
  80. inactivityTimeout: inactivityTimeout,
  81. activeOnWrite: activeOnWrite,
  82. realStartTime: time.Now(),
  83. monotonicStartTime: now,
  84. lruEntry: lruEntry,
  85. activityUpdaters: activityUpdaters,
  86. }
  87. activityConn.lastReadActivityTime.Store(now)
  88. return activityConn, nil
  89. }
  90. // GetStartTime gets the time when the ActivityMonitoredConn was initialized.
  91. // Reported time is UTC.
  92. func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
  93. return conn.realStartTime.UTC()
  94. }
  95. // GetActiveDuration returns the time elapsed between the initialization of
  96. // the ActivityMonitoredConn and the last Read. Only reads are used for this
  97. // calculation since writes may succeed locally due to buffering.
  98. func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
  99. return time.Duration(conn.lastReadActivityTime.Load() - conn.monotonicStartTime)
  100. }
  101. func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
  102. n, err := conn.Conn.Read(buffer)
  103. if n > 0 {
  104. if conn.inactivityTimeout > 0 {
  105. err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
  106. if err != nil {
  107. return n, errors.Trace(err)
  108. }
  109. }
  110. lastReadActivityTime := conn.lastReadActivityTime.Load()
  111. readActivityTime := int64(monotime.Now())
  112. conn.lastReadActivityTime.Store(readActivityTime)
  113. for _, activityUpdater := range conn.activityUpdaters {
  114. activityUpdater.UpdateProgress(
  115. int64(n), 0, readActivityTime-lastReadActivityTime)
  116. }
  117. if conn.lruEntry != nil {
  118. conn.lruEntry.Touch()
  119. }
  120. }
  121. // Note: no trace error to preserve error type
  122. return n, err
  123. }
  124. func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
  125. n, err := conn.Conn.Write(buffer)
  126. if n > 0 {
  127. // Bytes written are reported regardless of activeOnWrite. Inactivity
  128. // deadline extension and LRU updates are conditional on activeOnWrite.
  129. for _, activityUpdater := range conn.activityUpdaters {
  130. activityUpdater.UpdateProgress(0, int64(n), 0)
  131. }
  132. if conn.activeOnWrite {
  133. if conn.inactivityTimeout > 0 {
  134. err = conn.Conn.SetDeadline(time.Now().Add(conn.inactivityTimeout))
  135. if err != nil {
  136. return n, errors.Trace(err)
  137. }
  138. }
  139. if conn.lruEntry != nil {
  140. conn.lruEntry.Touch()
  141. }
  142. }
  143. }
  144. // Note: no trace error to preserve error type
  145. return n, err
  146. }
  147. // IsClosed implements the Closer iterface. The return value indicates whether
  148. // the underlying conn has been closed.
  149. func (conn *ActivityMonitoredConn) IsClosed() bool {
  150. closer, ok := conn.Conn.(Closer)
  151. if !ok {
  152. return false
  153. }
  154. return closer.IsClosed()
  155. }