فهرست منبع

Merge pull request #51 from rod-hynes/master

Fix: processStats goro was pinning the CPU. Add flag to enable CPU profiling.
Adam Pritchard 11 سال پیش
والد
کامیت
7143223e75
4فایلهای تغییر یافته به همراه50 افزوده شده و 129 حذف شده
  1. 16 1
      ConsoleClient/psiphonClient.go
  2. 0 3
      psiphon/controller.go
  3. 34 101
      psiphon/stats_collector.go
  4. 0 24
      psiphon/stats_test.go

+ 16 - 1
ConsoleClient/psiphonClient.go

@@ -25,6 +25,7 @@ import (
 	"log"
 	"os"
 	"os/signal"
+	"runtime/pprof"
 	"sync"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
@@ -33,8 +34,13 @@ import (
 func main() {
 
 	var configFilename string
-	flag.StringVar(&configFilename, "config", "", "configuration file")
+	flag.StringVar(&configFilename, "config", "", "configuration input file")
+
+	var profileFilename string
+	flag.StringVar(&profileFilename, "profile", "", "CPU profile output file")
+
 	flag.Parse()
+
 	if configFilename == "" {
 		log.Fatalf("configuration file is required")
 	}
@@ -47,6 +53,15 @@ func main() {
 		log.Fatalf("error processing configuration file: %s", err)
 	}
 
+	if profileFilename != "" {
+		profileFile, err := os.Create(profileFilename)
+		if err != nil {
+			log.Fatalf("error opening profile file: %s", err)
+		}
+		pprof.StartCPUProfile(profileFile)
+		defer pprof.StopCPUProfile()
+	}
+
 	err = psiphon.InitDataStore(config)
 	if err != nil {
 		log.Fatalf("error initializing datastore: %s", err)

+ 0 - 3
psiphon/controller.go

@@ -85,9 +85,6 @@ func NewController(config *Config) (controller *Controller) {
 func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 	Notice(NOTICE_VERSION, VERSION)
 
-	Stats_Start()
-	defer Stats_Stop()
-
 	socksProxy, err := NewSocksProxy(controller.config, controller)
 	if err != nil {
 		Notice(NOTICE_ALERT, "error initializing local SOCKS proxy: %s", err)

+ 34 - 101
psiphon/stats_collector.go

@@ -34,12 +34,6 @@ import (
 // a small amount of memory (< 1KB, probably), but we should still probably add
 // some kind of stale-stats cleanup.
 
-// _CHANNEL_CAPACITY is the size of the channel that connections use to send stats
-// bundles to the collector/processor.
-const (
-	_CHANNEL_CAPACITY = 1000
-)
-
 // Per-host/domain stats.
 // Note that the bytes we're counting are the ones going into the tunnel, so do
 // not include transport overhead.
@@ -64,42 +58,13 @@ func newServerStats() *serverStats {
 }
 
 // allStats is the root object that holds stats for all servers and all hosts,
-// as well as the mutex to access them, the channel to update them, etc.
-var allStats struct {
-	serverIDtoStats    map[string]*serverStats
-	statsMutex         sync.RWMutex
-	stopSignal         chan struct{}
-	statsChan          chan []*statsUpdate
-	processorWaitGroup sync.WaitGroup
-}
-
-// Start initializes and begins stats collection. Must be called once, when the
-// application starts.
-func Stats_Start() {
-	if allStats.stopSignal != nil {
-		return
-	}
-
-	allStats.serverIDtoStats = make(map[string]*serverStats)
-	allStats.stopSignal = make(chan struct{})
-	allStats.statsChan = make(chan []*statsUpdate, _CHANNEL_CAPACITY)
+// as well as the mutex to access them.
+var allStats = struct {
+	statsMutex      sync.RWMutex
+	serverIDtoStats map[string]*serverStats
+}{serverIDtoStats: make(map[string]*serverStats)}
 
-	allStats.processorWaitGroup.Add(1)
-	go processStats()
-}
-
-// Stop ends stats collection. Must be called once, before the application
-// terminates.
-func Stats_Stop() {
-	if allStats.stopSignal != nil {
-		close(allStats.stopSignal)
-		allStats.processorWaitGroup.Wait()
-		allStats.stopSignal = nil
-	}
-}
-
-// Instances of statsUpdate will be sent through the connection-to-collector
-// channel.
+// statsUpdate contains new stats counts to be aggregated.
 type statsUpdate struct {
 	serverID         string
 	hostname         string
@@ -107,64 +72,34 @@ type statsUpdate struct {
 	numBytesReceived int64
 }
 
-// recordStats adds the given stats update is added to the global collection.
+// recordStats makes sure the given stats update is added to the global
+// collection. Guaranteed to not block.
 // Callers of this function should assume that it "takes control" of the
 // statsUpdate object.
-func recordStat(newStat *statsUpdate) {
-	// This function has the potential to block, if statsChan gets full. The
-	// intention is that we give statsChan a big enough buffer that it doesn't
-	// block in normal circumstances
-	statSlice := []*statsUpdate{newStat}
-	allStats.statsChan <- statSlice
-}
-
-// processStats is a goro started by Start() and runs until Stop(). It collects
-// stats provided by StatsConn.
-func processStats() {
-	defer allStats.processorWaitGroup.Done()
-
-	for {
-		select {
-		case statSlice := <-allStats.statsChan:
-			allStats.statsMutex.Lock()
-
-			for _, stat := range statSlice {
-				if stat.hostname == "" {
-					stat.hostname = "(OTHER)"
-				}
-
-				storedServerStats := allStats.serverIDtoStats[stat.serverID]
-				if storedServerStats == nil {
-					storedServerStats = newServerStats()
-					allStats.serverIDtoStats[stat.serverID] = storedServerStats
-				}
+func recordStat(stat *statsUpdate) {
+	allStats.statsMutex.Lock()
+	defer allStats.statsMutex.Unlock()
 
-				storedHostStats := storedServerStats.hostnameToStats[stat.hostname]
-				if storedHostStats == nil {
-					storedHostStats = newHostStats()
-					storedServerStats.hostnameToStats[stat.hostname] = storedHostStats
-				}
+	if stat.hostname == "" {
+		stat.hostname = "(OTHER)"
+	}
 
-				storedHostStats.numBytesSent += stat.numBytesSent
-				storedHostStats.numBytesReceived += stat.numBytesReceived
+	storedServerStats := allStats.serverIDtoStats[stat.serverID]
+	if storedServerStats == nil {
+		storedServerStats = newServerStats()
+		allStats.serverIDtoStats[stat.serverID] = storedServerStats
+	}
 
-				//fmt.Println("server:", stat.serverID, "host:", stat.hostname, "sent:", storedHostStats.numBytesSent, "received:", storedHostStats.numBytesReceived)
-			}
+	storedHostStats := storedServerStats.hostnameToStats[stat.hostname]
+	if storedHostStats == nil {
+		storedHostStats = newHostStats()
+		storedServerStats.hostnameToStats[stat.hostname] = storedHostStats
+	}
 
-			allStats.statsMutex.Unlock()
+	storedHostStats.numBytesSent += stat.numBytesSent
+	storedHostStats.numBytesReceived += stat.numBytesReceived
 
-		default:
-			// Note that we only checking the stopSignal in the default case. This is
-			// because we don't want the statsChan to fill and block the connections
-			// sending to it. The connections have their own signals, so they will
-			// stop themselves, we will drain the channel, and then we will stop.
-			select {
-			case <-allStats.stopSignal:
-				return
-			default:
-			}
-		}
-	}
+	//fmt.Println("server:", stat.serverID, "host:", stat.hostname, "sent:", storedHostStats.numBytesSent, "received:", storedHostStats.numBytesReceived)
 }
 
 // NextSendPeriod returns the amount of time that should be waited before the
@@ -243,15 +178,13 @@ func GetForServer(serverID string) (payload *serverStats) {
 
 // PutBack re-adds a set of server stats to the collection.
 func PutBack(serverID string, ss *serverStats) {
-	statSlice := make([]*statsUpdate, 0, len(ss.hostnameToStats))
 	for hostname, hoststats := range ss.hostnameToStats {
-		statSlice = append(statSlice, &statsUpdate{
-			serverID:         serverID,
-			hostname:         hostname,
-			numBytesSent:     hoststats.numBytesSent,
-			numBytesReceived: hoststats.numBytesReceived,
-		})
+		recordStat(
+			&statsUpdate{
+				serverID:         serverID,
+				hostname:         hostname,
+				numBytesSent:     hoststats.numBytesSent,
+				numBytesReceived: hoststats.numBytesReceived,
+			})
 	}
-
-	allStats.statsChan <- statSlice
 }

+ 0 - 24
psiphon/stats_test.go

@@ -46,8 +46,6 @@ func TestStatsTestSuite(t *testing.T) {
 }
 
 func (suite *StatsTestSuite) SetupTest() {
-	Stats_Start()
-
 	re := make(Regexps, 0)
 	suite.httpClient = &http.Client{
 		Transport: &http.Transport{
@@ -58,7 +56,6 @@ func (suite *StatsTestSuite) SetupTest() {
 
 func (suite *StatsTestSuite) TearDownTest() {
 	suite.httpClient = nil
-	Stats_Stop()
 }
 
 func makeStatsDialer(serverID string, regexps *Regexps) func(network, addr string) (conn net.Conn, err error) {
@@ -86,16 +83,6 @@ func makeStatsDialer(serverID string, regexps *Regexps) func(network, addr strin
 	}
 }
 
-func (suite *StatsTestSuite) Test_StartStop() {
-	// Make sure Start and Stop calls don't crash
-	Stats_Start()
-	Stats_Start()
-	Stats_Stop()
-	Stats_Stop()
-	Stats_Start()
-	Stats_Stop()
-}
-
 func (suite *StatsTestSuite) Test_NextSendPeriod() {
 	res1 := NextSendPeriod()
 	suite.True(res1 > time.Duration(0), "duration should not be zero")
@@ -252,17 +239,6 @@ func (suite *StatsTestSuite) Test_Regex() {
 	}
 }
 
-func (suite *StatsTestSuite) Test_recordStat() {
-	// The normal operation of this function will get exercised during the
-	// other tests. Here we will quickly record more stats updates than the
-	// channel capacity. The test is just that this function returns, and doesn't
-	// crash or block forever.
-	stat := statsUpdate{"test", "test", 1, 1}
-	for i := 0; i < _CHANNEL_CAPACITY*2; i++ {
-		recordStat(&stat)
-	}
-}
-
 func (suite *StatsTestSuite) Test_getTLSHostname() {
 	// TODO: Create a more robust/antagonistic set of negative tests.
 	// We can write raw TCP to simulate any arbitrary degree of "almost looks