|
|
@@ -107,6 +107,16 @@ func InitDataStore(config *Config) (err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ // Run consistency checks on datastore and emit errors for diagnostics purposes
|
|
|
+ // We assume this will complete quickly for typical size Psiphon datastores.
|
|
|
+ db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := <-tx.Check()
|
|
|
+ if err != nil {
|
|
|
+ NoticeAlert("boltdb Check(): %s", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+
|
|
|
singleton.db = db
|
|
|
|
|
|
// The migrateServerEntries function requires the data store is
|
|
|
@@ -239,6 +249,18 @@ func PromoteServerEntry(ipAddress string) error {
|
|
|
checkInitDataStore()
|
|
|
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+
|
|
|
+ // Ensure the corresponding entry exists before
|
|
|
+ // inserting into rank.
|
|
|
+ bucket := tx.Bucket([]byte(serverEntriesBucket))
|
|
|
+ data := bucket.Get([]byte(ipAddress))
|
|
|
+ if data == nil {
|
|
|
+ NoticeAlert(
|
|
|
+ "PromoteServerEntry: ignoring unknown server entry: %s",
|
|
|
+ ipAddress)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
return insertRankedServerEntry(tx, ipAddress, 0)
|
|
|
})
|
|
|
|
|
|
@@ -500,7 +522,12 @@ func (iterator *ServerEntryIterator) Next() (serverEntry *ServerEntry, err error
|
|
|
var data []byte
|
|
|
err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(serverEntriesBucket))
|
|
|
- data = bucket.Get([]byte(serverEntryId))
|
|
|
+ value := bucket.Get([]byte(serverEntryId))
|
|
|
+ if value != nil {
|
|
|
+ // Must make a copy as slice is only valid within transaction.
|
|
|
+ data = make([]byte, len(value))
|
|
|
+ copy(data, value)
|
|
|
+ }
|
|
|
return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
@@ -684,7 +711,12 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
|
|
|
|
|
|
err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
|
|
|
- data = bucket.Get([]byte(region))
|
|
|
+ value := bucket.Get([]byte(region))
|
|
|
+ if value != nil {
|
|
|
+ // Must make a copy as slice is only valid within transaction.
|
|
|
+ data = make([]byte, len(value))
|
|
|
+ copy(data, value)
|
|
|
+ }
|
|
|
return nil
|
|
|
})
|
|
|
|
|
|
@@ -840,17 +872,35 @@ func TakeOutUnreportedTunnelStats(maxCount int) ([][]byte, error) {
|
|
|
bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
cursor := bucket.Cursor()
|
|
|
for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
|
|
|
+
|
|
|
+ // Perform a test JSON unmarshaling. In case of data corruption or a bug,
|
|
|
+ // skip the record.
|
|
|
+ var jsonData interface{}
|
|
|
+ err := json.Unmarshal(key, &jsonData)
|
|
|
+ if err != nil {
|
|
|
+ NoticeAlert(
|
|
|
+ "Invalid key in TakeOutUnreportedTunnelStats: %s: %s",
|
|
|
+ string(key), err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
|
|
|
- err := bucket.Put(key, tunnelStatsStateReporting)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- tunnelStats = append(tunnelStats, key)
|
|
|
+ // Must make a copy as slice is only valid within transaction.
|
|
|
+ data := make([]byte, len(key))
|
|
|
+ copy(data, key)
|
|
|
+ tunnelStats = append(tunnelStats, data)
|
|
|
if len(tunnelStats) >= maxCount {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ for _, key := range tunnelStats {
|
|
|
+ err := bucket.Put(key, tunnelStatsStateReporting)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
})
|
|
|
|
|
|
@@ -914,8 +964,15 @@ func resetAllTunnelStatsToUnreported() error {
|
|
|
|
|
|
err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(tunnelStatsBucket))
|
|
|
+ resetKeys := make([][]byte, 0)
|
|
|
cursor := bucket.Cursor()
|
|
|
for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
|
|
|
+ resetKeys = append(resetKeys, key)
|
|
|
+ }
|
|
|
+ // TODO: data mutation is done outside cursor. Is this
|
|
|
+ // strictly necessary in this case?
|
|
|
+ // https://godoc.org/github.com/boltdb/bolt#Cursor
|
|
|
+ for _, key := range resetKeys {
|
|
|
err := bucket.Put(key, tunnelStatsStateUnreported)
|
|
|
if err != nil {
|
|
|
return err
|