Răsfoiți Sursa

Stats collector bug fixes
* Tunnel monitoring bytes sent and received were
over-counted due to incorrect logic in PutBack.
* Tunnel monitoring bytes sent and received could
be under-counted due to incorrect logic in
TakeOut.
* Refactored stats collector to make two purposes
more distinct: TakeOut/PutBack stats object for
status requests, and ReportRecent for tunnel
monitoring.

Rod Hynes 10 ani în urmă
părinte
comite
9ccc0abd4a

+ 5 - 4
psiphon/serverApi.go

@@ -303,14 +303,14 @@ func makeStatusRequestUrl(sessionId, baseRequestUrl string, isTunneled bool) str
 // on whether or not the request succeeded.
 type statusRequestPayloadInfo struct {
 	serverId      string
-	transferStats *transferstats.ServerStats
+	transferStats *transferstats.AccumulatedStats
 	tunnelStats   [][]byte
 }
 
 func makeStatusRequestPayload(
 	serverId string) ([]byte, *statusRequestPayloadInfo, error) {
 
-	transferStats := transferstats.GetForServer(serverId)
+	transferStats := transferstats.TakeOutStatsForServer(serverId)
 	tunnelStats, err := TakeOutUnreportedTunnelStats(
 		PSIPHON_API_TUNNEL_STATS_MAX_COUNT)
 	if err != nil {
@@ -324,7 +324,7 @@ func makeStatusRequestPayload(
 
 	payload := make(map[string]interface{})
 
-	hostBytes, bytesTransferred := transferStats.GetStatsForReporting()
+	hostBytes, bytesTransferred := transferStats.GetStatsForStatusRequest()
 	payload["host_bytes"] = hostBytes
 	payload["bytes_transferred"] = bytesTransferred
 
@@ -352,7 +352,8 @@ func makeStatusRequestPayload(
 }
 
 func putBackStatusRequestPayload(payloadInfo *statusRequestPayloadInfo) {
-	transferstats.PutBack(payloadInfo.serverId, payloadInfo.transferStats)
+	transferstats.PutBackStatsForServer(
+		payloadInfo.serverId, payloadInfo.transferStats)
 	err := PutBackUnreportedTunnelStats(payloadInfo.tunnelStats)
 	if err != nil {
 		// These tunnel stats records won't be resent under after a

+ 71 - 55
psiphon/transferstats/collector.go

@@ -39,29 +39,47 @@ type hostStats struct {
 	numBytesReceived int64
 }
 
-func newHostStats() *hostStats {
-	return &hostStats{}
+// AccumulatedStats holds the Psiphon Server API status request data for a
+// given server. To accommodate status requests that may fail, and be retried,
+// the TakeOutStatsForServer/PutBackStatsForServer procedure allows the requester
+// to check out stats for reporting and merge back stats for a later retry.
+type AccumulatedStats struct {
+	hostnameToStats map[string]*hostStats
 }
 
-// ServerStats holds per-server stats.
-type ServerStats struct {
-	hostnameToStats    map[string]*hostStats
-	totalBytesSent     int64
-	totalBytesReceived int64
-}
+// GetStatsForStatusRequest summarizes AccumulatedStats data as
+// required for the Psiphon Server API status request.
+func (stats AccumulatedStats) GetStatsForStatusRequest() (map[string]int64, int64) {
+
+	hostBytes := make(map[string]int64)
+	bytesTransferred := int64(0)
 
-func newServerStats() *ServerStats {
-	return &ServerStats{
-		hostnameToStats: make(map[string]*hostStats),
+	for hostname, hostStats := range stats.hostnameToStats {
+		totalBytes := hostStats.numBytesReceived + hostStats.numBytesSent
+		bytesTransferred += totalBytes
+		hostBytes[hostname] = totalBytes
 	}
+
+	return hostBytes, bytesTransferred
+}
+
+// serverStats holds per-server stats.
+// accumulatedStats data is payload for the Psiphon status request
+// which is accessed via TakeOut/PutBack.
+// recentBytes data is for tunnel monitoring which is accessed via
+// ReportRecentBytesTransferredForServer.
+type serverStats struct {
+	accumulatedStats    *AccumulatedStats
+	recentBytesSent     int64
+	recentBytesReceived int64
 }
 
 // allStats is the root object that holds stats for all servers and all hosts,
 // as well as the mutex to access them.
 var allStats = struct {
 	statsMutex      sync.RWMutex
-	serverIDtoStats map[string]*ServerStats
-}{serverIDtoStats: make(map[string]*ServerStats)}
+	serverIDtoStats map[string]*serverStats
+}{serverIDtoStats: make(map[string]*serverStats)}
 
 // statsUpdate contains new stats counts to be aggregated.
 type statsUpdate struct {
@@ -72,10 +90,9 @@ type statsUpdate struct {
 }
 
 // recordStats makes sure the given stats update is added to the global
-// collection. Guaranteed to not block.
-// Callers of this function should assume that it "takes control" of the
-// statsUpdate object.
-func recordStat(stat *statsUpdate) {
+// collection. recentBytes are not adjusted when isPutBack is true,
+// as recentBytes aren't subject to TakeOut/PutBack.
+func recordStat(stat *statsUpdate, isPutBack bool) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
@@ -85,42 +102,31 @@ func recordStat(stat *statsUpdate) {
 
 	storedServerStats := allStats.serverIDtoStats[stat.serverID]
 	if storedServerStats == nil {
-		storedServerStats = newServerStats()
+		storedServerStats = &serverStats{
+			accumulatedStats: &AccumulatedStats{
+				hostnameToStats: make(map[string]*hostStats)}}
 		allStats.serverIDtoStats[stat.serverID] = storedServerStats
 	}
 
-	storedHostStats := storedServerStats.hostnameToStats[stat.hostname]
+	storedHostStats := storedServerStats.accumulatedStats.hostnameToStats[stat.hostname]
 	if storedHostStats == nil {
-		storedHostStats = newHostStats()
-		storedServerStats.hostnameToStats[stat.hostname] = storedHostStats
+		storedHostStats = &hostStats{}
+		storedServerStats.accumulatedStats.hostnameToStats[stat.hostname] = storedHostStats
 	}
 
-	storedServerStats.totalBytesSent += stat.numBytesSent
-	storedServerStats.totalBytesReceived += stat.numBytesReceived
-
 	storedHostStats.numBytesSent += stat.numBytesSent
 	storedHostStats.numBytesReceived += stat.numBytesReceived
 
-	//fmt.Println("server:", stat.serverID, "host:", stat.hostname, "sent:", storedHostStats.numBytesSent, "received:", storedHostStats.numBytesReceived)
-}
-
-func (serverStats ServerStats) GetStatsForReporting() (map[string]int64, int64) {
-
-	hostBytes := make(map[string]int64)
-	bytesTransferred := int64(0)
-
-	for hostname, hostStats := range serverStats.hostnameToStats {
-		totalBytes := hostStats.numBytesReceived + hostStats.numBytesSent
-		bytesTransferred += totalBytes
-		hostBytes[hostname] = totalBytes
+	if !isPutBack {
+		storedServerStats.recentBytesSent += stat.numBytesSent
+		storedServerStats.recentBytesReceived += stat.numBytesReceived
 	}
-
-	return hostBytes, bytesTransferred
 }
 
-// GetBytesTransferredForServer returns total bytes sent and received since
-// the last call to GetBytesTransferredForServer.
-func GetBytesTransferredForServer(serverID string) (sent, received int64) {
+// ReportRecentBytesTransferredForServer returns bytes sent and received since
+// the last call to ReportRecentBytesTransferredForServer. The accumulated sent
+// and received are reset to 0 by this call.
+func ReportRecentBytesTransferredForServer(serverID string) (sent, received int64) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
@@ -130,37 +136,47 @@ func GetBytesTransferredForServer(serverID string) (sent, received int64) {
 		return
 	}
 
-	sent = stats.totalBytesSent
-	received = stats.totalBytesReceived
+	sent = stats.recentBytesSent
+	received = stats.recentBytesReceived
 
-	stats.totalBytesSent = 0
-	stats.totalBytesReceived = 0
+	stats.recentBytesSent = 0
+	stats.recentBytesReceived = 0
 
 	return
 }
 
-// GetForServer returns the server stats for the given server.
-func GetForServer(serverID string) (serverStats *ServerStats) {
+// TakeOutStatsForServer borrows the AccumulatedStats for the specified
+// server. When we fail to report these stats, resubmit them with
+// PutBackStatsForServer. Stats will continue to be accumulated between
+// TakeOut and PutBack calls. The recentBytes values are unaffected by
+// TakeOut/PutBack. Returns empty stats if the serverID is not found.
+func TakeOutStatsForServer(serverID string) (accumulatedStats *AccumulatedStats) {
 	allStats.statsMutex.Lock()
 	defer allStats.statsMutex.Unlock()
 
-	serverStats = allStats.serverIDtoStats[serverID]
-	if serverStats == nil {
-		serverStats = newServerStats()
+	newAccumulatedStats := &AccumulatedStats{
+		hostnameToStats: make(map[string]*hostStats)}
+
+	serverStats := allStats.serverIDtoStats[serverID]
+	if serverStats != nil {
+		accumulatedStats = serverStats.accumulatedStats
+		serverStats.accumulatedStats = newAccumulatedStats
+	} else {
+		accumulatedStats = newAccumulatedStats
 	}
-	delete(allStats.serverIDtoStats, serverID)
 	return
 }
 
-// PutBack re-adds a set of server stats to the collection.
-func PutBack(serverID string, serverStats *ServerStats) {
-	for hostname, hoststats := range serverStats.hostnameToStats {
+// PutBackStatsForServer re-adds a set of server stats to the collection.
+func PutBackStatsForServer(serverID string, accumulatedStats *AccumulatedStats) {
+	for hostname, hoststats := range accumulatedStats.hostnameToStats {
 		recordStat(
 			&statsUpdate{
 				serverID:         serverID,
 				hostname:         hostname,
 				numBytesSent:     hoststats.numBytesSent,
 				numBytesReceived: hoststats.numBytesReceived,
-			})
+			},
+			true)
 	}
 }

+ 6 - 4
psiphon/transferstats/conn.go

@@ -47,8 +47,8 @@ type Conn struct {
 }
 
 // NewConn creates a Conn. serverID can be anything that uniquely
-// identifies the server; it will be passed to GetForServer() when retrieving
-// the accumulated stats.
+// identifies the server; it will be passed to TakeOutStatsForServer() when
+// retrieving the accumulated stats.
 func NewConn(nextConn net.Conn, serverID string, regexps *Regexps) *Conn {
 	return &Conn{
 		Conn:           nextConn,
@@ -85,7 +85,8 @@ func (conn *Conn) Write(buffer []byte) (n int, err error) {
 			conn.serverID,
 			conn.hostname,
 			int64(n),
-			0})
+			0},
+			false)
 	}
 
 	return
@@ -108,7 +109,8 @@ func (conn *Conn) Read(buffer []byte) (n int, err error) {
 		conn.serverID,
 		hostname,
 		0,
-		int64(n)})
+		int64(n)},
+		false)
 
 	return
 }

+ 9 - 9
psiphon/transferstats/transferstats_test.go

@@ -93,18 +93,18 @@ func (suite *StatsTestSuite) Test_StatsConn() {
 	resp.Body.Close()
 }
 
-func (suite *StatsTestSuite) Test_GetForServer() {
+func (suite *StatsTestSuite) Test_TakeOutStatsForServer() {
 
 	zeroPayload := newServerStats()
 
-	payload := GetForServer(_SERVER_ID)
+	payload := TakeOutStatsForServer(_SERVER_ID)
 	suite.Equal(payload, zeroPayload, "should get zero stats before any traffic")
 
 	resp, err := suite.httpClient.Get("http://example.com/index.html")
 	suite.Nil(err, "need successful http to proceed with tests")
 	resp.Body.Close()
 
-	payload = GetForServer(_SERVER_ID)
+	payload = TakeOutStatsForServer(_SERVER_ID)
 	suite.NotNil(payload, "should receive valid payload for valid server ID")
 
 	payloadJSON, err := json.Marshal(payload)
@@ -113,28 +113,28 @@ func (suite *StatsTestSuite) Test_GetForServer() {
 	suite.Nil(err, "payload JSON should parse successfully")
 
 	// After we retrieve the stats for a server, they should be cleared out of the tracked stats
-	payload = GetForServer(_SERVER_ID)
+	payload = TakeOutStatsForServer(_SERVER_ID)
 	suite.Equal(payload, zeroPayload, "after retrieving stats for a server, there should be zero stats (until more data goes through)")
 }
 
-func (suite *StatsTestSuite) Test_PutBack() {
+func (suite *StatsTestSuite) Test_PutBackStatsForServer() {
 	resp, err := suite.httpClient.Get("http://example.com/index.html")
 	suite.Nil(err, "need successful http to proceed with tests")
 	resp.Body.Close()
 
-	payloadToPutBack := GetForServer(_SERVER_ID)
+	payloadToPutBack := TakeOutStatsForServer(_SERVER_ID)
 	suite.NotNil(payloadToPutBack, "should receive valid payload for valid server ID")
 
 	zeroPayload := newServerStats()
 
-	payload := GetForServer(_SERVER_ID)
+	payload := PutBackStatsForServer(_SERVER_ID)
 	suite.Equal(payload, zeroPayload, "should be zero stats after getting them")
 
 	PutBack(_SERVER_ID, payloadToPutBack)
 	// PutBack is asynchronous, so we'll need to wait a moment for it to do its thing
 	<-time.After(100 * time.Millisecond)
 
-	payload = GetForServer(_SERVER_ID)
+	payload = PutBackStatsForServer(_SERVER_ID)
 	suite.NotEqual(payload, zeroPayload, "stats should be re-added after putting back")
 	suite.Equal(payload, payloadToPutBack, "stats should be the same as after the first retrieval")
 }
@@ -217,7 +217,7 @@ func (suite *StatsTestSuite) Test_Regex() {
 		suite.Nil(err)
 		resp.Body.Close()
 
-		payload := GetForServer(_SERVER_ID)
+		payload := TakeOutStatsForServer(_SERVER_ID)
 		suite.NotNil(payload, "should get stats because we made HTTP reqs; %s", scheme)
 
 		expectedHostnames := mapset.NewSet()

+ 1 - 1
psiphon/tunnel.go

@@ -640,7 +640,7 @@ func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
 	for !shutdown && err == nil {
 		select {
 		case <-noticeBytesTransferredTicker.C:
-			sent, received := transferstats.GetBytesTransferredForServer(
+			sent, received := transferstats.ReportRecentBytesTransferredForServer(
 				tunnel.serverEntry.IpAddress)
 
 			if received > 0 {