Procházet zdrojové kódy

More changes based on code review feedback.

- Returning proper error type from DoStatusRequest
- Increased stats channel capacity
- Passing pointers to statsUpdate rather than copies of the struct. This should be much more efficient.
Adam Pritchard před 11 roky
rodič
revize
960665c85f
4 změnil soubory, kde provedl 21 přidání a 16 odebrání
  1. 2 2
      psiphon/serverApi.go
  2. 16 11
      psiphon/stats_collector.go
  3. 2 2
      psiphon/stats_conn.go
  4. 1 1
      psiphon/stats_test.go

+ 2 - 2
psiphon/serverApi.go

@@ -77,7 +77,7 @@ func NewSession(config *Config, tunnel *Tunnel) (session *Session, err error) {
 func (session *Session) DoStatusRequest(statsPayload json.Marshaler, final bool) error {
 	statsPayloadJSON, err := json.Marshal(statsPayload)
 	if err != nil {
-		return err
+		return ContextError(err)
 	}
 
 	connected := "1"
@@ -91,7 +91,7 @@ func (session *Session) DoStatusRequest(statsPayload json.Marshaler, final bool)
 		&ExtraParam{"connected", connected})
 
 	err = session.doPostRequest(url, "application/json", bytes.NewReader(statsPayloadJSON))
-	return err
+	return ContextError(err)
 }
 
 // doHandshakeRequest performs the handshake API request. The handshake

+ 16 - 11
psiphon/stats_collector.go

@@ -34,8 +34,9 @@ import (
 // a small amount of memory (< 1KB, probably), but we should still probably add
 // some kind of stale-stats cleanup.
 
-// TODO: What size should this be?
-var _CHANNEL_CAPACITY = 20
+// _CHANNEL_CAPACITY is the size of the channel that connections use to send stats
+// bundles to the collector/processor.
+var _CHANNEL_CAPACITY = 1000
 
 // Per-host/domain stats.
 // Note that the bytes we're counting are the ones going into the tunnel, so do
@@ -66,7 +67,7 @@ var allStats struct {
 	serverIDtoStats    map[string]*serverStats
 	statsMutex         sync.RWMutex
 	stopSignal         chan struct{}
-	statsChan          chan []statsUpdate
+	statsChan          chan []*statsUpdate
 	processorWaitGroup sync.WaitGroup
 }
 
@@ -79,7 +80,7 @@ func Stats_Start() {
 
 	allStats.serverIDtoStats = make(map[string]*serverStats)
 	allStats.stopSignal = make(chan struct{})
-	allStats.statsChan = make(chan []statsUpdate, _CHANNEL_CAPACITY)
+	allStats.statsChan = make(chan []*statsUpdate, _CHANNEL_CAPACITY)
 
 	allStats.processorWaitGroup.Add(1)
 	go processStats()
@@ -104,10 +105,12 @@ type statsUpdate struct {
 	numBytesReceived int64
 }
 
-// recordStats makes sure the given stats update is added to the global collection.
-// Guaranteed to not block.
-func recordStat(newStat statsUpdate) {
-	statSlice := []statsUpdate{newStat}
+// recordStats makes sure the given stats update is added to the global
+// collection. Guaranteed to not block.
+// Callers of this function should assume that it "takes control" of the
+// statsUpdate object.
+func recordStat(newStat *statsUpdate) {
+	statSlice := []*statsUpdate{newStat}
 	// Priority: Don't block connections when updating stats. We can't just
 	// write to the statsChan, since that will block if it's full. We could
 	// launch a goroutine for each update, but that seems like  unnecessary
@@ -192,6 +195,8 @@ func NextSendPeriod() (duration time.Duration) {
 func (ss serverStats) MarshalJSON() ([]byte, error) {
 	out := make(map[string]interface{})
 
+	// Add a random amount of padding to help prevent stats updates from being
+	// a predictable size (which often happens when the connection is quiet).
 	var padding []byte
 	paddingSize, err := MakeSecureRandomInt(256)
 	// In case of randomness fail, we're going to proceed with zero padding.
@@ -221,7 +226,7 @@ func (ss serverStats) MarshalJSON() ([]byte, error) {
 
 	// Print the notice before adding the padding, since it's not interesting
 	noticeJSON, _ := json.Marshal(out)
-	Notice(NOTICE_INFO, "sending stats: %s %s", noticeJSON, err)
+	Notice(NOTICE_INFO, "sending stats: %s", noticeJSON)
 
 	out["padding"] = base64.StdEncoding.EncodeToString(padding)
 
@@ -245,9 +250,9 @@ func GetForServer(serverID string) (payload *serverStats) {
 
 // PutBack re-adds a set of server stats to the collection.
 func PutBack(serverID string, ss *serverStats) {
-	statSlice := make([]statsUpdate, 0, len(ss.hostnameToStats))
+	statSlice := make([]*statsUpdate, 0, len(ss.hostnameToStats))
 	for hostname, hoststats := range ss.hostnameToStats {
-		statSlice = append(statSlice, statsUpdate{
+		statSlice = append(statSlice, &statsUpdate{
 			serverID:         serverID,
 			hostname:         hostname,
 			numBytesSent:     hoststats.numBytesSent,

+ 2 - 2
psiphon/stats_conn.go

@@ -83,7 +83,7 @@ func (conn *StatsConn) Write(buffer []byte) (n int, err error) {
 			}
 		}
 
-		recordStat(statsUpdate{
+		recordStat(&statsUpdate{
 			conn.serverID,
 			conn.hostname,
 			int64(n),
@@ -99,7 +99,7 @@ func (conn *StatsConn) Read(buffer []byte) (n int, err error) {
 
 	// Count bytes without checking the error condition. It could happen that the
 	// buffer was partially read and then an error occurred.
-	recordStat(statsUpdate{
+	recordStat(&statsUpdate{
 		conn.serverID,
 		conn.hostname,
 		0,

+ 1 - 1
psiphon/stats_test.go

@@ -252,7 +252,7 @@ func (suite *StatsTestSuite) Test_recordStat() {
 	allStats.statsMutex.Lock()
 	stat := statsUpdate{"test", "test", 1, 1}
 	for i := 0; i < _CHANNEL_CAPACITY*2; i++ {
-		recordStat(stat)
+		recordStat(&stat)
 	}
 	allStats.statsMutex.Unlock()
 }