Browse Source

Fix packet tunnel OSL bugs

- osl and tun package UpdateProgress functions had
  reverse bytes read/written parameters, causing the
  incorrect counts to be used

- source/destination were reversed in flow ID, causing
  OSL logic to lookup incorrect IPs

- add automated test check for expected flow IP
Rod Hynes 6 years ago
parent
commit
5f9796e281
3 changed files with 31 additions and 26 deletions
  1. 6 6
      psiphon/common/tun/tun.go
  2. 21 16
      psiphon/common/tun/tun_test.go
  3. 4 4
      psiphon/server/tunnelServer.go

+ 6 - 6
psiphon/common/tun/tun.go

@@ -334,7 +334,7 @@ type AllowedPortChecker func(upstreamIPAddress net.IP, port int) bool
 // flow activity. Values passed to UpdateProgress are bytes transferred
 // and flow duration since the previous UpdateProgress.
 type FlowActivityUpdater interface {
-	UpdateProgress(upstreamBytes, downstreamBytes int64, durationNanoseconds int64)
+	UpdateProgress(downstreamBytes, upstreamBytes int64, durationNanoseconds int64)
 }
 
 // FlowActivityUpdaterMaker is a function which returns a list of
@@ -346,8 +346,8 @@ type FlowActivityUpdaterMaker func(
 // MetricsUpdater is a function which receives a checkpoint summary
 // of application bytes transferred through a packet tunnel.
 type MetricsUpdater func(
-	TCPApplicationBytesUp, TCPApplicationBytesDown,
-	UDPApplicationBytesUp, UDPApplicationBytesDown int64)
+	TCPApplicationBytesDown, TCPApplicationBytesUp,
+	UDPApplicationBytesDown, UDPApplicationBytesUp int64)
 
 // ClientConnected handles new client connections, creating or resuming
 // a session and returns with client packet handlers running.
@@ -1347,7 +1347,7 @@ func (session *session) updateFlow(
 	}
 
 	for _, updater := range flowState.activityUpdaters {
-		updater.UpdateProgress(upstreamBytes, downstreamBytes, durationNanoseconds)
+		updater.UpdateProgress(downstreamBytes, upstreamBytes, durationNanoseconds)
 	}
 }
 
@@ -2327,11 +2327,11 @@ func processPacket(
 
 		if direction == packetDirectionServerUpstream {
 			ID.set(
-				destinationIPAddress, destinationPort, sourceIPAddress, sourcePort, protocol)
+				sourceIPAddress, sourcePort, destinationIPAddress, destinationPort, protocol)
 
 		} else if direction == packetDirectionServerDownstream {
 			ID.set(
-				sourceIPAddress, sourcePort, destinationIPAddress, destinationPort, protocol)
+				destinationIPAddress, destinationPort, sourceIPAddress, sourcePort, protocol)
 		}
 
 		isTrackingFlow = session.isTrackingFlow(ID)

+ 21 - 16
psiphon/common/tun/tun_test.go

@@ -111,15 +111,20 @@ func testTunneledTCP(t *testing.T, useIPv6 bool) {
 
 	var flowCounter bytesTransferredCounter
 
-	flowActivityUpdaterMaker := func(_ string, _ net.IP) []FlowActivityUpdater {
+	flowActivityUpdaterMaker := func(_ string, IPAddress net.IP) []FlowActivityUpdater {
+
+		if IPAddress.String() != testTCPServer.getListenerIPAddress() {
+			t.Fatalf("unexpected flow IP address")
+		}
+
 		return []FlowActivityUpdater{&flowCounter}
 	}
 
 	var metricsCounter bytesTransferredCounter
 
-	metricsUpdater := func(TCPApplicationBytesUp, TCPApplicationBytesDown, _, _ int64) {
+	metricsUpdater := func(TCPApplicationBytesDown, TCPApplicationBytesUp, _, _ int64) {
 		metricsCounter.UpdateProgress(
-			TCPApplicationBytesUp, TCPApplicationBytesDown, 0)
+			TCPApplicationBytesDown, TCPApplicationBytesUp, 0)
 	}
 
 	testServer, err := startTestServer(useIPv6, MTU, flowActivityUpdaterMaker, metricsUpdater)
@@ -253,25 +258,25 @@ func testTunneledTCP(t *testing.T, useIPv6 bool) {
 
 	expectedBytesTransferred := CONCURRENT_CLIENT_COUNT * TCP_RELAY_TOTAL_SIZE
 
-	upstreamBytesTransferred, downstreamBytesTransferred, _ := flowCounter.Get()
-	if upstreamBytesTransferred < expectedBytesTransferred {
-		t.Fatalf("unexpected flow upstreamBytesTransferred: %d; expected at least %d",
-			upstreamBytesTransferred, expectedBytesTransferred)
-	}
+	downstreamBytesTransferred, upstreamBytesTransferred, _ := flowCounter.Get()
 	if downstreamBytesTransferred < expectedBytesTransferred {
 		t.Fatalf("unexpected flow downstreamBytesTransferred: %d; expected at least %d",
 			downstreamBytesTransferred, expectedBytesTransferred)
 	}
-
-	upstreamBytesTransferred, downstreamBytesTransferred, _ = metricsCounter.Get()
 	if upstreamBytesTransferred < expectedBytesTransferred {
-		t.Fatalf("unexpected metrics upstreamBytesTransferred: %d; expected at least %d",
+		t.Fatalf("unexpected flow upstreamBytesTransferred: %d; expected at least %d",
 			upstreamBytesTransferred, expectedBytesTransferred)
 	}
+
+	downstreamBytesTransferred, upstreamBytesTransferred, _ = metricsCounter.Get()
 	if downstreamBytesTransferred < expectedBytesTransferred {
 		t.Fatalf("unexpected metrics downstreamBytesTransferred: %d; expected at least %d",
 			downstreamBytesTransferred, expectedBytesTransferred)
 	}
+	if upstreamBytesTransferred < expectedBytesTransferred {
+		t.Fatalf("unexpected metrics upstreamBytesTransferred: %d; expected at least %d",
+			upstreamBytesTransferred, expectedBytesTransferred)
+	}
 
 	testServer.stop()
 
@@ -282,22 +287,22 @@ type bytesTransferredCounter struct {
 	// Note: 64-bit ints used with atomic operations are placed
 	// at the start of struct to ensure 64-bit alignment.
 	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	upstreamBytes       int64
 	downstreamBytes     int64
+	upstreamBytes       int64
 	durationNanoseconds int64
 }
 
 func (counter *bytesTransferredCounter) UpdateProgress(
-	upstreamBytes, downstreamBytes int64, durationNanoseconds int64) {
+	downstreamBytes, upstreamBytes int64, durationNanoseconds int64) {
 
-	atomic.AddInt64(&counter.upstreamBytes, upstreamBytes)
 	atomic.AddInt64(&counter.downstreamBytes, downstreamBytes)
+	atomic.AddInt64(&counter.upstreamBytes, upstreamBytes)
 	atomic.AddInt64(&counter.durationNanoseconds, durationNanoseconds)
 }
 
 func (counter *bytesTransferredCounter) Get() (int64, int64, int64) {
-	return atomic.LoadInt64(&counter.upstreamBytes),
-		atomic.LoadInt64(&counter.downstreamBytes),
+	return atomic.LoadInt64(&counter.downstreamBytes),
+		atomic.LoadInt64(&counter.upstreamBytes),
 		atomic.LoadInt64(&counter.durationNanoseconds)
 }
 

+ 4 - 4
psiphon/server/tunnelServer.go

@@ -1829,14 +1829,14 @@ func (sshClient *sshClient) handleNewPacketTunnelChannel(
 	}
 
 	metricUpdater := func(
-		TCPApplicationBytesUp, TCPApplicationBytesDown,
-		UDPApplicationBytesUp, UDPApplicationBytesDown int64) {
+		TCPApplicationBytesDown, TCPApplicationBytesUp,
+		UDPApplicationBytesDown, UDPApplicationBytesUp int64) {
 
 		sshClient.Lock()
-		sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
 		sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
-		sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
+		sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
 		sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
+		sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
 		sshClient.Unlock()
 	}