|
|
@@ -36,19 +36,6 @@ import (
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
|
|
|
)
|
|
|
|
|
|
-// The BoltDB dataStore implementation is an alternative to the sqlite3-based
|
|
|
-// implementation in dataStore.go. Both implementations have the same interface.
|
|
|
-//
|
|
|
-// BoltDB is pure Go, and is intended to be used in cases where we have trouble
|
|
|
-// building sqlite3/CGO (e.g., currently go mobile due to
|
|
|
-// https://github.com/mattn/go-sqlite3/issues/201), and perhaps ultimately as
|
|
|
-// the primary dataStore implementation.
|
|
|
-//
|
|
|
-type dataStore struct {
|
|
|
- init sync.Once
|
|
|
- db *bolt.DB
|
|
|
-}
|
|
|
-
|
|
|
const (
|
|
|
serverEntriesBucket = "serverEntries"
|
|
|
rankedServerEntriesBucket = "rankedServerEntries"
|
|
|
@@ -74,129 +61,177 @@ const (
|
|
|
PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
|
|
|
)
|
|
|
|
|
|
-var singleton dataStore
|
|
|
+var (
|
|
|
+ datastoreInitalizeMutex sync.Mutex
|
|
|
+ datastoreReferenceMutex sync.Mutex
|
|
|
+ datastoreDB *bolt.DB
|
|
|
+)
|
|
|
|
|
|
-// InitDataStore initializes the singleton instance of dataStore. This
|
|
|
-// function uses a sync.Once and is safe for use by concurrent goroutines.
|
|
|
-// The underlying sql.DB connection pool is also safe.
|
|
|
-//
|
|
|
-// Note: the sync.Once was more useful when initDataStore was private and
|
|
|
-// called on-demand by the public functions below. Now we require an explicit
|
|
|
-// InitDataStore() call with the filename passed in. The on-demand calls
|
|
|
-// have been replaced by checkInitDataStore() to assert that Init was called.
|
|
|
-func InitDataStore(config *Config) (err error) {
|
|
|
- singleton.init.Do(func() {
|
|
|
- // Need to gather the list of migratable server entries before
|
|
|
- // initializing the boltdb store (as prepareMigrationEntries
|
|
|
- // checks for the existence of the bolt db file)
|
|
|
- migratableServerEntries := prepareMigrationEntries(config)
|
|
|
-
|
|
|
- filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
|
|
|
- var db *bolt.DB
|
|
|
-
|
|
|
- for retry := 0; retry < 3; retry++ {
|
|
|
-
|
|
|
- if retry > 0 {
|
|
|
- NoticeAlert("InitDataStore retry: %d", retry)
|
|
|
- }
|
|
|
+// OpenDataStore opens and initializes the singleton data store instance.
|
|
|
+func OpenDataStore(config *Config) error {
|
|
|
|
|
|
- db, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
|
|
+ datastoreInitalizeMutex.Lock()
|
|
|
+ defer datastoreInitalizeMutex.Unlock()
|
|
|
|
|
|
- // The datastore file may be corrupt, so attempt to delete and try again
|
|
|
- if err != nil {
|
|
|
- NoticeAlert("bolt.Open error: %s", err)
|
|
|
- os.Remove(filename)
|
|
|
- continue
|
|
|
- }
|
|
|
+ datastoreReferenceMutex.Lock()
|
|
|
+ existingDB := datastoreDB
|
|
|
+ datastoreReferenceMutex.Unlock()
|
|
|
|
|
|
- // Run consistency checks on datastore and emit errors for diagnostics purposes
|
|
|
- // We assume this will complete quickly for typical size Psiphon datastores.
|
|
|
- err = db.View(func(tx *bolt.Tx) error {
|
|
|
- return tx.SynchronousCheck()
|
|
|
- })
|
|
|
+ if existingDB != nil {
|
|
|
+ return common.ContextError(errors.New("db already open"))
|
|
|
+ }
|
|
|
|
|
|
- // The datastore file may be corrupt, so attempt to delete and try again
|
|
|
- if err != nil {
|
|
|
- NoticeAlert("bolt.SynchronousCheck error: %s", err)
|
|
|
- db.Close()
|
|
|
- os.Remove(filename)
|
|
|
- continue
|
|
|
- }
|
|
|
+ // Need to gather the list of migratable server entries before
|
|
|
+ // initializing the boltdb store (as prepareMigrationEntries
|
|
|
+ // checks for the existence of the bolt db file)
|
|
|
+ migratableServerEntries := prepareMigrationEntries(config)
|
|
|
|
|
|
- break
|
|
|
+ filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
|
|
|
+
|
|
|
+ var newDB *bolt.DB
|
|
|
+ var err error
|
|
|
+
|
|
|
+ for retry := 0; retry < 3; retry++ {
|
|
|
+
|
|
|
+ if retry > 0 {
|
|
|
+ NoticeAlert("OpenDataStore retry: %d", retry)
|
|
|
}
|
|
|
|
|
|
+ newDB, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
|
|
+
|
|
|
+ // The datastore file may be corrupt, so attempt to delete and try again
|
|
|
if err != nil {
|
|
|
- // Note: intending to set the err return value for InitDataStore
|
|
|
- err = fmt.Errorf("initDataStore failed to open database: %s", err)
|
|
|
- return
|
|
|
+ NoticeAlert("bolt.Open error: %s", err)
|
|
|
+ os.Remove(filename)
|
|
|
+ continue
|
|
|
}
|
|
|
|
|
|
- err = db.Update(func(tx *bolt.Tx) error {
|
|
|
- requiredBuckets := []string{
|
|
|
- serverEntriesBucket,
|
|
|
- rankedServerEntriesBucket,
|
|
|
- splitTunnelRouteETagsBucket,
|
|
|
- splitTunnelRouteDataBucket,
|
|
|
- urlETagsBucket,
|
|
|
- keyValueBucket,
|
|
|
- tunnelStatsBucket,
|
|
|
- remoteServerListStatsBucket,
|
|
|
- slokBucket,
|
|
|
- tacticsBucket,
|
|
|
- speedTestSamplesBucket,
|
|
|
- }
|
|
|
- for _, bucket := range requiredBuckets {
|
|
|
- _, err := tx.CreateBucketIfNotExists([]byte(bucket))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- }
|
|
|
- return nil
|
|
|
+ // Run consistency checks on datastore and emit errors for diagnostics purposes
|
|
|
+ // We assume this will complete quickly for typical size Psiphon datastores.
|
|
|
+ err = newDB.View(func(tx *bolt.Tx) error {
|
|
|
+ return tx.SynchronousCheck()
|
|
|
})
|
|
|
+
|
|
|
+ // The datastore file may be corrupt, so attempt to delete and try again
|
|
|
if err != nil {
|
|
|
- err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
|
|
|
- return
|
|
|
+ NoticeAlert("bolt.SynchronousCheck error: %s", err)
|
|
|
+ newDB.Close()
|
|
|
+ os.Remove(filename)
|
|
|
+ continue
|
|
|
}
|
|
|
|
|
|
- // Cleanup obsolete tunnel (session) stats bucket, if one still exists
|
|
|
+ break
|
|
|
+ }
|
|
|
|
|
|
- err = db.Update(func(tx *bolt.Tx) error {
|
|
|
- tunnelStatsBucket := []byte("tunnelStats")
|
|
|
- if tx.Bucket(tunnelStatsBucket) != nil {
|
|
|
- err := tx.DeleteBucket(tunnelStatsBucket)
|
|
|
- if err != nil {
|
|
|
- NoticeAlert("DeleteBucket %s error: %s", tunnelStatsBucket, err)
|
|
|
- // Continue, since this is not fatal
|
|
|
- }
|
|
|
+ if err != nil {
|
|
|
+ return common.ContextError(fmt.Errorf("failed to open database: %s", err))
|
|
|
+ }
|
|
|
+
|
|
|
+ err = newDB.Update(func(tx *bolt.Tx) error {
|
|
|
+ requiredBuckets := []string{
|
|
|
+ serverEntriesBucket,
|
|
|
+ rankedServerEntriesBucket,
|
|
|
+ splitTunnelRouteETagsBucket,
|
|
|
+ splitTunnelRouteDataBucket,
|
|
|
+ urlETagsBucket,
|
|
|
+ keyValueBucket,
|
|
|
+ tunnelStatsBucket,
|
|
|
+ remoteServerListStatsBucket,
|
|
|
+ slokBucket,
|
|
|
+ tacticsBucket,
|
|
|
+ speedTestSamplesBucket,
|
|
|
+ }
|
|
|
+ for _, bucket := range requiredBuckets {
|
|
|
+ _, err := tx.CreateBucketIfNotExists([]byte(bucket))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
- return nil
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- err = fmt.Errorf("initDataStore failed to create buckets: %s", err)
|
|
|
- return
|
|
|
}
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ return common.ContextError(fmt.Errorf("failed to create buckets: %s", err))
|
|
|
+ }
|
|
|
|
|
|
- singleton.db = db
|
|
|
-
|
|
|
- // The migrateServerEntries function requires the data store is
|
|
|
- // initialized prior to execution so that migrated entries can be stored
|
|
|
+ // Cleanup obsolete tunnel (session) stats bucket, if one still exists
|
|
|
|
|
|
- if len(migratableServerEntries) > 0 {
|
|
|
- migrateEntries(
|
|
|
- config, migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
|
|
|
+ err = newDB.Update(func(tx *bolt.Tx) error {
|
|
|
+ tunnelStatsBucket := []byte("tunnelStats")
|
|
|
+ if tx.Bucket(tunnelStatsBucket) != nil {
|
|
|
+ err := tx.DeleteBucket(tunnelStatsBucket)
|
|
|
+ if err != nil {
|
|
|
+ NoticeAlert("DeleteBucket %s error: %s", tunnelStatsBucket, err)
|
|
|
+ // Continue, since this is not fatal
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- resetAllPersistentStatsToUnreported()
|
|
|
+ return nil
|
|
|
})
|
|
|
+ if err != nil {
|
|
|
+ return common.ContextError(fmt.Errorf("failed to create buckets: %s", err))
|
|
|
+ }
|
|
|
+
|
|
|
+ datastoreReferenceMutex.Lock()
|
|
|
+ datastoreDB = newDB
|
|
|
+ datastoreReferenceMutex.Unlock()
|
|
|
+
|
|
|
+ // The migrateServerEntries function requires the data store is
|
|
|
+ // initialized prior to execution so that migrated entries can be stored
|
|
|
+
|
|
|
+ if len(migratableServerEntries) > 0 {
|
|
|
+ migrateEntries(
|
|
|
+ config, migratableServerEntries, filepath.Join(config.DataStoreDirectory, LEGACY_DATA_STORE_FILENAME))
|
|
|
+ }
|
|
|
+
|
|
|
+ _ = resetAllPersistentStatsToUnreported()
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
|
|
|
- return err
|
|
|
+// CloseDataStore closes the singleton data store instance, if open.
|
|
|
+func CloseDataStore() {
|
|
|
+
|
|
|
+ datastoreInitalizeMutex.Lock()
|
|
|
+ defer datastoreInitalizeMutex.Unlock()
|
|
|
+
|
|
|
+ datastoreReferenceMutex.Lock()
|
|
|
+ defer datastoreReferenceMutex.Unlock()
|
|
|
+
|
|
|
+ if datastoreDB == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ err := datastoreDB.Close()
|
|
|
+ if err != nil {
|
|
|
+ NoticeAlert("failed to close database: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ datastoreDB = nil
|
|
|
}
|
|
|
|
|
|
-func checkInitDataStore() {
|
|
|
- if singleton.db == nil {
|
|
|
- panic("checkInitDataStore: datastore not initialized")
|
|
|
+func dataStoreView(fn func(tx *bolt.Tx) error) error {
|
|
|
+
|
|
|
+ datastoreReferenceMutex.Lock()
|
|
|
+ db := datastoreDB
|
|
|
+ datastoreReferenceMutex.Unlock()
|
|
|
+
|
|
|
+ if db == nil {
|
|
|
+ return common.ContextError(errors.New("database not open"))
|
|
|
}
|
|
|
+
|
|
|
+ return db.View(fn)
|
|
|
+}
|
|
|
+
|
|
|
+func dataStoreUpdate(fn func(tx *bolt.Tx) error) error {
|
|
|
+
|
|
|
+ datastoreReferenceMutex.Lock()
|
|
|
+ db := datastoreDB
|
|
|
+ datastoreReferenceMutex.Unlock()
|
|
|
+
|
|
|
+ if db == nil {
|
|
|
+ return common.ContextError(errors.New("database not open"))
|
|
|
+ }
|
|
|
+
|
|
|
+ return db.Update(fn)
|
|
|
}
|
|
|
|
|
|
// StoreServerEntry adds the server entry to the data store.
|
|
|
@@ -212,7 +247,6 @@ func checkInitDataStore() {
|
|
|
// If the server entry data is malformed, an alert notice is issued and
|
|
|
// the entry is skipped; no error is returned.
|
|
|
func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExists bool) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
// Server entries should already be validated before this point,
|
|
|
// so instead of skipping we fail with an error.
|
|
|
@@ -230,7 +264,7 @@ func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExi
|
|
|
// values (e.g., many servers support all protocols), performance
|
|
|
// is expected to be acceptable.
|
|
|
|
|
|
- err = singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err = dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
|
|
|
serverEntries := tx.Bucket([]byte(serverEntriesBucket))
|
|
|
|
|
|
@@ -292,8 +326,6 @@ func StoreServerEntries(
|
|
|
serverEntries []protocol.ServerEntryFields,
|
|
|
replaceIfExists bool) error {
|
|
|
|
|
|
- checkInitDataStore()
|
|
|
-
|
|
|
for _, serverEntryFields := range serverEntries {
|
|
|
err := StoreServerEntry(serverEntryFields, replaceIfExists)
|
|
|
if err != nil {
|
|
|
@@ -311,8 +343,6 @@ func StreamingStoreServerEntries(
|
|
|
serverEntries *protocol.StreamingServerEntryDecoder,
|
|
|
replaceIfExists bool) error {
|
|
|
|
|
|
- checkInitDataStore()
|
|
|
-
|
|
|
// Note: both StreamingServerEntryDecoder.Next and StoreServerEntry
|
|
|
// allocate temporary memory buffers for hex/JSON decoding/encoding,
|
|
|
// so this isn't true constant-memory streaming (it depends on garbage
|
|
|
@@ -343,9 +373,7 @@ func StreamingStoreServerEntries(
|
|
|
// iterated in decending rank order, so this server entry will be
|
|
|
// the first candidate in a subsequent tunnel establishment.
|
|
|
func PromoteServerEntry(config *Config, ipAddress string) error {
|
|
|
- checkInitDataStore()
|
|
|
-
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
|
|
|
// Ensure the corresponding entry exists before
|
|
|
// inserting into rank.
|
|
|
@@ -399,7 +427,7 @@ func hasServerEntryFilterChanged(config *Config) (bool, error) {
|
|
|
}
|
|
|
|
|
|
changed := false
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err = dataStoreView(func(tx *bolt.Tx) error {
|
|
|
|
|
|
// previousFilter will be nil not found (not previously
|
|
|
// set) which will never match any current filter.
|
|
|
@@ -525,8 +553,6 @@ func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error)
|
|
|
return newTargetServerEntryIterator(config, false)
|
|
|
}
|
|
|
|
|
|
- checkInitDataStore()
|
|
|
-
|
|
|
filterChanged, err := hasServerEntryFilterChanged(config)
|
|
|
if err != nil {
|
|
|
return false, nil, common.ContextError(err)
|
|
|
@@ -555,8 +581,6 @@ func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error)
|
|
|
return iterator, err
|
|
|
}
|
|
|
|
|
|
- checkInitDataStore()
|
|
|
-
|
|
|
iterator := &ServerEntryIterator{
|
|
|
shuffleHeadLength: 0,
|
|
|
isTacticsServerEntryIterator: true,
|
|
|
@@ -669,7 +693,7 @@ func (iterator *ServerEntryIterator) Reset() error {
|
|
|
|
|
|
var serverEntryIds []string
|
|
|
|
|
|
- err := singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
var err error
|
|
|
serverEntryIds, err = getRankedServerEntries(tx)
|
|
|
if err != nil {
|
|
|
@@ -717,8 +741,8 @@ func (iterator *ServerEntryIterator) Close() {
|
|
|
// Returns nil with no error when there is no next item.
|
|
|
func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
|
|
|
|
|
|
- var err error
|
|
|
var serverEntry *protocol.ServerEntry
|
|
|
+ var err error
|
|
|
|
|
|
defer func() {
|
|
|
if err != nil {
|
|
|
@@ -747,7 +771,8 @@ func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
|
|
|
iterator.serverEntryIndex += 1
|
|
|
|
|
|
var data []byte
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+
|
|
|
+ err = dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(serverEntriesBucket))
|
|
|
value := bucket.Get([]byte(serverEntryId))
|
|
|
if value != nil {
|
|
|
@@ -811,7 +836,7 @@ func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.Serv
|
|
|
}
|
|
|
|
|
|
func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
|
|
|
- err := singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(serverEntriesBucket))
|
|
|
cursor := bucket.Cursor()
|
|
|
|
|
|
@@ -840,8 +865,6 @@ func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
|
|
|
// CountServerEntries returns a count of stored servers for the
|
|
|
// specified region and tunnel protocols.
|
|
|
func CountServerEntries(useUpstreamProxy bool, region string, tunnelProtocols []string) int {
|
|
|
- checkInitDataStore()
|
|
|
-
|
|
|
count := 0
|
|
|
err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
|
|
|
if (region == "" || serverEntry.Region == region) &&
|
|
|
@@ -868,8 +891,6 @@ func CountNonImpairedProtocols(
|
|
|
region string,
|
|
|
limitTunnelProtocols, impairedProtocols []string) int {
|
|
|
|
|
|
- checkInitDataStore()
|
|
|
-
|
|
|
distinctProtocols := make(map[string]bool)
|
|
|
|
|
|
err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
|
|
|
@@ -899,7 +920,6 @@ func CountNonImpairedProtocols(
|
|
|
|
|
|
// ReportAvailableRegions prints a notice with the available egress regions.
|
|
|
func ReportAvailableRegions(config *Config) {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
limitTunnelProtocols := config.clientParameters.Get().TunnelProtocols(
|
|
|
parameters.LimitTunnelProtocols)
|
|
|
@@ -935,11 +955,10 @@ func ReportAvailableRegions(config *Config) {
|
|
|
|
|
|
// GetServerEntryIpAddresses returns an array containing
|
|
|
// all stored server IP addresses.
|
|
|
-func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
|
|
|
- checkInitDataStore()
|
|
|
+func GetServerEntryIpAddresses() ([]string, error) {
|
|
|
|
|
|
- ipAddresses = make([]string, 0)
|
|
|
- err = scanServerEntries(func(serverEntry *protocol.ServerEntry) {
|
|
|
+ ipAddresses := make([]string, 0)
|
|
|
+ err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
|
|
|
ipAddresses = append(ipAddresses, serverEntry.IpAddress)
|
|
|
})
|
|
|
|
|
|
@@ -954,9 +973,8 @@ func GetServerEntryIpAddresses() (ipAddresses []string, err error) {
|
|
|
// the given region. The associated etag is also stored and
|
|
|
// used to make efficient web requests for updates to the data.
|
|
|
func SetSplitTunnelRoutes(region, etag string, data []byte) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
|
|
|
err := bucket.Put([]byte(region), []byte(etag))
|
|
|
|
|
|
@@ -973,10 +991,11 @@ func SetSplitTunnelRoutes(region, etag string, data []byte) error {
|
|
|
|
|
|
// GetSplitTunnelRoutesETag retrieves the etag for cached routes
|
|
|
// data for the specified region. If not found, it returns an empty string value.
|
|
|
-func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
|
|
|
- checkInitDataStore()
|
|
|
+func GetSplitTunnelRoutesETag(region string) (string, error) {
|
|
|
+
|
|
|
+ var etag string
|
|
|
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
|
|
|
etag = string(bucket.Get([]byte(region)))
|
|
|
return nil
|
|
|
@@ -990,10 +1009,11 @@ func GetSplitTunnelRoutesETag(region string) (etag string, err error) {
|
|
|
|
|
|
// GetSplitTunnelRoutesData retrieves the cached routes data
|
|
|
// for the specified region. If not found, it returns a nil value.
|
|
|
-func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
|
|
|
- checkInitDataStore()
|
|
|
+func GetSplitTunnelRoutesData(region string) ([]byte, error) {
|
|
|
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ var data []byte
|
|
|
+
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
|
|
|
value := bucket.Get([]byte(region))
|
|
|
if value != nil {
|
|
|
@@ -1014,9 +1034,8 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
|
|
|
// Note: input URL is treated as a string, and is not
|
|
|
// encoded or decoded or otherwise canonicalized.
|
|
|
func SetUrlETag(url, etag string) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(urlETagsBucket))
|
|
|
err := bucket.Put([]byte(url), []byte(etag))
|
|
|
return err
|
|
|
@@ -1030,10 +1049,11 @@ func SetUrlETag(url, etag string) error {
|
|
|
|
|
|
// GetUrlETag retrieves a previously stored an ETag for the
|
|
|
// specfied URL. If not found, it returns an empty string value.
|
|
|
-func GetUrlETag(url string) (etag string, err error) {
|
|
|
- checkInitDataStore()
|
|
|
+func GetUrlETag(url string) (string, error) {
|
|
|
+
|
|
|
+ var etag string
|
|
|
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(urlETagsBucket))
|
|
|
etag = string(bucket.Get([]byte(url)))
|
|
|
return nil
|
|
|
@@ -1047,9 +1067,8 @@ func GetUrlETag(url string) (etag string, err error) {
|
|
|
|
|
|
// SetKeyValue stores a key/value pair.
|
|
|
func SetKeyValue(key, value string) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(keyValueBucket))
|
|
|
err := bucket.Put([]byte(key), []byte(value))
|
|
|
return err
|
|
|
@@ -1063,10 +1082,11 @@ func SetKeyValue(key, value string) error {
|
|
|
|
|
|
// GetKeyValue retrieves the value for a given key. If not found,
|
|
|
// it returns an empty string value.
|
|
|
-func GetKeyValue(key string) (value string, err error) {
|
|
|
- checkInitDataStore()
|
|
|
+func GetKeyValue(key string) (string, error) {
|
|
|
+
|
|
|
+ var value string
|
|
|
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(keyValueBucket))
|
|
|
value = string(bucket.Get([]byte(key)))
|
|
|
return nil
|
|
|
@@ -1106,13 +1126,12 @@ var persistentStatTypes = []string{
|
|
|
// is currently satisfied by the fields sessionId + tunnelNumber
|
|
|
// for tunnel stats, and URL + ETag for remote server list stats.
|
|
|
func StorePersistentStat(statType string, stat []byte) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
if !common.Contains(persistentStatTypes, statType) {
|
|
|
return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
|
|
|
}
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(statType))
|
|
|
err := bucket.Put(stat, persistentStatStateUnreported)
|
|
|
return err
|
|
|
@@ -1128,11 +1147,10 @@ func StorePersistentStat(statType string, stat []byte) error {
|
|
|
// CountUnreportedPersistentStats returns the number of persistent
|
|
|
// stat records in StateUnreported.
|
|
|
func CountUnreportedPersistentStats() int {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
unreported := 0
|
|
|
|
|
|
- err := singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
|
|
|
for _, statType := range persistentStatTypes {
|
|
|
|
|
|
@@ -1162,11 +1180,10 @@ func CountUnreportedPersistentStats() int {
|
|
|
// with ClearReportedPersistentStats. If the records are not successfully
|
|
|
// reported, restore them with PutBackUnreportedPersistentStats.
|
|
|
func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error) {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
stats := make(map[string][][]byte)
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
|
|
|
count := 0
|
|
|
|
|
|
@@ -1226,9 +1243,8 @@ func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error)
|
|
|
// PutBackUnreportedPersistentStats restores a list of persistent
|
|
|
// stat records to StateUnreported.
|
|
|
func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
|
|
|
for _, statType := range persistentStatTypes {
|
|
|
|
|
|
@@ -1254,9 +1270,8 @@ func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
|
|
|
// ClearReportedPersistentStats deletes a list of persistent
|
|
|
// stat records that were successfully reported.
|
|
|
func ClearReportedPersistentStats(stats map[string][][]byte) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
|
|
|
for _, statType := range persistentStatTypes {
|
|
|
|
|
|
@@ -1284,9 +1299,8 @@ func ClearReportedPersistentStats(stats map[string][][]byte) error {
|
|
|
// datastore is initialized at start up, as we do not know if
|
|
|
// persistent records in StateReporting were reported or not.
|
|
|
func resetAllPersistentStatsToUnreported() error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
|
|
|
for _, statType := range persistentStatTypes {
|
|
|
|
|
|
@@ -1320,11 +1334,10 @@ func resetAllPersistentStatsToUnreported() error {
|
|
|
|
|
|
// CountSLOKs returns the total number of SLOK records.
|
|
|
func CountSLOKs() int {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
count := 0
|
|
|
|
|
|
- err := singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(slokBucket))
|
|
|
cursor := bucket.Cursor()
|
|
|
for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
|
|
|
@@ -1343,9 +1356,8 @@ func CountSLOKs() int {
|
|
|
|
|
|
// DeleteSLOKs deletes all SLOK records.
|
|
|
func DeleteSLOKs() error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(slokBucket))
|
|
|
return bucket.ForEach(
|
|
|
func(id, _ []byte) error {
|
|
|
@@ -1363,11 +1375,10 @@ func DeleteSLOKs() error {
|
|
|
// SetSLOK stores a SLOK key, referenced by its ID. The bool
|
|
|
// return value indicates whether the SLOK was already stored.
|
|
|
func SetSLOK(id, key []byte) (bool, error) {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
var duplicate bool
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(slokBucket))
|
|
|
duplicate = bucket.Get(id) != nil
|
|
|
err := bucket.Put([]byte(id), []byte(key))
|
|
|
@@ -1383,10 +1394,11 @@ func SetSLOK(id, key []byte) (bool, error) {
|
|
|
|
|
|
// GetSLOK returns a SLOK key for the specified ID. The return
|
|
|
// value is nil if the SLOK is not found.
|
|
|
-func GetSLOK(id []byte) (key []byte, err error) {
|
|
|
- checkInitDataStore()
|
|
|
+func GetSLOK(id []byte) ([]byte, error) {
|
|
|
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ var key []byte
|
|
|
+
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket([]byte(slokBucket))
|
|
|
key = bucket.Get(id)
|
|
|
return nil
|
|
|
@@ -1425,9 +1437,8 @@ func GetTacticsStorer() *TacticsStorer {
|
|
|
}
|
|
|
|
|
|
func setBucketValue(bucket, key, value []byte) error {
|
|
|
- checkInitDataStore()
|
|
|
|
|
|
- err := singleton.db.Update(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreUpdate(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket(bucket)
|
|
|
err := bucket.Put(key, value)
|
|
|
return err
|
|
|
@@ -1440,10 +1451,11 @@ func setBucketValue(bucket, key, value []byte) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func getBucketValue(bucket, key []byte) (value []byte, err error) {
|
|
|
- checkInitDataStore()
|
|
|
+func getBucketValue(bucket, key []byte) ([]byte, error) {
|
|
|
+
|
|
|
+ var value []byte
|
|
|
|
|
|
- err = singleton.db.View(func(tx *bolt.Tx) error {
|
|
|
+ err := dataStoreView(func(tx *bolt.Tx) error {
|
|
|
bucket := tx.Bucket(bucket)
|
|
|
value = bucket.Get(key)
|
|
|
return nil
|