Explorar el Código

Implemented fixes and changes from code review

- Renamed stats.Start and stats.Stop to be clearer that they will be part of that package (when we do sub-packages).
- Made /status request actually succeed.
- Added final stats sending. Except... it doesn't quite succeed yet. The MeekConn seems to be already closed.
- Added WaitGroup to ensure stats.Stop doesn't return until the processStats goro has stopped. Also avoids possible panic due to settings allStats.stopSignal to nil before processStats uses it.
- Changes and comments to clear lint warnings.
Adam Pritchard hace 11 años
padre
commit
86211e9646
Se han modificado 7 ficheros con 79 adiciones y 40 borrados
  1. 17 10
      psiphon/controller.go
  2. 16 4
      psiphon/serverApi.go
  3. 23 10
      psiphon/stats_collector.go
  4. 6 0
      psiphon/stats_regexp.go
  5. 14 14
      psiphon/stats_test.go
  6. 2 0
      psiphon/tunnel.go
  7. 1 2
      psiphonClient.go

+ 17 - 10
psiphon/controller.go

@@ -85,8 +85,8 @@ func NewController(config *Config) (controller *Controller) {
 func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
 	Notice(NOTICE_VERSION, VERSION)
 
-	Start()
-	defer Stop()
+	Stats_Start()
+	defer Stats_Stop()
 
 	socksProxy, err := NewSocksProxy(controller.config, controller)
 	if err != nil {
@@ -411,18 +411,13 @@ func (controller *Controller) operateTunnel(tunnel *Tunnel) {
 			err = errors.New("tunnel closed unexpectedly")
 
 		case <-controller.shutdownBroadcast:
+			// Send final stats
+			sendStats(tunnel, session, true)
 			Notice(NOTICE_INFO, "shutdown operate tunnel")
 			return
 
 		case <-statsTimer.C:
-			payload := GetForServer(tunnel.serverEntry.IpAddress)
-			if payload != nil {
-				err := session.DoStatusRequest(payload)
-				if err != nil {
-					Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
-					PutBack(tunnel.serverEntry.IpAddress, payload)
-				}
-			}
+			sendStats(tunnel, session, false)
 			statsTimer.Reset(NextSendPeriod())
 		}
 	}
@@ -440,6 +435,18 @@ func (controller *Controller) operateTunnel(tunnel *Tunnel) {
 	}
 }
 
+// sendStats is a helper for sending session stats to the server.
+func sendStats(tunnel *Tunnel, session *Session, final bool) {
+	payload := GetForServer(tunnel.serverEntry.IpAddress)
+	if payload != nil {
+		err := session.DoStatusRequest(payload, final)
+		if err != nil {
+			Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
+			PutBack(tunnel.serverEntry.IpAddress, payload)
+		}
+	}
+}
+
 // TunneledConn implements net.Conn and wraps a port foward connection.
 // It is used to hook into Read and Write to observe I/O errors and
 // report these errors back to the tunnel monitor as port forward failures.

+ 16 - 4
psiphon/serverApi.go

@@ -72,13 +72,25 @@ func NewSession(config *Config, tunnel *Tunnel) (session *Session, err error) {
 	return session, nil
 }
 
-func (session *Session) DoStatusRequest(payload json.Marshaler) error {
-	payloadJson, err := json.Marshal(payload)
+// DoStatusRequest makes a /status request to the server, sending session stats.
+// final should be true if this is the last such request before disconnecting.
+func (session *Session) DoStatusRequest(statsPayload json.Marshaler, final bool) error {
+	statsPayloadJSON, err := json.Marshal(statsPayload)
 	if err != nil {
 		return err
 	}
-	url := session.buildRequestUrl("status")
-	err = session.doPostRequest(url, "application/json", bytes.NewReader(payloadJson))
+
+	connected := "1"
+	if final {
+		connected = "0"
+	}
+
+	url := session.buildRequestUrl(
+		"status",
+		&ExtraParam{"session_id", session.tunnel.sessionId},
+		&ExtraParam{"connected", connected})
+
+	err = session.doPostRequest(url, "application/json", bytes.NewReader(statsPayloadJSON))
 	return err
 }
 

+ 23 - 10
psiphon/stats_collector.go

@@ -63,15 +63,16 @@ func newServerStats() *serverStats {
 // allStats is the root object that holds stats for all servers and all hosts,
 // as well as the mutex to access them, the channel to update them, etc.
 var allStats struct {
-	serverIDtoStats map[string]*serverStats
-	statsMutex      sync.RWMutex
-	stopSignal      chan struct{}
-	statsChan       chan []statsUpdate
+	serverIDtoStats    map[string]*serverStats
+	statsMutex         sync.RWMutex
+	stopSignal         chan struct{}
+	statsChan          chan []statsUpdate
+	processorWaitGroup sync.WaitGroup
 }
 
 // Start initializes and begins stats collection. Must be called once, when the
 // application starts.
-func Start() {
+func Stats_Start() {
 	if allStats.stopSignal != nil {
 		return
 	}
@@ -80,14 +81,16 @@ func Start() {
 	allStats.stopSignal = make(chan struct{})
 	allStats.statsChan = make(chan []statsUpdate, _CHANNEL_CAPACITY)
 
+	allStats.processorWaitGroup.Add(1)
 	go processStats()
 }
 
 // Stop ends stats collection. Must be called once, before the application
 // terminates.
-func Stop() {
+func Stats_Stop() {
 	if allStats.stopSignal != nil {
 		close(allStats.stopSignal)
+		allStats.processorWaitGroup.Wait()
 		allStats.stopSignal = nil
 	}
 }
@@ -122,6 +125,8 @@ func recordStat(newStat statsUpdate) {
 // processStats is a goro started by Start() and runs until Stop(). It collects
 // stats provided by StatsConn.
 func processStats() {
+	defer allStats.processorWaitGroup.Done()
+
 	for {
 		select {
 		case statSlice := <-allStats.statsChan:
@@ -188,22 +193,20 @@ func (ss serverStats) MarshalJSON() ([]byte, error) {
 	out := make(map[string]interface{})
 
 	var padding []byte
-	padding_size, err := MakeSecureRandomInt(256)
+	paddingSize, err := MakeSecureRandomInt(256)
 	// In case of randomness fail, we're going to proceed with zero padding.
 	// TODO: Is this okay?
 	if err != nil {
 		Notice(NOTICE_ALERT, "stats.serverStats.MarshalJSON: MakeSecureRandomInt failed")
 		padding = make([]byte, 0)
 	} else {
-		padding, err = MakeSecureRandomBytes(padding_size)
+		padding, err = MakeSecureRandomBytes(paddingSize)
 		if err != nil {
 			Notice(NOTICE_ALERT, "stats.serverStats.MarshalJSON: MakeSecureRandomBytes failed")
 			padding = make([]byte, 0)
 		}
 	}
 
-	out["padding"] = base64.StdEncoding.EncodeToString(padding)
-
 	hostBytes := make(map[string]int64)
 	bytesTransferred := int64(0)
 
@@ -216,6 +219,16 @@ func (ss serverStats) MarshalJSON() ([]byte, error) {
 	out["bytes_transferred"] = bytesTransferred
 	out["host_bytes"] = hostBytes
 
+	// Print the notice before adding the padding, since it's not interesting
+	noticeJSON, _ := json.Marshal(out)
+	Notice(NOTICE_INFO, "sending stats: %s %s", noticeJSON, err)
+
+	out["padding"] = base64.StdEncoding.EncodeToString(padding)
+
+	// We're not using these fields, but the server requires them
+	out["page_views"] = make([]string, 0)
+	out["https_requests"] = make([]string, 0)
+
 	return json.Marshal(out)
 }
 

+ 6 - 0
psiphon/stats_regexp.go

@@ -26,8 +26,12 @@ type regexpReplace struct {
 	replace string
 }
 
+// Regexps holds the regular expressions and replacement strings used for
+// transforming URLs and hostnames into a stats-appropriate forms.
 type Regexps []regexpReplace
 
+// MakeRegexps takes the raw string-map form of the regex-replace pairs
+// returned by the server handshake and turns them into a usable object.
 func MakeRegexps(pageViewRegexes, httpsRequestRegexes []map[string]string) *Regexps {
 	regexps := make(Regexps, 0)
 
@@ -57,6 +61,8 @@ func MakeRegexps(pageViewRegexes, httpsRequestRegexes []map[string]string) *Rege
 	return &regexps
 }
 
+// regexHostname processes hostname through the given regexps and returns the
+// string that should be used for stats.
 func regexHostname(hostname string, regexps *Regexps) (statsHostname string) {
 	statsHostname = "(OTHER)"
 	for _, rr := range *regexps {

+ 14 - 14
psiphon/stats_test.go

@@ -43,7 +43,7 @@ func TestStatsTestSuite(t *testing.T) {
 }
 
 func (suite *StatsTestSuite) SetupTest() {
-	Start()
+	Stats_Start()
 
 	re := make(Regexps, 0)
 	suite.httpClient = &http.Client{
@@ -55,7 +55,7 @@ func (suite *StatsTestSuite) SetupTest() {
 
 func (suite *StatsTestSuite) TearDownTest() {
 	suite.httpClient = nil
-	Stop()
+	Stats_Stop()
 }
 
 func makeStatsDialer(serverID string, regexps *Regexps) func(network, addr string) (conn net.Conn, err error) {
@@ -73,7 +73,7 @@ func makeStatsDialer(serverID string, regexps *Regexps) func(network, addr strin
 				return nil, err
 			}
 		default:
-			err = errors.New("Using an unsupported testing network type")
+			err = errors.New("using an unsupported testing network type")
 			return
 		}
 
@@ -85,12 +85,12 @@ func makeStatsDialer(serverID string, regexps *Regexps) func(network, addr strin
 
 func (suite *StatsTestSuite) Test_StartStop() {
 	// Make sure Start and Stop calls don't crash
-	Start()
-	Start()
-	Stop()
-	Stop()
-	Start()
-	Stop()
+	Stats_Start()
+	Stats_Start()
+	Stats_Stop()
+	Stats_Stop()
+	Stats_Start()
+	Stats_Stop()
 }
 
 func (suite *StatsTestSuite) Test_NextSendPeriod() {
@@ -126,9 +126,9 @@ func (suite *StatsTestSuite) Test_GetForServer() {
 	payload = GetForServer(_SERVER_ID)
 	suite.NotNil(payload, "should receive valid payload for valid server ID")
 
-	payloadJson, err := json.Marshal(payload)
-	var parsedJson interface{}
-	err = json.Unmarshal(payloadJson, &parsedJson)
+	payloadJSON, err := json.Marshal(payload)
+	var parsedJSON interface{}
+	err = json.Unmarshal(payloadJSON, &parsedJSON)
 	suite.Nil(err, "payload JSON should parse successfully")
 
 	// After we retrieve the stats for a server, they should be cleared out of the tracked stats
@@ -234,7 +234,7 @@ func (suite *StatsTestSuite) Test_Regex() {
 	expectedHostnames.Add("replacement")
 
 	hostnames := make([]interface{}, 0)
-	for hostname, _ := range payload.hostnameToStats {
+	for hostname := range payload.hostnameToStats {
 		hostnames = append(hostnames, hostname)
 	}
 
@@ -251,7 +251,7 @@ func (suite *StatsTestSuite) Test_recordStat() {
 	// release it.
 	allStats.statsMutex.Lock()
 	stat := statsUpdate{"test", "test", 1, 1}
-	for i := 0; i < _CHANNEL_CAPACITY*2; i += 1 {
+	for i := 0; i < _CHANNEL_CAPACITY*2; i++ {
 		recordStat(stat)
 	}
 	allStats.statsMutex.Unlock()

+ 2 - 0
psiphon/tunnel.go

@@ -291,10 +291,12 @@ func (tunnel *Tunnel) ServerID() string {
 	return tunnel.serverEntry.IpAddress
 }
 
+// StatsRegexps gets the Regexps used for the statistics for this tunnel.
 func (tunnel *Tunnel) StatsRegexps() *Regexps {
 	return tunnel.regexps
 }
 
+// SetStatsRegexps sets the Regexps used for the statistics for this tunnel.
 func (tunnel *Tunnel) SetStatsRegexps(regexps *Regexps) {
 	tunnel.regexps = regexps
 }

+ 1 - 2
psiphonClient.go

@@ -21,12 +21,11 @@ package main
 
 import (
 	"flag"
+	psiphon "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 	"log"
 	"os"
 	"os/signal"
 	"sync"
-
-	psiphon "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 )
 
 func main() {