|
|
@@ -29,6 +29,7 @@ import (
|
|
|
"os"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
+ "sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
|
|
|
@@ -57,12 +58,14 @@ var (
|
|
|
datastoreInproxyCommonCompartmentIDsKey = []byte("inproxyCommonCompartmentIDs")
|
|
|
datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
|
|
|
datastorePersistentStatTypeFailedTunnel = string(datastoreFailedTunnelStatsBucket)
|
|
|
+ datastoreCheckServerEntryTagsEndTimeKey = "checkServerEntryTagsEndTime"
|
|
|
datastoreServerEntryFetchGCThreshold = 10
|
|
|
|
|
|
datastoreReferenceCountMutex sync.RWMutex
|
|
|
datastoreReferenceCount int64
|
|
|
datastoreMutex sync.RWMutex
|
|
|
activeDatastoreDB *datastoreDB
|
|
|
+ disableCheckServerEntryTags atomic.Bool
|
|
|
)
|
|
|
|
|
|
// OpenDataStore opens and initializes the singleton datastore instance.
|
|
|
@@ -595,6 +598,7 @@ type ServerEntryIterator struct {
|
|
|
serverEntryIndex int
|
|
|
isTacticsServerEntryIterator bool
|
|
|
isTargetServerEntryIterator bool
|
|
|
+ isPruneServerEntryIterator bool
|
|
|
hasNextTargetServerEntry bool
|
|
|
targetServerEntry *protocol.ServerEntry
|
|
|
}
|
|
|
@@ -658,6 +662,23 @@ func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error)
|
|
|
return iterator, nil
|
|
|
}
|
|
|
|
|
|
+func NewPruneServerEntryIterator(config *Config) (*ServerEntryIterator, error) {
|
|
|
+
|
|
|
+ // There is no TargetServerEntry case when pruning.
|
|
|
+
|
|
|
+ iterator := &ServerEntryIterator{
|
|
|
+ config: config,
|
|
|
+ isPruneServerEntryIterator: true,
|
|
|
+ }
|
|
|
+
|
|
|
+ err := iterator.reset(true)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return iterator, nil
|
|
|
+}
|
|
|
+
|
|
|
// newTargetServerEntryIterator is a helper for initializing the TargetServerEntry case
|
|
|
func newTargetServerEntryIterator(config *Config, isTactics bool) (bool, *ServerEntryIterator, error) {
|
|
|
|
|
|
@@ -765,12 +786,21 @@ func (iterator *ServerEntryIterator) reset(isInitialRound bool) error {
|
|
|
serverEntryIDs = make([][]byte, 0)
|
|
|
shuffleHead := 0
|
|
|
|
|
|
+ // The prune case, isPruneServerEntryIterator, skips all
|
|
|
+ // move-to-front operations and uses a pure random shuffle in order
|
|
|
+ // to uniformly select server entries to prune check. There may be a
|
|
|
+ // benefit to inverting the move and move affinity and potential
|
|
|
+ // replay servers to the _back_ if they're less likely to be pruned;
|
|
|
+ // however, the replay logic here doesn't check the replay TTL and
|
|
|
+ // even potential replay servers might be pruned.
|
|
|
+
|
|
|
var affinityServerEntryID []byte
|
|
|
|
|
|
// In the first round only, move any server affinity candiate to the
|
|
|
// very first position.
|
|
|
|
|
|
- if isInitialRound &&
|
|
|
+ if !iterator.isPruneServerEntryIterator &&
|
|
|
+ isInitialRound &&
|
|
|
iterator.applyServerAffinity {
|
|
|
|
|
|
affinityServerEntryID = bucket.get(datastoreAffinityServerEntryIDKey)
|
|
|
@@ -812,7 +842,8 @@ func (iterator *ServerEntryIterator) reset(isInitialRound bool) error {
|
|
|
|
|
|
p := iterator.config.GetParameters().Get()
|
|
|
|
|
|
- if (isInitialRound || p.WeightedCoinFlip(parameters.ReplayLaterRoundMoveToFrontProbability)) &&
|
|
|
+ if !iterator.isPruneServerEntryIterator &&
|
|
|
+ (isInitialRound || p.WeightedCoinFlip(parameters.ReplayLaterRoundMoveToFrontProbability)) &&
|
|
|
p.Int(parameters.ReplayCandidateCount) != 0 {
|
|
|
|
|
|
networkID := []byte(iterator.config.GetNetworkID())
|
|
|
@@ -1068,7 +1099,11 @@ func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
|
|
|
|
|
|
// Check filter requirements
|
|
|
|
|
|
- if iterator.isTacticsServerEntryIterator {
|
|
|
+ if iterator.isPruneServerEntryIterator {
|
|
|
+ // No region filter for the prune case.
|
|
|
+ break
|
|
|
+
|
|
|
+ } else if iterator.isTacticsServerEntryIterator {
|
|
|
|
|
|
// Tactics doesn't filter by egress region.
|
|
|
if len(serverEntry.GetSupportedTacticsProtocols()) > 0 {
|
|
|
@@ -1103,23 +1138,28 @@ func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.Serv
|
|
|
// PruneServerEntry deletes the server entry, along with associated data,
|
|
|
// corresponding to the specified server entry tag. Pruning is subject to an
|
|
|
// age check. In the case of an error, a notice is emitted.
|
|
|
-func PruneServerEntry(config *Config, serverEntryTag string) {
|
|
|
- err := pruneServerEntry(config, serverEntryTag)
|
|
|
+func PruneServerEntry(config *Config, serverEntryTag string) bool {
|
|
|
+ pruned, err := pruneServerEntry(config, serverEntryTag)
|
|
|
if err != nil {
|
|
|
NoticeWarning(
|
|
|
"PruneServerEntry failed: %s: %s",
|
|
|
serverEntryTag, errors.Trace(err))
|
|
|
- return
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if pruned {
|
|
|
+ NoticePruneServerEntry(serverEntryTag)
|
|
|
}
|
|
|
- NoticePruneServerEntry(serverEntryTag)
|
|
|
+ return pruned
|
|
|
}
|
|
|
|
|
|
-func pruneServerEntry(config *Config, serverEntryTag string) error {
|
|
|
+func pruneServerEntry(config *Config, serverEntryTag string) (bool, error) {
|
|
|
|
|
|
minimumAgeForPruning := config.GetParameters().Get().Duration(
|
|
|
parameters.ServerEntryMinimumAgeForPruning)
|
|
|
|
|
|
- return datastoreUpdate(func(tx *datastoreTx) error {
|
|
|
+ pruned := false
|
|
|
+
|
|
|
+ err := datastoreUpdate(func(tx *datastoreTx) error {
|
|
|
|
|
|
serverEntries := tx.bucket(datastoreServerEntriesBucket)
|
|
|
serverEntryTags := tx.bucket(datastoreServerEntryTagsBucket)
|
|
|
@@ -1196,8 +1236,12 @@ func pruneServerEntry(config *Config, serverEntryTag string) error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ pruned = true
|
|
|
+
|
|
|
return nil
|
|
|
})
|
|
|
+
|
|
|
+ return pruned, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
// DeleteServerEntry deletes the specified server entry and associated data.
|
|
|
@@ -1592,16 +1636,26 @@ func CountUnreportedPersistentStats() int {
|
|
|
// set to StateReporting. If the records are successfully reported, clear them
|
|
|
// with ClearReportedPersistentStats. If the records are not successfully
|
|
|
// reported, restore them with PutBackUnreportedPersistentStats.
|
|
|
-func TakeOutUnreportedPersistentStats(config *Config) (map[string][][]byte, error) {
|
|
|
+func TakeOutUnreportedPersistentStats(
|
|
|
+ config *Config,
|
|
|
+ adjustMaxSendBytes int) (map[string][][]byte, int, error) {
|
|
|
+
|
|
|
+ // TODO: add a failsafe like disableCheckServerEntryTags, to avoid repeatedly resending
|
|
|
+ // persistent stats in the case of a local error? Also consider just dropping persistent stats
|
|
|
+ // which fail to send due to a network disconnection, rather than invoking
|
|
|
+ // PutBackUnreportedPersistentStats -- especially if it's likely that the server received the
|
|
|
+ // stats and the disconnection occurs just before the request is acknowledged.
|
|
|
|
|
|
stats := make(map[string][][]byte)
|
|
|
|
|
|
maxSendBytes := config.GetParameters().Get().Int(
|
|
|
parameters.PersistentStatsMaxSendBytes)
|
|
|
|
|
|
- err := datastoreUpdate(func(tx *datastoreTx) error {
|
|
|
+ maxSendBytes -= adjustMaxSendBytes
|
|
|
+
|
|
|
+ sendBytes := 0
|
|
|
|
|
|
- sendBytes := 0
|
|
|
+ err := datastoreUpdate(func(tx *datastoreTx) error {
|
|
|
|
|
|
for _, statType := range persistentStatTypes {
|
|
|
|
|
|
@@ -1653,10 +1707,10 @@ func TakeOutUnreportedPersistentStats(config *Config) (map[string][][]byte, erro
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
- return nil, errors.Trace(err)
|
|
|
+ return nil, 0, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- return stats, nil
|
|
|
+ return stats, sendBytes, nil
|
|
|
}
|
|
|
|
|
|
// PutBackUnreportedPersistentStats restores a list of persistent
|
|
|
@@ -1752,6 +1806,171 @@ func resetAllPersistentStatsToUnreported() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// IsCheckServerEntryTagsDue indicates that a new prune check is due, based on
|
|
|
+// the time of the previous check ending.
|
|
|
+func IsCheckServerEntryTagsDue(config *Config) bool {
|
|
|
+
|
|
|
+ // disableCheckServerEntryTags is a failsafe, enabled in error cases below
|
|
|
+ // and in UpdateCheckServerEntryTagsEndTime to prevent constantly
|
|
|
+ // resending prune check payloads if the scheduling mechanism fails.
|
|
|
+ if disableCheckServerEntryTags.Load() {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ // Whether the next check is due is based on time elapsed since the time
|
|
|
+ // of the previous check ending, with the elapsed time set in tactics.
|
|
|
+ // The previous end time, rather the next due time, is stored, to allow
|
|
|
+ // changes to this tactic to have immediate effect.
|
|
|
+
|
|
|
+ p := config.GetParameters().Get()
|
|
|
+ enabled := p.Bool(parameters.CheckServerEntryTagsEnabled)
|
|
|
+ checkPeriod := p.Duration(parameters.CheckServerEntryTagsPeriod)
|
|
|
+ p.Close()
|
|
|
+
|
|
|
+ if !enabled {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ lastEndTimeValue, err := GetKeyValue(datastoreCheckServerEntryTagsEndTimeKey)
|
|
|
+ if err != nil {
|
|
|
+ NoticeWarning("IsCheckServerEntryTagsDue GetKeyValue failed: %s", errors.Trace(err))
|
|
|
+ disableCheckServerEntryTags.Store(true)
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ if lastEndTimeValue == "" {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ lastEndTime, err := time.Parse(time.RFC3339, lastEndTimeValue)
|
|
|
+ if err != nil {
|
|
|
+ NoticeWarning("IsCheckServerEntryTagsDue time.Parse failed: %s", errors.Trace(err))
|
|
|
+ disableCheckServerEntryTags.Store(true)
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ return time.Now().After(lastEndTime.Add(checkPeriod))
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateCheckServerEntryTagsEndTime should be called after a prune check is
|
|
|
+// complete. The end time is set, extending the time until the next check,
|
|
|
+// unless there's a sufficiently high ratio of pruned servers from the last
|
|
|
+// check.
|
|
|
+func UpdateCheckServerEntryTagsEndTime(config *Config, checkCount int, pruneCount int) {
|
|
|
+
|
|
|
+ p := config.GetParameters().Get()
|
|
|
+ ratio := p.Float(parameters.CheckServerEntryTagsRepeatRatio)
|
|
|
+ minimum := p.Int(parameters.CheckServerEntryTagsRepeatMinimum)
|
|
|
+ p.Close()
|
|
|
+
|
|
|
+ // When there's a sufficiently high ratio of pruned/checked from
|
|
|
+ // the _previous_ check operation, don't mark the check as ended. This
|
|
|
+ // will result in the next status request performing another check. It's
|
|
|
+ // assumed that the ratio will decrease over the course of repeated
|
|
|
+ // checks as more server entries are pruned, and random selection for
|
|
|
+ // checking will include fewer and fewer invalid server entry tags.
|
|
|
+ //
|
|
|
+ // The rate of repeated checking is also limited by the status request
|
|
|
+ // schedule, where PsiphonAPIStatusRequestPeriodMin/Max defaults to 5-10
|
|
|
+ // minutes.
|
|
|
+
|
|
|
+ if pruneCount >= minimum && ratio > 0 && float64(pruneCount)/float64(checkCount) >= ratio {
|
|
|
+ NoticeInfo("UpdateCheckServerEntryTagsEndTime: %d/%d: repeat", pruneCount, checkCount)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ err := SetKeyValue(
|
|
|
+ datastoreCheckServerEntryTagsEndTimeKey,
|
|
|
+ time.Now().Format(time.RFC3339))
|
|
|
+ if err != nil {
|
|
|
+ NoticeWarning("UpdateCheckServerEntryTagsEndTime SetKeyValue failed: %s", errors.Trace(err))
|
|
|
+ disableCheckServerEntryTags.Store(true)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ NoticeInfo("UpdateCheckServerEntryTagsEndTime: %d/%d: done", pruneCount, checkCount)
|
|
|
+}
|
|
|
+
|
|
|
+// GetCheckServerEntryTags returns a random selection of server entry tags to
|
|
|
+// be checked for pruning. An empty list is returned if a check is not yet
|
|
|
+// due.
|
|
|
+func GetCheckServerEntryTags(config *Config) ([]string, int, error) {
|
|
|
+
|
|
|
+ if disableCheckServerEntryTags.Load() {
|
|
|
+ return nil, 0, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if !IsCheckServerEntryTagsDue(config) {
|
|
|
+ return nil, 0, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // maxSendBytes is intended to limit the request memory overhead and
|
|
|
+ // network size. maxWorkTime ensures that slow devices -- with datastore
|
|
|
+ // operations and JSON unmarshaling particularly slow -- will launch a
|
|
|
+ // request in a timely fashion.
|
|
|
+
|
|
|
+ p := config.GetParameters().Get()
|
|
|
+ maxSendBytes := p.Int(parameters.CheckServerEntryTagsMaxSendBytes)
|
|
|
+ maxWorkTime := p.Duration(parameters.CheckServerEntryTagsMaxWorkTime)
|
|
|
+ minimumAgeForPruning := p.Duration(parameters.ServerEntryMinimumAgeForPruning)
|
|
|
+ p.Close()
|
|
|
+
|
|
|
+ iterator, err := NewPruneServerEntryIterator(config)
|
|
|
+ if err != nil {
|
|
|
+ return nil, 0, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ var checkTags []string
|
|
|
+ bytes := 0
|
|
|
+ startWork := time.Now()
|
|
|
+
|
|
|
+ for {
|
|
|
+
|
|
|
+ serverEntry, err := iterator.Next()
|
|
|
+ if err != nil {
|
|
|
+ return nil, 0, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if serverEntry == nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ // Skip checking the server entry if PruneServerEntry won't prune it
|
|
|
+ // anyway, due to ServerEntryMinimumAgeForPruning.
|
|
|
+ serverEntryLocalTimestamp, err := time.Parse(time.RFC3339, serverEntry.LocalTimestamp)
|
|
|
+ if err != nil {
|
|
|
+ return nil, 0, errors.Trace(err)
|
|
|
+ }
|
|
|
+ if serverEntryLocalTimestamp.Add(minimumAgeForPruning).After(time.Now()) {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Server entries with replay records are not skipped. It's possible that replay records are
|
|
|
+ // retained, due to ReplayRetainFailedProbability, even if the server entry is no longer
|
|
|
+ // valid. Inspecting replay would also require an additional JSON unmarshal of the
|
|
|
+ // DialParameters, in order to check the replay TTL.
|
|
|
+ //
|
|
|
+ // A potential future enhancement could be to add and check a new index that tracks how
|
|
|
+ // recently a server entry connection got as far as completing the SSH handshake, which
|
|
|
+ // verifies the Psiphon server running at that server entry network address. This would
|
|
|
+ // exclude from prune checking all recently known-valid servers regardless of whether they
|
|
|
+ // ultimately pass the liveness test, establish a tunnel, or reach the replay data transfer
|
|
|
+ // targets.
|
|
|
+
|
|
|
+ checkTags = append(checkTags, serverEntry.Tag)
|
|
|
+
|
|
|
+ // Approximate the size of the JSON encoding of the string array,
|
|
|
+ // including quotes and commas.
|
|
|
+ bytes += len(serverEntry.Tag) + 3
|
|
|
+
|
|
|
+ if bytes >= maxSendBytes || (maxWorkTime > 0 && time.Since(startWork) > maxWorkTime) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return checkTags, bytes, nil
|
|
|
+}
|
|
|
+
|
|
|
// CountSLOKs returns the total number of SLOK records.
|
|
|
func CountSLOKs() int {
|
|
|
|