dialer.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. package splithttp
  2. import (
  3. "context"
  4. gotls "crypto/tls"
  5. "fmt"
  6. "io"
  7. "math/rand"
  8. "net/http"
  9. "net/http/httptrace"
  10. "net/url"
  11. reflect "reflect"
  12. "strconv"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/apernet/quic-go"
  17. "github.com/apernet/quic-go/http3"
  18. "github.com/xtls/xray-core/common"
  19. "github.com/xtls/xray-core/common/buf"
  20. "github.com/xtls/xray-core/common/errors"
  21. "github.com/xtls/xray-core/common/net"
  22. "github.com/xtls/xray-core/common/signal/done"
  23. "github.com/xtls/xray-core/common/uuid"
  24. "github.com/xtls/xray-core/transport/internet"
  25. "github.com/xtls/xray-core/transport/internet/browser_dialer"
  26. "github.com/xtls/xray-core/transport/internet/hysteria/congestion"
  27. "github.com/xtls/xray-core/transport/internet/hysteria/udphop"
  28. "github.com/xtls/xray-core/transport/internet/reality"
  29. "github.com/xtls/xray-core/transport/internet/stat"
  30. "github.com/xtls/xray-core/transport/internet/tls"
  31. "github.com/xtls/xray-core/transport/pipe"
  32. "golang.org/x/net/http2"
  33. )
  34. type dialerConf struct {
  35. net.Destination
  36. *internet.MemoryStreamConfig
  37. }
  38. var (
  39. globalDialerMap map[dialerConf]*XmuxManager
  40. globalDialerAccess sync.Mutex
  41. )
  42. func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (DialerClient, *XmuxClient) {
  43. realityConfig := reality.ConfigFromStreamSettings(streamSettings)
  44. if browser_dialer.HasBrowserDialer() && realityConfig == nil {
  45. return &BrowserDialerClient{transportConfig: streamSettings.ProtocolSettings.(*Config)}, nil
  46. }
  47. globalDialerAccess.Lock()
  48. defer globalDialerAccess.Unlock()
  49. if globalDialerMap == nil {
  50. globalDialerMap = make(map[dialerConf]*XmuxManager)
  51. }
  52. key := dialerConf{dest, streamSettings}
  53. xmuxManager, found := globalDialerMap[key]
  54. if !found {
  55. transportConfig := streamSettings.ProtocolSettings.(*Config)
  56. var xmuxConfig XmuxConfig
  57. if transportConfig.Xmux != nil {
  58. xmuxConfig = *transportConfig.Xmux
  59. }
  60. xmuxManager = NewXmuxManager(xmuxConfig, func() XmuxConn {
  61. return createHTTPClient(dest, streamSettings)
  62. })
  63. globalDialerMap[key] = xmuxManager
  64. }
  65. xmuxClient := xmuxManager.GetXmuxClient(ctx)
  66. return xmuxClient.XmuxConn.(DialerClient), xmuxClient
  67. }
  68. func decideHTTPVersion(tlsConfig *tls.Config, realityConfig *reality.Config) string {
  69. if realityConfig != nil {
  70. return "2"
  71. }
  72. if tlsConfig == nil {
  73. return "1.1"
  74. }
  75. if len(tlsConfig.NextProtocol) != 1 {
  76. return "2"
  77. }
  78. if tlsConfig.NextProtocol[0] == "http/1.1" {
  79. return "1.1"
  80. }
  81. if tlsConfig.NextProtocol[0] == "h3" {
  82. return "3"
  83. }
  84. return "2"
  85. }
  86. func createHTTPClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient {
  87. tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
  88. realityConfig := reality.ConfigFromStreamSettings(streamSettings)
  89. httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
  90. if httpVersion == "3" {
  91. dest.Network = net.Network_UDP // better to keep this line
  92. }
  93. var gotlsConfig *gotls.Config
  94. if tlsConfig != nil {
  95. gotlsConfig = tlsConfig.GetTLSConfig(tls.WithDestination(dest))
  96. }
  97. transportConfig := streamSettings.ProtocolSettings.(*Config)
  98. dialContext := func(ctxInner context.Context) (net.Conn, error) {
  99. conn, err := internet.DialSystem(ctxInner, dest, streamSettings.SocketSettings)
  100. if err != nil {
  101. return nil, err
  102. }
  103. if streamSettings.TcpmaskManager != nil {
  104. newConn, err := streamSettings.TcpmaskManager.WrapConnClient(conn)
  105. if err != nil {
  106. conn.Close()
  107. return nil, errors.New("mask err").Base(err)
  108. }
  109. conn = newConn
  110. }
  111. if realityConfig != nil {
  112. return reality.UClient(conn, realityConfig, ctxInner, dest)
  113. }
  114. if gotlsConfig != nil {
  115. if fingerprint := tls.GetFingerprint(tlsConfig.Fingerprint); fingerprint != nil {
  116. conn = tls.UClient(conn, gotlsConfig, fingerprint)
  117. if err := conn.(*tls.UConn).HandshakeContext(ctxInner); err != nil {
  118. return nil, err
  119. }
  120. } else {
  121. conn = tls.Client(conn, gotlsConfig)
  122. }
  123. }
  124. return conn, nil
  125. }
  126. var keepAlivePeriod time.Duration
  127. if streamSettings.ProtocolSettings.(*Config).Xmux != nil {
  128. keepAlivePeriod = time.Duration(streamSettings.ProtocolSettings.(*Config).Xmux.HKeepAlivePeriod) * time.Second
  129. }
  130. var transport http.RoundTripper
  131. if httpVersion == "3" {
  132. quicParams := streamSettings.QuicParams
  133. if quicParams == nil {
  134. quicParams = &internet.QuicParams{}
  135. }
  136. if quicParams.UdpHop == nil {
  137. quicParams.UdpHop = &internet.UdpHop{}
  138. }
  139. quicConfig := &quic.Config{
  140. InitialStreamReceiveWindow: quicParams.InitStreamReceiveWindow,
  141. MaxStreamReceiveWindow: quicParams.MaxStreamReceiveWindow,
  142. InitialConnectionReceiveWindow: quicParams.InitConnReceiveWindow,
  143. MaxConnectionReceiveWindow: quicParams.MaxConnReceiveWindow,
  144. MaxIdleTimeout: time.Duration(quicParams.MaxIdleTimeout) * time.Second,
  145. KeepAlivePeriod: time.Duration(quicParams.KeepAlivePeriod) * time.Second,
  146. MaxIncomingStreams: quicParams.MaxIncomingStreams,
  147. DisablePathMTUDiscovery: quicParams.DisablePathMtuDiscovery,
  148. }
  149. if quicParams.MaxIdleTimeout == 0 {
  150. quicConfig.MaxIdleTimeout = net.ConnIdleTimeout
  151. }
  152. if quicParams.KeepAlivePeriod == 0 {
  153. if keepAlivePeriod == 0 {
  154. quicConfig.KeepAlivePeriod = net.QuicgoH3KeepAlivePeriod
  155. }
  156. }
  157. if quicParams.MaxIncomingStreams == 0 {
  158. // these two are defaults of quic-go/http3. the default of quic-go (no
  159. // http3) is different, so it is hardcoded here for clarity.
  160. // https://github.com/quic-go/quic-go/blob/b8ea5c798155950fb5bbfdd06cad1939c9355878/http3/client.go#L36-L39
  161. quicConfig.MaxIncomingStreams = -1
  162. }
  163. transport = &http3.Transport{
  164. QUICConfig: quicConfig,
  165. TLSClientConfig: gotlsConfig,
  166. Dial: func(ctx context.Context, addr string, tlsCfg *gotls.Config, cfg *quic.Config) (*quic.Conn, error) {
  167. udphopDialer := func(addr *net.UDPAddr) (net.PacketConn, error) {
  168. conn, err := internet.DialSystem(ctx, net.UDPDestination(net.IPAddress(addr.IP), net.Port(addr.Port)), streamSettings.SocketSettings)
  169. if err != nil {
  170. errors.LogDebug(context.Background(), "skip hop: failed to dial to dest")
  171. conn.Close()
  172. return nil, errors.New()
  173. }
  174. var udpConn net.PacketConn
  175. switch c := conn.(type) {
  176. case *internet.PacketConnWrapper:
  177. udpConn = c.PacketConn
  178. case *net.UDPConn:
  179. udpConn = c
  180. default:
  181. errors.LogDebug(context.Background(), "skip hop: udphop requires being at the outermost level ", reflect.TypeOf(c))
  182. conn.Close()
  183. return nil, errors.New()
  184. }
  185. return udpConn, nil
  186. }
  187. var index int
  188. if len(quicParams.UdpHop.Ports) > 0 {
  189. index = rand.Intn(len(quicParams.UdpHop.Ports))
  190. dest.Port = net.Port(quicParams.UdpHop.Ports[index])
  191. }
  192. conn, err := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
  193. if err != nil {
  194. return nil, err
  195. }
  196. var udpConn net.PacketConn
  197. var udpAddr *net.UDPAddr
  198. switch c := conn.(type) {
  199. case *internet.PacketConnWrapper:
  200. udpConn = c.PacketConn
  201. udpAddr, err = net.ResolveUDPAddr("udp", c.Dest.String())
  202. if err != nil {
  203. conn.Close()
  204. return nil, err
  205. }
  206. case *net.UDPConn:
  207. udpConn = c
  208. udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
  209. if err != nil {
  210. conn.Close()
  211. return nil, err
  212. }
  213. default:
  214. udpConn = &internet.FakePacketConn{Conn: c}
  215. udpAddr, err = net.ResolveUDPAddr("udp", c.RemoteAddr().String())
  216. if err != nil {
  217. conn.Close()
  218. return nil, err
  219. }
  220. if len(quicParams.UdpHop.Ports) > 0 {
  221. conn.Close()
  222. return nil, errors.New("udphop requires being at the outermost level ", reflect.TypeOf(c))
  223. }
  224. }
  225. if len(quicParams.UdpHop.Ports) > 0 {
  226. addr := &udphop.UDPHopAddr{
  227. IP: udpAddr.IP,
  228. Ports: quicParams.UdpHop.Ports,
  229. }
  230. udpConn, err = udphop.NewUDPHopPacketConn(addr, index, quicParams.UdpHop.IntervalMin, quicParams.UdpHop.IntervalMax, udphopDialer, udpConn)
  231. if err != nil {
  232. conn.Close()
  233. return nil, errors.New("udphop err").Base(err)
  234. }
  235. }
  236. if streamSettings.UdpmaskManager != nil {
  237. udpConn, err = streamSettings.UdpmaskManager.WrapPacketConnClient(udpConn)
  238. if err != nil {
  239. conn.Close()
  240. return nil, errors.New("mask err").Base(err)
  241. }
  242. }
  243. quicConn, err := quic.DialEarly(ctx, udpConn, udpAddr, tlsCfg, cfg)
  244. if err != nil {
  245. return nil, err
  246. }
  247. switch quicParams.Congestion {
  248. case "force-brutal":
  249. errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion brutal bytes per second ", quicParams.BrutalUp)
  250. congestion.UseBrutal(quicConn, quicParams.BrutalUp)
  251. case "reno":
  252. errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion reno")
  253. default:
  254. errors.LogDebug(context.Background(), quicConn.RemoteAddr(), " ", "congestion bbr")
  255. congestion.UseBBR(quicConn)
  256. }
  257. return quicConn, nil
  258. },
  259. }
  260. } else if httpVersion == "2" {
  261. if keepAlivePeriod == 0 {
  262. keepAlivePeriod = net.ChromeH2KeepAlivePeriod
  263. }
  264. if keepAlivePeriod < 0 {
  265. keepAlivePeriod = 0
  266. }
  267. transport = &http2.Transport{
  268. DialTLSContext: func(ctxInner context.Context, network string, addr string, cfg *gotls.Config) (net.Conn, error) {
  269. return dialContext(ctxInner)
  270. },
  271. IdleConnTimeout: net.ConnIdleTimeout,
  272. ReadIdleTimeout: keepAlivePeriod,
  273. }
  274. } else {
  275. httpDialContext := func(ctxInner context.Context, network string, addr string) (net.Conn, error) {
  276. return dialContext(ctxInner)
  277. }
  278. transport = &http.Transport{
  279. DialTLSContext: httpDialContext,
  280. DialContext: httpDialContext,
  281. IdleConnTimeout: net.ConnIdleTimeout,
  282. // chunked transfer download with KeepAlives is buggy with
  283. // http.Client and our custom dial context.
  284. DisableKeepAlives: true,
  285. }
  286. }
  287. client := &DefaultDialerClient{
  288. transportConfig: transportConfig,
  289. client: &http.Client{
  290. Transport: transport,
  291. },
  292. httpVersion: httpVersion,
  293. uploadRawPool: &sync.Pool{},
  294. dialUploadConn: dialContext,
  295. }
  296. return client
  297. }
  298. func init() {
  299. common.Must(internet.RegisterTransportDialer(protocolName, Dial))
  300. }
  301. func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
  302. tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
  303. realityConfig := reality.ConfigFromStreamSettings(streamSettings)
  304. httpVersion := decideHTTPVersion(tlsConfig, realityConfig)
  305. if httpVersion == "3" {
  306. dest.Network = net.Network_UDP
  307. }
  308. transportConfiguration := streamSettings.ProtocolSettings.(*Config)
  309. var requestURL url.URL
  310. if tlsConfig != nil || realityConfig != nil {
  311. requestURL.Scheme = "https"
  312. } else {
  313. requestURL.Scheme = "http"
  314. }
  315. requestURL.Host = transportConfiguration.Host
  316. if requestURL.Host == "" && tlsConfig != nil {
  317. requestURL.Host = tlsConfig.ServerName
  318. }
  319. if requestURL.Host == "" && realityConfig != nil {
  320. requestURL.Host = realityConfig.ServerName
  321. }
  322. if requestURL.Host == "" {
  323. requestURL.Host = dest.Address.String()
  324. }
  325. requestURL.Path = transportConfiguration.GetNormalizedPath()
  326. requestURL.RawQuery = transportConfiguration.GetNormalizedQuery()
  327. httpClient, xmuxClient := getHTTPClient(ctx, dest, streamSettings)
  328. mode := transportConfiguration.Mode
  329. if mode == "" || mode == "auto" {
  330. mode = "packet-up"
  331. if realityConfig != nil {
  332. mode = "stream-one"
  333. if transportConfiguration.DownloadSettings != nil {
  334. mode = "stream-up"
  335. }
  336. }
  337. }
  338. sessionId := ""
  339. if mode != "stream-one" {
  340. sessionIdUuid := uuid.New()
  341. sessionId = sessionIdUuid.String()
  342. }
  343. errors.LogInfo(ctx, fmt.Sprintf("XHTTP is dialing to %s, mode %s, HTTP version %s, host %s", dest, mode, httpVersion, requestURL.Host))
  344. requestURL2 := requestURL
  345. httpClient2 := httpClient
  346. xmuxClient2 := xmuxClient
  347. if transportConfiguration.DownloadSettings != nil {
  348. globalDialerAccess.Lock()
  349. if streamSettings.DownloadSettings == nil {
  350. streamSettings.DownloadSettings = common.Must2(internet.ToMemoryStreamConfig(transportConfiguration.DownloadSettings))
  351. if streamSettings.SocketSettings != nil && streamSettings.SocketSettings.Penetrate {
  352. streamSettings.DownloadSettings.SocketSettings = streamSettings.SocketSettings
  353. }
  354. }
  355. globalDialerAccess.Unlock()
  356. memory2 := streamSettings.DownloadSettings
  357. dest2 := *memory2.Destination // just panic
  358. tlsConfig2 := tls.ConfigFromStreamSettings(memory2)
  359. realityConfig2 := reality.ConfigFromStreamSettings(memory2)
  360. httpVersion2 := decideHTTPVersion(tlsConfig2, realityConfig2)
  361. if httpVersion2 == "3" {
  362. dest2.Network = net.Network_UDP
  363. }
  364. if tlsConfig2 != nil || realityConfig2 != nil {
  365. requestURL2.Scheme = "https"
  366. } else {
  367. requestURL2.Scheme = "http"
  368. }
  369. config2 := memory2.ProtocolSettings.(*Config)
  370. requestURL2.Host = config2.Host
  371. if requestURL2.Host == "" && tlsConfig2 != nil {
  372. requestURL2.Host = tlsConfig2.ServerName
  373. }
  374. if requestURL2.Host == "" && realityConfig2 != nil {
  375. requestURL2.Host = realityConfig2.ServerName
  376. }
  377. if requestURL2.Host == "" {
  378. requestURL2.Host = dest2.Address.String()
  379. }
  380. requestURL2.Path = config2.GetNormalizedPath()
  381. requestURL2.RawQuery = config2.GetNormalizedQuery()
  382. httpClient2, xmuxClient2 = getHTTPClient(ctx, dest2, memory2)
  383. errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host))
  384. }
  385. if xmuxClient != nil {
  386. xmuxClient.OpenUsage.Add(1)
  387. }
  388. if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
  389. xmuxClient2.OpenUsage.Add(1)
  390. }
  391. var closed atomic.Int32
  392. reader, writer := io.Pipe()
  393. conn := splitConn{
  394. writer: writer,
  395. onClose: func() {
  396. if closed.Add(1) > 1 {
  397. return
  398. }
  399. if xmuxClient != nil {
  400. xmuxClient.OpenUsage.Add(-1)
  401. }
  402. if xmuxClient2 != nil && xmuxClient2 != xmuxClient {
  403. xmuxClient2.OpenUsage.Add(-1)
  404. }
  405. },
  406. }
  407. var err error
  408. if mode == "stream-one" {
  409. requestURL.Path = transportConfiguration.GetNormalizedPath()
  410. if xmuxClient != nil {
  411. xmuxClient.LeftRequests.Add(-1)
  412. }
  413. conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, false)
  414. if err != nil { // browser dialer only
  415. return nil, err
  416. }
  417. return stat.Connection(&conn), nil
  418. } else { // stream-down
  419. if xmuxClient2 != nil {
  420. xmuxClient2.LeftRequests.Add(-1)
  421. }
  422. conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(ctx, requestURL2.String(), sessionId, nil, false)
  423. if err != nil { // browser dialer only
  424. return nil, err
  425. }
  426. }
  427. if mode == "stream-up" {
  428. if xmuxClient != nil {
  429. xmuxClient.LeftRequests.Add(-1)
  430. }
  431. _, _, _, err = httpClient.OpenStream(ctx, requestURL.String(), sessionId, reader, true)
  432. if err != nil { // browser dialer only
  433. return nil, err
  434. }
  435. return stat.Connection(&conn), nil
  436. }
  437. scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes()
  438. scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs()
  439. if scMaxEachPostBytes.From <= 0 {
  440. panic("`scMaxEachPostBytes` should be bigger than 0")
  441. }
  442. maxUploadSize := scMaxEachPostBytes.rand()
  443. // WithSizeLimit(0) will still allow single bytes to pass, and a lot of
  444. // code relies on this behavior. Subtract 1 so that together with
  445. // uploadWriter wrapper, exact size limits can be enforced
  446. // uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))
  447. uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(max(0, maxUploadSize-buf.Size)))
  448. conn.writer = uploadWriter{
  449. uploadPipeWriter,
  450. maxUploadSize,
  451. }
  452. go func() {
  453. var seq int64
  454. var lastWrite time.Time
  455. for {
  456. // by offloading the uploads into a buffered pipe, multiple conn.Write
  457. // calls get automatically batched together into larger POST requests.
  458. // without batching, bandwidth is extremely limited.
  459. remainder, err := uploadPipeReader.ReadMultiBuffer()
  460. if err != nil {
  461. break
  462. }
  463. doSplit := atomic.Bool{}
  464. for doSplit.Store(true); doSplit.Load(); {
  465. var chunk buf.MultiBuffer
  466. remainder, chunk = buf.SplitSize(remainder, maxUploadSize)
  467. if chunk.IsEmpty() {
  468. break
  469. }
  470. wroteRequest := done.New()
  471. ctx := httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
  472. WroteRequest: func(httptrace.WroteRequestInfo) {
  473. wroteRequest.Close()
  474. },
  475. })
  476. seqStr := strconv.FormatInt(seq, 10)
  477. seq += 1
  478. if scMinPostsIntervalMs.From > 0 {
  479. time.Sleep(time.Duration(scMinPostsIntervalMs.rand())*time.Millisecond - time.Since(lastWrite))
  480. }
  481. lastWrite = time.Now()
  482. if xmuxClient != nil && (xmuxClient.LeftRequests.Add(-1) <= 0 ||
  483. (xmuxClient.UnreusableAt != time.Time{} && lastWrite.After(xmuxClient.UnreusableAt))) {
  484. httpClient, xmuxClient = getHTTPClient(ctx, dest, streamSettings)
  485. }
  486. go func() {
  487. err := httpClient.PostPacket(
  488. ctx,
  489. requestURL.String(),
  490. sessionId,
  491. seqStr,
  492. chunk,
  493. )
  494. wroteRequest.Close()
  495. if err != nil {
  496. errors.LogInfoInner(ctx, err, "failed to send upload")
  497. uploadPipeReader.Interrupt()
  498. doSplit.Store(false)
  499. }
  500. }()
  501. if _, ok := httpClient.(*DefaultDialerClient); ok {
  502. <-wroteRequest.Wait()
  503. }
  504. }
  505. }
  506. }()
  507. return stat.Connection(&conn), nil
  508. }
  509. // A wrapper around pipe that ensures the size limit is exactly honored.
  510. //
  511. // The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
  512. // single MultiBuffer exceeds the size limit, and then starts blocking on the
  513. // next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
  514. // bytes than the size limit. We work around this by splitting a potentially
  515. // too large write up into multiple.
  516. type uploadWriter struct {
  517. *pipe.Writer
  518. maxLen int32
  519. }
  520. func (w uploadWriter) Write(b []byte) (int, error) {
  521. /*
  522. capacity := int(w.maxLen - w.Len())
  523. if capacity > 0 && capacity < len(b) {
  524. b = b[:capacity]
  525. }
  526. */
  527. buffer := buf.MultiBufferContainer{}
  528. common.Must2(buffer.Write(b))
  529. var writed int
  530. for _, buff := range buffer.MultiBuffer {
  531. err := w.WriteMultiBuffer(buf.MultiBuffer{buff})
  532. if err != nil {
  533. return writed, err
  534. }
  535. writed += int(buff.Len())
  536. }
  537. return writed, nil
  538. }