Browse Source

Implement NoticeBytesTransferred

* transferstats now also records total bytes sent and received
  for each tunnel server.
* Every second, emit notice with total bytes send and
  received (since last notice) when tunnel is active.
* Used for data transfer stats display in Android client.
* Optional feature, enabled with config.EmitBytesTransferred.
Rod Hynes 10 years ago
parent
commit
4ea0217ec9

+ 8 - 2
SampleApps/Psibot/app/src/main/java/ca/psiphon/PsiphonTunnel.java

@@ -70,6 +70,7 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
         public void onClientUpgradeDownloaded(String filename);
         public void onSplitTunnelRegion(String region);
         public void onUntunneledAddress(String address);
+        public void onBytesTransferred(long sent, long received);
     }
 
     private final HostService mHostService;
@@ -311,11 +312,12 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
         // bad user input. Since VpnService mode resolves domain names
         // differently (udpgw), invalid domain name user input won't result
         // in SSH port forward failures.
-        // TODO: only enable when
         if (isVpnMode) {
             json.put("PortForwardFailureThreshold", 10);
         }
-        
+
+        json.put("EmitBytesTransferred", true);
+
         if (mLocalSocksProxyPort != 0) {
             // When mLocalSocksProxyPort is set, tun2socks is already configured
             // to use that port value. So we force use of the same port.
@@ -384,6 +386,10 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
 
             } else if (noticeType.equals("UntunneledAddress")) {
                 mHostService.onHomepage(notice.getJSONObject("data").getString("address"));
+
+            } else if (noticeType.equals("BytesTransferred")) {
+                JSONObject data = notice.getJSONObject("data");
+                mHostService.onBytesTransferred(data.getLong("sent"), data.getLong("received"));
             }
 
             if (diagnostic) {

+ 4 - 0
SampleApps/Psibot/app/src/main/java/ca/psiphon/psibot/Service.java

@@ -224,6 +224,10 @@ public class Service extends VpnService
         Log.addEntry("untunneled address: " + address);
     }
 
+    @Override
+    public void onBytesTransferred(long sent, long received) {
+    }
+
     private static String readInputStreamToString(InputStream inputStream) throws IOException {
         return new String(readInputStreamToBytes(inputStream), "UTF-8");
     }

+ 1 - 0
psiphon/config.go

@@ -98,6 +98,7 @@ type Config struct {
 	SplitTunnelDnsServer                string
 	UpgradeDownloadUrl                  string
 	UpgradeDownloadFilename             string
+	EmitBytesTransferred                bool
 }
 
 // LoadConfig parses and validates a JSON format Psiphon config JSON

+ 6 - 0
psiphon/notice.go

@@ -191,6 +191,12 @@ func NoticeClientUpgradeDownloaded(filename string) {
 	outputNotice("ClientUpgradeDownloaded", false, "filename", filename)
 }
 
+// NoticeBytesTransferred reports how many tunneled bytes have been
+// transferred since the last NoticeBytesTransferred.
+func NoticeBytesTransferred(sent, received int64) {
+	outputNotice("BytesTransferred", false, "sent", sent, "received", received)
+}
+
 type noticeObject struct {
 	NoticeType string          `json:"noticeType"`
 	Data       json.RawMessage `json:"data"`

+ 27 - 1
psiphon/transferstats/collector.go

@@ -46,7 +46,9 @@ func newHostStats() *hostStats {
 
 // serverStats holds per-server stats.
 type serverStats struct {
-	hostnameToStats map[string]*hostStats
+	hostnameToStats    map[string]*hostStats
+	totalBytesSent     int64
+	totalBytesReceived int64
 }
 
 func newServerStats() *serverStats {
@@ -94,6 +96,9 @@ func recordStat(stat *statsUpdate) {
 		storedServerStats.hostnameToStats[stat.hostname] = storedHostStats
 	}
 
+	storedServerStats.totalBytesSent += stat.numBytesSent
+	storedServerStats.totalBytesReceived += stat.numBytesReceived
+
 	storedHostStats.numBytesSent += stat.numBytesSent
 	storedHostStats.numBytesReceived += stat.numBytesReceived
 
@@ -123,6 +128,27 @@ func (ss serverStats) MarshalJSON() ([]byte, error) {
 	return json.Marshal(out)
 }
 
+// GetBytesTransferredForServer returns total bytes sent and received since
+// the last call to GetBytesTransferredForServer.
+func GetBytesTransferredForServer(serverID string) (sent, received int64) {
+	allStats.statsMutex.Lock()
+	defer allStats.statsMutex.Unlock()
+
+	stats := allStats.serverIDtoStats[serverID]
+
+	if stats == nil {
+		return
+	}
+
+	sent = stats.totalBytesSent
+	received = stats.totalBytesReceived
+
+	stats.totalBytesSent = 0
+	stats.totalBytesReceived = 0
+
+	return
+}
+
 // GetForServer returns the json-able stats package for the given server.
 // If there are no stats, nil will be returned.
 func GetForServer(serverID string) (payload *serverStats) {

+ 16 - 0
psiphon/tunnel.go

@@ -529,6 +529,14 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 			TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX)
 	}
 
+	// TODO: don't initialize if !config.EmitBytesTransferred
+	noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
+	if !config.EmitBytesTransferred {
+		noticeBytesTransferredTicker.Stop()
+	} else {
+		defer noticeBytesTransferredTicker.Stop()
+	}
+
 	statsTimer := time.NewTimer(nextStatusRequestPeriod())
 	defer statsTimer.Stop()
 
@@ -538,6 +546,14 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	var err error
 	for err == nil {
 		select {
+		case <-noticeBytesTransferredTicker.C:
+			sent, received := transferstats.GetBytesTransferredForServer(
+				tunnel.serverEntry.IpAddress)
+			// Only emit notice when tunnel is not idle.
+			if sent > 0 || received > 0 {
+				NoticeBytesTransferred(sent, received)
+			}
+
 		case <-statsTimer.C:
 			sendStats(tunnel)
 			statsTimer.Reset(nextStatusRequestPeriod())