소스 검색

Stream remote server lists with StreamReadAuthenticatedDataPackage

- Add StreamingServerEntryDecoder to consumes data package
  payload stream, splitting and process server entry lines
- Add Datastore support to consume server entry stream
- Document allocation/garbage collection limitation
- Remove redundant StoreServerEntries shuffle
- Remove unrealized ValidateServerEntry and move its minor
  functionality to StoreServerEntry
- Not yet streaming: embedded server lists and OSLs
Rod Hynes 8 년 전
부모
커밋
5e7186d03f
7개의 변경된 파일135개의 추가작업 그리고 77개의 파일을 삭제
  1. 1 1
      ConsoleClient/main.go
  2. 1 1
      MobileLibrary/psi/psi.go
  3. 11 3
      psiphon/common/authPackage.go
  4. 55 24
      psiphon/common/protocol/serverEntry.go
  5. 46 14
      psiphon/dataStore.go
  6. 21 27
      psiphon/remoteServerList.go
  7. 0 7
      psiphon/serverApi.go

+ 1 - 1
ConsoleClient/main.go

@@ -181,7 +181,7 @@ func main() {
 				return
 			}
 			// TODO: stream embedded server list data? also, the cast makes an unnecessary copy of a large buffer?
-			serverEntries, err := protocol.DecodeAndValidateServerEntryList(
+			serverEntries, err := protocol.DecodeServerEntryList(
 				string(serverEntryList),
 				common.GetCurrentTimestamp(),
 				protocol.SERVER_ENTRY_SOURCE_EMBEDDED)

+ 1 - 1
MobileLibrary/psi/psi.go

@@ -95,7 +95,7 @@ func Start(
 		return fmt.Errorf("error initializing datastore: %s", err)
 	}
 
-	serverEntries, err := protocol.DecodeAndValidateServerEntryList(
+	serverEntries, err := protocol.DecodeServerEntryList(
 		embeddedServerEntryList,
 		common.GetCurrentTimestamp(),
 		protocol.SERVER_ENTRY_SOURCE_EMBEDDED)

+ 11 - 3
psiphon/common/authPackage.go

@@ -510,14 +510,22 @@ func (streamer *limitedJSONValueStreamer) Read(p []byte) (int, error) {
 		var n int
 		n, err = streamer.reader.Read(p[i : i+1])
 
+		// process n == 1 before handling err, in case err is io.EOF
 		if n == 1 {
 			if p[i] == '"' {
-				n = 0
 				streamer.eof = true
 				err = io.EOF
 			} else if p[i] == '\\' {
-				n = 0
-				err = ContextError(errors.New("unsupported escaped character"))
+				if err == nil {
+					// Psiphon server list string values contain '\n', so support
+					// that required case.
+					n, err = streamer.reader.Read(p[i : i+1])
+					if n == 1 && p[i] == 'n' {
+						p[i] = '\n'
+					} else {
+						err = ContextError(errors.New("unsupported escaped character"))
+					}
+				}
 			}
 		}
 

+ 55 - 24
psiphon/common/protocol/serverEntry.go

@@ -20,12 +20,13 @@
 package protocol
 
 import (
+	"bufio"
 	"bytes"
 	"encoding/hex"
 	"encoding/json"
 	"errors"
 	"fmt"
-	"net"
+	"io"
 	"strings"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -187,25 +188,10 @@ func DecodeServerEntry(
 	return serverEntry, nil
 }
 
-// ValidateServerEntry checks for malformed server entries.
-// Currently, it checks for a valid ipAddress. This is important since
-// handshake requests submit back to the server a list of known server
-// IP addresses and the handshake API expects well-formed inputs.
-// TODO: validate more fields
-func ValidateServerEntry(serverEntry *ServerEntry) error {
-	ipAddr := net.ParseIP(serverEntry.IpAddress)
-	if ipAddr == nil {
-		errMsg := fmt.Sprintf("server entry has invalid IpAddress: '%s'", serverEntry.IpAddress)
-		return common.ContextError(errors.New(errMsg))
-	}
-	return nil
-}
-
-// DecodeAndValidateServerEntryList extracts server entries from the list encoding
+// DecodeServerEntryList extracts server entries from the list encoding
 // used by remote server lists and Psiphon server handshake requests.
-// Each server entry is validated and invalid entries are skipped.
 // See DecodeServerEntry for note on serverEntrySource/timestamp.
-func DecodeAndValidateServerEntryList(
+func DecodeServerEntryList(
 	encodedServerEntryList, timestamp,
 	serverEntrySource string) (serverEntries []*ServerEntry, err error) {
 
@@ -221,13 +207,58 @@ func DecodeAndValidateServerEntryList(
 			return nil, common.ContextError(err)
 		}
 
-		if ValidateServerEntry(serverEntry) != nil {
-			// Skip this entry and continue with the next one
-			// TODO: invoke a logging callback
-			continue
-		}
-
 		serverEntries = append(serverEntries, serverEntry)
 	}
 	return serverEntries, nil
 }
+
+// StreamingServerEntryDecoder performs the DecodeServerEntryList
+// operation, loading only one server entry into memory at a time.
+type StreamingServerEntryDecoder struct {
+	scanner           *bufio.Scanner
+	timestamp         string
+	serverEntrySource string
+}
+
+// NewStreamingServerEntryDecoder creates a new StreamingServerEntryDecoder.
+func NewStreamingServerEntryDecoder(
+	encodedServerEntryListReader io.Reader,
+	timestamp, serverEntrySource string) *StreamingServerEntryDecoder {
+
+	return &StreamingServerEntryDecoder{
+		scanner:           bufio.NewScanner(encodedServerEntryListReader),
+		timestamp:         timestamp,
+		serverEntrySource: serverEntrySource,
+	}
+}
+
+// Next reads and decodes the next server entry from the input stream,
+// returning a nil server entry when the stream is complete.
+//
+// Limitations:
+// - Each encoded server entry line cannot exceed bufio.MaxScanTokenSize,
+//   the default buffer size which this decoder uses. This is 64K.
+// - DecodeServerEntry is called on each encoded server entry line, which
+//   will allocate memory to hex decode and JSON deserialze the server
+//   entry. As this is not presently reusing a fixed buffer, each call
+//   will allocate additional memory; garbage collection is necessary to
+//   reclaim that memory for reuse for the next server entry. Memory-
+//   constrained users could call runtime.GC() after each call to Next.
+//
+func (decoder *StreamingServerEntryDecoder) Next() (*ServerEntry, error) {
+
+	if !decoder.scanner.Scan() {
+		return nil, common.ContextError(decoder.scanner.Err())
+	}
+
+	// TODO: use scanner.Bytes which doesn't allocate, instead of scanner.Text
+
+	// TODO: skip this entry and continue if can't decode?
+	serverEntry, err := DecodeServerEntry(
+		decoder.scanner.Text(), decoder.timestamp, decoder.serverEntrySource)
+	if err != nil {
+		return nil, common.ContextError(err)
+	}
+
+	return serverEntry, nil
+}

+ 46 - 14
psiphon/dataStore.go

@@ -25,8 +25,10 @@ import (
 	"errors"
 	"fmt"
 	"math/rand"
+	"net"
 	"os"
 	"path/filepath"
+	"runtime"
 	"strings"
 	"sync"
 	"time"
@@ -186,11 +188,11 @@ func checkInitDataStore() {
 func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) error {
 	checkInitDataStore()
 
-	// Server entries should already be validated before this point,
-	// so instead of skipping we fail with an error.
-	err := protocol.ValidateServerEntry(serverEntry)
-	if err != nil {
-		return common.ContextError(errors.New("invalid server entry"))
+	ipAddr := net.ParseIP(serverEntry.IpAddress)
+	if ipAddr == nil {
+		NoticeAlert("skip storing server with invalid IP address: %s", serverEntry.IpAddress)
+		// Returns no error so callers such as StoreServerEntries won't abort
+		return nil
 	}
 
 	// BoltDB implementation note:
@@ -201,7 +203,7 @@ func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) e
 	// values (e.g., many servers support all protocols), performance
 	// is expected to be acceptable.
 
-	err = singleton.db.Update(func(tx *bolt.Tx) error {
+	err := singleton.db.Update(func(tx *bolt.Tx) error {
 
 		serverEntries := tx.Bucket([]byte(serverEntriesBucket))
 
@@ -249,18 +251,11 @@ func StoreServerEntry(serverEntry *protocol.ServerEntry, replaceIfExists bool) e
 	return nil
 }
 
-// StoreServerEntries shuffles and stores a list of server entries.
-// Shuffling is performed on imported server entrues as part of client-side
-// load balancing.
+// StoreServerEntries stores a list of server entries.
 // There is an independent transaction for each entry insert/update.
 func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists bool) error {
 	checkInitDataStore()
 
-	for index := len(serverEntries) - 1; index > 0; index-- {
-		swapIndex := rand.Intn(index + 1)
-		serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
-	}
-
 	for _, serverEntry := range serverEntries {
 		err := StoreServerEntry(serverEntry, replaceIfExists)
 		if err != nil {
@@ -275,6 +270,43 @@ func StoreServerEntries(serverEntries []*protocol.ServerEntry, replaceIfExists b
 	return nil
 }
 
+// StreamingStoreServerEntries stores a list of server entries.
+// There is an independent transaction for each entry insert/update.
+func StreamingStoreServerEntries(
+	serverEntries *protocol.StreamingServerEntryDecoder, replaceIfExists bool) error {
+
+	checkInitDataStore()
+
+	for {
+		serverEntry, err := serverEntries.Next()
+		if err != nil {
+			return common.ContextError(err)
+		}
+
+		if serverEntry == nil {
+			// No more server entries
+			return nil
+		}
+
+		err = StoreServerEntry(serverEntry, replaceIfExists)
+		if err != nil {
+			return common.ContextError(err)
+		}
+
+		// Both StreamingServerEntryDecoder.Next and StoreServerEntry allocate
+		// memory. To approximate true fixed-memory streaming, garbage collect
+		// to reclaim that memory for the next iteration.
+		// TODO: measure effectiveness and performance penalty of this call
+		runtime.GC()
+	}
+
+	// Since there has possibly been a significant change in the server entries,
+	// take this opportunity to update the available egress regions.
+	ReportAvailableRegions()
+
+	return nil
+}
+
 // PromoteServerEntry assigns the top rank (one more than current
 // max rank) to the specified server entry. Server candidates are
 // iterated in decending rank order, so this server entry will be

+ 21 - 27
psiphon/remoteServerList.go

@@ -23,8 +23,8 @@ import (
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"io"
 	"io/ioutil"
-	"os"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -70,12 +70,15 @@ func FetchCommonRemoteServerList(
 		return nil
 	}
 
-	serverListPayload, err := unpackRemoteServerListFile(config, config.RemoteServerListDownloadFilename)
+	serverListPayload, err := common.StreamingReadAuthenticatedDataPackage(
+		config.RemoteServerListDownloadFilename,
+		config.RemoteServerListSignaturePublicKey)
 	if err != nil {
-		return fmt.Errorf("failed to unpack common remote server list: %s", common.ContextError(err))
+		return fmt.Errorf("failed to read remote server list: %s", common.ContextError(err))
 	}
+	defer serverListPayload.Close()
 
-	err = storeServerEntries(serverListPayload, protocol.SERVER_ENTRY_SOURCE_REMOTE)
+	err = streamingStoreServerEntries(serverListPayload, protocol.SERVER_ENTRY_SOURCE_REMOTE)
 	if err != nil {
 		return fmt.Errorf("failed to store common remote server list: %s", common.ContextError(err))
 	}
@@ -356,45 +359,36 @@ func downloadRemoteServerListFile(
 	return responseETag, nil
 }
 
-// unpackRemoteServerListFile reads a file that contains an
-// authenticated data package, validates the package, and
-// returns the payload.
-func unpackRemoteServerListFile(
-	config *Config, filename string) (string, error) {
+func storeServerEntries(serverList, serverEntrySource string) error {
 
-	fileReader, err := os.Open(filename)
+	serverEntries, err := protocol.DecodeServerEntryList(
+		serverList,
+		common.GetCurrentTimestamp(),
+		serverEntrySource)
 	if err != nil {
-		return "", common.ContextError(err)
+		return common.ContextError(err)
 	}
-	defer fileReader.Close()
 
-	dataPackage, err := ioutil.ReadAll(fileReader)
-	if err != nil {
-		return "", common.ContextError(err)
-	}
+	// TODO: record stats for newly discovered servers
 
-	payload, err := common.ReadAuthenticatedDataPackage(
-		dataPackage, config.RemoteServerListSignaturePublicKey)
+	err = StoreServerEntries(serverEntries, true)
 	if err != nil {
-		return "", common.ContextError(err)
+		return common.ContextError(err)
 	}
 
-	return payload, nil
+	return nil
 }
 
-func storeServerEntries(serverList, serverEntrySource string) error {
+func streamingStoreServerEntries(serverListReader io.Reader, serverEntrySource string) error {
 
-	serverEntries, err := protocol.DecodeAndValidateServerEntryList(
-		serverList,
+	serverEntries := protocol.NewStreamingServerEntryDecoder(
+		serverListReader,
 		common.GetCurrentTimestamp(),
 		serverEntrySource)
-	if err != nil {
-		return common.ContextError(err)
-	}
 
 	// TODO: record stats for newly discovered servers
 
-	err = StoreServerEntries(serverEntries, true)
+	err := StreamingStoreServerEntries(serverEntries, true)
 	if err != nil {
 		return common.ContextError(err)
 	}

+ 0 - 7
psiphon/serverApi.go

@@ -198,13 +198,6 @@ func (serverContext *ServerContext) doHandshakeRequest() error {
 			return common.ContextError(err)
 		}
 
-		err = protocol.ValidateServerEntry(serverEntry)
-		if err != nil {
-			// Skip this entry and continue with the next one
-			NoticeAlert("invalid server entry: %s", err)
-			continue
-		}
-
 		decodedServerEntries = append(decodedServerEntries, serverEntry)
 	}