Browse Source

Merge pull request #561 from rod-hynes/master

Add Open/CloseDataStore calls to GetTactics via TacticsStorer
Rod Hynes 5 years ago
parent
commit
7d3068606e

+ 3 - 2
ClientLibrary/clientlib/clientlib.go

@@ -118,7 +118,7 @@ var ErrTimeout = std_errors.New("clientlib: tunnel establishment timeout")
 func StartTunnel(ctx context.Context,
 	configJSON []byte, embeddedServerEntryList string,
 	params Parameters, paramsDelta ClientParametersDelta,
-	noticeReceiver func(NoticeEvent)) (tunnel *PsiphonTunnel, err error) {
+	noticeReceiver func(NoticeEvent)) (tunnel *PsiphonTunnel, retErr error) {
 
 	config, err := psiphon.LoadConfig(configJSON)
 	if err != nil {
@@ -176,7 +176,7 @@ func StartTunnel(ctx context.Context,
 	}
 	// Make sure we close the datastore in case of error
 	defer func() {
-		if err != nil {
+		if retErr != nil {
 			psiphon.CloseDataStore()
 		}
 	}()
@@ -287,6 +287,7 @@ func StartTunnel(ctx context.Context,
 }
 
 // Stop stops/disconnects/shuts down the tunnel. It is safe to call when not connected.
+// Not safe to call concurrently with Start.
 func (tunnel *PsiphonTunnel) Stop() {
 	if tunnel.stopController != nil {
 		tunnel.stopController()

+ 1 - 0
MobileLibrary/psi/psi.go

@@ -183,6 +183,7 @@ func Start(
 
 	controller, err = psiphon.NewController(config)
 	if err != nil {
+		psiphon.CloseDataStore()
 		return fmt.Errorf("error initializing controller: %s", err)
 	}
 

+ 168 - 39
psiphon/dataStore.go

@@ -22,6 +22,7 @@ package psiphon
 import (
 	"bytes"
 	"encoding/json"
+	"math"
 	"sync"
 	"time"
 
@@ -53,20 +54,41 @@ var (
 	datastorePersistentStatTypeFailedTunnel     = string(datastoreFailedTunnelStatsBucket)
 	datastoreServerEntryFetchGCThreshold        = 10
 
-	datastoreMutex    sync.RWMutex
-	activeDatastoreDB *datastoreDB
+	datastoreMutex          sync.RWMutex
+	datastoreReferenceCount int64
+	activeDatastoreDB       *datastoreDB
 )
 
-// OpenDataStore opens and initializes the singleton data store instance.
+// OpenDataStore opens and initializes the singleton datastore instance.
+//
+// Nested Open/CloseDataStore calls are supported: OpenDataStore will succeed
+// when called when the datastore is initialized. Every call to OpenDataStore
+// must be paired with a corresponding call to CloseDataStore to ensure the
+// datastore is closed.
 func OpenDataStore(config *Config) error {
 
 	datastoreMutex.Lock()
 
+	if datastoreReferenceCount < 0 || datastoreReferenceCount == math.MaxInt64 {
+		datastoreMutex.Unlock()
+		return errors.Tracef(
+			"invalid datastore reference count: %d", datastoreReferenceCount)
+	}
+	datastoreReferenceCount += 1
+	if datastoreReferenceCount > 1 {
+		var err error
+		if activeDatastoreDB == nil {
+			err = errors.TraceNew("datastore unexpectedly closed")
+		}
+		datastoreMutex.Unlock()
+		return err
+	}
+
 	existingDB := activeDatastoreDB
 
 	if existingDB != nil {
 		datastoreMutex.Unlock()
-		return errors.TraceNew("db already open")
+		return errors.TraceNew("datastore unexpectedly open")
 	}
 
 	newDB, err := datastoreOpenDB(config.GetDataStoreDirectory())
@@ -84,31 +106,46 @@ func OpenDataStore(config *Config) error {
 	return nil
 }
 
-// CloseDataStore closes the singleton data store instance, if open.
+// CloseDataStore closes the singleton datastore instance, if open.
 func CloseDataStore() {
 
 	datastoreMutex.Lock()
 	defer datastoreMutex.Unlock()
 
+	if datastoreReferenceCount <= 0 {
+		NoticeWarning(
+			"invalid datastore reference count: %d", datastoreReferenceCount)
+		return
+	}
+	datastoreReferenceCount -= 1
+	if datastoreReferenceCount > 0 {
+		return
+	}
+
 	if activeDatastoreDB == nil {
 		return
 	}
 
 	err := activeDatastoreDB.close()
 	if err != nil {
-		NoticeWarning("failed to close database: %s", errors.Trace(err))
+		NoticeWarning("failed to close datastore: %s", errors.Trace(err))
 	}
 
 	activeDatastoreDB = nil
 }
 
+// datastoreView runs a read-only transaction, making datastore buckets and
+// values available to the supplied function.
+//
+// Bucket value slices are only valid for the duration of the transaction and
+// _must_ not be referenced directly outside the transaction.
 func datastoreView(fn func(tx *datastoreTx) error) error {
 
 	datastoreMutex.RLock()
 	defer datastoreMutex.RUnlock()
 
 	if activeDatastoreDB == nil {
-		return errors.TraceNew("database not open")
+		return errors.TraceNew("datastore not open")
 	}
 
 	err := activeDatastoreDB.view(fn)
@@ -118,6 +155,11 @@ func datastoreView(fn func(tx *datastoreTx) error) error {
 	return err
 }
 
+// datastoreUpdate runs a read-write transaction, making datastore buckets and
+// values available to the supplied function.
+//
+// Bucket value slices are only valid for the duration of the transaction and
+// _must_ not be referenced directly outside the transaction.
 func datastoreUpdate(fn func(tx *datastoreTx) error) error {
 
 	datastoreMutex.RLock()
@@ -134,7 +176,7 @@ func datastoreUpdate(fn func(tx *datastoreTx) error) error {
 	return err
 }
 
-// StoreServerEntry adds the server entry to the data store.
+// StoreServerEntry adds the server entry to the datastore.
 //
 // When a server entry already exists for a given server, it will be
 // replaced only if replaceIfExists is set or if the the ConfigurationVersion
@@ -1672,14 +1714,14 @@ func DeleteSLOKs() error {
 
 // SetSLOK stores a SLOK key, referenced by its ID. The bool
 // return value indicates whether the SLOK was already stored.
-func SetSLOK(id, key []byte) (bool, error) {
+func SetSLOK(id, slok []byte) (bool, error) {
 
 	var duplicate bool
 
 	err := datastoreUpdate(func(tx *datastoreTx) error {
 		bucket := tx.bucket(datastoreSLOKsBucket)
 		duplicate = bucket.get(id) != nil
-		err := bucket.put([]byte(id), []byte(key))
+		err := bucket.put(id, slok)
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -1697,11 +1739,16 @@ func SetSLOK(id, key []byte) (bool, error) {
 // value is nil if the SLOK is not found.
 func GetSLOK(id []byte) ([]byte, error) {
 
-	var key []byte
+	var slok []byte
 
 	err := datastoreView(func(tx *datastoreTx) error {
 		bucket := tx.bucket(datastoreSLOKsBucket)
-		key = bucket.get(id)
+		value := bucket.get(id)
+		if value != nil {
+			// Must make a copy as slice is only valid within transaction.
+			slok = make([]byte, len(value))
+			copy(slok, value)
+		}
 		return nil
 	})
 
@@ -1709,7 +1756,7 @@ func GetSLOK(id []byte) ([]byte, error) {
 		return nil, errors.Trace(err)
 	}
 
-	return key, nil
+	return slok, nil
 }
 
 func makeDialParametersKey(serverIPAddress, networkID []byte) []byte {
@@ -1737,22 +1784,28 @@ func GetDialParameters(serverIPAddress, networkID string) (*DialParameters, erro
 
 	key := makeDialParametersKey([]byte(serverIPAddress), []byte(networkID))
 
-	data, err := getBucketValue(datastoreDialParametersBucket, key)
-	if err != nil {
-		return nil, errors.Trace(err)
-	}
+	var dialParams *DialParameters
 
-	if data == nil {
-		return nil, nil
-	}
+	err := getBucketValue(
+		datastoreDialParametersBucket,
+		key,
+		func(value []byte) error {
+			if value == nil {
+				return nil
+			}
 
-	// Note: unlike with server entries, this record is not deleted when the
-	// unmarshal fails, as the caller should proceed with the dial without dial
-	// parameters; and when when the dial succeeds, new dial parameters will be
-	// written over this record.
+			// Note: unlike with server entries, this record is not deleted when the
+			// unmarshal fails, as the caller should proceed with the dial without dial
+			// parameters; and when when the dial succeeds, new dial parameters will be
+			// written over this record.
 
-	var dialParams *DialParameters
-	err = json.Unmarshal(data, &dialParams)
+			err := json.Unmarshal(value, &dialParams)
+			if err != nil {
+				return errors.Trace(err)
+			}
+
+			return nil
+		})
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
@@ -1770,28 +1823,93 @@ func DeleteDialParameters(serverIPAddress, networkID string) error {
 }
 
 // TacticsStorer implements tactics.Storer.
+//
+// Each TacticsStorer datastore operation is wrapped with
+// OpenDataStore/CloseDataStore, which enables a limited degree of
+// multiprocess datastore synchronization:
+//
+// One process runs a Controller. Another process runs a stand-alone operation
+// which accesses tactics via GetTactics. For example, SendFeedback.
+//
+// When the Controller is running, it holds an exclusive lock on the datastore
+// and TacticsStorer operations in GetTactics in another process will fail.
+// The stand-alone operation should proceed without tactics. In many cases,
+// this is acceptable since any stand-alone operation network traffic will be
+// tunneled.
+//
+// When the Controller is not running, the TacticsStorer operations in
+// GetTactics in another process will succeed, with no operation holding a
+// datastore lock for longer than the handful of milliseconds required to
+// perform a single datastore operation.
+//
+// If the Controller is started while the stand-alone operation is in
+// progress, the Controller start will not be blocked by the brief
+// TacticsStorer datastore locks; the bolt Open call, in particular, has a 1
+// second lock aquisition timeout.
+//
+// In this scheme, no attempt is made to detect interleaving datastore writes;
+// that is, if a different process writes tactics in between GetTactics calls
+// to GetTacticsRecord and then SetTacticsRecord. This is because all tactics
+// writes are considered fresh and valid.
 type TacticsStorer struct {
+	config *Config
 }
 
 func (t *TacticsStorer) SetTacticsRecord(networkID string, record []byte) error {
-	return setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
+	err := OpenDataStore(t.config)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	defer CloseDataStore()
+	err = setBucketValue(datastoreTacticsBucket, []byte(networkID), record)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	return nil
 }
 
 func (t *TacticsStorer) GetTacticsRecord(networkID string) ([]byte, error) {
-	return getBucketValue(datastoreTacticsBucket, []byte(networkID))
+	err := OpenDataStore(t.config)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	defer CloseDataStore()
+	value, err := copyBucketValue(datastoreTacticsBucket, []byte(networkID))
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	return value, nil
 }
 
 func (t *TacticsStorer) SetSpeedTestSamplesRecord(networkID string, record []byte) error {
-	return setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
+	err := OpenDataStore(t.config)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	defer CloseDataStore()
+	err = setBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID), record)
+	if err != nil {
+		return errors.Trace(err)
+	}
+	return nil
 }
 
 func (t *TacticsStorer) GetSpeedTestSamplesRecord(networkID string) ([]byte, error) {
-	return getBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
+	err := OpenDataStore(t.config)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	defer CloseDataStore()
+	value, err := copyBucketValue(datastoreSpeedTestSamplesBucket, []byte(networkID))
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
+	return value, nil
 }
 
 // GetTacticsStorer creates a TacticsStorer.
-func GetTacticsStorer() *TacticsStorer {
-	return &TacticsStorer{}
+func GetTacticsStorer(config *Config) *TacticsStorer {
+	return &TacticsStorer{config: config}
 }
 
 // GetAffinityServerEntryAndDialParameters fetches the current affinity server
@@ -1867,21 +1985,19 @@ func setBucketValue(bucket, key, value []byte) error {
 	return nil
 }
 
-func getBucketValue(bucket, key []byte) ([]byte, error) {
-
-	var value []byte
+func getBucketValue(bucket, key []byte, valueCallback func([]byte) error) error {
 
 	err := datastoreView(func(tx *datastoreTx) error {
 		bucket := tx.bucket(bucket)
-		value = bucket.get(key)
-		return nil
+		value := bucket.get(key)
+		return valueCallback(value)
 	})
 
 	if err != nil {
-		return nil, errors.Trace(err)
+		return errors.Trace(err)
 	}
 
-	return value, nil
+	return nil
 }
 
 func deleteBucketValue(bucket, key []byte) error {
@@ -1897,3 +2013,16 @@ func deleteBucketValue(bucket, key []byte) error {
 
 	return nil
 }
+
+func copyBucketValue(bucket, key []byte) ([]byte, error) {
+	var valueCopy []byte
+	err := getBucketValue(bucket, key, func(value []byte) error {
+		if value != nil {
+			// Must make a copy as slice is only valid within transaction.
+			valueCopy = make([]byte, len(value))
+			copy(valueCopy, value)
+		}
+		return nil
+	})
+	return valueCopy, err
+}

+ 0 - 3
psiphon/dataStoreRecovery_test.go

@@ -216,7 +216,6 @@ func TestBoltResiliency(t *testing.T) {
 	if err != nil {
 		t.Fatalf("OpenDataStore failed: %s", err)
 	}
-	defer CloseDataStore()
 
 	paveServerEntries()
 
@@ -241,7 +240,6 @@ func TestBoltResiliency(t *testing.T) {
 	if err != nil {
 		t.Fatalf("OpenDataStore failed: %s", err)
 	}
-	defer CloseDataStore()
 
 	<-noticeResetDatastore
 
@@ -277,7 +275,6 @@ func TestBoltResiliency(t *testing.T) {
 	if err != nil {
 		t.Fatalf("OpenDataStore failed: %s", err)
 	}
-	defer CloseDataStore()
 
 	<-noticeResetDatastore
 

+ 1 - 1
psiphon/dataStore_bolt.go

@@ -195,7 +195,7 @@ func (db *datastoreDB) close() error {
 
 func (db *datastoreDB) view(fn func(tx *datastoreTx) error) (reterr error) {
 
-	// Any bolt function that performs mmap buffer accesses can raise SIGBUS  due
+	// Any bolt function that performs mmap buffer accesses can raise SIGBUS due
 	// to underlying storage changes, such as a truncation of the datastore file
 	// or removal or network attached storage, etc.
 	//

+ 5 - 2
psiphon/serverApi.go

@@ -153,7 +153,10 @@ func (serverContext *ServerContext) doHandshakeRequest(
 		networkID = serverContext.tunnel.config.GetNetworkID()
 
 		err := tactics.SetTacticsAPIParameters(
-			serverContext.tunnel.config.clientParameters, GetTacticsStorer(), networkID, params)
+			serverContext.tunnel.config.clientParameters,
+			GetTacticsStorer(serverContext.tunnel.config),
+			networkID,
+			params)
 		if err != nil {
 			return errors.Trace(err)
 		}
@@ -307,7 +310,7 @@ func (serverContext *ServerContext) doHandshakeRequest(
 		if payload != nil {
 
 			tacticsRecord, err := tactics.HandleTacticsPayload(
-				GetTacticsStorer(),
+				GetTacticsStorer(serverContext.tunnel.config),
 				networkID,
 				payload)
 			if err != nil {

+ 9 - 2
psiphon/tactics.go

@@ -40,6 +40,13 @@ import (
 // Callers are responsible for ensuring that the input context eventually
 // cancels, and should synchronize GetTactics calls to ensure no unintended
 // concurrent fetch attempts occur.
+//
+// GetTactics implements a limited workaround for multiprocess datastore
+// synchronization, enabling, for example, SendFeedback in one process to
+// access tactics as long as a Controller is not running in another process;
+// and without blocking the Controller from starting. Accessing tactics is
+// most critical for untunneled network operations; when a Controller is
+// running, a tunnel may be used. See TacticsStorer for more details.
 func GetTactics(ctx context.Context, config *Config) {
 
 	// Limitation: GetNetworkID may not account for device VPN status, so
@@ -53,7 +60,7 @@ func GetTactics(ctx context.Context, config *Config) {
 	//    remote egress region/ISP, not the local region/ISP.
 
 	tacticsRecord, err := tactics.UseStoredTactics(
-		GetTacticsStorer(),
+		GetTacticsStorer(config),
 		config.GetNetworkID())
 	if err != nil {
 		NoticeWarning("get stored tactics failed: %s", err)
@@ -230,7 +237,7 @@ func fetchTactics(
 	tacticsRecord, err := tactics.FetchTactics(
 		ctx,
 		config.clientParameters,
-		GetTacticsStorer(),
+		GetTacticsStorer(config),
 		config.GetNetworkID,
 		apiParams,
 		serverEntry.Region,

+ 1 - 1
psiphon/tunnel.go

@@ -1454,7 +1454,7 @@ func (tunnel *Tunnel) sendSshKeepAlive(
 
 			err = tactics.AddSpeedTestSample(
 				tunnel.config.GetClientParameters(),
-				GetTacticsStorer(),
+				GetTacticsStorer(tunnel.config),
 				tunnel.config.GetNetworkID(),
 				tunnel.dialParams.ServerEntry.Region,
 				tunnel.dialParams.TunnelProtocol,