diff --git a/Temp/inproxy-proto2/inproxy/inproxy_test.go b/Temp/inproxy-proto2/inproxy/inproxy_test.go index 334befb1..6da022d3 100644 --- a/Temp/inproxy-proto2/inproxy/inproxy_test.go +++ b/Temp/inproxy-proto2/inproxy/inproxy_test.go @@ -80,12 +80,15 @@ func runTestInProxy() error { //numProxies := 10 //proxyMaxClients := 5 //numClients := 100 - numProxies := 5 - proxyMaxClients := 2 - numClients := 10 + numProxies := 1 + proxyMaxClients := 1 + numClients := 5 + // *TEMP* + //bytesToSend := 1 << 20 + //messageSize := 1 << 10 bytesToSend := 1 << 20 - messageSize := 1 << 10 + messageSize := 1024 targetElapsedSeconds := 2 baseMetrics := common.APIParameters{ @@ -349,6 +352,9 @@ func runTestInProxy() error { makeClientFunc := func(isTCP bool) func() error { + // *TEMP* + isTCP = false + // *DOC* use echo server address as proxy destination; alternate TCP/UDP var network, addr string if isTCP { @@ -429,6 +435,10 @@ func runTestInProxy() error { if bytesToSend-n < m { m = bytesToSend - n } + + // *TEMP* + fmt.Printf(" > SEND: %x\n", prefix(sendBytes[n:n+m])) + _, err := conn.Write(sendBytes[n : n+m]) if err != nil { return errors.Trace(err) @@ -446,7 +456,15 @@ func runTestInProxy() error { if err != nil { return errors.Trace(err) } + + // *TEMP* + fmt.Printf(" > RECV: %x\n", prefix(buf[:m])) + if !bytes.Equal(sendBytes[n:n+m], buf[:m]) { + + // *TEMP* + fmt.Printf(" > RECV: !EQUAL\n") + // *DOC* index logged to diagnose out-of-order or dropped message vs. entirely wrong bytes return errors.Tracef( "unexpected bytes: expected at index %d, received at index %d", @@ -636,6 +654,13 @@ func runTCPEchoServer(listener net.Listener) { } } +func prefix(b []byte) []byte { + if len(b) <= 8 { + return b + } + return b[:8] +} + func runUDPEchoServer(packetConn net.PacketConn) { buf := make([]byte, 65536) for { @@ -643,6 +668,10 @@ func runUDPEchoServer(packetConn net.PacketConn) { if err != nil { return } + + // *TEMP* + fmt.Printf(" > RELAY: %x\n", prefix(buf[:n])) + _, err = packetConn.WriteTo(buf[:n], addr) if err != nil { return diff --git a/Temp/inproxy-proto2/inproxy/proxy.go b/Temp/inproxy-proto2/inproxy/proxy.go index fd6e158f..2531e276 100644 --- a/Temp/inproxy-proto2/inproxy/proxy.go +++ b/Temp/inproxy-proto2/inproxy/proxy.go @@ -27,6 +27,8 @@ import ( "sync/atomic" "time" + "fmt" + "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors" "github.com/pion/webrtc/v3" @@ -403,11 +405,45 @@ func (p *Proxy) proxyOneClient(ctx context.Context) error { waitGroup := new(sync.WaitGroup) relayErrors := make(chan error, 2) + // *TEMP* + myCopy := func(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) { + for { + nr, er := src.Read(buf) + if nr > 0 { + fmt.Printf(" > COPY: %x\n", prefix(buf[:nr])) + nw, ew := dst.Write(buf[0:nr]) + if nw < 0 || nr < nw { + nw = 0 + if ew == nil { + ew = errors.TraceNew("write invalid") + } + } + written += int64(nw) + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + return written, err + } + waitGroup.Add(1) go func() { defer waitGroup.Done() - // *TODO* doc: for packet conn, io.Copy buffer must be packet MTU; it's 32K - _, err = io.Copy(webRTCConn, upstreamConn) + // *TODO* doc: for packet conn, io.Copy buffer must be packet MTU; it's 32K; need 64K? + var buf [65536]byte + _, err = io.CopyBuffer(webRTCConn, upstreamConn, buf[:]) if err != nil { relayErrors <- errors.Trace(err) return @@ -417,7 +453,10 @@ func (p *Proxy) proxyOneClient(ctx context.Context) error { waitGroup.Add(1) go func() { defer waitGroup.Done() - _, err := io.Copy(upstreamConn, webRTCConn) + var buf [65536]byte + // *TEMP* + //_, err := io.CopyBuffer(upstreamConn, webRTCConn, buf[:]) + _, err := myCopy(upstreamConn, webRTCConn, buf[:]) if err != nil { relayErrors <- errors.Trace(err) return