|
@@ -58,10 +58,18 @@ const (
|
|
|
urlETagsBucket = "urlETags"
|
|
urlETagsBucket = "urlETags"
|
|
|
keyValueBucket = "keyValues"
|
|
keyValueBucket = "keyValues"
|
|
|
tunnelStatsBucket = "tunnelStats"
|
|
tunnelStatsBucket = "tunnelStats"
|
|
|
|
|
+ remoteServerListStatsBucket = "remoteServerListStats"
|
|
|
slokBucket = "SLOKs"
|
|
slokBucket = "SLOKs"
|
|
|
rankedServerEntryCount = 100
|
|
rankedServerEntryCount = 100
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+const (
|
|
|
|
|
+ DATA_STORE_LAST_CONNECTED_KEY = "lastConnected"
|
|
|
|
|
+ DATA_STORE_OSL_DIRECTORY_KEY = "OSLDirectory"
|
|
|
|
|
+ PERSISTENT_STAT_TYPE_TUNNEL = tunnelStatsBucket
|
|
|
|
|
+ PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
var singleton dataStore
|
|
var singleton dataStore
|
|
|
|
|
|
|
|
// InitDataStore initializes the singleton instance of dataStore. This
|
|
// InitDataStore initializes the singleton instance of dataStore. This
|
|
@@ -105,6 +113,7 @@ func InitDataStore(config *Config) (err error) {
|
|
|
urlETagsBucket,
|
|
urlETagsBucket,
|
|
|
keyValueBucket,
|
|
keyValueBucket,
|
|
|
tunnelStatsBucket,
|
|
tunnelStatsBucket,
|
|
|
|
|
+ remoteServerListStatsBucket,
|
|
|
slokBucket,
|
|
slokBucket,
|
|
|
}
|
|
}
|
|
|
for _, bucket := range requiredBuckets {
|
|
for _, bucket := range requiredBuckets {
|
|
@@ -139,7 +148,7 @@ func InitDataStore(config *Config) (err error) {
|
|
|
migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
|
|
migrateEntries(migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- resetAllTunnelStatsToUnreported()
|
|
|
|
|
|
|
+ resetAllPersistentStatsToUnreported()
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
return err
|
|
return err
|
|
@@ -806,196 +815,241 @@ func GetKeyValue(key string) (value string, err error) {
|
|
|
return value, nil
|
|
return value, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// Tunnel stats records in the tunnelStatsStateUnreported
|
|
|
|
|
|
|
+// Persistent stat records in the persistentStatStateUnreported
|
|
|
// state are available for take out.
|
|
// state are available for take out.
|
|
|
-// Records in the tunnelStatsStateReporting have been
|
|
|
|
|
-// taken out and are pending either deleting (for a
|
|
|
|
|
-// successful request) or change to StateUnreported (for
|
|
|
|
|
-// a failed request).
|
|
|
|
|
-// All tunnel stats records are reverted to StateUnreported
|
|
|
|
|
|
|
+//
|
|
|
|
|
+// Records in the persistentStatStateReporting have been taken
|
|
|
|
|
+// out and are pending either deletion (for a successful request)
|
|
|
|
|
+// or change to StateUnreported (for a failed request).
|
|
|
|
|
+//
|
|
|
|
|
+// All persistent stat records are reverted to StateUnreported
|
|
|
// when the datastore is initialized at start up.
|
|
// when the datastore is initialized at start up.
|
|
|
|
|
|
|
|
-var tunnelStatsStateUnreported = []byte("0")
|
|
|
|
|
-var tunnelStatsStateReporting = []byte("1")
|
|
|
|
|
|
|
+var persistentStatStateUnreported = []byte("0")
|
|
|
|
|
+var persistentStatStateReporting = []byte("1")
|
|
|
|
|
|
|
|
-// StoreTunnelStats adds a new tunnel stats record, which is
|
|
|
|
|
-// set to StateUnreported and is an immediate candidate for
|
|
|
|
|
|
|
+var persistentStatTypes = []string{
|
|
|
|
|
+ PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
|
|
|
|
|
+ PERSISTENT_STAT_TYPE_TUNNEL,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// StorePersistentStats adds a new persistent stat record, which
|
|
|
|
|
+// is set to StateUnreported and is an immediate candidate for
|
|
|
// reporting.
|
|
// reporting.
|
|
|
-// tunnelStats is a JSON byte array containing fields as
|
|
|
|
|
-// required by the Psiphon server API (see RecordTunnelStats).
|
|
|
|
|
-// It's assumed that the JSON value contains enough unique
|
|
|
|
|
-// information for the value to function as a key in the
|
|
|
|
|
-// key/value datastore. This assumption is currently satisfied
|
|
|
|
|
-// by the fields sessionId + tunnelNumber.
|
|
|
|
|
-func StoreTunnelStats(tunnelStats []byte) error {
|
|
|
|
|
|
|
+//
|
|
|
|
|
+// The stat is a JSON byte array containing fields as
|
|
|
|
|
+// required by the Psiphon server API. It's assumed that the
|
|
|
|
|
+// JSON value contains enough unique information for the value to
|
|
|
|
|
+// function as a key in the key/value datastore. This assumption
|
|
|
|
|
+// is currently satisfied by the fields sessionId + tunnelNumber
|
|
|
|
|
+// for tunnel stats, and URL + ETag for remote server list stats.
|
|
|
|
|
+func StorePersistentStat(statType string, stat []byte) error {
|
|
|
checkInitDataStore()
|
|
checkInitDataStore()
|
|
|
|
|
|
|
|
|
|
+ if !common.Contains(persistentStatTypes, statType) {
|
|
|
|
|
+ return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
- bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
|
|
- err := bucket.Put(tunnelStats, tunnelStatsStateUnreported)
|
|
|
|
|
|
|
+ bucket := tx.Bucket([]byte(statType))
|
|
|
|
|
+ err := bucket.Put(stat, persistentStatStateUnreported)
|
|
|
return err
|
|
return err
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return common.ContextError(err)
|
|
return common.ContextError(err)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// CountUnreportedTunnelStats returns the number of tunnel
|
|
|
|
|
-// stats records in StateUnreported.
|
|
|
|
|
-func CountUnreportedTunnelStats() int {
|
|
|
|
|
|
|
+// CountUnreportedPersistentStats returns the number of persistent
|
|
|
|
|
+// stat records in StateUnreported.
|
|
|
|
|
+func CountUnreportedPersistentStats() int {
|
|
|
checkInitDataStore()
|
|
checkInitDataStore()
|
|
|
|
|
|
|
|
unreported := 0
|
|
unreported := 0
|
|
|
|
|
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
- bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
|
|
- cursor := bucket.Cursor()
|
|
|
|
|
- for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
|
|
|
|
|
- if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
|
|
|
|
|
- unreported++
|
|
|
|
|
- break
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for _, statType := range persistentStatTypes {
|
|
|
|
|
+
|
|
|
|
|
+ bucket := tx.Bucket([]byte(statType))
|
|
|
|
|
+ cursor := bucket.Cursor()
|
|
|
|
|
+ for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
|
|
|
|
|
+ if 0 == bytes.Compare(value, persistentStatStateUnreported) {
|
|
|
|
|
+ unreported++
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
return nil
|
|
return nil
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- NoticeAlert("CountUnreportedTunnelStats failed: %s", err)
|
|
|
|
|
|
|
+ NoticeAlert("CountUnreportedPersistentStats failed: %s", err)
|
|
|
return 0
|
|
return 0
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return unreported
|
|
return unreported
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// TakeOutUnreportedTunnelStats returns up to maxCount tunnel
|
|
|
|
|
-// stats records that are in StateUnreported. The records are set
|
|
|
|
|
-// to StateReporting. If the records are successfully reported,
|
|
|
|
|
-// clear them with ClearReportedTunnelStats. If the records are
|
|
|
|
|
-// not successfully reported, restore them with
|
|
|
|
|
-// PutBackUnreportedTunnelStats.
|
|
|
|
|
-func TakeOutUnreportedTunnelStats(maxCount int) ([][]byte, error) {
|
|
|
|
|
|
|
+// TakeOutUnreportedPersistentStats returns up to maxCount persistent
|
|
|
|
|
+// stats records that are in StateUnreported. The records are set to
|
|
|
|
|
+// StateReporting. If the records are successfully reported, clear them
|
|
|
|
|
+// with ClearReportedPersistentStats. If the records are not successfully
|
|
|
|
|
+// reported, restore them with PutBackUnreportedPersistentStats.
|
|
|
|
|
+func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
|
|
|
checkInitDataStore()
|
|
checkInitDataStore()
|
|
|
|
|
|
|
|
- tunnelStats := make([][]byte, 0)
|
|
|
|
|
|
|
+ stats := make(map[string][][]byte)
|
|
|
|
|
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
- bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
|
|
- cursor := bucket.Cursor()
|
|
|
|
|
- for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
|
|
|
|
|
|
|
|
|
|
- // Perform a test JSON unmarshaling. In case of data corruption or a bug,
|
|
|
|
|
- // skip the record.
|
|
|
|
|
- var jsonData interface{}
|
|
|
|
|
- err := json.Unmarshal(key, &jsonData)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- NoticeAlert(
|
|
|
|
|
- "Invalid key in TakeOutUnreportedTunnelStats: %s: %s",
|
|
|
|
|
- string(key), err)
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ count := 0
|
|
|
|
|
|
|
|
- if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
|
|
|
|
|
- // Must make a copy as slice is only valid within transaction.
|
|
|
|
|
- data := make([]byte, len(key))
|
|
|
|
|
- copy(data, key)
|
|
|
|
|
- tunnelStats = append(tunnelStats, data)
|
|
|
|
|
- if len(tunnelStats) >= maxCount {
|
|
|
|
|
|
|
+ for _, statType := range persistentStatTypes {
|
|
|
|
|
+
|
|
|
|
|
+ stats[statType] = make([][]byte, 0)
|
|
|
|
|
+
|
|
|
|
|
+ bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
|
|
+ cursor := bucket.Cursor()
|
|
|
|
|
+ for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
|
|
|
|
|
+
|
|
|
|
|
+ if count >= maxCount {
|
|
|
break
|
|
break
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // Perform a test JSON unmarshaling. In case of data corruption or a bug,
|
|
|
|
|
+ // skip the record.
|
|
|
|
|
+ var jsonData interface{}
|
|
|
|
|
+ err := json.Unmarshal(key, &jsonData)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ NoticeAlert(
|
|
|
|
|
+ "Invalid key in TakeOutUnreportedPersistentStats: %s: %s",
|
|
|
|
|
+ string(key), err)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if 0 == bytes.Compare(value, persistentStatStateUnreported) {
|
|
|
|
|
+ // Must make a copy as slice is only valid within transaction.
|
|
|
|
|
+ data := make([]byte, len(key))
|
|
|
|
|
+ copy(data, key)
|
|
|
|
|
+ stats[statType] = append(stats[statType], data)
|
|
|
|
|
+ count += 1
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
- for _, key := range tunnelStats {
|
|
|
|
|
- err := bucket.Put(key, tunnelStatsStateReporting)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for _, key := range stats[statType] {
|
|
|
|
|
+ err := bucket.Put(key, persistentStatStateReporting)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
|
|
+ }
|
|
|
return nil
|
|
return nil
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, common.ContextError(err)
|
|
return nil, common.ContextError(err)
|
|
|
}
|
|
}
|
|
|
- return tunnelStats, nil
|
|
|
|
|
|
|
+
|
|
|
|
|
+ return stats, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// PutBackUnreportedTunnelStats restores a list of tunnel
|
|
|
|
|
-// stats records to StateUnreported.
|
|
|
|
|
-func PutBackUnreportedTunnelStats(tunnelStats [][]byte) error {
|
|
|
|
|
|
|
+// PutBackUnreportedPersistentStats restores a list of persistent
|
|
|
|
|
+// stat records to StateUnreported.
|
|
|
|
|
+func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
|
|
|
checkInitDataStore()
|
|
checkInitDataStore()
|
|
|
|
|
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
- bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
|
|
- for _, key := range tunnelStats {
|
|
|
|
|
- err := bucket.Put(key, tunnelStatsStateUnreported)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for _, statType := range persistentStatTypes {
|
|
|
|
|
+
|
|
|
|
|
+ bucket := tx.Bucket([]byte(statType))
|
|
|
|
|
+ for _, key := range stats[statType] {
|
|
|
|
|
+ err := bucket.Put(key, persistentStatStateUnreported)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return common.ContextError(err)
|
|
return common.ContextError(err)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// ClearReportedTunnelStats deletes a list of tunnel
|
|
|
|
|
-// stats records that were succesdfully reported.
|
|
|
|
|
-func ClearReportedTunnelStats(tunnelStats [][]byte) error {
|
|
|
|
|
|
|
+// ClearReportedPersistentStats deletes a list of persistent
|
|
|
|
|
+// stat records that were successfully reported.
|
|
|
|
|
+func ClearReportedPersistentStats(stats map[string][][]byte) error {
|
|
|
checkInitDataStore()
|
|
checkInitDataStore()
|
|
|
|
|
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
- bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
|
|
- for _, key := range tunnelStats {
|
|
|
|
|
- err := bucket.Delete(key)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for _, statType := range persistentStatTypes {
|
|
|
|
|
+
|
|
|
|
|
+ bucket := tx.Bucket([]byte(statType))
|
|
|
|
|
+ for _, key := range stats[statType] {
|
|
|
|
|
+ err := bucket.Delete(key)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return common.ContextError(err)
|
|
return common.ContextError(err)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// resetAllTunnelStatsToUnreported sets all tunnel
|
|
|
|
|
-// stats records to StateUnreported. This reset is called
|
|
|
|
|
-// when the datastore is initialized at start up, as we do
|
|
|
|
|
-// not know if tunnel records in StateReporting were reported
|
|
|
|
|
-// or not.
|
|
|
|
|
-func resetAllTunnelStatsToUnreported() error {
|
|
|
|
|
|
|
+// resetAllPersistentStatsToUnreported sets all persistent stat
|
|
|
|
|
+// records to StateUnreported. This reset is called when the
|
|
|
|
|
+// datastore is initialized at start up, as we do not know if
|
|
|
|
|
+// persistent records in StateReporting were reported or not.
|
|
|
|
|
+func resetAllPersistentStatsToUnreported() error {
|
|
|
checkInitDataStore()
|
|
checkInitDataStore()
|
|
|
|
|
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
- bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
|
|
- resetKeys := make([][]byte, 0)
|
|
|
|
|
- cursor := bucket.Cursor()
|
|
|
|
|
- for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
|
|
|
|
|
- resetKeys = append(resetKeys, key)
|
|
|
|
|
- }
|
|
|
|
|
- // TODO: data mutation is done outside cursor. Is this
|
|
|
|
|
- // strictly necessary in this case?
|
|
|
|
|
- // https://godoc.org/github.com/boltdb/bolt#Cursor
|
|
|
|
|
- for _, key := range resetKeys {
|
|
|
|
|
- err := bucket.Put(key, tunnelStatsStateUnreported)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for _, statType := range persistentStatTypes {
|
|
|
|
|
+
|
|
|
|
|
+ bucket := tx.Bucket([]byte(statType))
|
|
|
|
|
+ resetKeys := make([][]byte, 0)
|
|
|
|
|
+ cursor := bucket.Cursor()
|
|
|
|
|
+ for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
|
|
|
|
|
+ resetKeys = append(resetKeys, key)
|
|
|
|
|
+ }
|
|
|
|
|
+ // TODO: data mutation is done outside cursor. Is this
|
|
|
|
|
+ // strictly necessary in this case? As is, this means
|
|
|
|
|
+ // all stats need to be loaded into memory at once.
|
|
|
|
|
+ // https://godoc.org/github.com/boltdb/bolt#Cursor
|
|
|
|
|
+ for _, key := range resetKeys {
|
|
|
|
|
+ err := bucket.Put(key, persistentStatStateUnreported)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return common.ContextError(err)
|
|
return common.ContextError(err)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1014,6 +1068,7 @@ func DeleteSLOKs() error {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return common.ContextError(err)
|
|
return common.ContextError(err)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|