Explorar el Código

XTLS Vision: Refactor code to use DispatchLink() in VLESS inbound (#5076)

* Xtls: code refactor

- Move more logic to VisionReader/Writer
- Remove XtlsWrite()
- XtlsRead now only handle splice at the outbound
- This helps VLESS inbound to have simple buf.copy() so that we can remove pipe next

* Add bufferFlushNext; Use DispatchLink() in VLESS inbound

* Use TimeoutWrapperReader; clean up timer/buffer
yuhan6665 hace 9 meses
padre
commit
4064f8dd80

+ 17 - 3
common/buf/writer.go

@@ -75,9 +75,10 @@ func (w *BufferToBytesWriter) ReadFrom(reader io.Reader) (int64, error) {
 // BufferedWriter is a Writer with internal buffer.
 // BufferedWriter is a Writer with internal buffer.
 type BufferedWriter struct {
 type BufferedWriter struct {
 	sync.Mutex
 	sync.Mutex
-	writer   Writer
-	buffer   *Buffer
-	buffered bool
+	writer    Writer
+	buffer    *Buffer
+	buffered  bool
+	flushNext bool
 }
 }
 
 
 // NewBufferedWriter creates a new BufferedWriter.
 // NewBufferedWriter creates a new BufferedWriter.
@@ -161,6 +162,12 @@ func (w *BufferedWriter) WriteMultiBuffer(b MultiBuffer) error {
 		}
 		}
 	}
 	}
 
 
+	if w.flushNext {
+		w.buffered = false
+		w.flushNext = false
+		return w.flushInternal()
+	}
+
 	return nil
 	return nil
 }
 }
 
 
@@ -201,6 +208,13 @@ func (w *BufferedWriter) SetBuffered(f bool) error {
 	return nil
 	return nil
 }
 }
 
 
+// SetFlushNext will wait the next WriteMultiBuffer to flush and set buffered = false
+func (w *BufferedWriter) SetFlushNext() {
+	w.Lock()
+	defer w.Unlock()
+	w.flushNext = true
+}
+
 // ReadFrom implements io.ReaderFrom.
 // ReadFrom implements io.ReaderFrom.
 func (w *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
 func (w *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
 	if err := w.SetBuffered(false); err != nil {
 	if err := w.SetBuffered(false); err != nil {

+ 125 - 52
proxy/proxy.go

@@ -177,62 +177,108 @@ type VisionReader struct {
 	trafficState *TrafficState
 	trafficState *TrafficState
 	ctx          context.Context
 	ctx          context.Context
 	isUplink     bool
 	isUplink     bool
+	conn         net.Conn
+	input        *bytes.Reader
+	rawInput     *bytes.Buffer
+	ob           *session.Outbound
+
+	// internal
+	directReadCounter stats.Counter
 }
 }
 
 
-func NewVisionReader(reader buf.Reader, state *TrafficState, isUplink bool, context context.Context) *VisionReader {
+func NewVisionReader(reader buf.Reader, trafficState *TrafficState, isUplink bool, ctx context.Context, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, ob *session.Outbound) *VisionReader {
 	return &VisionReader{
 	return &VisionReader{
 		Reader:       reader,
 		Reader:       reader,
-		trafficState: state,
-		ctx:          context,
+		trafficState: trafficState,
+		ctx:          ctx,
 		isUplink:     isUplink,
 		isUplink:     isUplink,
+		conn:         conn,
+		input:        input,
+		rawInput:     rawInput,
+		ob:           ob,
 	}
 	}
 }
 }
 
 
 func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
 func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
 	buffer, err := w.Reader.ReadMultiBuffer()
 	buffer, err := w.Reader.ReadMultiBuffer()
-	if !buffer.IsEmpty() {
-		var withinPaddingBuffers *bool
-		var remainingContent *int32
-		var remainingPadding *int32
-		var currentCommand *int
-		var switchToDirectCopy *bool
-		if w.isUplink {
-			withinPaddingBuffers = &w.trafficState.Inbound.WithinPaddingBuffers
-			remainingContent = &w.trafficState.Inbound.RemainingContent
-			remainingPadding = &w.trafficState.Inbound.RemainingPadding
-			currentCommand = &w.trafficState.Inbound.CurrentCommand
-			switchToDirectCopy = &w.trafficState.Inbound.UplinkReaderDirectCopy
+	if buffer.IsEmpty() {
+		return buffer, err
+	}
+
+	var withinPaddingBuffers *bool
+	var remainingContent *int32
+	var remainingPadding *int32
+	var currentCommand *int
+	var switchToDirectCopy *bool
+	if w.isUplink {
+		withinPaddingBuffers = &w.trafficState.Inbound.WithinPaddingBuffers
+		remainingContent = &w.trafficState.Inbound.RemainingContent
+		remainingPadding = &w.trafficState.Inbound.RemainingPadding
+		currentCommand = &w.trafficState.Inbound.CurrentCommand
+		switchToDirectCopy = &w.trafficState.Inbound.UplinkReaderDirectCopy
+	} else {
+		withinPaddingBuffers = &w.trafficState.Outbound.WithinPaddingBuffers
+		remainingContent = &w.trafficState.Outbound.RemainingContent
+		remainingPadding = &w.trafficState.Outbound.RemainingPadding
+		currentCommand = &w.trafficState.Outbound.CurrentCommand
+		switchToDirectCopy = &w.trafficState.Outbound.DownlinkReaderDirectCopy
+	}
+
+	if *switchToDirectCopy {
+		if w.directReadCounter != nil {
+			w.directReadCounter.Add(int64(buffer.Len()))
+		}
+		return buffer, err
+	}
+
+	if *withinPaddingBuffers || w.trafficState.NumberOfPacketToFilter > 0 {
+		mb2 := make(buf.MultiBuffer, 0, len(buffer))
+		for _, b := range buffer {
+			newbuffer := XtlsUnpadding(b, w.trafficState, w.isUplink, w.ctx)
+			if newbuffer.Len() > 0 {
+				mb2 = append(mb2, newbuffer)
+			}
+		}
+		buffer = mb2
+		if *remainingContent > 0 || *remainingPadding > 0 || *currentCommand == 0 {
+			*withinPaddingBuffers = true
+		} else if *currentCommand == 1 {
+			*withinPaddingBuffers = false
+		} else if *currentCommand == 2 {
+			*withinPaddingBuffers = false
+			*switchToDirectCopy = true
 		} else {
 		} else {
-			withinPaddingBuffers = &w.trafficState.Outbound.WithinPaddingBuffers
-			remainingContent = &w.trafficState.Outbound.RemainingContent
-			remainingPadding = &w.trafficState.Outbound.RemainingPadding
-			currentCommand = &w.trafficState.Outbound.CurrentCommand
-			switchToDirectCopy = &w.trafficState.Outbound.DownlinkReaderDirectCopy
+			errors.LogInfo(w.ctx, "XtlsRead unknown command ", *currentCommand, buffer.Len())
 		}
 		}
+	}
+	if w.trafficState.NumberOfPacketToFilter > 0 {
+		XtlsFilterTls(buffer, w.trafficState, w.ctx)
+	}
 
 
-		if *withinPaddingBuffers || w.trafficState.NumberOfPacketToFilter > 0 {
-			mb2 := make(buf.MultiBuffer, 0, len(buffer))
-			for _, b := range buffer {
-				newbuffer := XtlsUnpadding(b, w.trafficState, w.isUplink, w.ctx)
-				if newbuffer.Len() > 0 {
-					mb2 = append(mb2, newbuffer)
-				}
+	if *switchToDirectCopy {
+		// XTLS Vision processes TLS-like conn's input and rawInput
+		if inputBuffer, err := buf.ReadFrom(w.input); err == nil && !inputBuffer.IsEmpty() {
+			buffer, _ = buf.MergeMulti(buffer, inputBuffer)
+		}
+		if rawInputBuffer, err := buf.ReadFrom(w.rawInput); err == nil && !rawInputBuffer.IsEmpty() {
+			buffer, _ = buf.MergeMulti(buffer, rawInputBuffer)
+		}
+		*w.input = bytes.Reader{} // release memory
+		w.input = nil
+		*w.rawInput = bytes.Buffer{} // release memory
+		w.rawInput = nil
+
+		if inbound := session.InboundFromContext(w.ctx); inbound != nil && inbound.Conn != nil {
+			if w.isUplink && inbound.CanSpliceCopy == 2 {
+				inbound.CanSpliceCopy = 1
 			}
 			}
-			buffer = mb2
-			if *remainingContent > 0 || *remainingPadding > 0 || *currentCommand == 0 {
-				*withinPaddingBuffers = true
-			} else if *currentCommand == 1 {
-				*withinPaddingBuffers = false
-			} else if *currentCommand == 2 {
-				*withinPaddingBuffers = false
-				*switchToDirectCopy = true
-			} else {
-				errors.LogInfo(w.ctx, "XtlsRead unknown command ", *currentCommand, buffer.Len())
+			if !w.isUplink && w.ob != nil && w.ob.CanSpliceCopy == 2 { // ob need to be passed in due to context can have more than one ob
+				w.ob.CanSpliceCopy = 1
 			}
 			}
 		}
 		}
-		if w.trafficState.NumberOfPacketToFilter > 0 {
-			XtlsFilterTls(buffer, w.trafficState, w.ctx)
-		}
+		readerConn, readCounter, _ := UnwrapRawConn(w.conn)
+		w.directReadCounter = readCounter
+		w.Reader = buf.NewReader(readerConn)
 	}
 	}
 	return buffer, err
 	return buffer, err
 }
 }
@@ -241,28 +287,32 @@ func (w *VisionReader) ReadMultiBuffer() (buf.MultiBuffer, error) {
 // Note Vision probably only make sense as the inner most layer of writer, since it need assess traffic state from origin proxy traffic
 // Note Vision probably only make sense as the inner most layer of writer, since it need assess traffic state from origin proxy traffic
 type VisionWriter struct {
 type VisionWriter struct {
 	buf.Writer
 	buf.Writer
-	trafficState      *TrafficState
-	ctx               context.Context
-	writeOnceUserUUID []byte
-	isUplink          bool
+	trafficState *TrafficState
+	ctx          context.Context
+	isUplink     bool
+	conn         net.Conn
+	ob           *session.Outbound
+
+	// internal
+	writeOnceUserUUID  []byte
+	directWriteCounter stats.Counter
 }
 }
 
 
-func NewVisionWriter(writer buf.Writer, state *TrafficState, isUplink bool, context context.Context) *VisionWriter {
-	w := make([]byte, len(state.UserUUID))
-	copy(w, state.UserUUID)
+func NewVisionWriter(writer buf.Writer, trafficState *TrafficState, isUplink bool, ctx context.Context, conn net.Conn, ob *session.Outbound) *VisionWriter {
+	w := make([]byte, len(trafficState.UserUUID))
+	copy(w, trafficState.UserUUID)
 	return &VisionWriter{
 	return &VisionWriter{
 		Writer:            writer,
 		Writer:            writer,
-		trafficState:      state,
-		ctx:               context,
+		trafficState:      trafficState,
+		ctx:               ctx,
 		writeOnceUserUUID: w,
 		writeOnceUserUUID: w,
 		isUplink:          isUplink,
 		isUplink:          isUplink,
+		conn:              conn,
+		ob:                ob,
 	}
 	}
 }
 }
 
 
 func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
 func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
-	if w.trafficState.NumberOfPacketToFilter > 0 {
-		XtlsFilterTls(mb, w.trafficState, w.ctx)
-	}
 	var isPadding *bool
 	var isPadding *bool
 	var switchToDirectCopy *bool
 	var switchToDirectCopy *bool
 	if w.isUplink {
 	if w.isUplink {
@@ -272,6 +322,29 @@ func (w *VisionWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
 		isPadding = &w.trafficState.Inbound.IsPadding
 		isPadding = &w.trafficState.Inbound.IsPadding
 		switchToDirectCopy = &w.trafficState.Inbound.DownlinkWriterDirectCopy
 		switchToDirectCopy = &w.trafficState.Inbound.DownlinkWriterDirectCopy
 	}
 	}
+
+	if *switchToDirectCopy {
+		if inbound := session.InboundFromContext(w.ctx); inbound != nil {
+			if !w.isUplink && inbound.CanSpliceCopy == 2 {
+				inbound.CanSpliceCopy = 1
+			}
+			if w.isUplink && w.ob != nil && w.ob.CanSpliceCopy == 2 {
+				w.ob.CanSpliceCopy = 1
+			}
+		}
+		rawConn, _, writerCounter := UnwrapRawConn(w.conn)
+		w.Writer = buf.NewWriter(rawConn)
+		w.directWriteCounter = writerCounter
+		*switchToDirectCopy = false
+	}
+	if !mb.IsEmpty() &&  w.directWriteCounter != nil {
+		w.directWriteCounter.Add(int64(mb.Len()))
+	}
+
+	if w.trafficState.NumberOfPacketToFilter > 0 {
+		XtlsFilterTls(mb, w.trafficState, w.ctx)
+	}
+
 	if *isPadding {
 	if *isPadding {
 		if len(mb) == 1 && mb[0] == nil {
 		if len(mb) == 1 && mb[0] == nil {
 			mb[0] = XtlsPadding(nil, CommandPaddingContinue, &w.writeOnceUserUUID, true, w.ctx) // we do a long padding to hide vless header
 			mb[0] = XtlsPadding(nil, CommandPaddingContinue, &w.writeOnceUserUUID, true, w.ctx) // we do a long padding to hide vless header

+ 6 - 5
proxy/vless/encoding/addons.go

@@ -3,10 +3,12 @@ package encoding
 import (
 import (
 	"context"
 	"context"
 	"io"
 	"io"
+	"net"
 
 
 	"github.com/xtls/xray-core/common/buf"
 	"github.com/xtls/xray-core/common/buf"
 	"github.com/xtls/xray-core/common/errors"
 	"github.com/xtls/xray-core/common/errors"
 	"github.com/xtls/xray-core/common/protocol"
 	"github.com/xtls/xray-core/common/protocol"
+	"github.com/xtls/xray-core/common/session"
 	"github.com/xtls/xray-core/proxy"
 	"github.com/xtls/xray-core/proxy"
 	"github.com/xtls/xray-core/proxy/vless"
 	"github.com/xtls/xray-core/proxy/vless"
 	"google.golang.org/protobuf/proto"
 	"google.golang.org/protobuf/proto"
@@ -61,15 +63,14 @@ func DecodeHeaderAddons(buffer *buf.Buffer, reader io.Reader) (*Addons, error) {
 }
 }
 
 
 // EncodeBodyAddons returns a Writer that auto-encrypt content written by caller.
 // EncodeBodyAddons returns a Writer that auto-encrypt content written by caller.
-func EncodeBodyAddons(writer io.Writer, request *protocol.RequestHeader, requestAddons *Addons, state *proxy.TrafficState, isUplink bool, context context.Context) buf.Writer {
+func EncodeBodyAddons(writer buf.Writer, request *protocol.RequestHeader, requestAddons *Addons, state *proxy.TrafficState, isUplink bool, context context.Context, conn net.Conn, ob *session.Outbound) buf.Writer {
 	if request.Command == protocol.RequestCommandUDP {
 	if request.Command == protocol.RequestCommandUDP {
-		return NewMultiLengthPacketWriter(writer.(buf.Writer))
+		return NewMultiLengthPacketWriter(writer)
 	}
 	}
-	w := buf.NewWriter(writer)
 	if requestAddons.Flow == vless.XRV {
 	if requestAddons.Flow == vless.XRV {
-		w = proxy.NewVisionWriter(w, state, isUplink, context)
+		return proxy.NewVisionWriter(writer, state, isUplink, context, conn, ob)
 	}
 	}
-	return w
+	return writer
 }
 }
 
 
 // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body.
 // DecodeBodyAddons returns a Reader from which caller can fetch decrypted body.

+ 2 - 67
proxy/vless/encoding/encoding.go

@@ -1,7 +1,6 @@
 package encoding
 package encoding
 
 
 import (
 import (
-	"bytes"
 	"context"
 	"context"
 	"io"
 	"io"
 
 
@@ -11,7 +10,6 @@ import (
 	"github.com/xtls/xray-core/common/protocol"
 	"github.com/xtls/xray-core/common/protocol"
 	"github.com/xtls/xray-core/common/session"
 	"github.com/xtls/xray-core/common/session"
 	"github.com/xtls/xray-core/common/signal"
 	"github.com/xtls/xray-core/common/signal"
-	"github.com/xtls/xray-core/features/stats"
 	"github.com/xtls/xray-core/proxy"
 	"github.com/xtls/xray-core/proxy"
 	"github.com/xtls/xray-core/proxy/vless"
 	"github.com/xtls/xray-core/proxy/vless"
 )
 )
@@ -171,8 +169,8 @@ func DecodeResponseHeader(reader io.Reader, request *protocol.RequestHeader) (*A
 	return responseAddons, nil
 	return responseAddons, nil
 }
 }
 
 
-// XtlsRead filter and read xtls protocol
-func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, conn net.Conn, input *bytes.Reader, rawInput *bytes.Buffer, trafficState *proxy.TrafficState, ob *session.Outbound, isUplink bool, ctx context.Context) error {
+// XtlsRead can switch to splice copy
+func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer, conn net.Conn, trafficState *proxy.TrafficState, isUplink bool, ctx context.Context) error {
 	err := func() error {
 	err := func() error {
 		for {
 		for {
 			if isUplink && trafficState.Inbound.UplinkReaderDirectCopy || !isUplink && trafficState.Outbound.DownlinkReaderDirectCopy {
 			if isUplink && trafficState.Inbound.UplinkReaderDirectCopy || !isUplink && trafficState.Outbound.DownlinkReaderDirectCopy {
@@ -181,74 +179,11 @@ func XtlsRead(reader buf.Reader, writer buf.Writer, timer *signal.ActivityTimer,
 				if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Conn != nil {
 				if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Conn != nil {
 					writerConn = inbound.Conn
 					writerConn = inbound.Conn
 					inTimer = inbound.Timer
 					inTimer = inbound.Timer
-					if isUplink && inbound.CanSpliceCopy == 2 {
-						inbound.CanSpliceCopy = 1
-					}
-					if !isUplink && ob != nil && ob.CanSpliceCopy == 2 { // ob need to be passed in due to context can change
-						ob.CanSpliceCopy = 1
-					}
 				}
 				}
 				return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer, inTimer)
 				return proxy.CopyRawConnIfExist(ctx, conn, writerConn, writer, timer, inTimer)
 			}
 			}
 			buffer, err := reader.ReadMultiBuffer()
 			buffer, err := reader.ReadMultiBuffer()
 			if !buffer.IsEmpty() {
 			if !buffer.IsEmpty() {
-				timer.Update()
-				if isUplink && trafficState.Inbound.UplinkReaderDirectCopy || !isUplink && trafficState.Outbound.DownlinkReaderDirectCopy {
-					// XTLS Vision processes TLS-like conn's input and rawInput
-					if inputBuffer, err := buf.ReadFrom(input); err == nil && !inputBuffer.IsEmpty() {
-						buffer, _ = buf.MergeMulti(buffer, inputBuffer)
-					}
-					if rawInputBuffer, err := buf.ReadFrom(rawInput); err == nil && !rawInputBuffer.IsEmpty() {
-						buffer, _ = buf.MergeMulti(buffer, rawInputBuffer)
-					}
-					*input = bytes.Reader{} // release memory
-					input = nil
-					*rawInput = bytes.Buffer{} // release memory
-					rawInput = nil
-				}
-				if werr := writer.WriteMultiBuffer(buffer); werr != nil {
-					return werr
-				}
-			}
-			if err != nil {
-				return err
-			}
-		}
-	}()
-	if err != nil && errors.Cause(err) != io.EOF {
-		return err
-	}
-	return nil
-}
-
-// XtlsWrite filter and write xtls protocol
-func XtlsWrite(reader buf.Reader, writer buf.Writer, timer signal.ActivityUpdater, conn net.Conn, trafficState *proxy.TrafficState, ob *session.Outbound, isUplink bool, ctx context.Context) error {
-	err := func() error {
-		var ct stats.Counter
-		for {
-			buffer, err := reader.ReadMultiBuffer()
-			if isUplink && trafficState.Outbound.UplinkWriterDirectCopy || !isUplink && trafficState.Inbound.DownlinkWriterDirectCopy {
-				if inbound := session.InboundFromContext(ctx); inbound != nil {
-					if !isUplink && inbound.CanSpliceCopy == 2 {
-						inbound.CanSpliceCopy = 1
-					}
-					if isUplink && ob != nil && ob.CanSpliceCopy == 2 {
-						ob.CanSpliceCopy = 1
-					}
-				}
-				rawConn, _, writerCounter := proxy.UnwrapRawConn(conn)
-				writer = buf.NewWriter(rawConn)
-				ct = writerCounter
-				if isUplink {
-					trafficState.Outbound.UplinkWriterDirectCopy = false
-				} else {
-					trafficState.Inbound.DownlinkWriterDirectCopy = false
-				}
-			}
-			if !buffer.IsEmpty() {
-				if ct != nil {
-					ct.Add(int64(buffer.Len()))
-				}
 				timer.Update()
 				timer.Update()
 				if werr := writer.WriteMultiBuffer(buffer); werr != nil {
 				if werr := writer.WriteMultiBuffer(buffer); werr != nil {
 					return werr
 					return werr

+ 15 - 79
proxy/vless/inbound/inbound.go

@@ -31,6 +31,7 @@ import (
 	"github.com/xtls/xray-core/proxy/vless"
 	"github.com/xtls/xray-core/proxy/vless"
 	"github.com/xtls/xray-core/proxy/vless/encoding"
 	"github.com/xtls/xray-core/proxy/vless/encoding"
 	"github.com/xtls/xray-core/proxy/vless/encryption"
 	"github.com/xtls/xray-core/proxy/vless/encryption"
+	"github.com/xtls/xray-core/transport"
 	"github.com/xtls/xray-core/transport/internet/reality"
 	"github.com/xtls/xray-core/transport/internet/reality"
 	"github.com/xtls/xray-core/transport/internet/stat"
 	"github.com/xtls/xray-core/transport/internet/stat"
 	"github.com/xtls/xray-core/transport/internet/tls"
 	"github.com/xtls/xray-core/transport/internet/tls"
@@ -551,89 +552,24 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
 		ctx = session.ContextWithAllowedNetwork(ctx, net.Network_UDP)
 		ctx = session.ContextWithAllowedNetwork(ctx, net.Network_UDP)
 	}
 	}
 
 
-	sessionPolicy = h.policyManager.ForLevel(request.User.Level)
-	ctx, cancel := context.WithCancel(ctx)
-	timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
-	inbound.Timer = timer
-	ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
-
-	link, err := dispatcher.Dispatch(ctx, request.Destination())
-	if err != nil {
-		return errors.New("failed to dispatch request to ", request.Destination()).Base(err).AtWarning()
-	}
-
-	serverReader := link.Reader // .(*pipe.Reader)
-	serverWriter := link.Writer // .(*pipe.Writer)
 	trafficState := proxy.NewTrafficState(userSentID)
 	trafficState := proxy.NewTrafficState(userSentID)
-	postRequest := func() error {
-		defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
-
-		// default: clientReader := reader
-		clientReader := encoding.DecodeBodyAddons(reader, request, requestAddons)
-
-		var err error
-
-		if requestAddons.Flow == vless.XRV {
-			ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice
-			clientReader = proxy.NewVisionReader(clientReader, trafficState, true, ctx1)
-			err = encoding.XtlsRead(clientReader, serverWriter, timer, connection, input, rawInput, trafficState, nil, true, ctx1)
-		} else {
-			// from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBuffer
-			err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer))
-		}
-
-		if err != nil {
-			return errors.New("failed to transfer request payload").Base(err).AtInfo()
-		}
-
-		return nil
+	clientReader := encoding.DecodeBodyAddons(reader, request, requestAddons)
+	if requestAddons.Flow == vless.XRV {
+		clientReader = proxy.NewVisionReader(clientReader, trafficState, true, ctx, connection, input, rawInput, nil)
 	}
 	}
 
 
-	getResponse := func() error {
-		defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
-
-		bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection))
-		if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil {
-			return errors.New("failed to encode response header").Base(err).AtWarning()
-		}
-
-		// default: clientWriter := bufferWriter
-		clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, false, ctx)
-		multiBuffer, err1 := serverReader.ReadMultiBuffer()
-		if err1 != nil {
-			return err1 // ...
-		}
-		if err := clientWriter.WriteMultiBuffer(multiBuffer); err != nil {
-			return err // ...
-		}
-		// Flush; bufferWriter.WriteMultiBuffer now is bufferWriter.writer.WriteMultiBuffer
-		if err := bufferWriter.SetBuffered(false); err != nil {
-			return errors.New("failed to write A response payload").Base(err).AtWarning()
-		}
-
-		var err error
-		if requestAddons.Flow == vless.XRV {
-			err = encoding.XtlsWrite(serverReader, clientWriter, timer, connection, trafficState, nil, false, ctx)
-		} else {
-			// from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer
-			err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer))
-		}
-		if err != nil {
-			return errors.New("failed to transfer response payload").Base(err).AtInfo()
-		}
-		// Indicates the end of response payload.
-		switch responseAddons.Flow {
-		default:
-		}
-
-		return nil
+	bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection))
+	if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil {
+		return errors.New("failed to encode response header").Base(err).AtWarning()
 	}
 	}
-
-	if err := task.Run(ctx, task.OnSuccess(postRequest, task.Close(serverWriter)), getResponse); err != nil {
-		common.Interrupt(serverReader)
-		common.Interrupt(serverWriter)
-		return errors.New("connection ends").Base(err).AtInfo()
+	clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, false, ctx, connection, nil)
+	bufferWriter.SetFlushNext()
+
+	if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{
+		Reader: &buf.TimeoutWrapperReader{Reader: clientReader},
+		Writer: clientWriter},
+	); err != nil {
+		return errors.New("failed to dispatch request").Base(err)
 	}
 	}
-
 	return nil
 	return nil
 }
 }

+ 4 - 9
proxy/vless/outbound/outbound.go

@@ -225,7 +225,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 		}
 		}
 
 
 		// default: serverWriter := bufferWriter
 		// default: serverWriter := bufferWriter
-		serverWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, true, ctx)
+		serverWriter := encoding.EncodeBodyAddons(bufferWriter, request, requestAddons, trafficState, true, ctx, conn, ob)
 		if request.Command == protocol.RequestCommandMux && request.Port == 666 {
 		if request.Command == protocol.RequestCommandMux && request.Port == 666 {
 			serverWriter = xudp.NewPacketWriter(serverWriter, target, xudp.GetGlobalID(ctx))
 			serverWriter = xudp.NewPacketWriter(serverWriter, target, xudp.GetGlobalID(ctx))
 		}
 		}
@@ -253,7 +253,6 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 			return errors.New("failed to write A request payload").Base(err).AtWarning()
 			return errors.New("failed to write A request payload").Base(err).AtWarning()
 		}
 		}
 
 
-		var err error
 		if requestAddons.Flow == vless.XRV {
 		if requestAddons.Flow == vless.XRV {
 			if tlsConn, ok := iConn.(*tls.Conn); ok {
 			if tlsConn, ok := iConn.(*tls.Conn); ok {
 				if tlsConn.ConnectionState().Version != gotls.VersionTLS13 {
 				if tlsConn.ConnectionState().Version != gotls.VersionTLS13 {
@@ -264,12 +263,8 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 					return errors.New(`failed to use `+requestAddons.Flow+`, found outer tls version `, utlsConn.ConnectionState().Version).AtWarning()
 					return errors.New(`failed to use `+requestAddons.Flow+`, found outer tls version `, utlsConn.ConnectionState().Version).AtWarning()
 				}
 				}
 			}
 			}
-			ctx1 := session.ContextWithInbound(ctx, nil) // TODO enable splice
-			err = encoding.XtlsWrite(clientReader, serverWriter, timer, conn, trafficState, ob, true, ctx1)
-		} else {
-			// from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBuffer
-			err = buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer))
 		}
 		}
+		err := buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer))
 		if err != nil {
 		if err != nil {
 			return errors.New("failed to transfer request payload").Base(err).AtInfo()
 			return errors.New("failed to transfer request payload").Base(err).AtInfo()
 		}
 		}
@@ -292,7 +287,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 		// default: serverReader := buf.NewReader(conn)
 		// default: serverReader := buf.NewReader(conn)
 		serverReader := encoding.DecodeBodyAddons(conn, request, responseAddons)
 		serverReader := encoding.DecodeBodyAddons(conn, request, responseAddons)
 		if requestAddons.Flow == vless.XRV {
 		if requestAddons.Flow == vless.XRV {
-			serverReader = proxy.NewVisionReader(serverReader, trafficState, false, ctx)
+			serverReader = proxy.NewVisionReader(serverReader, trafficState, false, ctx, conn, input, rawInput, ob)
 		}
 		}
 		if request.Command == protocol.RequestCommandMux && request.Port == 666 {
 		if request.Command == protocol.RequestCommandMux && request.Port == 666 {
 			if requestAddons.Flow == vless.XRV {
 			if requestAddons.Flow == vless.XRV {
@@ -303,7 +298,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
 		}
 		}
 
 
 		if requestAddons.Flow == vless.XRV {
 		if requestAddons.Flow == vless.XRV {
-			err = encoding.XtlsRead(serverReader, clientWriter, timer, conn, input, rawInput, trafficState, ob, false, ctx)
+			err = encoding.XtlsRead(serverReader, clientWriter, timer, conn, trafficState, false, ctx)
 		} else {
 		} else {
 			// from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer
 			// from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBuffer
 			err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer))
 			err = buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer))