Bladeren bron

Merge pull request #130 from rod-hynes/master

Datastore and stats collector bug fixes
Rod Hynes 10 jaren geleden
bovenliggende
commit
4f5a838991

+ 5 - 1
psiphon/controller.go

@@ -626,7 +626,11 @@ func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
 
 	// Promote this successful tunnel to first rank so it's one
 	// of the first candidates next time establish runs.
-	PromoteServerEntry(tunnel.serverEntry.IpAddress)
+	// Connecting to a TargetServerEntry does not change the
+	// ranking.
+	if controller.config.TargetServerEntry == "" {
+		PromoteServerEntry(tunnel.serverEntry.IpAddress)
+	}
 
 	return len(controller.tunnels), true
 }

+ 64 - 7
psiphon/dataStore.go

@@ -107,6 +107,16 @@ func InitDataStore(config *Config) (err error) {
 			return
 		}
 
+		// Run consistency checks on datastore and emit errors for diagnostics purposes
+		// We assume this will complete quickly for typical size Psiphon datastores.
+		db.View(func(tx *bolt.Tx) error {
+			err := <-tx.Check()
+			if err != nil {
+				NoticeAlert("boltdb Check(): %s", err)
+			}
+			return nil
+		})
+
 		singleton.db = db
 
 		// The migrateServerEntries function requires the data store is
@@ -239,6 +249,18 @@ func PromoteServerEntry(ipAddress string) error {
 	checkInitDataStore()
 
 	err := singleton.db.Update(func(tx *bolt.Tx) error {
+
+		// Ensure the corresponding entry exists before
+		// inserting into rank.
+		bucket := tx.Bucket([]byte(serverEntriesBucket))
+		data := bucket.Get([]byte(ipAddress))
+		if data == nil {
+			NoticeAlert(
+				"PromoteServerEntry: ignoring unknown server entry: %s",
+				ipAddress)
+			return nil
+		}
+
 		return insertRankedServerEntry(tx, ipAddress, 0)
 	})
 
@@ -500,7 +522,12 @@ func (iterator *ServerEntryIterator) Next() (serverEntry *ServerEntry, err error
 		var data []byte
 		err = singleton.db.View(func(tx *bolt.Tx) error {
 			bucket := tx.Bucket([]byte(serverEntriesBucket))
-			data = bucket.Get([]byte(serverEntryId))
+			value := bucket.Get([]byte(serverEntryId))
+			if value != nil {
+				// Must make a copy as slice is only valid within transaction.
+				data = make([]byte, len(value))
+				copy(data, value)
+			}
 			return nil
 		})
 		if err != nil {
@@ -684,7 +711,12 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
 
 	err = singleton.db.View(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket([]byte(splitTunnelRouteDataBucket))
-		data = bucket.Get([]byte(region))
+		value := bucket.Get([]byte(region))
+		if value != nil {
+			// Must make a copy as slice is only valid within transaction.
+			data = make([]byte, len(value))
+			copy(data, value)
+		}
 		return nil
 	})
 
@@ -840,17 +872,35 @@ func TakeOutUnreportedTunnelStats(maxCount int) ([][]byte, error) {
 		bucket := tx.Bucket([]byte(tunnelStatsBucket))
 		cursor := bucket.Cursor()
 		for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
+
+			// Perform a test JSON unmarshaling. In case of data corruption or a bug,
+			// skip the record.
+			var jsonData interface{}
+			err := json.Unmarshal(key, &jsonData)
+			if err != nil {
+				NoticeAlert(
+					"Invalid key in TakeOutUnreportedTunnelStats: %s: %s",
+					string(key), err)
+				continue
+			}
+
 			if 0 == bytes.Compare(value, tunnelStatsStateUnreported) {
-				err := bucket.Put(key, tunnelStatsStateReporting)
-				if err != nil {
-					return err
-				}
-				tunnelStats = append(tunnelStats, key)
+				// Must make a copy as slice is only valid within transaction.
+				data := make([]byte, len(key))
+				copy(data, key)
+				tunnelStats = append(tunnelStats, data)
 				if len(tunnelStats) >= maxCount {
 					break
 				}
 			}
 		}
+		for _, key := range tunnelStats {
+			err := bucket.Put(key, tunnelStatsStateReporting)
+			if err != nil {
+				return err
+			}
+		}
+
 		return nil
 	})
 
@@ -914,8 +964,15 @@ func resetAllTunnelStatsToUnreported() error {
 
 	err := singleton.db.Update(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket([]byte(tunnelStatsBucket))
+		resetKeys := make([][]byte, 0)
 		cursor := bucket.Cursor()
 		for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
+			resetKeys = append(resetKeys, key)
+		}
+		// TODO: data mutation is done outside cursor. Is this
+		// strictly necessary in this case?
+		// https://godoc.org/github.com/boltdb/bolt#Cursor
+		for _, key := range resetKeys {
 			err := bucket.Put(key, tunnelStatsStateUnreported)
 			if err != nil {
 				return err

+ 1 - 2
psiphon/meekConn.go

@@ -157,8 +157,7 @@ func DialMeek(
 		// classify Psiphon traffic on some CDNs but not others) may throttle non-MiM CDNs so that our server
 		// selection always chooses tunnels to the MiM CDN (without any server cert verification, we won't
 		// exclusively connect to non-MiM CDNs); then the adversary kills the underlying TCP connection after
-		// some short period. This is similar to the "unidentified protocol" attack outlined in selectProtocol().
-		// A similar weighted selection defense may be appropriate.
+		// some short period. This is mitigated by the "impaired" protocol classification mechanism.
 
 		dialer = NewCustomTLSDialer(
 			&CustomTLSConfig{

+ 5 - 4
psiphon/serverApi.go

@@ -303,14 +303,14 @@ func makeStatusRequestUrl(sessionId, baseRequestUrl string, isTunneled bool) str
 // on whether or not the request succeeded.
 type statusRequestPayloadInfo struct {
 	serverId      string
-	transferStats *transferstats.ServerStats
+	transferStats *transferstats.AccumulatedStats
 	tunnelStats   [][]byte
 }
 
 func makeStatusRequestPayload(
 	serverId string) ([]byte, *statusRequestPayloadInfo, error) {
 
-	transferStats := transferstats.GetForServer(serverId)
+	transferStats := transferstats.TakeOutStatsForServer(serverId)
 	tunnelStats, err := TakeOutUnreportedTunnelStats(
 		PSIPHON_API_TUNNEL_STATS_MAX_COUNT)
 	if err != nil {
@@ -324,7 +324,7 @@ func makeStatusRequestPayload(
 
 	payload := make(map[string]interface{})
 
-	hostBytes, bytesTransferred := transferStats.GetStatsForReporting()
+	hostBytes, bytesTransferred := transferStats.GetStatsForStatusRequest()
 	payload["host_bytes"] = hostBytes
 	payload["bytes_transferred"] = bytesTransferred
 
@@ -352,7 +352,8 @@ func makeStatusRequestPayload(
 }
 
 func putBackStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
-	transferstats.PutBack(payloadInfo.serverId, payloadInfo.transferStats)
+	transferstats.PutBackStatsForServer(
+		payloadInfo.serverId, payloadInfo.transferStats)
 	err := PutBackUnreportedTunnelStats(payloadInfo.tunnelStats)
 	if err != nil {
 		// These tunnel stats records won't be resent under after a

+ 73 - 55
psiphon/transferstats/collector.go

@@ -39,29 +39,47 @@ type hostStats struct {
 	numBytesReceived int64
 }
 
-func newHostStats() *hostStats {
-	return &hostStats{}
+// AccumulatedStats holds the Psiphon Server API status request data for a
+// given server. To accommodate status requests that may fail, and be retried,
+// the TakeOutStatsForServer/PutBackStatsForServer procedure allows the requester
+// to check out stats for reporting and merge back stats for a later retry.
+type AccumulatedStats struct {
+	hostnameToStats map[string]*hostStats
 }
 
-// ServerStats holds per-server stats.
-type ServerStats struct {
-	hostnameToStats    map[string]*hostStats
-	totalBytesSent     int64
-	totalBytesReceived int64
-}
+// GetStatsForStatusRequest summarizes AccumulatedStats data as
+// required for the Psiphon Server API status request.
+func (stats AccumulatedStats) GetStatsForStatusRequest() (map[string]int64, int64) {
+
+	hostBytes := make(map[string]int64)
+	bytesTransferred := int64(0)
 
-func newServerStats() *ServerStats {
-	return &ServerStats{
-		hostnameToStats: make(map[string]*hostStats),
+	for hostname, hostStats := range stats.hostnameToStats {
+		totalBytes := hostStats.numBytesReceived + hostStats.numBytesSent
+		bytesTransferred += totalBytes
+		hostBytes[hostname] = totalBytes
 	}
+
+	return hostBytes, bytesTransferred
+}
+
+// serverStats holds per-server stats.
+// accumulatedStats data is payload for the Psiphon status request
+// which is accessed via TakeOut/PutBack.
+// recentBytes data is for tunnel monitoring which is accessed via
+// ReportRecentBytesTransferredForServer.
+type serverStats struct {
+	accumulatedStats    *AccumulatedStats
+	recentBytesSent     int64
+	recentBytesReceived int64
 }
 
 // allStats is the root object that holds stats for all servers and all hosts,
 // as well as the mutex to access them.
 var allStats = struct {
 	statsMutex      sync.RWMutex
-	serverIDtoStats map[string]*ServerStats
-}{serverIDtoStats: make(map[string]*ServerStats)}
+	serverIDtoStats map[string]*serverStats
+}{serverIDtoStats: make(map[string]*serverStats)}
 
 // statsUpdate contains new stats counts to be aggregated.
 type statsUpdate struct {
@@ -72,10 +90,9 @@ type statsUpdate struct {
 }
 
 // recordStats makes sure the given stats update is added to the global
-// collection. Guaranteed to not block.
-// Callers of this function should assume that it "takes control" of the
-// statsUpdate object.
-func recordStat(stat *statsUpdate) {
+// collection. recentBytes are not adjusted when isPutBack is true,
+// as recentBytes aren't subject to TakeOut/PutBack.
+func recordStat(stat *statsUpdate, isPutBack bool) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
@@ -85,42 +102,31 @@ func recordStat(stat *statsUpdate) {
 
 	storedServerStats := allStats.serverIDtoStats[stat.serverID]
 	if storedServerStats == nil {
-		storedServerStats = newServerStats()
+		storedServerStats = &serverStats{
+			accumulatedStats: &AccumulatedStats{
+				hostnameToStats: make(map[string]*hostStats)}}
 		allStats.serverIDtoStats[stat.serverID] = storedServerStats
 	}
 
-	storedHostStats := storedServerStats.hostnameToStats[stat.hostname]
+	storedHostStats := storedServerStats.accumulatedStats.hostnameToStats[stat.hostname]
 	if storedHostStats == nil {
-		storedHostStats = newHostStats()
-		storedServerStats.hostnameToStats[stat.hostname] = storedHostStats
+		storedHostStats = &hostStats{}
+		storedServerStats.accumulatedStats.hostnameToStats[stat.hostname] = storedHostStats
 	}
 
-	storedServerStats.totalBytesSent += stat.numBytesSent
-	storedServerStats.totalBytesReceived += stat.numBytesReceived
-
 	storedHostStats.numBytesSent += stat.numBytesSent
 	storedHostStats.numBytesReceived += stat.numBytesReceived
 
-	//fmt.Println("server:", stat.serverID, "host:", stat.hostname, "sent:", storedHostStats.numBytesSent, "received:", storedHostStats.numBytesReceived)
-}
-
-func (serverStats ServerStats) GetStatsForReporting() (map[string]int64, int64) {
-
-	hostBytes := make(map[string]int64)
-	bytesTransferred := int64(0)
-
-	for hostname, hostStats := range serverStats.hostnameToStats {
-		totalBytes := hostStats.numBytesReceived + hostStats.numBytesSent
-		bytesTransferred += totalBytes
-		hostBytes[hostname] = totalBytes
+	if !isPutBack {
+		storedServerStats.recentBytesSent += stat.numBytesSent
+		storedServerStats.recentBytesReceived += stat.numBytesReceived
 	}
-
-	return hostBytes, bytesTransferred
 }
 
-// GetBytesTransferredForServer returns total bytes sent and received since
-// the last call to GetBytesTransferredForServer.
-func GetBytesTransferredForServer(serverID string) (sent, received int64) {
+// ReportRecentBytesTransferredForServer returns bytes sent and received since
+// the last call to ReportRecentBytesTransferredForServer. The accumulated sent
+// and received are reset to 0 by this call.
+func ReportRecentBytesTransferredForServer(serverID string) (sent, received int64) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
@@ -130,37 +136,49 @@ func GetBytesTransferredForServer(serverID string) (sent, received int64) {
 		return
 	}
 
-	sent = stats.totalBytesSent
-	received = stats.totalBytesReceived
+	sent = stats.recentBytesSent
+	received = stats.recentBytesReceived
 
-	stats.totalBytesSent = 0
-	stats.totalBytesReceived = 0
+	stats.recentBytesSent = 0
+	stats.recentBytesReceived = 0
 
 	return
 }
 
-// GetForServer returns the server stats for the given server.
-func GetForServer(serverID string) (serverStats *ServerStats) {
+// TakeOutStatsForServer borrows the AccumulatedStats for the specified
+// server. When we fail to report these stats, resubmit them with
+// PutBackStatsForServer. Stats will continue to be accumulated between
+// TakeOut and PutBack calls. The recentBytes values are unaffected by
+// TakeOut/PutBack. Returns empty stats if the serverID is not found.
+func TakeOutStatsForServer(serverID string) (accumulatedStats *AccumulatedStats) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
-	serverStats = allStats.serverIDtoStats[serverID]
-	if serverStats == nil {
-		serverStats = newServerStats()
+	newAccumulatedStats := &AccumulatedStats{
+		hostnameToStats: make(map[string]*hostStats)}
+
+	// Note: for an existing serverStats, only the accumulatedStats is
+	// affected; the recentBytes fields are not changed.
+	serverStats := allStats.serverIDtoStats[serverID]
+	if serverStats != nil {
+		accumulatedStats = serverStats.accumulatedStats
+		serverStats.accumulatedStats = newAccumulatedStats
+	} else {
+		accumulatedStats = newAccumulatedStats
 	}
-	delete(allStats.serverIDtoStats, serverID)
 	return
 }
 
-// PutBack re-adds a set of server stats to the collection.
-func PutBack(serverID string, serverStats *ServerStats) {
-	for hostname, hoststats := range serverStats.hostnameToStats {
+// PutBackStatsForServer re-adds a set of server stats to the collection.
+func PutBackStatsForServer(serverID string, accumulatedStats *AccumulatedStats) {
+	for hostname, hoststats := range accumulatedStats.hostnameToStats {
 		recordStat(
 			&statsUpdate{
 				serverID:         serverID,
 				hostname:         hostname,
 				numBytesSent:     hoststats.numBytesSent,
 				numBytesReceived: hoststats.numBytesReceived,
-			})
+			},
+			true)
 	}
 }

+ 6 - 4
psiphon/transferstats/conn.go

@@ -47,8 +47,8 @@ type Conn struct {
 }
 
 // NewConn creates a Conn. serverID can be anything that uniquely
-// identifies the server; it will be passed to GetForServer() when retrieving
-// the accumulated stats.
+// identifies the server; it will be passed to TakeOutStatsForServer() when
+// retrieving the accumulated stats.
 func NewConn(nextConn net.Conn, serverID string, regexps *Regexps) *Conn {
 	return &Conn{
 		Conn:           nextConn,
@@ -85,7 +85,8 @@ func (conn *Conn) Write(buffer []byte) (n int, err error) {
 			conn.serverID,
 			conn.hostname,
 			int64(n),
-			0})
+			0},
+			false)
 	}
 
 	return
@@ -108,7 +109,8 @@ func (conn *Conn) Read(buffer []byte) (n int, err error) {
 		conn.serverID,
 		hostname,
 		0,
-		int64(n)})
+		int64(n)},
+		false)
 
 	return
 }

+ 9 - 9
psiphon/transferstats/transferstats_test.go

@@ -93,18 +93,18 @@ func (suite *StatsTestSuite) Test_StatsConn() {
 	resp.Body.Close()
 }
 
-func (suite *StatsTestSuite) Test_GetForServer() {
+func (suite *StatsTestSuite) Test_TakeOutStatsForServer() {
 
 	zeroPayload := newServerStats()
 
-	payload := GetForServer(_SERVER_ID)
+	payload := TakeOutStatsForServer(_SERVER_ID)
 	suite.Equal(payload, zeroPayload, "should get zero stats before any traffic")
 
 	resp, err := suite.httpClient.Get("http://example.com/index.html")
 	suite.Nil(err, "need successful http to proceed with tests")
 	resp.Body.Close()
 
-	payload = GetForServer(_SERVER_ID)
+	payload = TakeOutStatsForServer(_SERVER_ID)
 	suite.NotNil(payload, "should receive valid payload for valid server ID")
 
 	payloadJSON, err := json.Marshal(payload)
@@ -113,28 +113,28 @@ func (suite *StatsTestSuite) Test_GetForServer() {
 	suite.Nil(err, "payload JSON should parse successfully")
 
 	// After we retrieve the stats for a server, they should be cleared out of the tracked stats
-	payload = GetForServer(_SERVER_ID)
+	payload = TakeOutStatsForServer(_SERVER_ID)
 	suite.Equal(payload, zeroPayload, "after retrieving stats for a server, there should be zero stats (until more data goes through)")
 }
 
-func (suite *StatsTestSuite) Test_PutBack() {
+func (suite *StatsTestSuite) Test_PutBackStatsForServer() {
 	resp, err := suite.httpClient.Get("http://example.com/index.html")
 	suite.Nil(err, "need successful http to proceed with tests")
 	resp.Body.Close()
 
-	payloadToPutBack := GetForServer(_SERVER_ID)
+	payloadToPutBack := TakeOutStatsForServer(_SERVER_ID)
 	suite.NotNil(payloadToPutBack, "should receive valid payload for valid server ID")
 
 	zeroPayload := newServerStats()
 
-	payload := GetForServer(_SERVER_ID)
+	payload := PutBackStatsForServer(_SERVER_ID)
 	suite.Equal(payload, zeroPayload, "should be zero stats after getting them")
 
 	PutBack(_SERVER_ID, payloadToPutBack)
 	// PutBack is asynchronous, so we'll need to wait a moment for it to do its thing
 	<-time.After(100 * time.Millisecond)
 
-	payload = GetForServer(_SERVER_ID)
+	payload = PutBackStatsForServer(_SERVER_ID)
 	suite.NotEqual(payload, zeroPayload, "stats should be re-added after putting back")
 	suite.Equal(payload, payloadToPutBack, "stats should be the same as after the first retrieval")
 }
@@ -217,7 +217,7 @@ func (suite *StatsTestSuite) Test_Regex() {
 		suite.Nil(err)
 		resp.Body.Close()
 
-		payload := GetForServer(_SERVER_ID)
+		payload := TakeOutStatsForServer(_SERVER_ID)
 		suite.NotNil(payload, "should get stats because we made HTTP reqs; %s", scheme)
 
 		expectedHostnames := mapset.NewSet()

+ 1 - 1
psiphon/tunnel.go

@@ -640,7 +640,7 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	for !shutdown && err == nil {
 		select {
 		case <-noticeBytesTransferredTicker.C:
-			sent, received := transferstats.GetBytesTransferredForServer(
+			sent, received := transferstats.ReportRecentBytesTransferredForServer(
 				tunnel.serverEntry.IpAddress)
 
 			if received > 0 {