Просмотр исходного кода

Added flow inspector test for OSSH Prefix

Amir Khan 2 лет назад
Родитель
Сommit
aa1ade15e5
1 измененных файлов с 256 добавлено и 9 удалено
  1. 256 9
      psiphon/server/server_test.go

+ 256 - 9
psiphon/server/server_test.go

@@ -20,12 +20,15 @@
 package server
 
 import (
+	"bytes"
 	"context"
 	"encoding/base64"
+	"encoding/hex"
 	"encoding/json"
 	std_errors "errors"
 	"flag"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"net"
 	"net/http"
@@ -41,6 +44,7 @@ import (
 	"testing"
 	"time"
 
+	socks "github.com/Psiphon-Labs/goptlib"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
@@ -170,6 +174,23 @@ func TestPrefixedOSSH(t *testing.T) {
 			applyPrefix:          true,
 			doDanglingTCPConn:    true,
 			doLogHostProvider:    true,
+			inspectFlows:         true,
+		})
+}
+
+func TestFragmentedPrefixedOSSH(t *testing.T) {
+	runServer(t,
+		&runServerConfig{
+			tunnelProtocol:       "OSSH",
+			enableSSHAPIRequests: true,
+			requireAuthorization: true,
+			doTunneledWebRequest: true,
+			doTunneledNTPRequest: true,
+			applyPrefix:          true,
+			forceFragmenting:     true,
+			doDanglingTCPConn:    true,
+			doLogHostProvider:    true,
+			inspectFlows:         true,
 		})
 }
 
@@ -522,6 +543,7 @@ type runServerConfig struct {
 	doDestinationBytes   bool
 	doChangeBytesConfig  bool
 	doLogHostProvider    bool
+	inspectFlows         bool
 }
 
 var (
@@ -572,6 +594,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 	doClientTactics := protocol.TunnelProtocolUsesMeek(runConfig.tunnelProtocol)
 	doServerTactics := doClientTactics ||
+		runConfig.applyPrefix ||
 		runConfig.forceFragmenting ||
 		runConfig.doBurstMonitor ||
 		runConfig.doDestinationBytes
@@ -676,6 +699,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			runConfig.doBurstMonitor,
 			runConfig.doDestinationBytes,
 			runConfig.applyPrefix,
+			runConfig.forceFragmenting,
 		)
 	}
 
@@ -774,6 +798,17 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		}
 	})
 
+	// run flow inspector if requested
+	var flowInspectorProxy *flowInspectorProxy
+	if runConfig.inspectFlows {
+		flowInspectorProxy, err = newFlowInspectorProxy()
+		if err != nil {
+			t.Fatalf("error starting flow inspector: %s", err)
+		}
+		flowInspectorProxy.start()
+		defer flowInspectorProxy.close()
+	}
+
 	// run server
 
 	serverWaitGroup := new(sync.WaitGroup)
@@ -928,6 +963,12 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	clientConfig.EmitSLOKs = true
 	clientConfig.EmitServerAlerts = true
 
+	if runConfig.inspectFlows {
+		trueVal := true
+		clientConfig.UpstreamProxyURL = fmt.Sprintf("socks5://%s", flowInspectorProxy.listener.Addr())
+		clientConfig.UpstreamProxyAllowAllServerEntrySources = &trueVal
+	}
+
 	if runConfig.doSplitTunnel {
 		clientConfig.SplitTunnelOwnRegion = true
 	}
@@ -963,14 +1004,14 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 		if runConfig.applyPrefix {
 
 			applyParameters[parameters.OSSHPrefixSpecs] = transforms.Specs{
-				"TEST": {{"", "\x00{24}"}},
+				"TEST": {{"", "\x00{200}"}},
 			}
 			applyParameters[parameters.OSSHPrefixScopedSpecNames] = transforms.ScopedSpecNames{
 				"": {"TEST"},
 			}
 			applyParameters[parameters.OSSHPrefixProbability] = 1.0
-			applyParameters[parameters.OSSHPrefixSplitMinDelay] = 1 * time.Millisecond
-			applyParameters[parameters.OSSHPrefixSplitMaxDelay] = 10 * time.Millisecond
+			applyParameters[parameters.OSSHPrefixSplitMinDelay] = "10ms"
+			applyParameters[parameters.OSSHPrefixSplitMaxDelay] = "20ms"
 
 			applyParameters[parameters.OSSHPrefixEnableFragmentor] = runConfig.forceFragmenting
 
@@ -1186,7 +1227,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 			livenessTestSize,
 			runConfig.doBurstMonitor,
 			false,
-			false)
+			false, false)
 
 		p, _ := os.FindProcess(os.Getpid())
 		p.Signal(syscall.SIGUSR1)
@@ -1396,6 +1437,61 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 
 	// Check that datastore had retained/pruned server entries as expected.
 	checkPruneServerEntriesTest(t, runConfig, testDataDirName, pruneServerEntryTestCases)
+
+	// Inspect OSSH prefix flows, if applicable.
+	if runConfig.inspectFlows && runConfig.applyPrefix {
+
+		flows := <-flowInspectorProxy.ch
+		serverFlows := flows[0]
+		clientFlows := flows[1]
+
+		expectedClientPrefix := bytes.Repeat([]byte{0x00}, 200)
+		expectedServerPrefix := bytes.Repeat([]byte{0x01}, 200)
+
+		if runConfig.forceFragmenting {
+
+			// Fragmentor was applied, so check for prefix in stream dump.
+			if !bytes.Equal(clientFlows.streamDump.Bytes()[:200], expectedClientPrefix) {
+				t.Fatal("client flow does not have expected prefix")
+			}
+
+			if !bytes.Equal(serverFlows.streamDump.Bytes()[:200], expectedServerPrefix) {
+				t.Fatal("server flow does not have expected prefix")
+			}
+
+			fragmentorMaxWriteBytes := 100
+			if len(clientFlows.flows[0].data) > fragmentorMaxWriteBytes {
+				t.Fatal("client flow was not fragmented")
+			}
+			if len(serverFlows.flows[0].data) > fragmentorMaxWriteBytes {
+				t.Fatal("server flow was not fragmented")
+			}
+
+		} else {
+			// Fragmentor was not applied, so check for prefix in first flow.
+			if !bytes.Equal(clientFlows.flows[0].data, expectedClientPrefix) {
+				t.Fatal("client flow does not have expected prefix")
+			}
+			if !bytes.Equal(serverFlows.flows[0].data, expectedServerPrefix) {
+				t.Fatal("server flow does not have expected prefix")
+			}
+
+			// Analyze time bwetween prefix and next packet.
+			// client 10-20ms, 30-40ms for server with standard deviation of 1ms.
+			clientZtest := testSampleInUniformRange(clientFlows.flows[1].timeDelta.Microseconds(), 10000, 20000, 1000)
+			serverZtest := testSampleInUniformRange(serverFlows.flows[1].timeDelta.Microseconds(), 30000, 40000, 1000)
+
+			if !clientZtest {
+				t.Fatalf("client write delay after prefix too high: %f ms",
+					clientFlows.flows[1].timeDelta.Seconds()*1e3)
+			}
+
+			if !serverZtest {
+				t.Fatalf("server write delay after prefix too high: %f ms",
+					serverFlows.flows[1].timeDelta.Seconds()*1e3)
+			}
+		}
+	}
 }
 
 func sendNotificationReceived(c chan<- struct{}) {
@@ -2498,7 +2594,8 @@ func paveTacticsConfigFile(
 	livenessTestSize int,
 	doBurstMonitor bool,
 	doDestinationBytes bool,
-	applyOsshPrefix bool) {
+	applyOsshPrefix bool,
+	enableOsshPrefixFragmenting bool) {
 
 	// Setting LimitTunnelProtocols passively exercises the
 	// server-side LimitTunnelProtocols enforcement.
@@ -2604,12 +2701,14 @@ func paveTacticsConfigFile(
 
 	osshPrefix := ""
 	if applyOsshPrefix {
-		osshPrefix = `
+		osshPrefix = fmt.Sprintf(`
           "ServerOSSHPrefixSpecs": {
-              "TEST": [["", "\\x00{20}"]],
+              "TEST": [["", "\\x01{200}"]]
           },
-          "OSSHPrefixEnableFragmentor": true,
-					`
+          "OSSHPrefixSplitMinDelay": "30ms",
+          "OSSHPrefixSplitMaxDelay": "40ms",
+          "OSSHPrefixEnableFragmentor": %s,
+	`, strconv.FormatBool(enableOsshPrefixFragmenting))
 	}
 
 	tacticsConfigJSON := fmt.Sprintf(
@@ -3001,3 +3100,151 @@ func (v verifyTestCasesStoredLookup) checkStored(t *testing.T, errMessage string
 		t.Fatalf("%s: %+v", errMessage, v)
 	}
 }
+
+type Number interface {
+	int64 | float64
+}
+
+// testSampleInUniformRange returns true if sample is in the range [a, b],
+// or within 2 standard deviations of the range.
+func testSampleInUniformRange[V Number](sample, a, b, stddev V) bool {
+	if sample >= a && sample <= b {
+		return true
+	}
+	lower := float64(sample-a) / float64(stddev)
+	higher := float64(sample-b) / float64(stddev)
+	return lower <= 2.0 || higher <= 2.0
+}
+
+type flowInspectorProxy struct {
+	listener *socks.SocksListener
+	ch       chan []*flows
+}
+
+func newFlowInspectorProxy() (*flowInspectorProxy, error) {
+	listener, err := socks.ListenSocks("tcp", "127.0.0.1:0")
+	if err != nil {
+		fmt.Printf("socks.ListenSocks failed: %s\n", err)
+		return nil, err
+	}
+	return &flowInspectorProxy{
+		listener: listener,
+		ch:       make(chan []*flows, 1),
+	}, nil
+}
+
+func (f *flowInspectorProxy) start() {
+
+	go func() {
+		for {
+			localConn, err := f.listener.AcceptSocks()
+			if err != nil {
+				return
+			}
+			go func() {
+				defer localConn.Close()
+				remoteConn, err := net.Dial("tcp", localConn.Req.Target)
+				if err != nil {
+					fmt.Printf("net.Dial failed: %s\n", err)
+					return
+				}
+				defer remoteConn.Close()
+				err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
+				if err != nil {
+					fmt.Printf("localConn.Grant failed: %s\n", err)
+					return
+				}
+
+				waitGroup := new(sync.WaitGroup)
+				waitGroup.Add(1)
+				serverFlowWriter := newFlowWriter(true)
+				clientFlowWriter := newFlowWriter(false)
+				go func() {
+					defer waitGroup.Done()
+
+					// Copy from remote to local, and tee to serverFlowWriter.
+					io.Copy(localConn, io.TeeReader(remoteConn, serverFlowWriter))
+
+					// fmt.Printf("Server Flows:\n%s\n\n", serverFlowWriter.String())
+
+					localConn.Close()
+					remoteConn.Close()
+				}()
+
+				// Copy from local to remote, and tee to clientFlowWriter.
+				io.Copy(remoteConn, io.TeeReader(localConn, clientFlowWriter))
+
+				// fmt.Printf("Client Flows:\n%s\n\n", clientFlowWriter.String())
+
+				localConn.Close()
+				remoteConn.Close()
+				waitGroup.Wait()
+
+				// clientFlowWriter and serverFlowWriter are synchronized by waitGroup.
+				f.ch <- []*flows{serverFlowWriter, clientFlowWriter}
+			}()
+		}
+	}()
+}
+
+func (f *flowInspectorProxy) close() error {
+	return f.listener.Close()
+}
+
+type flow struct {
+	// timeDelta is the time elapsed since the last flow
+	timeDelta time.Duration
+	data      []byte
+}
+
+type flows struct {
+	lastTime   time.Time
+	server     bool
+	streamDump *bytes.Buffer
+	flows      []flow
+}
+
+func newFlowWriter(server bool) *flows {
+	return &flows{
+		lastTime:   time.Now(),
+		streamDump: new(bytes.Buffer),
+		server:     server,
+	}
+}
+
+// String returns a string representation of the first 10 flows.
+func (f *flows) String() string {
+	var sb strings.Builder
+	for i, flow := range f.flows[:10] {
+		sb.WriteString(fmt.Sprintf("Flow %d: %.5f ms: %s\n",
+			i, flow.timeDelta.Seconds()*1000, hex.EncodeToString(flow.data)))
+	}
+	if len(f.flows) > 10 {
+		sb.WriteString("...\n")
+	}
+	return sb.String()
+}
+
+func (f *flows) Write(p []byte) (n int, err error) {
+	curTime := time.Now()
+
+	_, err = f.streamDump.Write(p)
+	if err != nil {
+		return 0, err
+	}
+
+	data := make([]byte, len(p))
+	n = copy(data, p)
+	if n < len(p) {
+		return n, io.ErrShortWrite
+	}
+
+	f.flows = append(f.flows, flow{
+		timeDelta: time.Since(f.lastTime),
+		data:      data,
+	})
+
+	f.lastTime = curTime
+
+	return n, err
+}