| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617 |
- package splithttp
- import (
- "context"
- gotls "crypto/tls"
- "fmt"
- "io"
- "math/rand"
- "net/http"
- "net/http/httptrace"
- "net/url"
- reflect "reflect"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
- "github.com/apernet/quic-go"
- "github.com/apernet/quic-go/http3"
- "github.com/xtls/xray-core/common"
- "github.com/xtls/xray-core/common/buf"
- "github.com/xtls/xray-core/common/errors"
- "github.com/xtls/xray-core/common/net"
- "github.com/xtls/xray-core/common/signal/done"
- "github.com/xtls/xray-core/common/uuid"
- "github.com/xtls/xray-core/transport/internet"
- "github.com/xtls/xray-core/transport/internet/browser_dialer"
- "github.com/xtls/xray-core/transport/internet/hysteria/congestion"
- "github.com/xtls/xray-core/transport/internet/hysteria/udphop"
- "github.com/xtls/xray-core/transport/internet/reality"
- "github.com/xtls/xray-core/transport/internet/stat"
- "github.com/xtls/xray-core/transport/internet/tls"
- "github.com/xtls/xray-core/transport/pipe"
- "golang.org/x/net/http2"
- )
- type dialerConf struct {
- net.Destination
- *internet.MemoryStreamConfig
- }
- var (
- globalDialerMap map[dialerConf]*XmuxManager
- globalDialerAccess sync.Mutex
- )
- func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (DialerClient, *XmuxClient) {
- realityConfig := reality.ConfigFromStreamSettings(streamSettings)
- if browser_dialer.HasBrowserDialer() && realityConfig == nil {
- return &BrowserDialerClient{transportConfig: streamSettings.ProtocolSettings.(*Config)}, nil
- }
- globalDialerAccess.Lock()
- defer globalDialerAccess.Unlock()
- if globalDialerMap == nil {
- globalDialerMap = make(map[dialerConf]*XmuxManager)
- }
- key := dialerConf{dest, streamSettings}
- xmuxManager, found := globalDialerMap[key]
- if !found {
- transportConfig := streamSettings.ProtocolSettings.(*Config)
- var xmuxConfig XmuxConfig
- if transportConfig.Xmux != nil {
- xmuxConfig = *transportConfig.Xmux
- }
- xmuxManager = NewXmuxManager(xmuxConfig, func() XmuxConn {
- return createHTTPClient(dest, streamSettings)
- })
- globalDialerMap[key] = xmuxManager
- }
- xmuxClient := xmuxManager.GetXmuxClient(ctx)
- return xmuxClient.XmuxConn.(DialerClient), xmuxClient
- }
- func decideHTTPVersion(tlsConfig *tls.Config, realityConfig *reality.Config) string {
- if realityConfig != nil {
- return "2"
- }
- if tlsConfig == nil {
- return "1.1"
- }
- if len(tlsConfig.NextProtocol) != 1 {
- return "2"
- }
- if tlsConfig.NextProtocol[0] == "http/1.1" {
- return "1.1"
- }
- if tlsConfig.NextProtocol[0] == "h3" {
- return "3"
- }
- return "2"
- }
- func createHTTPClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient {
- tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
- realityConfig := reality.ConfigFromStreamSettings(streamSettings)
- httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
- if httpVersion == "3" {
- dest.Network = net.Network_UDP // better to keep this line
- }
- var gotlsConfig *gotls.Config
- if tlsConfig != nil {
- gotlsConfig = tlsConfig.GetTLSConfig(tls.WithDestination(dest))
- }
- transportConfig := streamSettings.ProtocolSettings.(*Config)
- dialContext := func(ctxInner context.Context) (net.Conn, error) {
- conn, err := internet.DialSystem(ctxInner, dest, streamSettings.SocketSettings)
- if err != nil {
- return nil, err
- }
- if streamSettings.TcpmaskManager != nil {
- newConn, err := streamSettings.TcpmaskManager.WrapConnClient(conn)
- if err != nil {
- conn.Close()
- return nil, errors.New("mask err").Base(err)
- }
- conn = newConn
- }
- if realityConfig != nil {
- return reality.UClient(conn, realityConfig, ctxInner, dest)
- }
- if gotlsConfig != nil {
- if fingerprint := tls.GetFingerprint(tlsConfig.Fingerprint); fingerprint != nil {
- conn = tls.UClient(conn, gotlsConfig, fingerprint)
- if err := conn.(*tls.UConn).HandshakeContext(ctxInner); err != nil {
- return nil, err
- }
- } else {
- conn = tls.Client(conn, gotlsConfig)
- }
- }
- return conn, nil
- }
- var keepAlivePeriod time.Duration
- if streamSettings.ProtocolSettings.(*Config).Xmux != nil {
- keepAlivePeriod = time.Duration(streamSettings.ProtocolSettings.(*Config).Xmux.HKeepAlivePeriod) * time.Second
- }
- var transport http.RoundTripper
- if httpVersion == "3" {
- quicParams := streamSettings.QuicParams
- if quicParams == nil {
- quicParams = &internet.QuicParams{}
- }
- if quicParams.UdpHop == nil {
- quicParams.UdpHop = &internet.UdpHop{}
- }
- quicConfig := &quic.Config{
- InitialStreamReceiveWindow: quicParams.InitStreamReceiveWindow,
- MaxStreamReceiveWindow: quicParams.MaxStreamReceiveWindow,
- InitialConnectionReceiveWindow: quicParams.InitConnReceiveWindow,
- MaxConnectionReceiveWindow: quicParams.MaxConnReceiveWindow,
- MaxIdleTimeout: time.Duration(quicParams.MaxIdleTimeout) * time.Second,
- KeepAlivePeriod: time.Duration(quicParams.KeepAlivePeriod) * time.Second,
- MaxIncomingStreams: quicParams.MaxIncomingStreams,
- DisablePathMTUDiscovery: quicParams.DisablePathMtuDiscovery,
- }
- if quicParams.MaxIdleTimeout == 0 {
- quicConfig.MaxIdleTimeout = net.ConnIdleTimeout
- }
- if quicParams.KeepAlivePeriod == 0 {
- if keepAlivePeriod == 0 {
- quicConfig.KeepAlivePeriod = net.QuicgoH3KeepAlivePeriod
- }
- }
- if quicParams.MaxIncomingStreams == 0 {
- // these two are defaults of quic-go/http3. the default of quic-go (no
- // http3) is different, so it is hardcoded here for clarity.
- // https://github.com/quic-go/quic-go/blob/b8ea5c798155950fb5bbfdd06cad1939c9355878/http3/client.go#L36-L39
- quicConfig.MaxIncomingStreams = -1
- }
- transport = &http3.Transport{
- QUICConfig: quicConfig,
- TLSClientConfig: gotlsConfig,
- Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (*quic.Conn, error) {
- udphopDialer := func(addr *net.UDPAddr) (net.PacketConn, error) {
- conn, err := internet.DialSystem(ctx, net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)), streamSettings.SocketSettings)
- if err != nil {
- errors.LogDebug(context.Background(), "skip hop: failed to dial to dest")
- conn.Close()
- return nil, errors.New()
- }
- var udpConn net.PacketConn
- switch c := conn.(type) {
- case *internet.PacketConnWrapper:
- udpConn = c.PacketConn
- case *net.UDPConn:
- udpConn = c
- default:
- errors.LogDebug(context.Background(), "skip hop: udphop requires being at the outermost level ", reflect.TypeOf(c))
- conn.Close()
- return nil, errors.New()
- }
- return udpConn, nil
- }
- var index int
- if len(quicParams.UdpHop.Ports) > 0 {
- index = rand.Intn(len(quicParams.UdpHop.Ports))
- dest.Port = net.Port(quicParams.UdpHop.Ports[index])
- }
- conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
- if err != nil {
- return nil, err
- }
- var udpConn net.PacketConn
- var udpAddr *net.UDPAddr
- switch c := conn.(type) {
- case *internet.PacketConnWrapper:
- udpConn = c.PacketConn
- udpAddr, err = net.ResolveUDPAddr("udp", c.Dest.String())
- if err != nil {
- conn.Close()
- return nil, err
- }
- case *net.UDPConn:
- udpConn = c
- udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
- if err != nil {
- conn.Close()
- return nil, err
- }
- default:
- udpConn = &internet.FakePacketConn{Conn: c}
- udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
- if err != nil {
- conn.Close()
- return nil, err
- }
- if len(quicParams.UdpHop.Ports) > 0 {
- conn.Close()
- return nil, errors.New("udphop requires being at the outermost level ", reflect.TypeOf(c))
- }
- }
- if len(quicParams.UdpHop.Ports) > 0 {
- addr := &udphop.UDPHopAddr{
- IP: udpAddr.IP,
- Ports: quicParams.UdpHop.Ports,
- }
- udpConn, err = udphop.NewUDPHopPacketConn(addr, index, quicParams.UdpHop.IntervalMin, quicParams.UdpHop.IntervalMax, udphopDialer, udpConn)
- if err != nil {
- conn.Close()
- return nil, errors.New("udphop err").Base(err)
- }
- }
- if streamSettings.UdpmaskManager != nil {
- udpConn, err = streamSettings.UdpmaskManager.WrapPacketConnClient(udpConn)
- if err != nil {
- conn.Close()
- return nil, errors.New("mask err").Base(err)
- }
- }
- quicConn, err := quic.DialEarly(ctx, udpConn, udpAddr, tlsCfg, cfg)
- if err != nil {
- return nil, err
- }
- switch quicParams.Congestion {
- case "force-brutal":
- errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion brutal bytes per second ", quicParams.BrutalUp)
- congestion.UseBrutal(quicConn, quicParams.BrutalUp)
- case "reno":
- errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion reno")
- default:
- errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion bbr")
- congestion.UseBBR(quicConn)
- }
- return quicConn, nil
- },
- }
- } else if httpVersion == "2" {
- if keepAlivePeriod == 0 {
- keepAlivePeriod = net.ChromeH2KeepAlivePeriod
- }
- if keepAlivePeriod < 0 {
- keepAlivePeriod = 0
- }
- transport = &http2.Transport{
- DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) {
- return dialContext(ctxInner)
- },
- IdleConnTimeout: net.ConnIdleTimeout,
- ReadIdleTimeout: keepAlivePeriod,
- }
- } else {
- httpDialContext := func(ctxInner context.Context, network string, addr string) (net.Conn, error) {
- return dialContext(ctxInner)
- }
- transport = &http.Transport{
- DialTLSContext: httpDialContext,
- DialContext: httpDialContext,
- IdleConnTimeout: net.ConnIdleTimeout,
- // chunked transfer download with KeepAlives is buggy with
- // http.Client and our custom dial context.
- DisableKeepAlives: true,
- }
- }
- client := &DefaultDialerClient{
- transportConfig: transportConfig,
- client: &http.Client{
- Transport: transport,
- },
- httpVersion: httpVersion,
- uploadRawPool: &sync.Pool{},
- dialUploadConn: dialContext,
- }
- return client
- }
- func init() {
- common.Must(internet.RegisterTransportDialer(protocolName, Dial))
- }
- func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
- tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
- realityConfig := reality.ConfigFromStreamSettings(streamSettings)
- httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
- if httpVersion == "3" {
- dest.Network = net.Network_UDP
- }
- transportConfiguration := streamSettings.ProtocolSettings.(*Config)
- var requestURL url.URL
- if tlsConfig != nil || realityConfig != nil {
- requestURL.Scheme = "https"
- } else {
- requestURL.Scheme = "http"
- }
- requestURL.Host = transportConfiguration.Host
- if requestURL.Host == "" && tlsConfig != nil {
- requestURL.Host = tlsConfig.ServerName
- }
- if requestURL.Host == "" && realityConfig != nil {
- requestURL.Host = realityConfig.ServerName
- }
- if requestURL.Host == "" {
- requestURL.Host = dest.Address.String()
- }
- requestURL.Path = transportConfiguration.GetNormalizedPath()
- requestURL.RawQuery = transportConfiguration.GetNormalizedQuery()
- httpClient, xmuxClient := getHTTPClient(ctx, dest, streamSettings)
- mode := transportConfiguration.Mode
- if mode == "" || mode == "auto" {
- mode = "packet-up"
- if realityConfig != nil {
- mode = "stream-one"
- if transportConfiguration.DownloadSettings != nil {
- mode = "stream-up"
- }
- }
- }
- sessionId := ""
- if mode != "stream-one" {
- sessionIdUuid := uuid.New()
- sessionId = sessionIdUuid.String()
- }
- errors.LogInfo(ctx, fmt.Sprintf("XHTTP is dialing to %s, mode %s, HTTP version %s, host %s", dest, mode, httpVersion, requestURL.Host))
- requestURL2 := requestURL
- httpClient2 := httpClient
- xmuxClient2 := xmuxClient
- if transportConfiguration.DownloadSettings != nil {
- globalDialerAccess.Lock()
- if streamSettings.DownloadSettings == nil {
- streamSettings.DownloadSettings = common.Must2(internet.ToMemoryStreamConfig(transportConfiguration.DownloadSettings))
- if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.Penetrate {
- streamSettings.DownloadSettings.SocketSettings = streamSettings.SocketSettings
- }
- }
- globalDialerAccess.Unlock()
- memory2 := streamSettings.DownloadSettings
- dest2 := *memory2.Destination // just panic
- tlsConfig2 := tls.ConfigFromStreamSettings(memory2)
- realityConfig2 := reality.ConfigFromStreamSettings(memory2)
- httpVersion2 := decideHTTPVersion(tlsConfig2, realityConfig2)
- if httpVersion2 == "3" {
- dest2.Network = net.Network_UDP
- }
- if tlsConfig2 != nil || realityConfig2 != nil {
- requestURL2.Scheme = "https"
- } else {
- requestURL2.Scheme = "http"
- }
- config2 := memory2.ProtocolSettings.(*Config)
- requestURL2.Host = config2.Host
- if requestURL2.Host == "" && tlsConfig2 != nil {
- requestURL2.Host = tlsConfig2.ServerName
- }
- if requestURL2.Host == "" && realityConfig2 != nil {
- requestURL2.Host = realityConfig2.ServerName
- }
- if requestURL2.Host == "" {
- requestURL2.Host = dest2.Address.String()
- }
- requestURL2.Path = config2.GetNormalizedPath()
- requestURL2.RawQuery = config2.GetNormalizedQuery()
- httpClient2, xmuxClient2 = getHTTPClient(ctx, dest2, memory2)
- errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host))
- }
- if xmuxClient != nil {
- xmuxClient.OpenUsage.Add(1)
- }
- if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
- xmuxClient2.OpenUsage.Add(1)
- }
- var closed atomic.Int32
- reader, writer := io.Pipe()
- conn := splitConn{
- writer: writer,
- onClose: func() {
- if closed.Add(1) > 1 {
- return
- }
- if xmuxClient != nil {
- xmuxClient.OpenUsage.Add(-1)
- }
- if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
- xmuxClient2.OpenUsage.Add(-1)
- }
- },
- }
- var err error
- if mode == "stream-one" {
- requestURL.Path = transportConfiguration.GetNormalizedPath()
- if xmuxClient != nil {
- xmuxClient.LeftRequests.Add(-1)
- }
- conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, false)
- if err != nil { // browser dialer only
- return nil, err
- }
- return stat.Connection(&conn), nil
- } else { // stream-down
- if xmuxClient2 != nil {
- xmuxClient2.LeftRequests.Add(-1)
- }
- conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(ctx, requestURL2.String(), sessionId, nil, false)
- if err != nil { // browser dialer only
- return nil, err
- }
- }
- if mode == "stream-up" {
- if xmuxClient != nil {
- xmuxClient.LeftRequests.Add(-1)
- }
- _, _, _, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, true)
- if err != nil { // browser dialer only
- return nil, err
- }
- return stat.Connection(&conn), nil
- }
- scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes()
- scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs()
- if scMaxEachPostBytes.From <= 0 {
- panic("`scMaxEachPostBytes` should be bigger than 0")
- }
- maxUploadSize := scMaxEachPostBytes.rand()
- // WithSizeLimit(0) will still allow single bytes to pass, and a lot of
- // code relies on this behavior. Subtract 1 so that together with
- // uploadWriter wrapper, exact size limits can be enforced
- // uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))
- uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(max(0, maxUploadSize-buf.Size)))
- conn.writer = uploadWriter{
- uploadPipeWriter,
- maxUploadSize,
- }
- go func() {
- var seq int64
- var lastWrite time.Time
- for {
- // by offloading the uploads into a buffered pipe, multiple conn.Write
- // calls get automatically batched together into larger POST requests.
- // without batching, bandwidth is extremely limited.
- remainder, err := uploadPipeReader.ReadMultiBuffer()
- if err != nil {
- break
- }
- doSplit := atomic.Bool{}
- for doSplit.Store(true); doSplit.Load(); {
- var chunk buf.MultiBuffer
- remainder, chunk = buf.SplitSize(remainder, maxUploadSize)
- if chunk.IsEmpty() {
- break
- }
- wroteRequest := done.New()
- ctx := httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
- WroteRequest: func(httptrace.WroteRequestInfo) {
- wroteRequest.Close()
- },
- })
- seqStr := strconv.FormatInt(seq, 10)
- seq += 1
- if scMinPostsIntervalMs.From > 0 {
- time.Sleep(time.Duration(scMinPostsIntervalMs.rand())*time.Millisecond - time.Since(lastWrite))
- }
- lastWrite = time.Now()
- if xmuxClient != nil && (xmuxClient.LeftRequests.Add(-1) <= 0 ||
- (xmuxClient.UnreusableAt != time.Time{} && lastWrite.After(xmuxClient.UnreusableAt))) {
- httpClient, xmuxClient = getHTTPClient(ctx, dest, streamSettings)
- }
- go func() {
- err := httpClient.PostPacket(
- ctx,
- requestURL.String(),
- sessionId,
- seqStr,
- chunk,
- )
- wroteRequest.Close()
- if err != nil {
- errors.LogInfoInner(ctx, err, "failed to send upload")
- uploadPipeReader.Interrupt()
- doSplit.Store(false)
- }
- }()
- if _, ok := httpClient.(*DefaultDialerClient); ok {
- <-wroteRequest.Wait()
- }
- }
- }
- }()
- return stat.Connection(&conn), nil
- }
- // A wrapper around pipe that ensures the size limit is exactly honored.
- //
- // The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
- // single MultiBuffer exceeds the size limit, and then starts blocking on the
- // next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
- // bytes than the size limit. We work around this by splitting a potentially
- // too large write up into multiple.
- type uploadWriter struct {
- *pipe.Writer
- maxLen int32
- }
- func (w uploadWriter) Write(b []byte) (int, error) {
- /*
- capacity := int(w.maxLen - w.Len())
- if capacity > 0 && capacity < len(b) {
- b = b[:capacity]
- }
- */
- buffer := buf.MultiBufferContainer{}
- common.Must2(buffer.Write(b))
- var writed int
- for _, buff := range buffer.MultiBuffer {
- err := w.WriteMultiBuffer(buf.MultiBuffer{buff})
- if err != nil {
- return writed, err
- }
- writed += int(buff.Len())
- }
- return writed, nil
- }
|