discovery.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. /*
  2. * Copyright (c) 2024, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // Package discovery implements the Psiphon discovery algorithms.
  20. package discovery
  21. import (
  22. "context"
  23. "net"
  24. "sync"
  25. "time"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/psinet"
  27. )
  28. // clock is an interface of functions required by discovery that exist in
  29. // the time package in the Go standard library, which enables using
  30. // implementations in tests that do not rely on the monotonic clock or wall
  31. // clock.
  32. type clock interface {
  33. Now() time.Time
  34. Until(t time.Time) time.Duration
  35. After(d time.Duration) <-chan time.Time
  36. NewTimer(d time.Duration) timer
  37. }
  38. // realClock implements clock using the time package in the Go standard library.
  39. type realClock struct{}
  40. func (realClock) Now() time.Time { return time.Now() }
  41. func (realClock) Until(t time.Time) time.Duration { return time.Until(t) }
  42. func (realClock) After(d time.Duration) <-chan time.Time { return time.After(d) }
  43. func (realClock) NewTimer(d time.Duration) timer { return &realTimer{t: time.NewTimer(d)} }
  44. // timer is an interface matching what Timer in the time package provides in
  45. // the Go standard library, which enables using implementations in tests that
  46. // do not rely on the monotonic clock or wall clock.
  47. type timer interface {
  48. C() <-chan time.Time
  49. Stop() bool
  50. Reset(d time.Duration) bool
  51. }
  52. // realTimer implements timer using the time package in the Go standard library.
  53. type realTimer struct {
  54. t *time.Timer
  55. }
  56. func (t *realTimer) C() <-chan time.Time {
  57. return t.t.C
  58. }
  59. func (t *realTimer) Stop() bool {
  60. return t.t.Stop()
  61. }
  62. func (t *realTimer) Reset(d time.Duration) bool {
  63. return t.t.Reset(d)
  64. }
  65. // DiscoveryStrategy represents a discovery algorithm that selects server
  66. // entries to be "discovered" by a client. Implementations must be safe for
  67. // concurrent usage.
  68. type DiscoveryStrategy interface {
  69. // selectServers selects discovery servers to give out to the client based
  70. // on its IP address and, possibly, other strategies that are internal to
  71. // the discovery strategy implementation.
  72. selectServers(clientIP net.IP) []*psinet.DiscoveryServer
  73. // serversChanged is called with the set of currently discoverable servers
  74. // whever that set changes. The discovery strategy implementation must
  75. // replace its set of discoverable servers with these servers.
  76. serversChanged(servers []*psinet.DiscoveryServer)
  77. }
  78. // Discovery is the combination of a discovery strategy with a set of discovery
  79. // servers. It's safe for concurrent usage.
  80. type Discovery struct {
  81. clk clock
  82. all []*psinet.DiscoveryServer
  83. strategy DiscoveryStrategy
  84. cancelFunc context.CancelFunc
  85. wg *sync.WaitGroup
  86. }
  87. // MakeDiscovery creates a new Discovery instance, which uses the specified
  88. // strategy with the given discovery servers.
  89. func MakeDiscovery(
  90. servers []*psinet.DiscoveryServer,
  91. strategy DiscoveryStrategy) *Discovery {
  92. return makeDiscovery(realClock{}, servers, strategy)
  93. }
  94. func makeDiscovery(
  95. clk clock,
  96. servers []*psinet.DiscoveryServer,
  97. strategy DiscoveryStrategy) *Discovery {
  98. d := Discovery{
  99. clk: clk,
  100. all: servers,
  101. strategy: strategy,
  102. wg: new(sync.WaitGroup),
  103. }
  104. return &d
  105. }
  106. // Start starts discovery. Servers are discoverable when the current time
  107. // falls within their discovery date range, i.e. DiscoveryDateRange[0] <=
  108. // clk.Now() < DiscoveryDateRange[1].
  109. func (d *Discovery) Start() {
  110. current, nextUpdate := discoverableServers(d.all, d.clk)
  111. d.strategy.serversChanged(current)
  112. ctx, cancelFunc := context.WithCancel(context.Background())
  113. d.cancelFunc = cancelFunc
  114. d.wg.Add(1)
  115. // Update the set of discovery servers used by the chosen discovery
  116. // algorithm, and therefore discoverable with SelectServers, everytime a
  117. // server enters, or exits, its discovery date range.
  118. go func() {
  119. for ctx.Err() == nil {
  120. // Wait until the next time a server enters, or exits, its
  121. // discovery date range.
  122. //
  123. // Warning: NewTimer uses the monotonic clock but discovery uses
  124. // the wall clock. If there is wall clock drift, then it is
  125. // possible that the wall clock surpasses nextUpdate or, more
  126. // generally, by the wall clock time the set of discoverable
  127. // servers should change before the timer fires. This scenario is
  128. // not handled. One solution would be to periodically check if set
  129. // of discoverable servers has changed in conjunction with using a
  130. // timer.
  131. t := d.clk.NewTimer(d.clk.Until(nextUpdate))
  132. select {
  133. case <-t.C():
  134. case <-ctx.Done():
  135. t.Stop()
  136. continue
  137. }
  138. t.Stop()
  139. // Note: servers with a discovery date range in the past are not
  140. // removed from d.all in case the wall clock has drifted;
  141. // otherwise, we risk removing them prematurely.
  142. var servers []*psinet.DiscoveryServer
  143. servers, nextUpdate = discoverableServers(d.all, d.clk)
  144. // Update the set of discoverable servers.
  145. d.strategy.serversChanged(servers)
  146. if nextUpdate == (time.Time{}) {
  147. // The discovery date range of all candidate discovery servers
  148. // are in the past. No more serversChanged calls will be made
  149. // to DiscoveryStrategy.
  150. //
  151. // Warning: at this point if the wall clock has drifted but
  152. // will correct itself in the future such that the set of
  153. // discoverable servers changes, then serversChanged will
  154. // not be called on the discovery strategies with the new set
  155. // of discoverable servers. One workaround for this scenario
  156. // would be to periodically check if set of discoverable
  157. // servers has changed after this point and restart this loop
  158. // if they have.
  159. break
  160. }
  161. }
  162. d.wg.Done()
  163. }()
  164. }
  165. // Stop stops discovery and cleans up underlying resources. Stop should be
  166. // invoked as soon as Discovery is no longer needed. Discovery should not be
  167. // used after this because the set of discoverable servers will no longer be
  168. // updated, so it may contain servers that are no longer discoverable and
  169. // exclude servers that are.
  170. func (d *Discovery) Stop() {
  171. d.cancelFunc()
  172. d.wg.Wait()
  173. }
  174. // SelectServers selects new server entries to be "discovered" by the client,
  175. // using the client's IP address as the input into the configured discovery
  176. // algorithm.
  177. func (d *Discovery) SelectServers(clientIP net.IP) []*psinet.DiscoveryServer {
  178. return d.strategy.selectServers(clientIP)
  179. }
  180. // discoverableServers returns all servers in discoveryServers that are currently
  181. // eligible for discovery along with the next time that a server in
  182. // discoveryServers will enter, or exit, its discovery date range.
  183. func discoverableServers(
  184. discoveryServers []*psinet.DiscoveryServer,
  185. clk clock) (discoverableServers []*psinet.DiscoveryServer, nextUpdate time.Time) {
  186. now := clk.Now().UTC()
  187. discoverableServers = make([]*psinet.DiscoveryServer, 0)
  188. var nextServerAdd time.Time
  189. var nextServerRemove time.Time
  190. for _, server := range discoveryServers {
  191. if len(server.DiscoveryDateRange) == 2 {
  192. if now.Before(server.DiscoveryDateRange[0]) {
  193. // Next server that will enter its discovery date range.
  194. if nextServerAdd == (time.Time{}) || server.DiscoveryDateRange[0].Before(nextServerAdd) {
  195. nextServerAdd = server.DiscoveryDateRange[0]
  196. }
  197. } else if now.Before(server.DiscoveryDateRange[1]) {
  198. discoverableServers = append(discoverableServers, server)
  199. // Next server that will exit its discovery date range.
  200. if nextServerRemove == (time.Time{}) || server.DiscoveryDateRange[1].Before(nextServerRemove) {
  201. nextServerRemove = server.DiscoveryDateRange[1]
  202. }
  203. }
  204. }
  205. }
  206. // The next time the set of servers eligible for discovery changes is
  207. // whichever occurs first: the next time a server enters its discovery
  208. // discovery date range or the next time a server exits its discovery
  209. // date range.
  210. nextUpdate = nextServerAdd
  211. if nextServerAdd == (time.Time{}) ||
  212. (nextServerRemove.Before(nextServerAdd) && nextServerRemove != (time.Time{})) {
  213. nextUpdate = nextServerRemove
  214. }
  215. return discoverableServers, nextUpdate
  216. }