| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- diff --git a/Temp/inproxy-proto2/inproxy/inproxy_test.go b/Temp/inproxy-proto2/inproxy/inproxy_test.go
- index 334befb1..6da022d3 100644
- --- a/Temp/inproxy-proto2/inproxy/inproxy_test.go
- +++ b/Temp/inproxy-proto2/inproxy/inproxy_test.go
- @@ -80,12 +80,15 @@ func runTestInProxy() error {
- //numProxies := 10
- //proxyMaxClients := 5
- //numClients := 100
- - numProxies := 5
- - proxyMaxClients := 2
- - numClients := 10
- + numProxies := 1
- + proxyMaxClients := 1
- + numClients := 5
-
- + // *TEMP*
- + //bytesToSend := 1 << 20
- + //messageSize := 1 << 10
- bytesToSend := 1 << 20
- - messageSize := 1 << 10
- + messageSize := 1024
- targetElapsedSeconds := 2
-
- baseMetrics := common.APIParameters{
- @@ -349,6 +352,9 @@ func runTestInProxy() error {
-
- makeClientFunc := func(isTCP bool) func() error {
-
- + // *TEMP*
- + isTCP = false
- +
- // *DOC* use echo server address as proxy destination; alternate TCP/UDP
- var network, addr string
- if isTCP {
- @@ -429,6 +435,10 @@ func runTestInProxy() error {
- if bytesToSend-n < m {
- m = bytesToSend - n
- }
- +
- + // *TEMP*
- + fmt.Printf(" > SEND: %x\n", prefix(sendBytes[n:n+m]))
- +
- _, err := conn.Write(sendBytes[n : n+m])
- if err != nil {
- return errors.Trace(err)
- @@ -446,7 +456,15 @@ func runTestInProxy() error {
- if err != nil {
- return errors.Trace(err)
- }
- +
- + // *TEMP*
- + fmt.Printf(" > RECV: %x\n", prefix(buf[:m]))
- +
- if !bytes.Equal(sendBytes[n:n+m], buf[:m]) {
- +
- + // *TEMP*
- + fmt.Printf(" > RECV: !EQUAL\n")
- +
- // *DOC* index logged to diagnose out-of-order or dropped message vs. entirely wrong bytes
- return errors.Tracef(
- "unexpected bytes: expected at index %d, received at index %d",
- @@ -636,6 +654,13 @@ func runTCPEchoServer(listener net.Listener) {
- }
- }
-
- +func prefix(b []byte) []byte {
- + if len(b) <= 8 {
- + return b
- + }
- + return b[:8]
- +}
- +
- func runUDPEchoServer(packetConn net.PacketConn) {
- buf := make([]byte, 65536)
- for {
- @@ -643,6 +668,10 @@ func runUDPEchoServer(packetConn net.PacketConn) {
- if err != nil {
- return
- }
- +
- + // *TEMP*
- + fmt.Printf(" > RELAY: %x\n", prefix(buf[:n]))
- +
- _, err = packetConn.WriteTo(buf[:n], addr)
- if err != nil {
- return
- diff --git a/Temp/inproxy-proto2/inproxy/proxy.go b/Temp/inproxy-proto2/inproxy/proxy.go
- index fd6e158f..2531e276 100644
- --- a/Temp/inproxy-proto2/inproxy/proxy.go
- +++ b/Temp/inproxy-proto2/inproxy/proxy.go
- @@ -27,6 +27,8 @@ import (
- "sync/atomic"
- "time"
-
- + "fmt"
- +
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- "github.com/pion/webrtc/v3"
- @@ -403,11 +405,45 @@ func (p *Proxy) proxyOneClient(ctx context.Context) error {
- waitGroup := new(sync.WaitGroup)
- relayErrors := make(chan error, 2)
-
- + // *TEMP*
- + myCopy := func(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
- + for {
- + nr, er := src.Read(buf)
- + if nr > 0 {
- + fmt.Printf(" > COPY: %x\n", prefix(buf[:nr]))
- + nw, ew := dst.Write(buf[0:nr])
- + if nw < 0 || nr < nw {
- + nw = 0
- + if ew == nil {
- + ew = errors.TraceNew("write invalid")
- + }
- + }
- + written += int64(nw)
- + if ew != nil {
- + err = ew
- + break
- + }
- + if nr != nw {
- + err = io.ErrShortWrite
- + break
- + }
- + }
- + if er != nil {
- + if er != io.EOF {
- + err = er
- + }
- + break
- + }
- + }
- + return written, err
- + }
- +
- waitGroup.Add(1)
- go func() {
- defer waitGroup.Done()
- - // *TODO* doc: for packet conn, io.Copy buffer must be packet MTU; it's 32K
- - _, err = io.Copy(webRTCConn, upstreamConn)
- + // *TODO* doc: for packet conn, io.Copy buffer must be packet MTU; it's 32K; need 64K?
- + var buf [65536]byte
- + _, err = io.CopyBuffer(webRTCConn, upstreamConn, buf[:])
- if err != nil {
- relayErrors <- errors.Trace(err)
- return
- @@ -417,7 +453,10 @@ func (p *Proxy) proxyOneClient(ctx context.Context) error {
- waitGroup.Add(1)
- go func() {
- defer waitGroup.Done()
- - _, err := io.Copy(upstreamConn, webRTCConn)
- + var buf [65536]byte
- + // *TEMP*
- + //_, err := io.CopyBuffer(upstreamConn, webRTCConn, buf[:])
- + _, err := myCopy(upstreamConn, webRTCConn, buf[:])
- if err != nil {
- relayErrors <- errors.Trace(err)
- return
|