Explorar el Código

Low-memory environment datastore changes

- Implement a datastore integration shim; can
  now select, via build tags, new candidate
  datastores: Badger DB and a simple file
  system-based store.

  The new candidates are default-disabled
  pending further testing and evaluation.

  Both alternative datastore implementations
  avoid the low-memory issue with BoltDB:
  it memory-maps the entire datastore file
  and could potentially load the entire file
  into RSS if each record is touched.

- Add additional forced-GCs during operations
  that load and unmarshal many server entry
  records.

- Replace rankedServerEntries with simpler,
  single affinityServerEntryID.

- Use []byte for serverEntryID instead of
  string, to avoid unnecessary conversions.
Rod Hynes hace 7 años
padre
commit
15e9566c6c

+ 199 - 364
psiphon/dataStore.go

@@ -25,45 +25,32 @@ import (
 	"errors"
 	"fmt"
 	"math/rand"
-	"os"
-	"path/filepath"
 	"sync"
-	"time"
 
-	"github.com/Psiphon-Labs/bolt"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 )
 
-const (
-	serverEntriesBucket         = "serverEntries"
-	rankedServerEntriesBucket   = "rankedServerEntries"
-	rankedServerEntriesKey      = "rankedServerEntries"
-	splitTunnelRouteETagsBucket = "splitTunnelRouteETags"
-	splitTunnelRouteDataBucket  = "splitTunnelRouteData"
-	urlETagsBucket              = "urlETags"
-	keyValueBucket              = "keyValues"
-	tunnelStatsBucket           = "tunnelStats"
-	remoteServerListStatsBucket = "remoteServerListStats"
-	slokBucket                  = "SLOKs"
-	tacticsBucket               = "tactics"
-	speedTestSamplesBucket      = "speedTestSamples"
-
-	rankedServerEntryCount = 100
-)
-
-const (
-	DATA_STORE_FILENAME                     = "psiphon.boltdb"
-	DATA_STORE_LAST_CONNECTED_KEY           = "lastConnected"
-	DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY = "lastServerEntryFilter"
-	PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST = remoteServerListStatsBucket
-)
-
 var (
+	datastoreServerEntriesBucket                = []byte("serverEntries")
+	datastoreSplitTunnelRouteETagsBucket        = []byte("splitTunnelRouteETags")
+	datastoreSplitTunnelRouteDataBucket         = []byte("splitTunnelRouteData")
+	datastoreUrlETagsBucket                     = []byte("urlETags")
+	datastoreKeyValueBucket                     = []byte("keyValues")
+	datastoreRemoteServerListStatsBucket        = []byte("remoteServerListStats")
+	datastoreSLOKsBucket                        = []byte("SLOKs")
+	datastoreTacticsBucket                      = []byte("tactics")
+	datastoreSpeedTestSamplesBucket             = []byte("speedTestSamples")
+	datastoreLastConnectedKey                   = "lastConnected"
+	datastoreLastServerEntryFilterKey           = []byte("lastServerEntryFilter")
+	datastoreAffinityServerEntryIDKey           = []byte("affinityServerEntryID")
+	datastorePersistentStatTypeRemoteServerList = string(datastoreRemoteServerListStatsBucket)
+	datastoreServerEntryFetchGCThreshold        = 20
+
 	datastoreInitalizeMutex sync.Mutex
 	datastoreReferenceMutex sync.Mutex
-	datastoreDB             *bolt.DB
+	activeDatastoreDB       *datastoreDB
 )
 
 // OpenDataStore opens and initializes the singleton data store instance.
@@ -73,99 +60,20 @@ func OpenDataStore(config *Config) error {
 	defer datastoreInitalizeMutex.Unlock()
 
 	datastoreReferenceMutex.Lock()
-	existingDB := datastoreDB
+	existingDB := activeDatastoreDB
 	datastoreReferenceMutex.Unlock()
 
 	if existingDB != nil {
 		return common.ContextError(errors.New("db already open"))
 	}
 
-	filename := filepath.Join(config.DataStoreDirectory, DATA_STORE_FILENAME)
-
-	var newDB *bolt.DB
-	var err error
-
-	for retry := 0; retry < 3; retry++ {
-
-		if retry > 0 {
-			NoticeAlert("OpenDataStore retry: %d", retry)
-		}
-
-		newDB, err = bolt.Open(filename, 0600, &bolt.Options{Timeout: 1 * time.Second})
-
-		// The datastore file may be corrupt, so attempt to delete and try again
-		if err != nil {
-			NoticeAlert("bolt.Open error: %s", err)
-			os.Remove(filename)
-			continue
-		}
-
-		// Run consistency checks on datastore and emit errors for diagnostics purposes
-		// We assume this will complete quickly for typical size Psiphon datastores.
-		err = newDB.View(func(tx *bolt.Tx) error {
-			return tx.SynchronousCheck()
-		})
-
-		// The datastore file may be corrupt, so attempt to delete and try again
-		if err != nil {
-			NoticeAlert("bolt.SynchronousCheck error: %s", err)
-			newDB.Close()
-			os.Remove(filename)
-			continue
-		}
-
-		break
-	}
-
-	if err != nil {
-		return common.ContextError(fmt.Errorf("failed to open database: %s", err))
-	}
-
-	err = newDB.Update(func(tx *bolt.Tx) error {
-		requiredBuckets := []string{
-			serverEntriesBucket,
-			rankedServerEntriesBucket,
-			splitTunnelRouteETagsBucket,
-			splitTunnelRouteDataBucket,
-			urlETagsBucket,
-			keyValueBucket,
-			tunnelStatsBucket,
-			remoteServerListStatsBucket,
-			slokBucket,
-			tacticsBucket,
-			speedTestSamplesBucket,
-		}
-		for _, bucket := range requiredBuckets {
-			_, err := tx.CreateBucketIfNotExists([]byte(bucket))
-			if err != nil {
-				return err
-			}
-		}
-		return nil
-	})
-	if err != nil {
-		return common.ContextError(fmt.Errorf("failed to create buckets: %s", err))
-	}
-
-	// Cleanup obsolete tunnel (session) stats bucket, if one still exists
-
-	err = newDB.Update(func(tx *bolt.Tx) error {
-		tunnelStatsBucket := []byte("tunnelStats")
-		if tx.Bucket(tunnelStatsBucket) != nil {
-			err := tx.DeleteBucket(tunnelStatsBucket)
-			if err != nil {
-				NoticeAlert("DeleteBucket %s error: %s", tunnelStatsBucket, err)
-				// Continue, since this is not fatal
-			}
-		}
-		return nil
-	})
+	newDB, err := datastoreOpenDB(config.DataStoreDirectory)
 	if err != nil {
-		return common.ContextError(fmt.Errorf("failed to create buckets: %s", err))
+		return common.ContextError(err)
 	}
 
 	datastoreReferenceMutex.Lock()
-	datastoreDB = newDB
+	activeDatastoreDB = newDB
 	datastoreReferenceMutex.Unlock()
 
 	_ = resetAllPersistentStatsToUnreported()
@@ -182,49 +90,53 @@ func CloseDataStore() {
 	datastoreReferenceMutex.Lock()
 	defer datastoreReferenceMutex.Unlock()
 
-	if datastoreDB == nil {
+	if activeDatastoreDB == nil {
 		return
 	}
 
-	err := datastoreDB.Close()
+	err := activeDatastoreDB.close()
 	if err != nil {
-		NoticeAlert("failed to close database: %s", err)
+		NoticeAlert("failed to close database: %s", common.ContextError(err))
 	}
 
-	datastoreDB = nil
+	activeDatastoreDB = nil
 }
 
-func dataStoreView(fn func(tx *bolt.Tx) error) error {
+func datastoreView(fn func(tx *datastoreTx) error) error {
 
 	datastoreReferenceMutex.Lock()
-	db := datastoreDB
+	db := activeDatastoreDB
 	datastoreReferenceMutex.Unlock()
 
 	if db == nil {
 		return common.ContextError(errors.New("database not open"))
 	}
 
-	return db.View(fn)
+	err := db.view(fn)
+	if err != nil {
+		err = common.ContextError(err)
+	}
+	return err
 }
 
-func dataStoreUpdate(fn func(tx *bolt.Tx) error) error {
+func datastoreUpdate(fn func(tx *datastoreTx) error) error {
 
 	datastoreReferenceMutex.Lock()
-	db := datastoreDB
+	db := activeDatastoreDB
 	datastoreReferenceMutex.Unlock()
 
 	if db == nil {
 		return common.ContextError(errors.New("database not open"))
 	}
 
-	return db.Update(fn)
+	err := db.update(fn)
+	if err != nil {
+		err = common.ContextError(err)
+	}
+	return err
 }
 
 // StoreServerEntry adds the server entry to the data store.
-// A newly stored (or re-stored) server entry is assigned the next-to-top
-// rank for iteration order (the previous top ranked entry is promoted). The
-// purpose of inserting at next-to-top is to keep the last selected server
-// as the top ranked server.
 //
 // When a server entry already exists for a given server, it will be
 // replaced only if replaceIfExists is set or if the the ConfigurationVersion
@@ -250,16 +162,16 @@ func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExi
 	// values (e.g., many servers support all protocols), performance
 	// is expected to be acceptable.
 
-	err = dataStoreUpdate(func(tx *bolt.Tx) error {
+	err = datastoreUpdate(func(tx *datastoreTx) error {
 
-		serverEntries := tx.Bucket([]byte(serverEntriesBucket))
+		serverEntries := tx.bucket(datastoreServerEntriesBucket)
 
 		ipAddress := serverEntryFields.GetIPAddress()
 
 		// Check not only that the entry exists, but is valid. This
 		// will replace in the rare case where the data is corrupt.
 		existingConfigurationVersion := -1
-		existingData := serverEntries.Get([]byte(ipAddress))
+		existingData := serverEntries.get([]byte(ipAddress))
 		if existingData != nil {
 			var existingServerEntry *protocol.ServerEntry
 			err := json.Unmarshal(existingData, &existingServerEntry)
@@ -284,12 +196,7 @@ func StoreServerEntry(serverEntryFields protocol.ServerEntryFields, replaceIfExi
 		if err != nil {
 			return common.ContextError(err)
 		}
-		err = serverEntries.Put([]byte(ipAddress), data)
-		if err != nil {
-			return common.ContextError(err)
-		}
-
-		err = insertRankedServerEntry(tx, ipAddress, 1)
+		err = serverEntries.put([]byte(ipAddress), data)
 		if err != nil {
 			return common.ContextError(err)
 		}
@@ -334,6 +241,7 @@ func StreamingStoreServerEntries(
 	// so this isn't true constant-memory streaming (it depends on garbage
 	// collection).
 
+	n := 0
 	for {
 		serverEntry, err := serverEntries.Next()
 		if err != nil {
@@ -349,22 +257,28 @@ func StreamingStoreServerEntries(
 		if err != nil {
 			return common.ContextError(err)
 		}
+
+		n += 1
+		if n == datastoreServerEntryFetchGCThreshold {
+			defaultGarbageCollection()
+			n = 0
+		}
 	}
 
 	return nil
 }
 
-// PromoteServerEntry assigns the top rank (one more than current
-// max rank) to the specified server entry. Server candidates are
-// iterated in decending rank order, so this server entry will be
-// the first candidate in a subsequent tunnel establishment.
+// PromoteServerEntry sets the server affinity server entry ID to the
+// specified server entry IP address.
 func PromoteServerEntry(config *Config, ipAddress string) error {
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+
+		serverEntryID := []byte(ipAddress)
 
-		// Ensure the corresponding entry exists before
-		// inserting into rank.
-		bucket := tx.Bucket([]byte(serverEntriesBucket))
-		data := bucket.Get([]byte(ipAddress))
+		// Ensure the corresponding server entry exists before
+		// setting server affinity.
+		bucket := tx.bucket(datastoreServerEntriesBucket)
+		data := bucket.get(serverEntryID)
 		if data == nil {
 			NoticeAlert(
 				"PromoteServerEntry: ignoring unknown server entry: %s",
@@ -372,7 +286,8 @@ func PromoteServerEntry(config *Config, ipAddress string) error {
 			return nil
 		}
 
-		err := insertRankedServerEntry(tx, ipAddress, 0)
+		bucket = tx.bucket(datastoreKeyValueBucket)
+		err := bucket.put(datastoreAffinityServerEntryIDKey, serverEntryID)
 		if err != nil {
 			return err
 		}
@@ -386,8 +301,8 @@ func PromoteServerEntry(config *Config, ipAddress string) error {
 		if err != nil {
 			return err
 		}
-		bucket = tx.Bucket([]byte(keyValueBucket))
-		return bucket.Put([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY), currentFilter)
+
+		return bucket.put(datastoreLastServerEntryFilterKey, currentFilter)
 	})
 
 	if err != nil {
@@ -413,13 +328,13 @@ func hasServerEntryFilterChanged(config *Config) (bool, error) {
 	}
 
 	changed := false
-	err = dataStoreView(func(tx *bolt.Tx) error {
+	err = datastoreView(func(tx *datastoreTx) error {
 
 		// previousFilter will be nil not found (not previously
 		// set) which will never match any current filter.
 
-		bucket := tx.Bucket([]byte(keyValueBucket))
-		previousFilter := bucket.Get([]byte(DATA_STORE_LAST_SERVER_ENTRY_FILTER_KEY))
+		bucket := tx.bucket(datastoreKeyValueBucket)
+		previousFilter := bucket.get(datastoreLastServerEntryFilterKey)
 		if bytes.Compare(previousFilter, currentFilter) != 0 {
 			changed = true
 		}
@@ -432,87 +347,12 @@ func hasServerEntryFilterChanged(config *Config) (bool, error) {
 	return changed, nil
 }
 
-func getRankedServerEntries(tx *bolt.Tx) ([]string, error) {
-	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
-	data := bucket.Get([]byte(rankedServerEntriesKey))
-
-	if data == nil {
-		return []string{}, nil
-	}
-
-	rankedServerEntries := make([]string, 0)
-	err := json.Unmarshal(data, &rankedServerEntries)
-	if err != nil {
-		return nil, common.ContextError(err)
-	}
-	return rankedServerEntries, nil
-}
-
-func setRankedServerEntries(tx *bolt.Tx, rankedServerEntries []string) error {
-	data, err := json.Marshal(rankedServerEntries)
-	if err != nil {
-		return common.ContextError(err)
-	}
-
-	bucket := tx.Bucket([]byte(rankedServerEntriesBucket))
-	err = bucket.Put([]byte(rankedServerEntriesKey), data)
-	if err != nil {
-		return common.ContextError(err)
-	}
-
-	return nil
-}
-
-func insertRankedServerEntry(tx *bolt.Tx, serverEntryId string, position int) error {
-	rankedServerEntries, err := getRankedServerEntries(tx)
-	if err != nil {
-		return common.ContextError(err)
-	}
-
-	// BoltDB implementation note:
-	// For simplicity, we store the ranked server ids in an array serialized to
-	// a single key value. To ensure this value doesn't grow without bound,
-	// it's capped at rankedServerEntryCount. For now, this cap should be large
-	// enough to meet the shuffleHeadLength = config.TunnelPoolSize criteria, for
-	// any reasonable configuration of config.TunnelPoolSize.
-
-	// Using: https://github.com/golang/go/wiki/SliceTricks
-
-	// When serverEntryId is already ranked, remove it first to avoid duplicates
-
-	for i, rankedServerEntryId := range rankedServerEntries {
-		if rankedServerEntryId == serverEntryId {
-			rankedServerEntries = append(
-				rankedServerEntries[:i], rankedServerEntries[i+1:]...)
-			break
-		}
-	}
-
-	// SliceTricks insert, with length cap enforced
-
-	if len(rankedServerEntries) < rankedServerEntryCount {
-		rankedServerEntries = append(rankedServerEntries, "")
-	}
-	if position >= len(rankedServerEntries) {
-		position = len(rankedServerEntries) - 1
-	}
-	copy(rankedServerEntries[position+1:], rankedServerEntries[position:])
-	rankedServerEntries[position] = serverEntryId
-
-	err = setRankedServerEntries(tx, rankedServerEntries)
-	if err != nil {
-		return common.ContextError(err)
-	}
-
-	return nil
-}
-
 // ServerEntryIterator is used to iterate over
 // stored server entries in rank order.
 type ServerEntryIterator struct {
 	config                       *Config
-	shuffleHeadLength            int
-	serverEntryIds               []string
+	applyServerAffinity          bool
+	serverEntryIDs               [][]byte
 	serverEntryIndex             int
 	isTacticsServerEntryIterator bool
 	isTargetServerEntryIterator  bool
@@ -547,8 +387,8 @@ func NewServerEntryIterator(config *Config) (bool, *ServerEntryIterator, error)
 	applyServerAffinity := !filterChanged
 
 	iterator := &ServerEntryIterator{
-		config:            config,
-		shuffleHeadLength: config.TunnelPoolSize,
+		config:              config,
+		applyServerAffinity: applyServerAffinity,
 	}
 
 	err = iterator.Reset()
@@ -568,7 +408,6 @@ func NewTacticsServerEntryIterator(config *Config) (*ServerEntryIterator, error)
 	}
 
 	iterator := &ServerEntryIterator{
-		shuffleHeadLength:            0,
 		isTacticsServerEntryIterator: true,
 	}
 
@@ -634,11 +473,6 @@ func (iterator *ServerEntryIterator) Reset() error {
 		return nil
 	}
 
-	// This query implements the Psiphon server candidate selection
-	// algorithm: the first TunnelPoolSize server candidates are in rank
-	// (priority) order, to favor previously successful servers; then the
-	// remaining long tail is shuffled to raise up less recent candidates.
-
 	// BoltDB implementation note:
 	// We don't keep a transaction open for the duration of the iterator
 	// because this would expose the following semantics to consumer code:
@@ -650,44 +484,51 @@ func (iterator *ServerEntryIterator) Reset() error {
 	//     transaction is open.
 	//     (https://github.com/boltdb/bolt)
 	//
-	// So the underlying serverEntriesBucket could change after the serverEntryIds
+	// So the underlying serverEntriesBucket could change after the serverEntryIDs
 	// list is built.
 
-	var serverEntryIds []string
+	var serverEntryIDs [][]byte
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		var err error
-		serverEntryIds, err = getRankedServerEntries(tx)
-		if err != nil {
-			return err
-		}
+	err := datastoreView(func(tx *datastoreTx) error {
+
+		bucket := tx.bucket(datastoreKeyValueBucket)
 
-		skipServerEntryIds := make(map[string]bool)
-		for _, serverEntryId := range serverEntryIds {
-			skipServerEntryIds[serverEntryId] = true
+		serverEntryIDs = make([][]byte, 0)
+		shuffleHead := 0
+
+		var affinityServerEntryID []byte
+		if iterator.applyServerAffinity {
+			affinityServerEntryID = bucket.get(datastoreAffinityServerEntryIDKey)
+			if affinityServerEntryID != nil {
+				serverEntryIDs = append(serverEntryIDs, append([]byte(nil), affinityServerEntryID...))
+				shuffleHead = 1
+			}
 		}
 
-		bucket := tx.Bucket([]byte(serverEntriesBucket))
-		cursor := bucket.Cursor()
-		for key, _ := cursor.Last(); key != nil; key, _ = cursor.Prev() {
-			serverEntryId := string(key)
-			if _, ok := skipServerEntryIds[serverEntryId]; ok {
-				continue
+		bucket = tx.bucket(datastoreServerEntriesBucket)
+		cursor := bucket.cursor()
+		for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
+			if affinityServerEntryID != nil {
+				if bytes.Equal(affinityServerEntryID, key) {
+					continue
+				}
 			}
-			serverEntryIds = append(serverEntryIds, serverEntryId)
+			serverEntryIDs = append(serverEntryIDs, append([]byte(nil), key...))
+		}
+		cursor.close()
+
+		for i := len(serverEntryIDs) - 1; i > shuffleHead-1; i-- {
+			j := rand.Intn(i+1-shuffleHead) + shuffleHead
+			serverEntryIDs[i], serverEntryIDs[j] = serverEntryIDs[j], serverEntryIDs[i]
 		}
+
 		return nil
 	})
 	if err != nil {
 		return common.ContextError(err)
 	}
 
-	for i := len(serverEntryIds) - 1; i > iterator.shuffleHeadLength-1; i-- {
-		j := rand.Intn(i+1-iterator.shuffleHeadLength) + iterator.shuffleHeadLength
-		serverEntryIds[i], serverEntryIds[j] = serverEntryIds[j], serverEntryIds[i]
-	}
-
-	iterator.serverEntryIds = serverEntryIds
+	iterator.serverEntryIDs = serverEntryIDs
 	iterator.serverEntryIndex = 0
 
 	return nil
@@ -695,7 +536,7 @@ func (iterator *ServerEntryIterator) Reset() error {
 
 // Close cleans up resources associated with a ServerEntryIterator.
 func (iterator *ServerEntryIterator) Close() {
-	iterator.serverEntryIds = nil
+	iterator.serverEntryIDs = nil
 	iterator.serverEntryIndex = 0
 }
 
@@ -724,19 +565,19 @@ func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
 	// Loop until we have the next server entry that matches the iterator
 	// filter requirements.
 	for {
-		if iterator.serverEntryIndex >= len(iterator.serverEntryIds) {
+		if iterator.serverEntryIndex >= len(iterator.serverEntryIDs) {
 			// There is no next item
 			return nil, nil
 		}
 
-		serverEntryId := iterator.serverEntryIds[iterator.serverEntryIndex]
+		serverEntryID := iterator.serverEntryIDs[iterator.serverEntryIndex]
 		iterator.serverEntryIndex += 1
 
 		var data []byte
 
-		err = dataStoreView(func(tx *bolt.Tx) error {
-			bucket := tx.Bucket([]byte(serverEntriesBucket))
-			value := bucket.Get([]byte(serverEntryId))
+		err = datastoreView(func(tx *datastoreTx) error {
+			bucket := tx.bucket(datastoreServerEntriesBucket)
+			value := bucket.get(serverEntryID)
 			if value != nil {
 				// Must make a copy as slice is only valid within transaction.
 				data = make([]byte, len(value))
@@ -751,7 +592,7 @@ func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
 		if data == nil {
 			// In case of data corruption or a bug causing this condition,
 			// do not stop iterating.
-			NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", serverEntryId)
+			NoticeAlert("ServerEntryIterator.Next: unexpected missing server entry: %s", string(serverEntryID))
 			continue
 		}
 
@@ -763,6 +604,10 @@ func (iterator *ServerEntryIterator) Next() (*protocol.ServerEntry, error) {
 			continue
 		}
 
+		if iterator.serverEntryIndex%datastoreServerEntryFetchGCThreshold == 0 {
+			defaultGarbageCollection()
+		}
+
 		// Check filter requirements
 
 		if iterator.isTacticsServerEntryIterator {
@@ -798,13 +643,13 @@ func MakeCompatibleServerEntry(serverEntry *protocol.ServerEntry) *protocol.Serv
 }
 
 func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(serverEntriesBucket))
-		cursor := bucket.Cursor()
-
-		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
-			serverEntry := new(protocol.ServerEntry)
-			err := json.Unmarshal(value, serverEntry)
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreServerEntriesBucket)
+		cursor := bucket.cursor()
+		n := 0
+		for key, value := cursor.first(); key != nil; key, value = cursor.next() {
+			var serverEntry *protocol.ServerEntry
+			err := json.Unmarshal(value, &serverEntry)
 			if err != nil {
 				// In case of data corruption or a bug causing this condition,
 				// do not stop iterating.
@@ -812,8 +657,14 @@ func scanServerEntries(scanner func(*protocol.ServerEntry)) error {
 				continue
 			}
 			scanner(serverEntry)
-		}
 
+			n += 1
+			if n == datastoreServerEntryFetchGCThreshold {
+				defaultGarbageCollection()
+				n = 0
+			}
+		}
+		cursor.close()
 		return nil
 	})
 
@@ -906,33 +757,17 @@ func ReportAvailableRegions(config *Config, limitState *limitTunnelProtocolsStat
 	NoticeAvailableEgressRegions(regionList)
 }
 
-// GetServerEntryIpAddresses returns an array containing
-// all stored server IP addresses.
-func GetServerEntryIpAddresses() ([]string, error) {
-
-	ipAddresses := make([]string, 0)
-	err := scanServerEntries(func(serverEntry *protocol.ServerEntry) {
-		ipAddresses = append(ipAddresses, serverEntry.IpAddress)
-	})
-
-	if err != nil {
-		return nil, common.ContextError(err)
-	}
-
-	return ipAddresses, nil
-}
-
 // SetSplitTunnelRoutes updates the cached routes data for
 // the given region. The associated etag is also stored and
 // used to make efficient web requests for updates to the data.
 func SetSplitTunnelRoutes(region, etag string, data []byte) error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
-		err := bucket.Put([]byte(region), []byte(etag))
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
+		err := bucket.put([]byte(region), []byte(etag))
 
-		bucket = tx.Bucket([]byte(splitTunnelRouteDataBucket))
-		err = bucket.Put([]byte(region), data)
+		bucket = tx.bucket(datastoreSplitTunnelRouteDataBucket)
+		err = bucket.put([]byte(region), data)
 		return err
 	})
 
@@ -948,9 +783,9 @@ func GetSplitTunnelRoutesETag(region string) (string, error) {
 
 	var etag string
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteETagsBucket))
-		etag = string(bucket.Get([]byte(region)))
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreSplitTunnelRouteETagsBucket)
+		etag = string(bucket.get([]byte(region)))
 		return nil
 	})
 
@@ -966,9 +801,9 @@ func GetSplitTunnelRoutesData(region string) ([]byte, error) {
 
 	var data []byte
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
-		value := bucket.Get([]byte(region))
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreSplitTunnelRouteDataBucket)
+		value := bucket.get([]byte(region))
 		if value != nil {
 			// Must make a copy as slice is only valid within transaction.
 			data = make([]byte, len(value))
@@ -988,9 +823,9 @@ func GetSplitTunnelRoutesData(region string) ([]byte, error) {
 // encoded or decoded or otherwise canonicalized.
 func SetUrlETag(url, etag string) error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(urlETagsBucket))
-		err := bucket.Put([]byte(url), []byte(etag))
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreUrlETagsBucket)
+		err := bucket.put([]byte(url), []byte(etag))
 		return err
 	})
 
@@ -1006,9 +841,9 @@ func GetUrlETag(url string) (string, error) {
 
 	var etag string
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(urlETagsBucket))
-		etag = string(bucket.Get([]byte(url)))
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreUrlETagsBucket)
+		etag = string(bucket.get([]byte(url)))
 		return nil
 	})
 
@@ -1021,9 +856,9 @@ func GetUrlETag(url string) (string, error) {
 // SetKeyValue stores a key/value pair.
 func SetKeyValue(key, value string) error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(keyValueBucket))
-		err := bucket.Put([]byte(key), []byte(value))
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreKeyValueBucket)
+		err := bucket.put([]byte(key), []byte(value))
 		return err
 	})
 
@@ -1039,9 +874,9 @@ func GetKeyValue(key string) (string, error) {
 
 	var value string
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(keyValueBucket))
-		value = string(bucket.Get([]byte(key)))
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreKeyValueBucket)
+		value = string(bucket.get([]byte(key)))
 		return nil
 	})
 
@@ -1065,7 +900,7 @@ var persistentStatStateUnreported = []byte("0")
 var persistentStatStateReporting = []byte("1")
 
 var persistentStatTypes = []string{
-	PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST,
+	datastorePersistentStatTypeRemoteServerList,
 }
 
 // StorePersistentStat adds a new persistent stat record, which
@@ -1084,9 +919,9 @@ func StorePersistentStat(statType string, stat []byte) error {
 		return common.ContextError(fmt.Errorf("invalid persistent stat type: %s", statType))
 	}
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(statType))
-		err := bucket.Put(stat, persistentStatStateUnreported)
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+		bucket := tx.bucket([]byte(statType))
+		err := bucket.put(stat, persistentStatStateUnreported)
 		return err
 	})
 
@@ -1103,18 +938,19 @@ func CountUnreportedPersistentStats() int {
 
 	unreported := 0
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
+	err := datastoreView(func(tx *datastoreTx) error {
 
 		for _, statType := range persistentStatTypes {
 
-			bucket := tx.Bucket([]byte(statType))
-			cursor := bucket.Cursor()
-			for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			bucket := tx.bucket([]byte(statType))
+			cursor := bucket.cursor()
+			for key, value := cursor.first(); key != nil; key, value = cursor.next() {
 				if 0 == bytes.Compare(value, persistentStatStateUnreported) {
 					unreported++
 					break
 				}
 			}
+			cursor.close()
 		}
 		return nil
 	})
@@ -1136,15 +972,15 @@ func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error)
 
 	stats := make(map[string][][]byte)
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
+	err := datastoreUpdate(func(tx *datastoreTx) error {
 
 		count := 0
 
 		for _, statType := range persistentStatTypes {
 
-			bucket := tx.Bucket([]byte(statType))
-			cursor := bucket.Cursor()
-			for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+			bucket := tx.bucket([]byte(statType))
+			cursor := bucket.cursor()
+			for key, value := cursor.first(); key != nil; key, value = cursor.next() {
 
 				if count >= maxCount {
 					break
@@ -1174,9 +1010,10 @@ func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error)
 					count += 1
 				}
 			}
+			cursor.close()
 
 			for _, key := range stats[statType] {
-				err := bucket.Put(key, persistentStatStateReporting)
+				err := bucket.put(key, persistentStatStateReporting)
 				if err != nil {
 					return err
 				}
@@ -1197,13 +1034,13 @@ func TakeOutUnreportedPersistentStats(maxCount int) (map[string][][]byte, error)
 // stat records to StateUnreported.
 func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
+	err := datastoreUpdate(func(tx *datastoreTx) error {
 
 		for _, statType := range persistentStatTypes {
 
-			bucket := tx.Bucket([]byte(statType))
+			bucket := tx.bucket([]byte(statType))
 			for _, key := range stats[statType] {
-				err := bucket.Put(key, persistentStatStateUnreported)
+				err := bucket.put(key, persistentStatStateUnreported)
 				if err != nil {
 					return err
 				}
@@ -1224,13 +1061,13 @@ func PutBackUnreportedPersistentStats(stats map[string][][]byte) error {
 // stat records that were successfully reported.
 func ClearReportedPersistentStats(stats map[string][][]byte) error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
+	err := datastoreUpdate(func(tx *datastoreTx) error {
 
 		for _, statType := range persistentStatTypes {
 
-			bucket := tx.Bucket([]byte(statType))
+			bucket := tx.bucket([]byte(statType))
 			for _, key := range stats[statType] {
-				err := bucket.Delete(key)
+				err := bucket.delete(key)
 				if err != nil {
 					return err
 				}
@@ -1253,22 +1090,23 @@ func ClearReportedPersistentStats(stats map[string][][]byte) error {
 // persistent records in StateReporting were reported or not.
 func resetAllPersistentStatsToUnreported() error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
+	err := datastoreUpdate(func(tx *datastoreTx) error {
 
 		for _, statType := range persistentStatTypes {
 
-			bucket := tx.Bucket([]byte(statType))
+			bucket := tx.bucket([]byte(statType))
 			resetKeys := make([][]byte, 0)
-			cursor := bucket.Cursor()
-			for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
+			cursor := bucket.cursor()
+			for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
 				resetKeys = append(resetKeys, key)
 			}
+			cursor.close()
 			// TODO: data mutation is done outside cursor. Is this
 			// strictly necessary in this case? As is, this means
 			// all stats need to be loaded into memory at once.
 			// https://godoc.org/github.com/boltdb/bolt#Cursor
 			for _, key := range resetKeys {
-				err := bucket.Put(key, persistentStatStateUnreported)
+				err := bucket.put(key, persistentStatStateUnreported)
 				if err != nil {
 					return err
 				}
@@ -1290,12 +1128,13 @@ func CountSLOKs() int {
 
 	count := 0
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(slokBucket))
-		cursor := bucket.Cursor()
-		for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreSLOKsBucket)
+		cursor := bucket.cursor()
+		for key := cursor.firstKey(); key != nil; key = cursor.nextKey() {
 			count++
 		}
+		cursor.close()
 		return nil
 	})
 
@@ -1310,12 +1149,8 @@ func CountSLOKs() int {
 // DeleteSLOKs deletes all SLOK records.
 func DeleteSLOKs() error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(slokBucket))
-		return bucket.ForEach(
-			func(id, _ []byte) error {
-				return bucket.Delete(id)
-			})
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+		return tx.clearBucket(datastoreSLOKsBucket)
 	})
 
 	if err != nil {
@@ -1331,10 +1166,10 @@ func SetSLOK(id, key []byte) (bool, error) {
 
 	var duplicate bool
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(slokBucket))
-		duplicate = bucket.Get(id) != nil
-		err := bucket.Put([]byte(id), []byte(key))
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreSLOKsBucket)
+		duplicate = bucket.get(id) != nil
+		err := bucket.put([]byte(id), []byte(key))
 		return err
 	})
 
@@ -1351,9 +1186,9 @@ func GetSLOK(id []byte) ([]byte, error) {
 
 	var key []byte
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket([]byte(slokBucket))
-		key = bucket.Get(id)
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(datastoreSLOKsBucket)
+		key = bucket.get(id)
 		return nil
 	})
 
@@ -1369,19 +1204,19 @@ type TacticsStorer struct {
 }
 
 func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
-	return setBucketValue([]byte(tacticsBucket), []byte(networkID), record)
+	return setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
 }
 
 func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
-	return getBucketValue([]byte(tacticsBucket), []byte(networkID))
+	return getBucketValue(datastoreTacticsBucket, []byte(networkID))
 }
 
 func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
-	return setBucketValue([]byte(speedTestSamplesBucket), []byte(networkID), record)
+	return setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
 }
 
 func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
-	return getBucketValue([]byte(speedTestSamplesBucket), []byte(networkID))
+	return getBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
 }
 
 // GetTacticsStorer creates a TacticsStorer.
@@ -1391,9 +1226,9 @@ func GetTacticsStorer() *TacticsStorer {
 
 func setBucketValue(bucket, key, value []byte) error {
 
-	err := dataStoreUpdate(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket(bucket)
-		err := bucket.Put(key, value)
+	err := datastoreUpdate(func(tx *datastoreTx) error {
+		bucket := tx.bucket(bucket)
+		err := bucket.put(key, value)
 		return err
 	})
 
@@ -1408,9 +1243,9 @@ func getBucketValue(bucket, key []byte) ([]byte, error) {
 
 	var value []byte
 
-	err := dataStoreView(func(tx *bolt.Tx) error {
-		bucket := tx.Bucket(bucket)
-		value = bucket.Get(key)
+	err := datastoreView(func(tx *datastoreTx) error {
+		bucket := tx.bucket(bucket)
+		value = bucket.get(key)
 		return nil
 	})
 

+ 225 - 0
psiphon/dataStore_badger.go

@@ -0,0 +1,225 @@
+// +build BADGER_DB
+
+/*
+ * Copyright (c) 2018, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"os"
+	"path/filepath"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/dgraph-io/badger"
+	"github.com/dgraph-io/badger/options"
+)
+
+const (
+	DATA_STORE_DIRECTORY = "psiphon.badgerdb"
+)
+
+type datastoreDB struct {
+	badgerDB *badger.DB
+}
+
+type datastoreTx struct {
+	badgerTx *badger.Txn
+}
+
+type datastoreBucket struct {
+	name []byte
+	tx   *datastoreTx
+}
+
+type datastoreCursor struct {
+	badgerIterator *badger.Iterator
+	prefix         []byte
+}
+
+func datastoreOpenDB(rootDataDirectory string) (*datastoreDB, error) {
+
+	dbDirectory := filepath.Join(rootDataDirectory, "psiphon.badgerdb")
+
+	err := os.MkdirAll(dbDirectory, 0700)
+	if err != nil {
+		return nil, common.ContextError(err)
+	}
+
+	opts := badger.DefaultOptions
+
+	opts.Dir = dbDirectory
+	opts.ValueDir = dbDirectory
+
+	opts.TableLoadingMode = options.FileIO
+	opts.ValueLogLoadingMode = options.FileIO
+	opts.MaxTableSize = 1 << 16
+	opts.ValueLogFileSize = 1 << 20
+	opts.NumMemtables = 1
+	opts.NumLevelZeroTables = 1
+	opts.NumLevelZeroTablesStall = 2
+	opts.NumCompactors = 1
+
+	db, err := badger.Open(opts)
+	if err != nil {
+		return nil, common.ContextError(err)
+	}
+
+	for {
+		if db.RunValueLogGC(0.5) != nil {
+			break
+		}
+	}
+
+	return &datastoreDB{badgerDB: db}, nil
+}
+
+func (db *datastoreDB) close() error {
+	return db.badgerDB.Close()
+}
+
+func (db *datastoreDB) view(fn func(tx *datastoreTx) error) error {
+	return db.badgerDB.View(
+		func(tx *badger.Txn) error {
+			err := fn(&datastoreTx{badgerTx: tx})
+			if err != nil {
+				return common.ContextError(err)
+			}
+			return nil
+		})
+}
+
+func (db *datastoreDB) update(fn func(tx *datastoreTx) error) error {
+	return db.badgerDB.Update(
+		func(tx *badger.Txn) error {
+			err := fn(&datastoreTx{badgerTx: tx})
+			if err != nil {
+				return common.ContextError(err)
+			}
+			return nil
+		})
+}
+
+func (tx *datastoreTx) bucket(name []byte) *datastoreBucket {
+	return &datastoreBucket{
+		name: name,
+		tx:   tx,
+	}
+}
+
+func (tx *datastoreTx) clearBucket(name []byte) error {
+	b := tx.bucket(name)
+	c := b.cursor()
+	for key := c.firstKey(); key != nil; key = c.nextKey() {
+		err := tx.badgerTx.Delete(key)
+		if err != nil {
+			return common.ContextError(err)
+		}
+	}
+	return nil
+}
+
+func (b *datastoreBucket) get(key []byte) []byte {
+	keyWithPrefix := append(b.name, key...)
+	item, err := b.tx.badgerTx.Get(keyWithPrefix)
+	if err != nil {
+		if err != badger.ErrKeyNotFound {
+			// The original datastore interface does not return an error from
+			// Get, so emit notice.
+			NoticeAlert("get failed: %s: %s",
+				string(keyWithPrefix), common.ContextError(err))
+		}
+		return nil
+	}
+	value, err := item.Value()
+	if err != nil {
+		NoticeAlert("get failed: %s: %s",
+			string(keyWithPrefix), common.ContextError(err))
+		return nil
+	}
+	return value
+}
+
+func (b *datastoreBucket) put(key, value []byte) error {
+	keyWithPrefix := append(b.name, key...)
+	err := b.tx.badgerTx.Set(keyWithPrefix, value)
+	if err != nil {
+		return common.ContextError(err)
+	}
+	return nil
+}
+
+func (b *datastoreBucket) delete(key []byte) error {
+	keyWithPrefix := append(b.name, key...)
+	err := b.tx.badgerTx.Delete(keyWithPrefix)
+	if err != nil {
+		return common.ContextError(err)
+	}
+	return nil
+}
+
+func (b *datastoreBucket) cursor() *datastoreCursor {
+	opts := badger.DefaultIteratorOptions
+	opts.PrefetchValues = false
+	iterator := b.tx.badgerTx.NewIterator(opts)
+	return &datastoreCursor{badgerIterator: iterator, prefix: b.name}
+}
+
+func (c *datastoreCursor) firstKey() []byte {
+	c.badgerIterator.Seek(c.prefix)
+	return c.currentKey()
+}
+
+func (c *datastoreCursor) currentKey() []byte {
+	if !c.badgerIterator.ValidForPrefix(c.prefix) {
+		return nil
+	}
+	item := c.badgerIterator.Item()
+	return item.Key()[len(c.prefix):]
+}
+
+func (c *datastoreCursor) nextKey() []byte {
+	c.badgerIterator.Next()
+	return c.currentKey()
+}
+
+func (c *datastoreCursor) first() ([]byte, []byte) {
+	c.badgerIterator.Seek(c.prefix)
+	return c.current()
+}
+
+func (c *datastoreCursor) current() ([]byte, []byte) {
+	if !c.badgerIterator.ValidForPrefix(c.prefix) {
+		return nil, nil
+	}
+	item := c.badgerIterator.Item()
+	value, err := item.Value()
+	if err != nil {
+		return nil, nil
+	}
+	return item.Key()[len(c.prefix):], value
+}
+
+func (c *datastoreCursor) next() ([]byte, []byte) {
+	c.badgerIterator.Next()
+	return c.current()
+}
+
+func (c *datastoreCursor) close() {
+	c.badgerIterator.Close()
+}

+ 395 - 0
psiphon/dataStore_files.go

@@ -0,0 +1,395 @@
+// +build FILES_DB
+
+/*
+ * Copyright (c) 2018, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package psiphon
+
+import (
+	"bytes"
+	"encoding/hex"
+	"errors"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+)
+
+// datastoreDB is a simple filesystem-backed key/value store that implements
+// the datastore interface.
+//
+// The current implementation is intended only for experimentation.
+//
+// Buckets are subdirectories, keys are file names (hex-encoded), and values
+// are file contents. Unlike other datastores, update transactions are neither
+// atomic not isolcated; only each put is individually atomic.
+//
+// A buffer pool is used to reduce memory allocation/GC churn from loading
+// file values into memory. Transactions and cursors track and release shared
+// buffers.
+//
+// As with the original datastore interface, value slices are only valid
+// within a transaction; for cursors, there's a further limitation that the
+// value slices are only valid until the next iteration.
+type datastoreDB struct {
+	dataDirectory string
+	bufferPool    sync.Pool
+	lock          sync.RWMutex
+	closed        bool
+}
+
+type datastoreTx struct {
+	db        *datastoreDB
+	canUpdate bool
+	buffers   []*bytes.Buffer
+}
+
+type datastoreBucket struct {
+	bucketDirectory string
+	tx              *datastoreTx
+}
+
+type datastoreCursor struct {
+	bucket     *datastoreBucket
+	fileInfos  []os.FileInfo
+	index      int
+	lastBuffer *bytes.Buffer
+}
+
+func datastoreOpenDB(rootDataDirectory string) (*datastoreDB, error) {
+
+	dataDirectory := filepath.Join(rootDataDirectory, "psiphon.filesdb")
+	err := os.MkdirAll(dataDirectory, 0700)
+	if err != nil {
+		return nil, common.ContextError(err)
+	}
+
+	return &datastoreDB{
+		dataDirectory: dataDirectory,
+		bufferPool: sync.Pool{
+			New: func() interface{} {
+				return new(bytes.Buffer)
+			},
+		},
+	}, nil
+}
+
+func (db *datastoreDB) getBuffer() *bytes.Buffer {
+	return db.bufferPool.Get().(*bytes.Buffer)
+}
+
+func (db *datastoreDB) putBuffer(buffer *bytes.Buffer) {
+	buffer.Truncate(0)
+	db.bufferPool.Put(buffer)
+}
+
+func (db *datastoreDB) readBuffer(filename string) (*bytes.Buffer, error) {
+	// Complete any partial put commit.
+	err := datastoreApplyCommit(filename)
+	if err != nil {
+		return nil, common.ContextError(err)
+	}
+	file, err := os.Open(filename)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return nil, nil
+		}
+		return nil, common.ContextError(err)
+	}
+	defer file.Close()
+	buffer := db.getBuffer()
+	_, err = buffer.ReadFrom(file)
+	if err != nil {
+		return nil, common.ContextError(err)
+	}
+	return buffer, nil
+}
+
+func (db *datastoreDB) close() error {
+	// close will await any active view and update transactions via this lock.
+	db.lock.Lock()
+	defer db.lock.Unlock()
+	db.closed = true
+	return nil
+}
+
+func (db *datastoreDB) view(fn func(tx *datastoreTx) error) error {
+	db.lock.RLock()
+	defer db.lock.RUnlock()
+	if db.closed {
+		return common.ContextError(errors.New("closed"))
+	}
+	tx := &datastoreTx{db: db}
+	defer tx.releaseBuffers()
+	err := fn(tx)
+	if err != nil {
+		return common.ContextError(err)
+	}
+	return nil
+}
+
+func (db *datastoreDB) update(fn func(tx *datastoreTx) error) error {
+	db.lock.Lock()
+	defer db.lock.Unlock()
+	if db.closed {
+		return common.ContextError(errors.New("closed"))
+	}
+	tx := &datastoreTx{db: db, canUpdate: true}
+	defer tx.releaseBuffers()
+	err := fn(tx)
+	if err != nil {
+		return common.ContextError(err)
+	}
+	return nil
+}
+
+func (tx *datastoreTx) bucket(name []byte) *datastoreBucket {
+	bucketDirectory := filepath.Join(tx.db.dataDirectory, hex.EncodeToString(name))
+	err := os.MkdirAll(bucketDirectory, 0700)
+	if err != nil {
+		// The original datastore interface does not return an error from Bucket,
+		// so emit notice, and return zero-value bucket for which all
+		// operations will fail.
+		NoticeAlert("bucket failed: %s", common.ContextError(err))
+		return &datastoreBucket{}
+	}
+	return &datastoreBucket{
+		bucketDirectory: bucketDirectory,
+		tx:              tx,
+	}
+}
+
+func (tx *datastoreTx) clearBucket(name []byte) error {
+	bucketDirectory := filepath.Join(tx.db.dataDirectory, hex.EncodeToString(name))
+	err := os.RemoveAll(bucketDirectory)
+	if err != nil {
+		return common.ContextError(err)
+	}
+	return nil
+}
+
+func (tx *datastoreTx) releaseBuffers() {
+	for _, buffer := range tx.buffers {
+		tx.db.putBuffer(buffer)
+	}
+	tx.buffers = nil
+}
+
+func (b *datastoreBucket) get(key []byte) []byte {
+	if b.tx == nil {
+		return nil
+	}
+	filename := filepath.Join(b.bucketDirectory, hex.EncodeToString(key))
+	valueBuffer, err := b.tx.db.readBuffer(filename)
+	if err != nil {
+		// The original datastore interface does not return an error from Get,
+		// so emit notice.
+		NoticeAlert("get failed: %s", common.ContextError(err))
+		return nil
+	}
+	if valueBuffer == nil {
+		return nil
+	}
+	b.tx.buffers = append(b.tx.buffers, valueBuffer)
+	return valueBuffer.Bytes()
+}
+
+func (b *datastoreBucket) put(key, value []byte) error {
+	if b.tx == nil {
+		return common.ContextError(errors.New("bucket not found"))
+	}
+	if !b.tx.canUpdate {
+		return common.ContextError(errors.New("non-update transaction"))
+	}
+
+	filename := filepath.Join(b.bucketDirectory, hex.EncodeToString(key))
+
+	// Complete any partial put commit.
+	err := datastoreApplyCommit(filename)
+	if err != nil {
+		return common.ContextError(err)
+	}
+
+	putFilename := filename + ".put"
+	err = ioutil.WriteFile(putFilename, value, 0600)
+	if err != nil {
+		return common.ContextError(err)
+	}
+
+	commitFilename := filename + ".commit"
+	err = os.Rename(putFilename, commitFilename)
+	if err != nil {
+		return common.ContextError(err)
+	}
+
+	err = datastoreApplyCommit(filename)
+	if err != nil {
+		return common.ContextError(err)
+	}
+
+	return nil
+}
+
+func datastoreApplyCommit(filename string) error {
+	commitFilename := filename + ".commit"
+	if _, err := os.Stat(commitFilename); err != nil && os.IsNotExist(err) {
+		return nil
+	}
+	// TODO: may not be sufficient atomic
+	err := os.Rename(commitFilename, filename)
+	if err != nil {
+		return common.ContextError(err)
+	}
+	return nil
+}
+
+func (b *datastoreBucket) delete(key []byte) error {
+	if b.tx == nil {
+		return common.ContextError(errors.New("bucket not found"))
+	}
+	filename := filepath.Join(b.bucketDirectory, hex.EncodeToString(key))
+	filenames := []string{filename + ".put", filename + ".commit", filename}
+	for _, filename := range filenames {
+		err := os.Remove(filename)
+		if err != nil && !os.IsNotExist(err) {
+			return common.ContextError(err)
+		}
+	}
+	return nil
+}
+
+func (b *datastoreBucket) cursor() *datastoreCursor {
+	if b.tx == nil {
+		// The original datastore interface does not return an error from
+		// Cursor, so emit notice, and return zero-value cursor for which all
+		// operations will fail.
+		return &datastoreCursor{}
+	}
+	fileInfos, err := ioutil.ReadDir(b.bucketDirectory)
+	if err != nil {
+		NoticeAlert("cursor failed: %s", common.ContextError(err))
+		return &datastoreCursor{}
+	}
+	return &datastoreCursor{
+		bucket:    b,
+		fileInfos: fileInfos,
+	}
+}
+
+func (c *datastoreCursor) advance() {
+	if c.bucket == nil {
+		return
+	}
+	for {
+		c.index += 1
+		if c.index <= len(c.fileInfos) {
+			break
+		}
+		// Skip any .put or .commit files
+		if strings.Contains(c.fileInfos[c.index].Name(), ".") {
+			continue
+		}
+	}
+}
+
+func (c *datastoreCursor) firstKey() []byte {
+	if c.bucket == nil {
+		return nil
+	}
+	c.index = 0
+	return c.currentKey()
+}
+
+func (c *datastoreCursor) currentKey() []byte {
+	if c.bucket == nil {
+		return nil
+	}
+	if c.index >= len(c.fileInfos) {
+		return nil
+	}
+	info := c.fileInfos[c.index]
+	if info.IsDir() {
+		NoticeAlert("cursor failed: unexpected dir")
+		return nil
+	}
+	key, err := hex.DecodeString(info.Name())
+	if err != nil {
+		NoticeAlert("cursor failed: %s", common.ContextError(err))
+		return nil
+	}
+	return key
+}
+
+func (c *datastoreCursor) nextKey() []byte {
+	if c.bucket == nil {
+		return nil
+	}
+	c.advance()
+	return c.currentKey()
+}
+
+func (c *datastoreCursor) first() ([]byte, []byte) {
+	if c.bucket == nil {
+		return nil, nil
+	}
+	c.index = 0
+	return c.current()
+}
+
+func (c *datastoreCursor) current() ([]byte, []byte) {
+	key := c.currentKey()
+	if key == nil {
+		return nil, nil
+	}
+
+	if c.lastBuffer != nil {
+		c.bucket.tx.db.putBuffer(c.lastBuffer)
+	}
+	c.lastBuffer = nil
+
+	filename := filepath.Join(c.bucket.bucketDirectory, hex.EncodeToString(key))
+	valueBuffer, err := c.bucket.tx.db.readBuffer(filename)
+	if valueBuffer == nil {
+		err = errors.New("unexpected nil value")
+	}
+	if err != nil {
+		NoticeAlert("cursor failed: %s", common.ContextError(err))
+		return nil, nil
+	}
+	c.lastBuffer = valueBuffer
+	return key, valueBuffer.Bytes()
+}
+
+func (c *datastoreCursor) next() ([]byte, []byte) {
+	if c.bucket == nil {
+		return nil, nil
+	}
+	c.advance()
+	return c.current()
+}
+
+func (c *datastoreCursor) close() {
+	if c.lastBuffer != nil {
+		c.bucket.tx.db.putBuffer(c.lastBuffer)
+		c.lastBuffer = nil
+	}
+}

+ 0 - 3
psiphon/limitProtocols_test.go

@@ -24,7 +24,6 @@ import (
 	"fmt"
 	"io/ioutil"
 	"os"
-	"path/filepath"
 	"sync"
 	"testing"
 	"time"
@@ -95,8 +94,6 @@ func TestLimitTunnelProtocols(t *testing.T) {
 	}
 	defer os.RemoveAll(testDataDirName)
 
-	os.Remove(filepath.Join(testDataDirName, DATA_STORE_FILENAME))
-
 	clientConfigJSON := `
     {
         "ClientPlatform" : "Windows",

+ 0 - 1
psiphon/memory_test/memory_test.go

@@ -83,7 +83,6 @@ func runMemoryTest(t *testing.T, testMode int) {
 		os.Exit(1)
 	}
 	defer os.RemoveAll(testDataDirName)
-	os.Remove(filepath.Join(testDataDirName, psiphon.DATA_STORE_FILENAME))
 
 	psiphon.SetEmitDiagnosticNotices(true)
 

+ 0 - 2
psiphon/remoteServerList_test.go

@@ -205,8 +205,6 @@ func testObfuscatedRemoteServerLists(t *testing.T, omitMD5Sums bool) {
 	// mock seeding SLOKs
 	//
 
-	os.Remove(filepath.Join(testDataDirName, DATA_STORE_FILENAME))
-
 	err = OpenDataStore(&Config{DataStoreDirectory: testDataDirName})
 	if err != nil {
 		t.Fatalf("error initializing client datastore: %s", err)

+ 0 - 2
psiphon/server/server_test.go

@@ -79,8 +79,6 @@ func TestMain(m *testing.M) {
 	}
 	defer os.RemoveAll(testDataDirName)
 
-	os.Remove(filepath.Join(testDataDirName, psiphon.DATA_STORE_FILENAME))
-
 	psiphon.SetEmitDiagnosticNotices(true)
 
 	mockWebServerURL, mockWebServerExpectedResponse = runMockWebServer()

+ 4 - 4
psiphon/serverApi.go

@@ -315,7 +315,7 @@ func (serverContext *ServerContext) DoConnectedRequest() error {
 
 	params := serverContext.getBaseAPIParameters()
 
-	lastConnected, err := GetKeyValue(DATA_STORE_LAST_CONNECTED_KEY)
+	lastConnected, err := GetKeyValue(datastoreLastConnectedKey)
 	if err != nil {
 		return common.ContextError(err)
 	}
@@ -361,7 +361,7 @@ func (serverContext *ServerContext) DoConnectedRequest() error {
 	}
 
 	err = SetKeyValue(
-		DATA_STORE_LAST_CONNECTED_KEY, connectedResponse.ConnectedTimestamp)
+		datastoreLastConnectedKey, connectedResponse.ConnectedTimestamp)
 	if err != nil {
 		return common.ContextError(err)
 	}
@@ -515,7 +515,7 @@ func makeStatusRequestPayload(
 	payload["https_requests"] = make([]string, 0)
 
 	persistentStatPayloadNames := make(map[string]string)
-	persistentStatPayloadNames[PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST] = "remote_server_list_stats"
+	persistentStatPayloadNames[datastorePersistentStatTypeRemoteServerList] = "remote_server_list_stats"
 
 	for statType, stats := range persistentStats {
 
@@ -607,7 +607,7 @@ func RecordRemoteServerListStat(
 	}
 
 	return StorePersistentStat(
-		PERSISTENT_STAT_TYPE_REMOTE_SERVER_LIST, remoteServerListStatJson)
+		datastorePersistentStatTypeRemoteServerList, remoteServerListStatJson)
 }
 
 // doGetRequest makes a tunneled HTTPS request and returns the response body.