|
|
@@ -23,6 +23,7 @@ package discovery
|
|
|
import (
|
|
|
"context"
|
|
|
"net"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/server/psinet"
|
|
|
@@ -39,16 +40,16 @@ type clock interface {
|
|
|
NewTimer(d time.Duration) timer
|
|
|
}
|
|
|
|
|
|
-// RealClock implements clock using the time package in the Go standard library.
|
|
|
-type RealClock struct{}
|
|
|
+// realClock implements clock using the time package in the Go standard library.
|
|
|
+type realClock struct{}
|
|
|
|
|
|
-func (RealClock) Now() time.Time { return time.Now() }
|
|
|
+func (realClock) Now() time.Time { return time.Now() }
|
|
|
|
|
|
-func (RealClock) Until(t time.Time) time.Duration { return time.Until(t) }
|
|
|
+func (realClock) Until(t time.Time) time.Duration { return time.Until(t) }
|
|
|
|
|
|
-func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) }
|
|
|
+func (realClock) After(d time.Duration) <-chan time.Time { return time.After(d) }
|
|
|
|
|
|
-func (RealClock) NewTimer(d time.Duration) timer { return &realTimer{t: time.NewTimer(d)} }
|
|
|
+func (realClock) NewTimer(d time.Duration) timer { return &realTimer{t: time.NewTimer(d)} }
|
|
|
|
|
|
// timer is an interface matching what Timer in the time package provides in
|
|
|
// the Go standard library, which enables using implementations in tests that
|
|
|
@@ -93,42 +94,57 @@ type DiscoveryStrategy interface {
|
|
|
// Discovery is the combination of a discovery strategy with a set of discovery
|
|
|
// servers. It's safe for concurrent usage.
|
|
|
type Discovery struct {
|
|
|
- all []*psinet.DiscoveryServer
|
|
|
- strategy DiscoveryStrategy
|
|
|
+ clk clock
|
|
|
+ all []*psinet.DiscoveryServer
|
|
|
+ strategy DiscoveryStrategy
|
|
|
+ cancelFunc context.CancelFunc
|
|
|
+ wg *sync.WaitGroup
|
|
|
}
|
|
|
|
|
|
-// NewDiscovery creates a new Discovery instance, which uses the specified
|
|
|
-// strategy with the given discovery servers. Servers are discoverable when the
|
|
|
-// current time falls within their discovery window, i.e.
|
|
|
-// DiscoveryDateRange[0] <= clk.Now() < DiscoveryDateRange[1].
|
|
|
-//
|
|
|
-// Cancelling ctx stops the underlying go routines and, therefore, ctx should
|
|
|
-// be cancelled as soon as Discovery is no longer needed. Discovery should not
|
|
|
-// be used after this because the set of discoverable servers will no longer be
|
|
|
-// updated so it may contain servers that are no longer in their discover
|
|
|
-// window and exclude servers that are.
|
|
|
-func NewDiscovery(
|
|
|
- ctx context.Context,
|
|
|
- clk clock,
|
|
|
+// MakeDiscovery creates a new Discovery instance, which uses the specified
|
|
|
+// strategy with the given discovery servers.
|
|
|
+func MakeDiscovery(
|
|
|
servers []*psinet.DiscoveryServer,
|
|
|
- strategy DiscoveryStrategy) (*Discovery, error) {
|
|
|
+ strategy DiscoveryStrategy) *Discovery {
|
|
|
|
|
|
- current, nextUpdate := discoverableServers(servers, clk)
|
|
|
+ return makeDiscovery(realClock{}, servers, strategy)
|
|
|
+}
|
|
|
|
|
|
- strategy.serversChanged(current)
|
|
|
+func makeDiscovery(
|
|
|
+ clk clock,
|
|
|
+ servers []*psinet.DiscoveryServer,
|
|
|
+ strategy DiscoveryStrategy) *Discovery {
|
|
|
|
|
|
d := Discovery{
|
|
|
+ clk: clk,
|
|
|
all: servers,
|
|
|
strategy: strategy,
|
|
|
+ wg: new(sync.WaitGroup),
|
|
|
}
|
|
|
|
|
|
+ return &d
|
|
|
+}
|
|
|
+
|
|
|
+// Start starts discovery. Servers are discoverable when the current time
|
|
|
+// falls within their discovery date range, i.e. DiscoveryDateRange[0] <=
|
|
|
+// clk.Now() < DiscoveryDateRange[1].
|
|
|
+func (d *Discovery) Start() {
|
|
|
+
|
|
|
+ current, nextUpdate := discoverableServers(d.all, d.clk)
|
|
|
+
|
|
|
+ d.strategy.serversChanged(current)
|
|
|
+
|
|
|
+ ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
+ d.cancelFunc = cancelFunc
|
|
|
+ d.wg.Add(1)
|
|
|
+
|
|
|
// Update the set of discovery servers used by the chosen discovery
|
|
|
// algorithm, and therefore discoverable with SelectServers, everytime a
|
|
|
- // server enters, or exits, its discovery window.
|
|
|
+ // server enters, or exits, its discovery date range.
|
|
|
go func() {
|
|
|
for ctx.Err() == nil {
|
|
|
// Wait until the next time a server enters, or exits, its
|
|
|
- // discovery window.
|
|
|
+ // discovery date range.
|
|
|
//
|
|
|
// Warning: NewTimer uses the monotonic clock but discovery uses
|
|
|
// the wall clock. If there is wall clock drift, then it is
|
|
|
@@ -138,7 +154,7 @@ func NewDiscovery(
|
|
|
// not handled. One solution would be to periodically check if set
|
|
|
// of discoverable servers has changed in conjunction with using a
|
|
|
// timer.
|
|
|
- t := clk.NewTimer(clk.Until(nextUpdate))
|
|
|
+ t := d.clk.NewTimer(d.clk.Until(nextUpdate))
|
|
|
|
|
|
select {
|
|
|
case <-t.C():
|
|
|
@@ -148,18 +164,18 @@ func NewDiscovery(
|
|
|
}
|
|
|
t.Stop()
|
|
|
|
|
|
- // Note: servers with a discovery window in the past are not
|
|
|
- // removed from d.all incase the wall clock has drifted; otherwise,
|
|
|
- // we risk removing them prematurely.
|
|
|
- servers, nextUpdate = discoverableServers(d.all, clk)
|
|
|
+ // Note: servers with a discovery date range in the past are not
|
|
|
+ // removed from d.all in case the wall clock has drifted;
|
|
|
+ // otherwise, we risk removing them prematurely.
|
|
|
+ servers, nextUpdate := discoverableServers(d.all, d.clk)
|
|
|
|
|
|
// Update the set of discoverable servers.
|
|
|
- strategy.serversChanged(servers)
|
|
|
+ d.strategy.serversChanged(servers)
|
|
|
|
|
|
if nextUpdate == (time.Time{}) {
|
|
|
- // The discovery windows of all candidate discovery servers are
|
|
|
- // in the past. No more serversChanged calls willl be made to
|
|
|
- // DiscoveryStrategy.
|
|
|
+ // The discovery date range of all candidate discovery servers
|
|
|
+ // are in the past. No more serversChanged calls will be made
|
|
|
+ // to DiscoveryStrategy.
|
|
|
//
|
|
|
// Warning: at this point if the wall clock has drifted but
|
|
|
// will correct itself in the future such that the set of
|
|
|
@@ -172,9 +188,18 @@ func NewDiscovery(
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
+ d.wg.Done()
|
|
|
}()
|
|
|
+}
|
|
|
|
|
|
- return &d, nil
|
|
|
+// Stop stops discovery and cleans up underlying resources. Stop should be
|
|
|
+// invoked as soon as Discovery is no longer needed. Discovery should not be
|
|
|
+// used after this because the set of discoverable servers will no longer be
|
|
|
+// updated, so it may contain servers that are no longer discoverable and
|
|
|
+// exclude servers that are.
|
|
|
+func (d *Discovery) Stop() {
|
|
|
+ d.cancelFunc()
|
|
|
+ d.wg.Wait()
|
|
|
}
|
|
|
|
|
|
// SelectServers selects new server entries to be "discovered" by the client,
|
|
|
@@ -186,7 +211,7 @@ func (d *Discovery) SelectServers(clientIP net.IP) []*psinet.DiscoveryServer {
|
|
|
|
|
|
// discoverableServers returns all servers in discoveryServers that are currently
|
|
|
// eligible for discovery along with the next time that a server in
|
|
|
-// discoveryServers will enter, or exit, its discovery window.
|
|
|
+// discoveryServers will enter, or exit, its discovery date range.
|
|
|
func discoverableServers(
|
|
|
discoveryServers []*psinet.DiscoveryServer,
|
|
|
clk clock) (discoverableServers []*psinet.DiscoveryServer, nextUpdate time.Time) {
|
|
|
@@ -198,29 +223,27 @@ func discoverableServers(
|
|
|
var nextServerRemove time.Time
|
|
|
|
|
|
for _, server := range discoveryServers {
|
|
|
- // All servers that are discoverable on this day are eligible for discovery
|
|
|
if len(server.DiscoveryDateRange) == 2 {
|
|
|
- if !now.Before(server.DiscoveryDateRange[0]) &&
|
|
|
- now.Before(server.DiscoveryDateRange[1]) {
|
|
|
-
|
|
|
+ if now.Before(server.DiscoveryDateRange[0]) {
|
|
|
+ // Next server that will enter its discovery date range.
|
|
|
+ if nextServerAdd == (time.Time{}) || server.DiscoveryDateRange[0].Before(nextServerAdd) {
|
|
|
+ nextServerAdd = server.DiscoveryDateRange[0]
|
|
|
+ }
|
|
|
+ } else if now.Before(server.DiscoveryDateRange[1]) {
|
|
|
discoverableServers = append(discoverableServers, server)
|
|
|
|
|
|
- // Next server that will exit its discovery window
|
|
|
+ // Next server that will exit its discovery date range.
|
|
|
if nextServerRemove == (time.Time{}) || server.DiscoveryDateRange[1].Before(nextServerRemove) {
|
|
|
nextServerRemove = server.DiscoveryDateRange[1]
|
|
|
}
|
|
|
- } else if now.Before(server.DiscoveryDateRange[0]) {
|
|
|
- // Next server that will enter its discovery window
|
|
|
- if nextServerAdd == (time.Time{}) || server.DiscoveryDateRange[0].Before(nextServerAdd) {
|
|
|
- nextServerAdd = server.DiscoveryDateRange[0]
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// The next time the set of servers eligible for discovery changes is
|
|
|
// whichever occurs first: the next time a server enters its discovery
|
|
|
- // window or the next time a server exits its discovery window.
|
|
|
+ // discovery date range or the next time a server exits its discovery
|
|
|
+ // date range.
|
|
|
nextUpdate = nextServerAdd
|
|
|
if nextServerAdd == (time.Time{}) ||
|
|
|
(nextServerRemove.Before(nextServerAdd) && nextServerRemove != (time.Time{})) {
|