Browse Source

Stats implementation in progress.

Also merged upstream (required conflict resolution).
Adam Pritchard 11 years ago
parent
commit
b8cbc7be3e

+ 11 - 5
psiphon/controller.go

@@ -30,6 +30,8 @@ import (
 	"net"
 	"sync"
 	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/stats"
 )
 
 // Controller is a tunnel lifecycle coordinator. It manages lists of servers to
@@ -83,9 +85,11 @@ func NewController(config *Config) (controller *Controller) {
 // - a local SOCKS proxy that port forwards through the pool of tunnels
 // - a local HTTP proxy that port forwards through the pool of tunnels
 func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
-
 	Notice(NOTICE_VERSION, VERSION)
 
+	stats.Start()
+	defer stats.Stop()
+
 	socksProxy, err := NewSocksProxy(controller.config, controller)
 	if err != nil {
 		Notice(NOTICE_ALERT, "error initializing local SOCKS proxy: %s", err)
@@ -476,10 +480,12 @@ func (controller *Controller) Dial(remoteAddr string) (conn net.Conn, err error)
 		}
 		return nil, ContextError(err)
 	}
-	return &TunneledConn{
-			Conn:   tunnelConn,
-			tunnel: tunnel},
-		nil
+
+	conn = &TunneledConn{
+		Conn:   stats.NewStatsConn(tunnelConn, tunnel.GetServerID()),
+		tunnel: tunnel}
+
+	return
 }
 
 // startEstablishing creates a pool of worker goroutines which will

+ 2 - 1
psiphon/meekConn.go

@@ -21,7 +21,6 @@ package psiphon
 
 import (
 	"bytes"
-	"code.google.com/p/go.crypto/nacl/box"
 	"crypto/rand"
 	"encoding/base64"
 	"encoding/json"
@@ -33,6 +32,8 @@ import (
 	"net/url"
 	"sync"
 	"time"
+
+	"code.google.com/p/go.crypto/nacl/box"
 )
 
 // MeekConn is based on meek-client.go from Tor and Psiphon:

+ 136 - 0
psiphon/stats/statsCollector.go

@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2014, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package stats
+
+import (
+	"fmt"
+	"sync"
+)
+
+// TODO: What size should this be?
+var _CHANNEL_CAPACITY = 20
+
+type hostnameStats struct {
+	numBytesSent     int64
+	numBytesReceived int64
+}
+
+func newHostnameStats() *hostnameStats {
+	return &hostnameStats{}
+}
+
+// serverStats holds per-server stats.
+// Note that the bytes we're counting are the ones going into the tunnel, so do
+// not include transport overhead.
+type serverStats struct {
+	hostnameToStats map[string]*hostnameStats
+}
+
+func newServerStats() *serverStats {
+	return &serverStats{
+		hostnameToStats: make(map[string]*hostnameStats),
+	}
+}
+
+var allStats struct {
+	serverIDtoStats map[string]*serverStats
+	statsMutex      sync.RWMutex
+	stopSignal      chan struct{}
+	statsChan       chan statsUpdate
+}
+
+// Start initializes and begins stats collection. Must be called once, when the
+// application starts.
+func Start() {
+	if allStats.stopSignal != nil {
+		return
+	}
+
+	allStats.serverIDtoStats = make(map[string]*serverStats)
+	allStats.stopSignal = make(chan struct{})
+	allStats.statsChan = make(chan statsUpdate, _CHANNEL_CAPACITY)
+
+	go processStats()
+}
+
+// Stop ends stats collection. Must be called once, before the application terminates.
+func Stop() {
+	if allStats.stopSignal != nil {
+		close(allStats.stopSignal)
+		allStats.stopSignal = nil
+	}
+}
+
+type statsUpdate struct {
+	serverID         string
+	hostname         string
+	numBytesSent     int
+	numBytesReceived int
+}
+
+func recordStat(newStat statsUpdate) {
+	go func() {
+		allStats.statsChan <- newStat
+	}()
+}
+
+func processStats() {
+	for {
+		select {
+		case stat := <-allStats.statsChan:
+			if stat.hostname == "" {
+				stat.hostname = "(OTHER)"
+			}
+
+			allStats.statsMutex.Lock()
+
+			storedServerStats := allStats.serverIDtoStats[stat.serverID]
+			if storedServerStats == nil {
+				storedServerStats = newServerStats()
+				allStats.serverIDtoStats[stat.serverID] = storedServerStats
+			}
+
+			storedHostnameStats := storedServerStats.hostnameToStats[stat.hostname]
+			if storedHostnameStats == nil {
+				storedHostnameStats = newHostnameStats()
+				storedServerStats.hostnameToStats[stat.hostname] = storedHostnameStats
+			}
+
+			storedHostnameStats.numBytesSent += int64(stat.numBytesSent)
+			storedHostnameStats.numBytesReceived += int64(stat.numBytesReceived)
+
+			fmt.Println(stat.hostname, storedHostnameStats.numBytesSent, storedHostnameStats.numBytesReceived)
+
+			allStats.statsMutex.Unlock()
+
+		default:
+			// Note that we only checking the stopSignal in the default case. This is
+			// because we don't want the statsChan to fill and block the connections
+			// sending to it. The connections have their own signals, so they will
+			// stop themselves, we will drain the channel, and then we will stop.
+			select {
+			case <-allStats.stopSignal:
+				fmt.Println("stats processor stopping")
+				return
+			default:
+			}
+		}
+	}
+}

+ 93 - 0
psiphon/stats/statsConn.go

@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2014, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+// Package stats conts and keeps track of session stats. These per-domain bytes
+// transferred and total bytes transferred.
+package stats
+
+/*
+Assumption: The same connection will not be used to access different hostnames
+	(even if, say, those hostnames map to the same server). If this does occur, we
+	will mis-attribute some bytes.
+Assumption: Enough of the first HTTP will be present in the first Write() call
+	for us to a) recognize that it is HTTP, and b) parse the hostname.
+		- If this turns out to not be generally true we will need to add buffering.
+*/
+
+import (
+	"bufio"
+	"bytes"
+	"net"
+	"net/http"
+)
+
+// StatsConn is to be used as an intermediate link in a chain of net.Conn objects.
+// It inspects requests and responses and derives stats from them.
+type StatsConn struct {
+	net.Conn
+	serverID   string
+	hostname   string
+	firstWrite bool
+}
+
+func NewStatsConn(nextConn net.Conn, serverID string) *StatsConn {
+	return &StatsConn{
+		Conn:       nextConn,
+		firstWrite: true,
+	}
+}
+
+// Called when requests are being written out through the tunnel to the remote
+// server.
+func (conn *StatsConn) Write(buffer []byte) (n int, err error) {
+	// First pass the data down the chain.
+	n, err = conn.Conn.Write(buffer)
+
+	// Count stats before we check the error condition. It could happen that the
+	// buffer was partially written and then an error occurred.
+	if n > 0 {
+		// If this is the first request, try to determine the hostname to associate
+		// with this connection.
+		if conn.firstWrite {
+			conn.firstWrite = false
+
+			// Check if this is a HTTP request
+			bufferReader := bufio.NewReader(bytes.NewReader(buffer))
+			httpReq, httpErr := http.ReadRequest(bufferReader)
+			if httpErr == nil {
+				conn.hostname = httpReq.Host
+			}
+		}
+
+		recordStat(statsUpdate{conn.serverID, conn.hostname, n, 0})
+	}
+
+	return
+}
+
+// Called when responses to requests are being read from the remote server.
+func (conn *StatsConn) Read(buffer []byte) (n int, err error) {
+	n, err = conn.Conn.Read(buffer)
+
+	// Count bytes without checking the error condition. It could happen that the
+	// buffer was partially read and then an error occurred.
+	recordStat(statsUpdate{conn.serverID, conn.hostname, 0, n})
+
+	return
+}

+ 80 - 0
psiphon/stats/stats_test.go

@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2014, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package stats
+
+import (
+	"errors"
+	"fmt"
+	"net"
+	"net/http"
+	"testing"
+
+	"github.com/stretchr/testify/suite"
+)
+
+type StatsTestSuite struct {
+	suite.Suite
+}
+
+func TestStatsTestSuite(t *testing.T) {
+	suite.Run(t, new(StatsTestSuite))
+}
+
+func statsDialer(network, addr string) (conn net.Conn, err error) {
+	fmt.Println("statsDialer", network, addr)
+
+	var subConn net.Conn
+
+	switch network {
+	case "tcp", "tcp4", "tcp6":
+		tcpAddr, err := net.ResolveTCPAddr(network, addr)
+		if err != nil {
+			return nil, err
+		}
+		subConn, err = net.DialTCP(network, nil, tcpAddr)
+		if err != nil {
+			return nil, err
+		}
+	default:
+		err = errors.New("Using an unsupported testing network type")
+		return
+	}
+
+	conn = &StatsConn{
+		Conn: subConn,
+	}
+	err = nil
+	return
+}
+
+func (suite *StatsTestSuite) Test_Blah() {
+	tr := &http.Transport{
+		Dial: statsDialer,
+	}
+
+	client := &http.Client{Transport: tr}
+	resp, err := client.Get("http://s3.amazonaws.com/f58xp-mqce-k1yj/en/index.html")
+	resp.Body.Close()
+	fmt.Println("resp", resp, "; err", err)
+
+	resp, err = client.Get("http://s3.amazonaws.com/f58p-mqce-k1yj/en/index.html")
+	resp.Body.Close()
+	fmt.Println("resp", resp, "; err", err)
+}

+ 8 - 1
psiphon/tunnel.go

@@ -21,7 +21,6 @@ package psiphon
 
 import (
 	"bytes"
-	"code.google.com/p/go.crypto/ssh"
 	"encoding/base64"
 	"encoding/json"
 	"errors"
@@ -30,6 +29,8 @@ import (
 	"strings"
 	"sync/atomic"
 	"time"
+
+	"code.google.com/p/go.crypto/ssh"
 )
 
 // Tunneler specifies the interface required by components that use a tunnel.
@@ -282,3 +283,9 @@ func (tunnel *Tunnel) SignalFailure() {
 	Notice(NOTICE_ALERT, "tunnel received failure signal")
 	tunnel.Close()
 }
+
+// GetServerID provides a unique identifier for the server the tunnel connects to.
+// This ID is consistent between multiple tunnels connected to that server.
+func (tunnel *Tunnel) GetServerID() string {
+	return tunnel.serverEntry.IpAddress
+}

+ 2 - 1
psiphonClient.go

@@ -21,11 +21,12 @@ package main
 
 import (
 	"flag"
-	psiphon "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 	"log"
 	"os"
 	"os/signal"
 	"sync"
+
+	psiphon "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 )
 
 func main() {