Bladeren bron

Fixed "cannot start a transaction within a transaction" bug due to missing transaction cleanup; fixed transaction retry logic; enabled sqlite shared cached and WAL mode

Rod Hynes 11 jaren geleden
bovenliggende
commit
6bafeb5a9b
3 gewijzigde bestanden met toevoegingen van 37 en 14 verwijderingen
  1. 1 2
      README.md
  2. 34 11
      psiphon/dataStore.go
  3. 2 1
      psiphon/runTunnel.go

+ 1 - 2
README.md

@@ -15,8 +15,7 @@ This project is currently at the proof-of-concept stage. Current production Psip
 
 ### TODO (proof-of-concept)
 
-* investigate "psiphon.transactionWithRetry: database is locked" errors 
-* shutdown results in log noise: "use of closed network connection"
+* log noise: "use of closed network connection"; "psiphon.relayPortForward: ...: i/o timeout"
 * use ContextError in more places
 * build/test on Android and iOS
 * integrate meek-client

+ 34 - 11
psiphon/dataStore.go

@@ -23,6 +23,7 @@ import (
 	"database/sql"
 	"encoding/json"
 	"errors"
+	"fmt"
 	sqlite3 "github.com/mattn/go-sqlite3"
 	"log"
 	"sync"
@@ -50,8 +51,11 @@ func initDataStore() {
         create table if not exists keyValue
             (key text not null,
              value text not null);
+		pragma journal_mode=WAL;
         `
-		db, err := sql.Open("sqlite3", DATA_STORE_FILENAME)
+		db, err := sql.Open(
+			"sqlite3",
+			fmt.Sprintf("file:%s?cache=shared&mode=rwc", DATA_STORE_FILENAME))
 		if err != nil {
 			log.Fatal("initDataStore failed to open database: %s", err)
 		}
@@ -63,11 +67,23 @@ func initDataStore() {
 	})
 }
 
+func canRetry(err error) bool {
+	sqlError, ok := err.(sqlite3.Error)
+	return ok && (sqlError.Code == sqlite3.ErrBusy ||
+		sqlError.Code == sqlite3.ErrLocked ||
+		sqlError.ExtendedCode == sqlite3.ErrLockedSharedCache ||
+		sqlError.ExtendedCode == sqlite3.ErrBusySnapshot)
+}
+
 // transactionWithRetry will retry a write transaction if sqlite3
-// reports ErrBusy or ErrBusySnapshot -- i.e., if the XXXXX
+// reports a table is locked by another writer.
 func transactionWithRetry(updater func(*sql.Tx) error) error {
 	initDataStore()
 	for i := 0; i < 10; i++ {
+		if i > 0 {
+			// Delay on retry
+			time.Sleep(100)
+		}
 		transaction, err := singleton.db.Begin()
 		if err != nil {
 			return ContextError(err)
@@ -75,16 +91,17 @@ func transactionWithRetry(updater func(*sql.Tx) error) error {
 		err = updater(transaction)
 		if err != nil {
 			transaction.Rollback()
-			if sqlError, ok := err.(sqlite3.Error); ok &&
-				(sqlError.Code == sqlite3.ErrBusy ||
-					sqlError.ExtendedCode == sqlite3.ErrBusySnapshot) {
-				time.Sleep(100)
+			if canRetry(err) {
 				continue
 			}
 			return ContextError(err)
 		}
 		err = transaction.Commit()
 		if err != nil {
+			transaction.Rollback()
+			if canRetry(err) {
+				continue
+			}
 			return ContextError(err)
 		}
 		return nil
@@ -120,7 +137,8 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
                 where id = (select id from serverEntry order by rank desc limit 1);
             `)
 		if err != nil {
-			return ContextError(err)
+			// Note: ContextError() would break canRetry()
+			return err
 		}
 		data, err := json.Marshal(serverEntry)
 		if err != nil {
@@ -130,6 +148,9 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
             insert or replace into serverEntry (id, rank, region, data)
             values (?, (select coalesce(max(rank)-1, 0) from serverEntry), ?, ?);
             `, serverEntry.IpAddress, serverEntry.Region, data)
+		if err != nil {
+			return err
+		}
 		// TODO: log after commit
 		if !serverEntryExists {
 			log.Printf("stored server %s", serverEntry.IpAddress)
@@ -149,7 +170,8 @@ func PromoteServerEntry(ipAddress string) error {
             where id = ?;
             `, ipAddress)
 		if err != nil {
-			return ContextError(err)
+			// Note: ContextError() would break canRetry()
+			return err
 		}
 		return nil
 	})
@@ -259,13 +281,13 @@ func HasServerEntries(region string) bool {
 	if region == "" {
 		err = singleton.db.QueryRow("select count(*) from serverEntry;").Scan(&count)
 		if err == nil {
-			log.Printf("stored servers: %d", count)
+			log.Printf("servers: %d", count)
 		}
 	} else {
 		err = singleton.db.QueryRow(
 			"select count(*) from serverEntry where region = ?;", region).Scan(&count)
 		if err == nil {
-			log.Printf("stored servers for region %s: %d", region, count)
+			log.Printf("servers for region %s: %d", region, count)
 		}
 	}
 	return err == nil && count > 0
@@ -303,7 +325,8 @@ func SetKeyValue(key, value string) error {
             values (?, ?);
             `, key, value)
 		if err != nil {
-			return ContextError(err)
+			// Note: ContextError() would break canRetry()
+			return err
 		}
 		return nil
 	})

+ 2 - 1
psiphon/runTunnel.go

@@ -98,7 +98,8 @@ func establishTunnel(config *Config) (tunnel *Tunnel, err error) {
 	cycler, err := NewServerEntryCycler(config.EgressRegion)
 	for selectedTunnel == nil && err == nil {
 		var serverEntry *ServerEntry
-		serverEntry, err = cycler.Next() // don't mask err here, we want to reference it after the loop
+		// Note: don't mask err here, we want to reference it after the loop
+		serverEntry, err = cycler.Next()
 		if err != nil {
 			break
 		}